Ananth, This is good proposal. We will work with you. Thks Amol
E:a...@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre* www.datatorrent.com On Sat, Apr 1, 2017 at 4:29 PM, ananth <ananthg.a...@gmail.com> wrote: > Hello All, > > I would like to the community's opinion on the implementation of Kudu > output operator. A first cut implementation was made available in November > last year but I guess we did not get time to discuss this thoroughly on the > mailing list and hence the PR did not get merged. > > This operator would allow Apex to stream data into Kudu. A brief > description of Kudu is here : https://kudu.apache.org/. This would allow > at a high level the following use cases from Apex point of view: > > - Low latency writes into Kudu store that allows SQL queries on the Kudu > store. This essentially means sub-second data updates available for SQL > querying. As opposed to parquet styled data dumps which would ideally need > a few minutes to accumulate data to take advantage of Parquet formats, this > would make same second queries on very large datasets on Kudu with Impala. > > - Another very interesting use cases would be to allow Kudu as a source > store to stream based on SQL queries. The kudu input operator is another > JIRA(https://issues.apache.org/jira/browse/APEXMALHAR-2472) and would be > covering mechanisms to stream data from Kudu into Apex. This will bring in > interesting use cases like de-dupe and selective streaming and out of band > data in a different way if Kudu is part of the eco system in a given setup. > > Here is the design of the Kudu output operator: > > > 1. The operator would be an AbstractOperator and would allow the concrete > implementations to set a few behavioral aspects of the operator. > > 2. The following are the major phases of the operator: > > During activate() phase of the operator : Establish a connection to the > cluster and get the metadata about the table that is being used as the sink. > During setup() phase of the operator: Fetch the current window information > and use it decide if we are recovering from a failure mode. (See point 8 > below ) > During process() of Input port : Inspect the incoming ExecutionContext ( > see below ) tuple and perform one of the operations ( > Insert/Update/Delete/Upsert) > 3. The following parameters are tunable while establishing a Kudu > connection: > Table name, Boss worker threads, Worker threads, Socket read time outs and > External Consistency mode. > 4. The user need not specify any schema outright. The pojo fields are > automatically mapped to the table column names as identified in the schema > parse in the activate phase. > 5. Allow the concrete implementation of the operator to override the Pojo > field name to the table schema column name. This would allow flexibility in > use cases like table schema column names are not compatible with java bean > frameworks or in situations when column names cant be controlled as POJO is > coming from an upstream operator. > 6. The input tuple that is to be supplied to this operator is of type > "Kudu Execution Context". This tuple encompasses the actual Pojo that is > going to be persisted to the Kudu store. Additionally it allows the > upstream operator to specify the operation that needs to be performed. One > of the following operations is permitted as part of the context : Insert, > Upsert, Update and delete on the Pojo that is acting as the payload in the > Execution Context. > 7. The concrete implementation of the operator would allow the user to > specify the actual POJO class definition that would be used to the write to > the table. The execution context would contain this POJO as well as the > metadata that defines the behavior of the processing that needs to be done > on that tuple. > 8. The operator would allow for a special case of execution mode for the > first window that is being processed as the operator gets activated. There > are two modes for the first window of processing of the operator : > a. Safe Mode : Safe mode is the "happy path execution" as in no extra > processing is required to perform the Kudu mutation. > b. Reconciling Mode: There is an additional function that would be > called to see if the user would like the tuple to be used for mutation. > This mode is automatically set when OperatorContext.ACTIVATION_WINDOW_ID > != Stateless.WINDOW_ID during the first window of processing by the > operator. > > This feature is deemed to be useful when an operator is recovering from a > crash instance of the application and we do not want to perform multiple > mutations of the same tuple given ATLEAST_ONCE is the default semantics. > > 9. The operator is a stateless operator. > 10. The operator would generate the following autometrics : > a. Counts of Inserts, Upserts, Deletes and Updates (separate counters > for each mutation) for a given window > b. Bytes written in a given window > c. Write RPCs in the given window > d. Total RPC errors in this window > e. All of the above metrics for the operator for its entire lifetime > of the operator. > > > Could you please provide your thoughts if the above design looks good ? > > > > > Regards, > > Ananth > >