radu created FLINK-6077:
---------------------------
Summary: Support In/Exists/Except/Any /Some/All for Stream SQL
Key: FLINK-6077
URL: https://issues.apache.org/jira/browse/FLINK-6077
Project: Flink
Issue Type: Bug
Components: Table API & SQL
Reporter: radu
Time target: Proc Time
SQL targeted query examples:
----------------------------
With inner query
Q1. ```SELECT client FROM stream1 WHERE id
IN
((SELECT id FROM stream2 GROUP BY FLOOR(proctime TO HOUR), WHERE salary> 4500
))```
Comment: A concrete example for this query can be to consider selecting the
customers where their country is the list of countries of suppliers (`Select
customer FROM customers WHERE Country IN (Select Country FROM suppliers)` )
Comment: This implementation depends on the implementation of the inner
query. The structure can be the same as for inner query support with the
difference that the LogicalJoin between main query and inner query is
conditional.
Comment: The inner query needs a bound as otherwise it cannot be decided
when to trigger.
Comment: If the value is not triggered by the grouping expression then
the inner query must based on when that expression changes value.
Comments: boundaries should be supported over all options: group by
clauses; windows or time expressions (\[floor/ceil\](rowtime/proctime to
hour),)
With collection
Q2. ```SELECT * FROM stream1 WHERE b
IN
(5000, 7000, 8000, 9000)```
Comment: This can be checked if it is supported by the DataStreamCalc
implementation. If not it can be transformed as a sub-JIRA task to
extend the DataStreamCalc functionality to implement this conditional
behavior.
Comment: A similar functionality can be provided if the collection is a
table rather than a set of values.
With table
```SELECT client FROM stream1 WHERE id
IN
((SELECT id FROM table1 where stream1.id = table1.id))```
Comment: This can be a sub-JIRA issue, perhaps within the context of dynamic
tables, to support the join with tables and filtering operations based on
contents from an external table
General comments: **Except** is similar in behavior with IN or EXISTS as
it filters out outputs of the main stream based on data from a secondary
stream. The implementation will follow exactly the same logic as for
IN/Exists by filtering the outputs in the join function between the main
stream and the secondary stream. Additionally, we apply the same
restrictions for the secondary/inner queries.
```SELECT ID, NAME FROM CUSTOMERS LEFT JOIN ORDERS ON CUSTOMERS.ID =
ORDERS.CUSTOMER\_ID
EXCEPT
SELECT ID, NAME FROM CUSTOMERS RIGHT JOIN ORDERS ON CUSTOMERS.ID =
ORDERS.CUSTOMER\_ID GROUP BY FLOOR(procTime TO HOUR);```
Description:
------------
The IN and EXISTS operators are conditional clauses in WHERE clause to
check for values in certain collections. The collections based on which
the restriction of the values is done can either be static (values,
tables, or parts of a stream). This JIRA issue is concerned with the
latter case of checking for values over a stream. In order for this
operation to work, the stream needs to be bounded such that the result
can trigger and the collection can be formed. This points out to using
some boundaries or groupings over the sub-query that forms the
collection over which IN is applied. This should be supported via 3
options as shown below. Each of these options can be a sub-JIRA issue.
1) Group By clauses that are applied over some monotonic order of the
stream based on which ranges are defined.
` [...] GROUP BY prodId`
3) Window clauses to define rolling partitions of the data of the
stream, which evolve in time.
` [...] WINDOW HOUR AS (RANGE INTERVAL '10' MINUTE TO SECOND(3)
PRECEDING);`
Functionality example
---------------------
We exemplify below the functionality of the IN/Exists when working with
streams.
```SELECT * FROM stream1 WHERE id IN ((SELECT id2 FROM stream2 GROUP BY
FLOOR(PROCTIME TO HOUR) WHERE b>10 ))```
Note: The inner query triggers only once an hour. For the next hour the result
of the previous hour from the inner query will be the one used to filter the
results from the main query as they come. This is consistent also with how the
inner queries are translated (see inner queries)
||IngestionTime(Event)||Stream1||Stream 2||Output||
|10:00:01| Id1,10| |nil|
|10:02:00| |Id2,2| |
|11:25:00| |Id3,15| |
|12:3:00| Id2,15| |nil|
|12:05:00| Id3,11| |Id3,11|
|12:06:00| |Id2,30| |
|12:07:00| |Id3,2| |
|12:09:00| Id2.17| |nil|
|12:10:00| Id3,20| |Id3,20|
|...|
Implementation option
---------------------
Considering that the query only makes sense in the context of 1) window
boundaries and 2) over sub-queries that extract collections of data, the
main design of this is based on inner query implementation with the
following modifications. (As a recap the Inner query is implemented with
a special Join \[left type with always true condition\] between the main
stream and the output of the inner query which is passed through a
single value selection aggregation):
1) The condition of outputting a result by the LogicalJoin is not
always true as before. Instead the condition is done within the
window function by checking that the input from main stream is
within the collection from the inner query.
2) The check is done specifically based on the type of function used (IN,
ANY, SOMEā¦.). The logic of each such function would need to have a direct
implementation.
3) The filter on the inner query to keep a single value is removed and
instead a collection is passed for evaluation in the join.
4) The boundaries of the SQL query are to be used as the boundaries to
define the join window in which the verification is done.
5) Type of the join behavior is of the INNER JOIN from condition point
of view (value is emitted only if exists on the other side).
[See attached document for schema]
General logic of Join
---------------------
leftDataStream.join(rightDataStream).where(new
ConstantConditionSelector())
.equalTo(new ConstantConditionSelector())
.window(\[TIME/COUNT\]\[TUMBLE/SLIDE\]window.create())
> //.trigger(new DefaultTrigger())
>
> //.evictor(new DefaultEvictor())
.apply(FlatJoinFunctionWithInSelection());
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)