Hi All, I wanted to give a quick update on Apex-Calcite integration work.
Currently I'm able to run SQL statement as a DAG against registered table abstractions of data endpoint and message type. Here is the SQL support that is currently implemented: 1. Data Endpoint (Source/Destination): - File - Kafka 2. Message Types from Data endpoint (source/destination): - CSV 3. SQL Functionality Support: - SELECT (Projection) - Select from Source - INSERT - Insert into Destination - WHERE (Filter) - Scalar functions which are provided in Calcite core - Custom sclar function can be defined as provided to SQL. 4. Table can be defined as abstraction of Data Endpoint (source/dest) and message type Currently Calcite integration with Apex is exposed as a small boiler plate code in populateDAG as follows: SQLExecEnvironment.getEnvironment(dag) .registerTable("ORDERS", new KafkaEndpoint(broker, sourceTopic, new CSVMessageFormat(schemaIn))) .registerTable("SALES", new KafkaEndpoint(broker, destTopic, new CSVMessageFormat(schemaOut))) .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str") .executeSQL("INSERT INTO SALES " + "SELECT STREAM ROWTIME, " + "FLOOR(ROWTIME TO DAY), " + "APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM ORDERS WHERE ID > 3 " + "AND " + "PRODUCT LIKE 'paint%'"); Following is a video recording of the demo of apex-capcite integration: https://drive.google.com/open?id=0B_Tb-ZDtsUHeUVM5NWRYSFg0Z3c Currently I'm working on addition of inner join functionality. Once the inner join functionality is implemented, I think the code is good to create a Review Only PR for first cut of calcite integration. Please share your opinion on above. Thanks, Chinmay. On Fri, Aug 12, 2016 at 9:55 PM, Chinmay Kolhatkar <chin...@datatorrent.com> wrote: > Hi All, > > I wanted to give update on Apex-Calcite Integration work being done for > visibility and feedback from the community. > > In the first phase, target is to use Calcite core library for SQL parsing > and transformation of relation algebra to apex specific component > (operators). > Once this is achieved one would be able to define input and outputs using > Calcite model file and define the processing from input to output using SQL > statement. > > The status for above work as of now is as follows: > 1. I'm able to traverse relational algebra for simple select statement. > 2. DAG is getting generated for simple statement SELECT STREAM * FROM > TABLE. > 3. DAG is getting validated. > 4. Operators are being set with properties, streams and schema is also > being set using TUPLE_CLASS attr. For schema the class is generated on the > fly and put in classpath using LIBRARY_JAR attr. > 5. Able to run generated DAG in local mode. > 6. The code is currently being developed at (WIP): > Currently for each of development and code being farely large, I've added > a new module malhar-sql in malhar in my fork. But I'm open to other > suggestions here. > https://github.com/chinmaykolhatkar/apex-malhar/tree/calcite/sql > > Next step: > 1. Run the generate DAG in distributed mode. > 2. Expand the source and destination definition (calcite model file) to > include Kafka as source schema and destination. > 3. Expand the scope to include filter operator (WHERE clause, HAVING too > if possible) and inner join when it gets merged. > 4. Write extensive unit tests for above. > > I'll send an update on this thread at every logical step of achieving > something. > > I request the community to provide the feedback on above approach/targets > and if possible take a look at the code in above link. > > Thanks, > Chinmay. > >