[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ananth updated APEXMALHAR-2278:
-------------------------------
    Fix Version/s: 3.8.0

> Implement Kudu Output Operator for non-transactional streams
> ------------------------------------------------------------
>
>                 Key: APEXMALHAR-2278
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2278
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>          Components: adapters database
>            Reporter: Ananth
>            Assignee: Ananth
>             Fix For: 3.8.0
>
>
> Here are some benefits of integrating Kudu and Apex:
>     Kudu is just declared 1.0 and has just been declared production ready.
>     Kudu as a store might a good a fit for many architectures in the years to 
> come because of its capabilities to provide mutability of data ( unlike HDFS 
> ) and optimized storage formats for low latency scans.
>     It seems to also withstand high-throughput write patterns which makes it 
> a stable sink for Apex workflows which operate at very high volumes. 
> [Design] 
> 1. The operator would be an AbstractOperator and would allow the concrete 
> implementations to set a few behavioral aspects of the operator. 
> 2. The following are the major phases of the operator:
>     During activate() phase of the operator : Establish a connection to the 
> cluster and get the metadata about the table that is being used as the sink.
>     During setup() phase of the operator: Fetch the current window 
> information and use it decide if we are recovering from a failure mode. (See 
> point 8 below )
>     During process() of Input port : Inspect the incoming ExecutionContext ( 
> see below ) tuple and perform one of the  operations ( 
> Insert/Update/Delete/Upsert) 
> 3. The following parameters are tunable while establishing a Kudu connection:
>     Table name, Boss worker threads, Worker threads, Socket read time outs 
> and External Consistency mode.
> 4. The user need not specify any schema outright. The pojo fields are 
> automatically mapped to the table column names as identified in the schema 
> parse in the activate phase. 
> 5. Allow the concrete implementation of the operator to override the Pojo 
> field name to the table schema column name. This would allow flexibility in 
> use cases like table schema column names are not compatible with java bean 
> frameworks or in situations when column names cant be controlled as POJO is 
> coming from an upstream operator.
> 6. The input tuple that is to be supplied to this operator is of type "Kudu 
> Execution Context". This tuple encompasses the actual Pojo that is going to 
> be persisted to the Kudu store. Additionally it allows the upstream operator 
> to specify the operation that needs to be performed. One of the following 
> operations is permitted as part of the context : Insert, Upsert, Update and 
> delete on the Pojo that is acting as the payload in the Execution Context.
> 7. The concrete implementation of the operator would allow the user to 
> specify the actual POJO class definition that would be used to the write to 
> the table. The execution context would contain this POJO as well as the 
> metadata that defines the behavior of the processing that needs to be done on 
> that tuple.
> 8. The operator would allow for a special case of execution mode for the 
> first window that is being processed as the operator gets activated. There 
> are two modes for the first window of processing of the operator : 
>       a. Safe Mode :  Safe mode is the "happy path execution" as in no extra 
> processing is required to perform the Kudu mutation.
>       b. Reconciling Mode:  There is an additional function that would be 
> called to see if the user would like the tuple to be used for mutation. This 
> mode is automatically set when OperatorContext.ACTIVATION_WINDOW_ID != 
> Stateless.WINDOW_ID   during the first window of processing by the operator. 
> This feature is deemed to be useful when an operator is recovering from a 
> crash instance of the application and we do not want to perform multiple 
> mutations of the same tuple given ATLEAST_ONCE is the default semantics.  
> 9. The operator is a stateless operator. 
> 10. The operator would generate the following autometrics :
>       a. Counts  of Inserts, Upserts, Deletes and Updates (separate counters 
> for each mutation) for a given window
>       b. Bytes written in a given window
>       c. Write RPCs in the given window
>       d. Total RPC errors in this window
>        e. All of the above metrics for the operator for its entire lifetime 
> of the operator. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to