Hello All, The implementation for Apex Kudu Input Operator is ready for a pull request. Before raising the pull request, I would like to get any inputs regarding the design and incorporate any feedback before raising the pull request in the next couple of days for the following JIRA.
https://issues.apache.org/jira/browse/APEXMALHAR-2472 <https://issues.apache.org/jira/browse/APEXMALHAR-2472> The following are the main features that would be supported by the Input operator: - The input operator would be used to scan all or some rows of a single kudu table. - Each Kudu row is translated to a POJO for downstream operators. - The Input operator would accept an SQL expression ( described in detail below) that would be parsed to generate the equivalent scanner code for the Kudu Table. This is because Kudu Table API does not support an SQL expressions - The SQL expression would have additional options that would help in Apache Apex design patterns ( Ex: Sending a control tuple message after a query is successfully processed ) - The Input operator works on a continuous basis i.e. it would accept the next query once the current query is complete) - The operator will work in a distributed fashion for the input query. This essentially means for a single input query, the scan work is distributed among all of the physical instances of the input operator. - Kudu splits a table into chunks of data regions called Tablets. The tablets are replicated and partitioned (range and hash partitions are supported ) in Kudu according to the Kudu Table definition. The operator allows partitioning of the Input Operator to be done in 2 ways. - Map many Kudu Tablets to one partition of the Apex Kudu Input operator - One Kudu Tablet maps to one partition of the Apex Kudu Input operator - The partitioning does not change on a per query basis. This is because of the complex use cases that would arise. For example, if the query is touching only a few rows before the next query is accepted, it would result in a lot of churn in terms of operator serialize/deserialze, YARN allocation requests etc. Also supporting per query partition planning leads to possibly very complex implementation and poor resource usage as all physical instances of the operator have to wait for its peers to complete its scan and wait for next checkpoint to get repartitioned. - The partitioner splits the work load of a single query in a round robin fashion. After a query plan is generated , each scan token range is distributed equally among the physical operator instances. - The operator allows for two modes of scanning for an application ( Cannot be changed on a per query basis ) - Consistent Order scanner - only one tablet scan thread is active at any given instance of time for a given query - Random Order scanner - Many threads are active to scan Kudu tablets in parallel - As can be seen, Consistent order scanner would be slower but would help in better “exactly once” implementations if the correct method is overridden in the operator. - The operator introduces the DisruptorBlockingQueue for a low latency buffer management. LMAX disruptor library was considered and based on some other discussion threads on other Apache projects, settled on the ConversantMedia implementation of the Disruptor Blocking queue. This blocking queue is used when the kudu scanner thread wants to send the scanned row into the input operators main thread emitTuples() call. - The operator allows for exactly once semantics if the user specifies the logic for reconciling a possible duplicate row in situations when the operator is resuming from a checkpoint. This is done by overriding a method that returns a boolean ( true to emit the tuple and false to suppress the tuple ) when the operating is working in the reconciling window phase. As can be seen, this reconciling phase is only active at the max for one window. - The operator uses the FSWindowManager to manage metadata at the end of every window. From resumption at a checkpoint, the operator will still scan the Kudu tablets but simply not emit all rows that were already streamed downstream. Subsequently when the operator is in the reconciling window, the method described above is invoked to allow for duplicates filter. After this reconciling window, the operator works in the normal mode of operation. - The following are the additional configurable aspects of the operator - Max tuples per window - Spin policy and the buffer size for the Disruptor Blocking Queue - Mechanism to provide custom control tuples if required - Setting the number of Physical operator instances via the API if required. - Setting the fault Tolerance. If fault tolerant , an alternative replica of the Kudu tablet is picked up for scanning if the initial tablet fails for whatever reason. However this slows down the scan throughput. Hence it is configurable by the end user. Some notes regarding the SQL expression for the operator: - The operator uses ANTLR4 to parse the SQL expression. - The parser is based on a grammar file which is part of the source tree. The grammar is compiled on every build as part of the build process and code is generated for the parser automatically. - The reason we had to use a custom parser are (as opposed to something like calcite) : - Kudu does not have all the features for a standard SQL expression. As an example != ( not equal to ) is not supported. Nor is there a concept of a Join etc. - We are providing a lot more flexibility for the user to specify what the control tuple message should be should the end user choose to send a control tuple downstream after the given query is done processing - The SQL expression can specify a set of options for processing of the query: - Control tuple message : A message/string that can be sent as the Control tuple field. There would be other parts for this control tuple like the query that was just completed and whether this is a begin or end of the scan. - Read Snapshot time : Kudu supports specifying the read snapshot time for which the scan has to occur. This is because Kudu is essentially an MVCC engine and stores multiple versions of the same row. The Read snapshot time allows for the end user to specify the read snapshot time for the scan. - The parser supports for general syntax checking. If there is an error in the SQL expression , the string representing the SQL expression supplied is emitted onto an error port and the next query is taken for processing. - The data types supported are only those data types as supported by the Kudu Engine. The parser supports data type parsing support. For example String data types are double quoted etc. - The Parser allows for a SELECT AA as BB style of expressions wherein AA is the column name in Kudu and BB is the name of the java POJO field name. Please let me know if the community has any other questions regarding the above design. I am planning to present this operator along with the Kudu output operator in the Data works summit next month and any feedback would be useful. Regards, Ananth