I wanted to give a quick update on Apex-Calcite integration work.
Currently I'm able to run SQL statement as a DAG against registered table
abstractions of data endpoint and message type.
Here is the SQL support that is currently implemented:
1. Data Endpoint (Source/Destination):
2. Message Types from Data endpoint (source/destination):
3. SQL Functionality Support:
- SELECT (Projection) - Select from Source
- INSERT - Insert into Destination
- WHERE (Filter)
- Scalar functions which are provided in Calcite core
- Custom sclar function can be defined as provided to SQL.
4. Table can be defined as abstraction of Data Endpoint (source/dest) and
Currently Calcite integration with Apex is exposed as a small boiler plate
code in populateDAG as follows:
.registerTable("ORDERS", new KafkaEndpoint(broker, sourceTopic,
.registerTable("SALES", new KafkaEndpoint(broker, destTopic, new
.executeSQL("INSERT INTO SALES " + "SELECT STREAM ROWTIME, " +
"FLOOR(ROWTIME TO DAY), " +
"APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM
ORDERS WHERE ID > 3 " + "AND " +
"PRODUCT LIKE 'paint%'");
Following is a video recording of the demo of apex-capcite integration:
Currently I'm working on addition of inner join functionality.
Once the inner join functionality is implemented, I think the code is good
to create a Review Only PR for first cut of calcite integration.
Please share your opinion on above.
On Fri, Aug 12, 2016 at 9:55 PM, Chinmay Kolhatkar <chin...@datatorrent.com>
> Hi All,
> I wanted to give update on Apex-Calcite Integration work being done for
> visibility and feedback from the community.
> In the first phase, target is to use Calcite core library for SQL parsing
> and transformation of relation algebra to apex specific component
> Once this is achieved one would be able to define input and outputs using
> Calcite model file and define the processing from input to output using SQL
> The status for above work as of now is as follows:
> 1. I'm able to traverse relational algebra for simple select statement.
> 2. DAG is getting generated for simple statement SELECT STREAM * FROM
> 3. DAG is getting validated.
> 4. Operators are being set with properties, streams and schema is also
> being set using TUPLE_CLASS attr. For schema the class is generated on the
> fly and put in classpath using LIBRARY_JAR attr.
> 5. Able to run generated DAG in local mode.
> 6. The code is currently being developed at (WIP):
> Currently for each of development and code being farely large, I've added
> a new module malhar-sql in malhar in my fork. But I'm open to other
> suggestions here.
> Next step:
> 1. Run the generate DAG in distributed mode.
> 2. Expand the source and destination definition (calcite model file) to
> include Kafka as source schema and destination.
> 3. Expand the scope to include filter operator (WHERE clause, HAVING too
> if possible) and inner join when it gets merged.
> 4. Write extensive unit tests for above.
> I'll send an update on this thread at every logical step of achieving
> I request the community to provide the feedback on above approach/targets
> and if possible take a look at the code in above link.