Hi, Will be merging the PR - https://github.com/apache/apex-malhar/pull/486 soon unless there are any comments from the community.
~ Bhupesh _______________________________________________________ Bhupesh Chawda E: bhup...@datatorrent.com | Twitter: @bhupeshsc www.datatorrent.com | apex.apache.org On Sat, Apr 8, 2017 at 2:06 AM, Amol Kekre <a...@datatorrent.com> wrote: > Ananth, > This is good proposal. We will work with you. > > Thks > Amol > > > E:a...@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre* > > www.datatorrent.com > > > On Sat, Apr 1, 2017 at 4:29 PM, ananth <ananthg.a...@gmail.com> wrote: > > > Hello All, > > > > I would like to the community's opinion on the implementation of Kudu > > output operator. A first cut implementation was made available in > November > > last year but I guess we did not get time to discuss this thoroughly on > the > > mailing list and hence the PR did not get merged. > > > > This operator would allow Apex to stream data into Kudu. A brief > > description of Kudu is here : https://kudu.apache.org/. This would allow > > at a high level the following use cases from Apex point of view: > > > > - Low latency writes into Kudu store that allows SQL queries on the Kudu > > store. This essentially means sub-second data updates available for SQL > > querying. As opposed to parquet styled data dumps which would ideally > need > > a few minutes to accumulate data to take advantage of Parquet formats, > this > > would make same second queries on very large datasets on Kudu with > Impala. > > > > - Another very interesting use cases would be to allow Kudu as a source > > store to stream based on SQL queries. The kudu input operator is another > > JIRA(https://issues.apache.org/jira/browse/APEXMALHAR-2472) and would be > > covering mechanisms to stream data from Kudu into Apex. This will bring > in > > interesting use cases like de-dupe and selective streaming and out of > band > > data in a different way if Kudu is part of the eco system in a given > setup. > > > > Here is the design of the Kudu output operator: > > > > > > 1. The operator would be an AbstractOperator and would allow the concrete > > implementations to set a few behavioral aspects of the operator. > > > > 2. The following are the major phases of the operator: > > > > During activate() phase of the operator : Establish a connection to the > > cluster and get the metadata about the table that is being used as the > sink. > > During setup() phase of the operator: Fetch the current window > information > > and use it decide if we are recovering from a failure mode. (See point 8 > > below ) > > During process() of Input port : Inspect the incoming ExecutionContext ( > > see below ) tuple and perform one of the operations ( > > Insert/Update/Delete/Upsert) > > 3. The following parameters are tunable while establishing a Kudu > > connection: > > Table name, Boss worker threads, Worker threads, Socket read time outs > and > > External Consistency mode. > > 4. The user need not specify any schema outright. The pojo fields are > > automatically mapped to the table column names as identified in the > schema > > parse in the activate phase. > > 5. Allow the concrete implementation of the operator to override the Pojo > > field name to the table schema column name. This would allow flexibility > in > > use cases like table schema column names are not compatible with java > bean > > frameworks or in situations when column names cant be controlled as POJO > is > > coming from an upstream operator. > > 6. The input tuple that is to be supplied to this operator is of type > > "Kudu Execution Context". This tuple encompasses the actual Pojo that is > > going to be persisted to the Kudu store. Additionally it allows the > > upstream operator to specify the operation that needs to be performed. > One > > of the following operations is permitted as part of the context : Insert, > > Upsert, Update and delete on the Pojo that is acting as the payload in > the > > Execution Context. > > 7. The concrete implementation of the operator would allow the user to > > specify the actual POJO class definition that would be used to the write > to > > the table. The execution context would contain this POJO as well as the > > metadata that defines the behavior of the processing that needs to be > done > > on that tuple. > > 8. The operator would allow for a special case of execution mode for the > > first window that is being processed as the operator gets activated. > There > > are two modes for the first window of processing of the operator : > > a. Safe Mode : Safe mode is the "happy path execution" as in no extra > > processing is required to perform the Kudu mutation. > > b. Reconciling Mode: There is an additional function that would be > > called to see if the user would like the tuple to be used for mutation. > > This mode is automatically set when OperatorContext.ACTIVATION_WINDOW_ID > > != Stateless.WINDOW_ID during the first window of processing by the > > operator. > > > > This feature is deemed to be useful when an operator is recovering from a > > crash instance of the application and we do not want to perform multiple > > mutations of the same tuple given ATLEAST_ONCE is the default semantics. > > > > 9. The operator is a stateless operator. > > 10. The operator would generate the following autometrics : > > a. Counts of Inserts, Upserts, Deletes and Updates (separate > counters > > for each mutation) for a given window > > b. Bytes written in a given window > > c. Write RPCs in the given window > > d. Total RPC errors in this window > > e. All of the above metrics for the operator for its entire lifetime > > of the operator. > > > > > > Could you please provide your thoughts if the above design looks good ? > > > > > > > > > > Regards, > > > > Ananth > > > > >