Hello All,

The implementation for Apex Kudu Input Operator is ready for a pull request. 
Before raising the pull request, I would like to get any inputs regarding the 
design and incorporate any feedback before raising the pull request in the next 
couple of days for the following JIRA.

https://issues.apache.org/jira/browse/APEXMALHAR-2472 
<https://issues.apache.org/jira/browse/APEXMALHAR-2472>

The following are the main features that would be supported by the Input 
operator:

- The input operator would be used to scan all or some rows of a single kudu 
table.
- Each Kudu row is translated to a POJO for downstream operators. 
- The Input operator would accept an SQL expression ( described in detail 
below) that would be parsed to generate the equivalent scanner code for the 
Kudu Table. This is because Kudu Table API does not support an SQL expressions 
- The SQL expression would have additional options that would help in Apache 
Apex design patterns ( Ex: Sending a control tuple message after a query is 
successfully processed )
- The Input operator works on a continuous basis i.e. it would accept the next 
query once the current query is complete)
- The operator will work in a distributed fashion for the input query. This 
essentially means for a single input query, the scan work is distributed among 
all of the physical instances of the input operator.
- Kudu splits a table into chunks of data regions called Tablets. The tablets 
are replicated and partitioned  (range and hash partitions are supported ) in 
Kudu according to the Kudu Table definition. The operator allows partitioning 
of the Input Operator to be done in 2 ways. 
        - Map many Kudu Tablets to one partition of the Apex Kudu Input operator
        - One Kudu Tablet maps to one partition of the Apex Kudu Input operator
- The partitioning does not change on a per query basis. This is because of the 
complex use cases that would arise. For example, if the query is touching only 
a few rows before the next query is accepted, it would result in a lot of churn 
in terms of operator serialize/deserialze, YARN allocation requests etc. Also 
supporting per query partition planning leads to possibly very complex 
implementation and poor resource usage as all physical instances of the 
operator have to wait for its peers to complete its scan and wait for next 
checkpoint to get repartitioned.
- The partitioner splits the work load of a single query in a round robin 
fashion. After a query plan is generated , each scan token range is distributed 
equally among the physical operator instances.
- The operator allows for two modes of scanning for an application ( Cannot be 
changed on a per query basis ) 
        - Consistent Order scanner - only one tablet scan thread is active at 
any given instance of time for a given query
        - Random Order scanner - Many threads are active to scan Kudu tablets 
in parallel
- As can be seen, Consistent order scanner would be slower but would help in 
better “exactly once” implementations if the correct method is overridden in 
the operator.
- The operator introduces the DisruptorBlockingQueue for a low latency buffer 
management. LMAX disruptor library was considered and based on some other 
discussion threads on other Apache projects, settled on the ConversantMedia 
implementation of the Disruptor Blocking queue. This blocking queue is used 
when the kudu scanner thread wants to send the scanned row into the input 
operators main thread emitTuples() call.
- The operator allows for exactly once semantics if the user specifies the 
logic for reconciling a possible duplicate row in situations when the operator 
is resuming from a checkpoint. This is done by overriding a method that returns 
a boolean ( true to emit the tuple and false to suppress the tuple ) when the 
operating is working in the reconciling window phase. As can be seen, this 
reconciling phase is only active at the max for one window.
- The operator uses the FSWindowManager to manage metadata at the end of every 
window. From resumption at a checkpoint, the operator will still scan the Kudu 
tablets but simply not emit all rows that were already streamed downstream. 
Subsequently when the operator is in the reconciling window, the method 
described above is invoked to allow for duplicates filter. After this 
reconciling window, the operator works in the normal mode of operation.
- The following are the additional configurable aspects of the operator
        - Max tuples per window
        - Spin policy and the buffer size for the Disruptor Blocking Queue
        - Mechanism to provide custom control tuples if required
        - Setting the number of Physical operator instances via the API if 
required. 
        - Setting the fault Tolerance. If fault tolerant , an alternative 
replica of the Kudu tablet is picked up for scanning if the initial tablet 
fails for whatever reason. However this slows down the scan throughput. Hence 
it is configurable by the end user.


Some notes regarding the SQL expression for the operator:

- The operator uses ANTLR4 to parse the SQL expression.
- The parser is based on a grammar file which is part of the source tree. The 
grammar is compiled on every build as part of the build process and code is 
generated for the parser automatically.
- The reason we had to use a custom parser are (as opposed to something like 
calcite) :
        - Kudu does not have all the features for a standard SQL expression. As 
an example != ( not equal to ) is not supported. Nor is there a concept of a 
Join etc.
        - We are providing a lot more flexibility for the user to specify what 
the control tuple message should be should the end user choose to send a 
control tuple downstream after the given query is done processing
- The SQL expression can specify a set of options for processing of the query:
        - Control tuple message : A message/string that can be sent as the 
Control tuple field. There would be other parts for this control tuple like the 
query that was just completed and whether this is a begin or end of the scan.
        - Read Snapshot time : Kudu supports specifying the read snapshot time 
for which the scan has to occur. This is because Kudu is essentially an MVCC 
engine and stores multiple versions of the same row. The Read snapshot time 
allows for the end user to specify the read snapshot time for the scan. 
- The parser supports for general syntax checking. If there is an error in the 
SQL expression , the string representing the SQL expression supplied is emitted 
onto an error port and the next query is taken for processing.
- The data types supported are only those data types as supported by the Kudu 
Engine. The parser supports data type parsing support. For example String data 
types are double quoted etc. 
- The Parser allows for a SELECT AA as BB style of expressions wherein AA is 
the column name in Kudu and BB is the name of the java POJO field name.

Please let me know if the community has any other questions regarding the above 
design. I am planning to present this operator along with the Kudu output 
operator in the Data works summit next month and any feedback would be useful.


Regards,
Ananth 

Reply via email to