Ananth,
This is good proposal. We will work with you.

Thks
Amol


E:a...@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre*

www.datatorrent.com


On Sat, Apr 1, 2017 at 4:29 PM, ananth <ananthg.a...@gmail.com> wrote:

> Hello All,
>
> I would like to the community's opinion on the implementation of Kudu
> output operator.  A first cut implementation was made available in November
> last year but I guess we did not get time to discuss this thoroughly on the
> mailing list and hence the PR did not get merged.
>
> This operator would allow Apex to stream data into Kudu. A brief
> description of Kudu is here : https://kudu.apache.org/. This would allow
> at a high level the following use cases from Apex point of view:
>
> - Low latency writes into Kudu store that allows SQL queries on the Kudu
> store. This essentially means sub-second data updates available for SQL
> querying. As opposed to parquet styled data dumps which would ideally need
> a few minutes to accumulate data to take advantage of Parquet formats, this
> would make same second queries on very large datasets on Kudu with Impala.
>
> - Another very interesting use cases would be to allow Kudu as a source
> store to stream based on SQL queries. The kudu input operator is another
> JIRA(https://issues.apache.org/jira/browse/APEXMALHAR-2472) and would be
> covering mechanisms to stream data from Kudu into Apex. This will bring in
> interesting use cases like de-dupe and selective streaming and out of band
> data in a different way if Kudu is part of the eco system in a given setup.
>
> Here is the design of the Kudu output operator:
>
>
> 1. The operator would be an AbstractOperator and would allow the concrete
> implementations to set a few behavioral aspects of the operator.
>
> 2. The following are the major phases of the operator:
>
> During activate() phase of the operator : Establish a connection to the
> cluster and get the metadata about the table that is being used as the sink.
> During setup() phase of the operator: Fetch the current window information
> and use it decide if we are recovering from a failure mode. (See point 8
> below )
> During process() of Input port : Inspect the incoming ExecutionContext (
> see below ) tuple and perform one of the operations (
> Insert/Update/Delete/Upsert)
> 3. The following parameters are tunable while establishing a Kudu
> connection:
> Table name, Boss worker threads, Worker threads, Socket read time outs and
> External Consistency mode.
> 4. The user need not specify any schema outright. The pojo fields are
> automatically mapped to the table column names as identified in the schema
> parse in the activate phase.
> 5. Allow the concrete implementation of the operator to override the Pojo
> field name to the table schema column name. This would allow flexibility in
> use cases like table schema column names are not compatible with java bean
> frameworks or in situations when column names cant be controlled as POJO is
> coming from an upstream operator.
> 6. The input tuple that is to be supplied to this operator is of type
> "Kudu Execution Context". This tuple encompasses the actual Pojo that is
> going to be persisted to the Kudu store. Additionally it allows the
> upstream operator to specify the operation that needs to be performed. One
> of the following operations is permitted as part of the context : Insert,
> Upsert, Update and delete on the Pojo that is acting as the payload in the
> Execution Context.
> 7. The concrete implementation of the operator would allow the user to
> specify the actual POJO class definition that would be used to the write to
> the table. The execution context would contain this POJO as well as the
> metadata that defines the behavior of the processing that needs to be done
> on that tuple.
> 8. The operator would allow for a special case of execution mode for the
> first window that is being processed as the operator gets activated. There
> are two modes for the first window of processing of the operator :
>     a. Safe Mode : Safe mode is the "happy path execution" as in no extra
> processing is required to perform the Kudu mutation.
>     b. Reconciling Mode: There is an additional function that would be
> called to see if the user would like the tuple to be used for mutation.
> This mode is automatically set when OperatorContext.ACTIVATION_WINDOW_ID
> != Stateless.WINDOW_ID during the first window of processing by the
> operator.
>
> This feature is deemed to be useful when an operator is recovering from a
> crash instance of the application and we do not want to perform multiple
> mutations of the same tuple given ATLEAST_ONCE is the default semantics.
>
> 9. The operator is a stateless operator.
> 10. The operator would generate the following autometrics :
>      a. Counts of Inserts, Upserts, Deletes and Updates (separate counters
> for each mutation) for a given window
>      b. Bytes written in a given window
>      c. Write RPCs in the given window
>      d. Total RPC errors in this window
>      e. All of the above metrics for the operator for its entire lifetime
> of the operator.
>
>
> Could you please provide your thoughts if the above design looks good ?
>
>
>
>
> Regards,
>
> Ananth
>
>

Reply via email to