Hello All,
I would like to the community's opinion on the implementation of Kudu
output operator. A first cut implementation was made available in
November last year but I guess we did not get time to discuss this
thoroughly on the mailing list and hence the PR did not get merged.
This operator would allow Apex to stream data into Kudu. A brief
description of Kudu is here : https://kudu.apache.org/. This would allow
at a high level the following use cases from Apex point of view:
- Low latency writes into Kudu store that allows SQL queries on the Kudu
store. This essentially means sub-second data updates available for SQL
querying. As opposed to parquet styled data dumps which would ideally
need a few minutes to accumulate data to take advantage of Parquet
formats, this would make same second queries on very large datasets on
Kudu with Impala.
- Another very interesting use cases would be to allow Kudu as a source
store to stream based on SQL queries. The kudu input operator is another
JIRA(https://issues.apache.org/jira/browse/APEXMALHAR-2472) and would be
covering mechanisms to stream data from Kudu into Apex. This will bring
in interesting use cases like de-dupe and selective streaming and out of
band data in a different way if Kudu is part of the eco system in a
given setup.
Here is the design of the Kudu output operator:
1. The operator would be an AbstractOperator and would allow the
concrete implementations to set a few behavioral aspects of the operator.
2. The following are the major phases of the operator:
During activate() phase of the operator : Establish a connection to the
cluster and get the metadata about the table that is being used as the sink.
During setup() phase of the operator: Fetch the current window
information and use it decide if we are recovering from a failure mode.
(See point 8 below )
During process() of Input port : Inspect the incoming ExecutionContext (
see below ) tuple and perform one of the operations (
Insert/Update/Delete/Upsert)
3. The following parameters are tunable while establishing a Kudu
connection:
Table name, Boss worker threads, Worker threads, Socket read time outs
and External Consistency mode.
4. The user need not specify any schema outright. The pojo fields are
automatically mapped to the table column names as identified in the
schema parse in the activate phase.
5. Allow the concrete implementation of the operator to override the
Pojo field name to the table schema column name. This would allow
flexibility in use cases like table schema column names are not
compatible with java bean frameworks or in situations when column names
cant be controlled as POJO is coming from an upstream operator.
6. The input tuple that is to be supplied to this operator is of type
"Kudu Execution Context". This tuple encompasses the actual Pojo that is
going to be persisted to the Kudu store. Additionally it allows the
upstream operator to specify the operation that needs to be performed.
One of the following operations is permitted as part of the context :
Insert, Upsert, Update and delete on the Pojo that is acting as the
payload in the Execution Context.
7. The concrete implementation of the operator would allow the user to
specify the actual POJO class definition that would be used to the write
to the table. The execution context would contain this POJO as well as
the metadata that defines the behavior of the processing that needs to
be done on that tuple.
8. The operator would allow for a special case of execution mode for the
first window that is being processed as the operator gets activated.
There are two modes for the first window of processing of the operator :
a. Safe Mode : Safe mode is the "happy path execution" as in no
extra processing is required to perform the Kudu mutation.
b. Reconciling Mode: There is an additional function that would be
called to see if the user would like the tuple to be used for mutation.
This mode is automatically set when OperatorContext.ACTIVATION_WINDOW_ID
!= Stateless.WINDOW_ID during the first window of processing by the
operator.
This feature is deemed to be useful when an operator is recovering from
a crash instance of the application and we do not want to perform
multiple mutations of the same tuple given ATLEAST_ONCE is the default
semantics.
9. The operator is a stateless operator.
10. The operator would generate the following autometrics :
a. Counts of Inserts, Upserts, Deletes and Updates (separate
counters for each mutation) for a given window
b. Bytes written in a given window
c. Write RPCs in the given window
d. Total RPC errors in this window
e. All of the above metrics for the operator for its entire
lifetime of the operator.
Could you please provide your thoughts if the above design looks good ?
Regards,
Ananth