Hi Shaoxuan,
Thanks for the feedback!
Regarding the proposal for relational queries that you referenced, I am a bit
confused with respect to its purpose and evolution with respect to the current
implementation of stream sql - is it suppose to replace this implementation, to
complement it....but I will send another email about this as I guess this can
be a standalone discussion tread
Also, regarding the join stream-to-stream I intend to start another discussion
about this such that we can decide all together if we can start some
implementation/design now or we need to wait.
Now, regarding the inner queries and the points you raised. It is true that in
general an inner join would work like any other join (which obviously requires
some buffering capabilities and mechanisms to restrict the infinite growth for
the join state composition). However, at least for some cases of supporting
inner queries we can support them without the need for buffering mechanisms or
full support for inner join / left join. Basically the logical operator in
which an inner query is translated (left join with an always true condition is
to some extend more similar to UNION ,- and the union implementation, then the
implementation we will have for the joins). This is why I believe we can
already provide the support for this (I also tested a PoC implementation
internally for this and it works). In terms of examples when we could use this,
please see the next 2 examples. Please let me know what do you think and
whether it is worth designing the jira issue perhaps with some more details
(including the technical details).
Consider the example below:
SELECT STREAM user
FROM inputstream
WHERE amount > (SELECT STREAM Min(amount2) FROM inputstream2)
The point of this is to restrict the values you are selecting based on some
value that you have from the other stream. Consider the values below that come
in each stream
Inputstream inputstream2 Result
User1,100 user1 (because there is no
value in inputstream2 and the left join should not restrict the output in this
case)
X,x,10 nothing as there is no event in
inputstream to be outputted. Min will become from now 10
User2, 20 user2 (because 20 is greater
than 10 which is the minimum retain in inputstream2)
X,x,20 nothing as there is no event in
inputstream to be outputted. Min will remain from now 10
X,x, 5 nothing as there is no event in
inputstream to be outputted. Min will become from now 5
User3, 8 User3 (because 8 is greater
than 5)
....
The goal for the final usage of this is to be able among others to define
multiple window processing on the same input stream. Consider:
SELECT STREAM user
FROM inputstream
WHERE (SELECT STREAM AVERAGE(amount) OVER (ORDER BY timestamp RANGE
INTERVAL 1 HOUR PRECEDING) AS hour_sum FROM inputstream) < amount
Assume you have the following events each coming every 30 minutes
User1, 100 -> Average is 100 and the output of the topology that implements
the query is NULL (no output as 100 is not > than 100)
User2, 10 -> Average is 55 and the output of the topology that implements
the query is NULL (no output as 10 is not > than 55)
User3, 40 -> Average is 25 (10+40) and the output of the topology that
implements the query is User3 (40 is > than 25)
....
Although the query as it is depends on aggregates and windows, the operator to
implement the inner query can be implemented independently of functions that
are contained in the query. Also, there is no need for a window or buffering to
implement the logic for assembling the results from the inner query.
Best regards,
-----Original Message-----
From: Shaoxuan Wang [mailto:[email protected]]
Sent: Thursday, January 26, 2017 4:36 AM
To: [email protected]
Subject: Re: STREAM SQL inner queries
Hi Radu,
Similar as the stream-stream join, this stream-stream inner query does not seem
to be well defined. It needs provide at least some kind of window bounds to
complete the streaming SQL semantics. If this is an unbounded join/select, a
mechanism of how to store the infinite date has to be considered. I may not
fully understand your proposal. Could you please provide more details about
this inner query, say giving some examples of input and output. It would be
also great if you can explain the use case of this inner query. This helps us
to understand the semantics.
It should also be noted that, we have recently decided to unify stream and
batch query with the same regular (batch) SQL. Therefore we have removed the
support for STREAM keyword in flink Streaming SQL. In the past several months,
Fabian and Xiaowei Jiang have started to work on the future Relational Queries
on flink streaming. Fabian has drafted a very good design doc,
https://goo.gl/m31kkE. The design is based on a new concept of dynamic table
whose content changes over time, thereby can be derived from streams. With this
dynamic table, stream query can be done via regular
(batch) SQL. Besides some syntax sugar, there is not too much difference
between batch query and stream query (in terms of what and where of a query is
executed). Stream query has addition characters in the manners of when to emit
a result and how to refine the result considering the retraction.
Hope this helps and look forward to working with you on streaming SQL.
Regards,
Shaoxuan
On Wed, Jan 25, 2017 at 9:49 PM, Radu Tudoran <[email protected]>
wrote:
> Hi all,
>
> I would like to open a jira issue (and then provide the
> implementation) for supporting inner queries. The idea is to be able
> to support SQL queries as the ones presented in the scenarios below.
> The key idea is that supporting inner queries would require to have the
> implementation for:
>
> è JOIN (type = left and condition = true) - Basically this is a simple
> implementation for a join function between 2 streams that does not
> require any window support behind the scenes as there is no condition
> on which to perform the join
>
> è SINGLE_VALUE - this operator would require to provide one value to
> be furthered joined. In the context of streaming this value should
> basically evolve with the contents of the window. This could be
> implemented with a flatmap function as left joins would allow also to
> do the mapping with null values
>
> We can then extend this initial and simple implementation to provide
> support for joins in general (conditional joins, right joins..) or we
> can isolate this implementation for this specific case of inner
> queries and go with a totally new design for stream to stream joins
> (might be needed depending on what is the decision behind on how to
> support the conditional
> mapping)
>
> What do you think about this?
>
> Examples of scenarios to apply
>
> SELECT STREAM amount,
> (SELECT id FROM inputstream1) AS field1 FROM inputstream2
>
> Translated to
> LogicalProject(amount=[$1], c=[$4])
> LogicalJoin(condition=[true], joinType=[left])
> LogicalTableScan(table=[[inputstream1]])
> LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
> LogicalProject(user_id=[$0])
> LogicalTableScan(table=[[inputstream2]])
>
> Or from the same stream - perhaps interesting for applying some more
> complex operations within the inner query SELECT STREAM amount,
> (SELECT id FROM inputstream1) AS field1 FROM inputstream1
>
> Translated to
> LogicalProject(amount=[$1], c=[$4])
> LogicalJoin(condition=[true], joinType=[left])
> LogicalTableScan(table=[[inputstream1]])
> LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
> LogicalProject(user_id=[$0])
> LogicalTableScan(table=[[inputstream1]])
>
> Or used to do the projection
> SELECT STREAM amount, c FROM (SELECT *,id AS c FROM inputstream1)
>
> Translated to
> LogicalProject(amount=[$1], c=[$5])
> LogicalProject(time=[$0], amount =[$1], date=[$2], id =[$4], c=[$5])
> LogicalTableScan(table=[[inputstream1]])
>
>
> Or in the future even
> SELECT STREAM amount, myagg FROM (SELECT STREAM *, SUM(amount) OVER
> window AS myagg FROM inputstream1)) ...
>
>
>
>