[
https://issues.apache.org/jira/browse/SAMZA-390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14297979#comment-14297979
]
Yi Pan (Data Infrastructure) commented on SAMZA-390:
----------------------------------------------------
Posting the proposal from [~julianhyde]:
Julian Hyde [email protected] via samza.apache.org
1:54 PM (2 hours ago)
to dev
Let me propose an alternative approach. The deliverables and the technology
stack would be different, but I think we still fulfill the spirit of the
proposal, and there are benefits from better interoperability, standards
compliance, and building on existing code that already works.
First, I propose that we implement not a SQL-like language, but standard SQL
with streaming extensions. The extensions should be minimal, and should be
consistent with the look and feel of the language (e.g. SQL tends to use
keywords like OVER rather than punctuation marks like '['). If there is a way
to achieve something within standard SQL, we should use it. And any extensions
we make should preserve SQL's principles of being a closed language (you can
nest structures to arbitrary depth, and you can re-use queries as if they were
relations) and of having a sound semantics.
The language would allow queries involving streams, mixtures of streams and
relations, and only relations. A query that only used relations would be 100%
standard SQL, and a query that used a stream S would, with luck, be very
similar to one that used a table T with the same contents as S.
Second, I propose that we use Calcite's SQL parser, validator, and logical
algebra. We could also use its JDBC driver infrastructure.
I agree with the consensus that CQL is a good basis for extending relational
algebra to streams. The question is, can we shoehorn CQL's algebra extensions
into SQL? I believe we can, as follows:
The ISTREAM operator is represented by the STREAM keyword after SELECT (DSTREAM
and RSTREAM are much less important, so can be deferred)
Streams included in the FROM clause implicitly become relations (but they are
“streamable” relations, and the planner will very likely leverage this when
finding a viable implementation)
To use a particular window of a stream, not its entire history, follow it with
an OVER clause
Use SQL standard constructs for datetime literals (TIMESTAMP ‘2015-01-29
12:18:34.123’, DATE ‘2015-01-29’), interval literals (INTERVAL ‘2:30’ HOUR TO
MINUTE, INTERVAL ‘5’ MONTH), windowed aggregates (“AVG(price) OVER (PARTITION
BY productId RANGE BETWEEN 10 ROWS PRECEDING AND 5 ROWS FOLLOWING)”).
Some examples.
# Identity
SELECT STREAM *
FROM Orders;
# Filter and project
SELECT STREAM state, quantity
FROM Orders
WHERE state = ‘CA’;
# Windowed aggregation
SELECT STREAM product,
AVG(price) OVER this_week AS avg_price
FROM Orders
WINDOW this_week AS (PARTITION BY product ORDER BY rowtime RANGE INTERVAL ‘7’
DAY PRECEDING);
# Aggregation
# At the top of each hour, emit totals for each product
SELECT STREAM product, trunc(rowtime to hour) AS rowtime, COUNT(*) AS c
FROM Orders
GROUP BY product, trunc(rowtime to hour)
# Relational query on recent history of a stream
SELECT product, COUNT(*)
FROM Orders OVER (ORDER BY rowtime RANGE ‘1’ HOUR PRECEDING)
GROUP BY product;
or alternatively
SELECT product, COUNT(*)
FROM Orders
WHERE rowtime BETWEEN now - INTERVAL ‘1’ HOUR AND now
GROUP BY product;
# Stream-table join producing a stream
SELECT STREAM *
FROM Orders AS o
JOIN Products AS p ON o.productId = p.productId
WHERE o.price < p.list_price / 2;
# Stream-stream join producing a stream
SELECT STREAM o.rowtime, o.orderId, s.shipmentId, s.rowtime AS shipTime
FROM Orders AS o
JOIN Shipments AS s
ON o.orderId = s.orderId
AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL ‘1’ HOUR
ORDER BY o.rowtime;
# Union
SELECT STREAM rowtime, customerId FROM Orders
UNION ALL
SELECT STREAM rowtime, customerId FROM SupportCalls;
Note that, as in standard SQL, windowed aggregation emits the same number of
rows as it consumes, whereas GROUP BY collapses rows. The windowed aggregation
and GROUP BY examples both leverage the fact that “rowtime” is sorted (and
“trunc(rowtime to hour)” can be deduced to be sorted). If it were not, the
system would not allow the queries.
Calcite already has a SQL parser, validator, metadata SPI (that you can use to
declare what schemas, tables, streams, columns are available), and a logical
algebra. The logical algebra consists of TableScan, Filter, Project, Union,
Join, Aggregate, Window, Sort, Values (and a few others). Calcite allows you to
define transformation rules that combine operator calls to produce semantically
equivalent operator calls, and has an engine that applies lots of
transformation rules, optionally guided by a cost model.
I am working on a prototype of Calcite that adds streaming [
https://github.com/julianhyde/incubator-calcite/tree/chi
<https://github.com/julianhyde/incubator-calcite/tree/chi> ]. Just two new
operators are needed: Delta (converts a relation to a stream) and Chi (converts
a stream to a relation). And a few rules, such as one that maps
Delta(Filter(condition, r)) to Filter(condition, Delta(r)), are sufficient to
transform the logical algebra into something that could be implemented. My
prototype can parse
SELECT STREAM * From Orders
convert it to
LogicalDelta
LogicalProject(id=[$0], product=[$1], quantity=[$2])
EnumerableTableScan(table=[[STREAMS, ORDERS]])
and simplify to
EnumerableStreamScan(table=[[STREAMS, ORDERS]])
The next step would be to write rules to convert EnumerableStreamScan (and
several other operators) to physical algebra operators SamzaStreamScan etc.
In summary, I think this approach should be given serious consideration. An
extended standard SQL is much more useful than a SQL-like language, and I
believe I have shown that we can add the necessary extensions to SQL without
destroying it.
Building a SQL parser, validator, relational algebra, JDBC driver and planning
framework is a massive amount of work and 90% of the functionality is identical
in a streaming and non-streaming system.
Lastly, building a stack based on extended standard SQL does not preclude
adding other high-level languages on top of the algebra at a later date.
> High-Level Language for Samza
> -----------------------------
>
> Key: SAMZA-390
> URL: https://issues.apache.org/jira/browse/SAMZA-390
> Project: Samza
> Issue Type: New Feature
> Components: sql
> Reporter: Raul Castro Fernandez
> Priority: Minor
> Labels: project
> Attachments: StreamSQLforSAMZA-v0.1.docx.docx
>
>
> Discussion about high-level languages to define Samza queries. Queries are
> defined in this language and transformed to a dataflow graph where the nodes
> are Samza jobs.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)