[ 
https://issues.apache.org/jira/browse/SAMZA-390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14297979#comment-14297979
 ] 

Yi Pan (Data Infrastructure) commented on SAMZA-390:
----------------------------------------------------

Posting the proposal from [~julianhyde]:
Julian Hyde [email protected] via samza.apache.org 
1:54 PM (2 hours ago)

to dev 
Let me propose an alternative approach. The deliverables and the technology 
stack would be different, but I think we still fulfill the spirit of the 
proposal, and there are benefits from better interoperability, standards 
compliance, and building on existing code that already works.

First, I propose that we implement not a SQL-like language, but standard SQL 
with streaming extensions. The extensions should be minimal, and should be 
consistent with the look and feel of the language (e.g. SQL tends to use 
keywords like OVER rather than punctuation marks like '['). If there is a way 
to achieve something within standard SQL, we should use it. And any extensions 
we make should preserve SQL's principles of being a closed language (you can 
nest structures to arbitrary depth, and you can re-use queries as if they were 
relations) and of having a sound semantics.

The language would allow queries involving streams, mixtures of streams and 
relations, and only relations. A query that only used relations would be 100% 
standard SQL, and a query that used a stream S would, with luck, be very 
similar to one that used a table T with the same contents as S.

Second, I propose that we use Calcite's SQL parser, validator, and logical 
algebra. We could also use its JDBC driver infrastructure.

I agree with the consensus that CQL is a good basis for extending relational 
algebra to streams. The question is, can we shoehorn CQL's algebra extensions 
into SQL? I believe we can, as follows:
The ISTREAM operator is represented by the STREAM keyword after SELECT (DSTREAM 
and RSTREAM are much less important, so can be deferred)
Streams included in the FROM clause implicitly become relations (but they are 
“streamable” relations, and the planner will very likely leverage this when 
finding a viable implementation)
To use a particular window of a stream, not its entire history, follow it with 
an OVER clause

Use SQL standard constructs for datetime literals (TIMESTAMP ‘2015-01-29 
12:18:34.123’, DATE ‘2015-01-29’), interval literals (INTERVAL ‘2:30’ HOUR TO 
MINUTE, INTERVAL ‘5’ MONTH), windowed aggregates (“AVG(price) OVER (PARTITION 
BY productId RANGE BETWEEN 10 ROWS PRECEDING AND 5 ROWS FOLLOWING)”).

Some examples.

# Identity
SELECT STREAM *
FROM Orders;

# Filter and project
SELECT STREAM state, quantity
FROM Orders
WHERE state = ‘CA’;

# Windowed aggregation
SELECT STREAM product,
   AVG(price) OVER this_week AS avg_price
FROM Orders
WINDOW this_week AS (PARTITION BY product ORDER BY rowtime RANGE INTERVAL ‘7’ 
DAY PRECEDING);

# Aggregation
# At the top of each hour, emit totals for each product
SELECT STREAM product, trunc(rowtime to hour) AS rowtime, COUNT(*) AS c
FROM Orders
GROUP BY product, trunc(rowtime to hour)

# Relational query on recent history of a stream
SELECT product, COUNT(*)
FROM Orders OVER (ORDER BY rowtime RANGE ‘1’ HOUR PRECEDING)
GROUP BY product;

or alternatively

SELECT product, COUNT(*)
FROM Orders
WHERE rowtime BETWEEN now - INTERVAL ‘1’ HOUR AND now
GROUP BY product;

# Stream-table join producing a stream
SELECT STREAM *
FROM Orders AS o
  JOIN Products AS p ON o.productId = p.productId
WHERE o.price < p.list_price / 2;

# Stream-stream join producing a stream
SELECT STREAM o.rowtime, o.orderId, s.shipmentId, s.rowtime AS shipTime
FROM Orders AS o
  JOIN Shipments AS s
  ON o.orderId = s.orderId
   AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL ‘1’ HOUR
ORDER BY o.rowtime;

# Union
SELECT STREAM rowtime, customerId FROM Orders
UNION ALL
SELECT STREAM rowtime, customerId FROM SupportCalls;

Note that, as in standard SQL, windowed aggregation emits the same number of 
rows as it consumes, whereas GROUP BY collapses rows. The windowed aggregation 
and GROUP BY examples both leverage the fact that “rowtime” is sorted (and 
“trunc(rowtime to hour)” can be deduced to be sorted). If it were not, the 
system would not allow the queries.

Calcite already has a SQL parser, validator, metadata SPI (that you can use to 
declare what schemas, tables, streams, columns are available), and a logical 
algebra. The logical algebra consists of TableScan, Filter, Project, Union, 
Join, Aggregate, Window, Sort, Values (and a few others). Calcite allows you to 
define transformation rules that combine operator calls to produce semantically 
equivalent operator calls, and has an engine that applies lots of 
transformation rules, optionally guided by a cost model.

I am working on a prototype of Calcite that adds streaming [ 
https://github.com/julianhyde/incubator-calcite/tree/chi 
<https://github.com/julianhyde/incubator-calcite/tree/chi> ]. Just two new 
operators are needed: Delta (converts a relation to a stream) and Chi (converts 
a stream to a relation). And a few rules, such as one that maps 
Delta(Filter(condition, r)) to Filter(condition, Delta(r)), are sufficient to 
transform the logical algebra into something that could be implemented. My 
prototype can parse

SELECT STREAM * From Orders

convert it to

LogicalDelta
  LogicalProject(id=[$0], product=[$1], quantity=[$2])
    EnumerableTableScan(table=[[STREAMS, ORDERS]])

and simplify to

EnumerableStreamScan(table=[[STREAMS, ORDERS]])

The next step would be to write rules to convert EnumerableStreamScan (and 
several other operators) to physical algebra operators SamzaStreamScan etc.


In summary, I think this approach should be given serious consideration. An 
extended standard SQL is much more useful than a SQL-like language, and I 
believe I have shown that we can add the necessary extensions to SQL without 
destroying it.

Building a SQL parser, validator, relational algebra, JDBC driver and planning 
framework is a massive amount of work and 90% of the functionality is identical 
in a streaming and non-streaming system.

Lastly, building a stack based on extended standard SQL does not preclude 
adding other high-level languages on top of the algebra at a later date.

> High-Level Language for Samza
> -----------------------------
>
>                 Key: SAMZA-390
>                 URL: https://issues.apache.org/jira/browse/SAMZA-390
>             Project: Samza
>          Issue Type: New Feature
>          Components: sql
>            Reporter: Raul Castro Fernandez
>            Priority: Minor
>              Labels: project
>         Attachments: StreamSQLforSAMZA-v0.1.docx.docx
>
>
> Discussion about high-level languages to define Samza queries. Queries are 
> defined in this language and transformed to a dataflow graph where the nodes 
> are Samza jobs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to