Hello All,

I would like to the community's opinion on the implementation of Kudu output operator. A first cut implementation was made available in November last year but I guess we did not get time to discuss this thoroughly on the mailing list and hence the PR did not get merged.

This operator would allow Apex to stream data into Kudu. A brief description of Kudu is here : https://kudu.apache.org/. This would allow at a high level the following use cases from Apex point of view:

- Low latency writes into Kudu store that allows SQL queries on the Kudu store. This essentially means sub-second data updates available for SQL querying. As opposed to parquet styled data dumps which would ideally need a few minutes to accumulate data to take advantage of Parquet formats, this would make same second queries on very large datasets on Kudu with Impala.

- Another very interesting use cases would be to allow Kudu as a source store to stream based on SQL queries. The kudu input operator is another JIRA(https://issues.apache.org/jira/browse/APEXMALHAR-2472) and would be covering mechanisms to stream data from Kudu into Apex. This will bring in interesting use cases like de-dupe and selective streaming and out of band data in a different way if Kudu is part of the eco system in a given setup.

Here is the design of the Kudu output operator:


1. The operator would be an AbstractOperator and would allow the concrete implementations to set a few behavioral aspects of the operator.

2. The following are the major phases of the operator:

During activate() phase of the operator : Establish a connection to the cluster and get the metadata about the table that is being used as the sink. During setup() phase of the operator: Fetch the current window information and use it decide if we are recovering from a failure mode. (See point 8 below ) During process() of Input port : Inspect the incoming ExecutionContext ( see below ) tuple and perform one of the operations ( Insert/Update/Delete/Upsert) 3. The following parameters are tunable while establishing a Kudu connection: Table name, Boss worker threads, Worker threads, Socket read time outs and External Consistency mode. 4. The user need not specify any schema outright. The pojo fields are automatically mapped to the table column names as identified in the schema parse in the activate phase. 5. Allow the concrete implementation of the operator to override the Pojo field name to the table schema column name. This would allow flexibility in use cases like table schema column names are not compatible with java bean frameworks or in situations when column names cant be controlled as POJO is coming from an upstream operator. 6. The input tuple that is to be supplied to this operator is of type "Kudu Execution Context". This tuple encompasses the actual Pojo that is going to be persisted to the Kudu store. Additionally it allows the upstream operator to specify the operation that needs to be performed. One of the following operations is permitted as part of the context : Insert, Upsert, Update and delete on the Pojo that is acting as the payload in the Execution Context. 7. The concrete implementation of the operator would allow the user to specify the actual POJO class definition that would be used to the write to the table. The execution context would contain this POJO as well as the metadata that defines the behavior of the processing that needs to be done on that tuple. 8. The operator would allow for a special case of execution mode for the first window that is being processed as the operator gets activated. There are two modes for the first window of processing of the operator : a. Safe Mode : Safe mode is the "happy path execution" as in no extra processing is required to perform the Kudu mutation. b. Reconciling Mode: There is an additional function that would be called to see if the user would like the tuple to be used for mutation. This mode is automatically set when OperatorContext.ACTIVATION_WINDOW_ID != Stateless.WINDOW_ID during the first window of processing by the operator.

This feature is deemed to be useful when an operator is recovering from a crash instance of the application and we do not want to perform multiple mutations of the same tuple given ATLEAST_ONCE is the default semantics.

9. The operator is a stateless operator.
10. The operator would generate the following autometrics :
a. Counts of Inserts, Upserts, Deletes and Updates (separate counters for each mutation) for a given window
     b. Bytes written in a given window
     c. Write RPCs in the given window
     d. Total RPC errors in this window
e. All of the above metrics for the operator for its entire lifetime of the operator.


Could you please provide your thoughts if the above design looks good ?




Regards,

Ananth

Reply via email to