Hi All,

I wanted to give a quick update on Apex-Calcite integration work.

Currently I'm able to run SQL statement as a DAG against registered table
abstractions of data endpoint and message type.

Here is the SQL support that is currently implemented:
1. Data Endpoint (Source/Destination):
   - File
   - Kafka
2. Message Types from Data endpoint (source/destination):
   - CSV
3. SQL Functionality Support:
   - SELECT (Projection) - Select from Source
   - INSERT - Insert into Destination
   - WHERE (Filter)
   - Scalar functions which are provided in Calcite core
   - Custom sclar function can be defined as provided to SQL.
4. Table can be defined as abstraction of Data Endpoint (source/dest) and
message type

Currently Calcite integration with Apex is exposed as a small boiler plate
code in populateDAG as follows:

SQLExecEnvironment.getEnvironment(dag)
          .registerTable("ORDERS", new KafkaEndpoint(broker, sourceTopic,
new CSVMessageFormat(schemaIn)))
          .registerTable("SALES", new KafkaEndpoint(broker, destTopic, new
CSVMessageFormat(schemaOut)))
          .registerFunction("APEXCONCAT", FileEndpointTest.class,
"apex_concat_str")
          .executeSQL("INSERT INTO SALES " + "SELECT STREAM ROWTIME, " +
"FLOOR(ROWTIME TO DAY), " +
              "APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM
ORDERS WHERE ID > 3 " + "AND " +
              "PRODUCT LIKE 'paint%'");

Following is a video recording of the demo of apex-capcite integration:
https://drive.google.com/open?id=0B_Tb-ZDtsUHeUVM5NWRYSFg0Z3c

Currently I'm working on addition of inner join functionality.
Once the inner join functionality is implemented, I think the code is good
to create a Review Only PR for first cut of calcite integration.

Please share your opinion on above.

Thanks,
Chinmay.


On Fri, Aug 12, 2016 at 9:55 PM, Chinmay Kolhatkar <chin...@datatorrent.com>
wrote:

> Hi All,
>
> I wanted to give update on Apex-Calcite Integration work being done for
> visibility and feedback from the community.
>
> In the first phase, target is to use Calcite core library for SQL parsing
> and transformation of relation algebra to apex specific component
> (operators).
> Once this is achieved one would be able to define input and outputs using
> Calcite model file and define the processing from input to output using SQL
> statement.
>
> The status for above work as of now is as follows:
> 1. I'm able to traverse relational algebra for simple select statement.
> 2. DAG is getting generated for simple statement SELECT STREAM * FROM
> TABLE.
> 3. DAG is getting validated.
> 4. Operators are being set with properties, streams and schema is also
> being set using TUPLE_CLASS attr. For schema the class is generated on the
> fly and put in classpath using LIBRARY_JAR attr.
> 5. Able to run generated DAG in local mode.
> 6. The code is currently being developed at (WIP):
> Currently for each of development and code being farely large, I've added
> a new module malhar-sql in malhar in my fork. But I'm open to other
> suggestions here.
> https://github.com/chinmaykolhatkar/apex-malhar/tree/calcite/sql
>
> Next step:
> 1. Run the generate DAG in distributed mode.
> 2. Expand the source and destination definition (calcite model file) to
> include Kafka as source schema and destination.
> 3. Expand the scope to include filter operator (WHERE clause, HAVING too
> if possible) and inner join when it gets merged.
> 4. Write extensive unit tests for above.
>
> I'll send an update on this thread at every logical step of achieving
> something.
>
> I request the community to provide the feedback on above approach/targets
> and if possible take a look at the code in above link.
>
> Thanks,
> Chinmay.
>
>

Reply via email to