[
https://issues.apache.org/jira/browse/FLINK-6077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-6077:
----------------------------------
Labels: stale-major (was: )
> Support In/Exists/Except/Any /Some/All for Stream SQL
> -----------------------------------------------------
>
> Key: FLINK-6077
> URL: https://issues.apache.org/jira/browse/FLINK-6077
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / API
> Reporter: radu
> Priority: Major
> Labels: stale-major
> Attachments: in.png
>
>
> Time target: Proc Time
> SQL targeted query examples:
> ----------------------------
> With inner query
> Q1. ```SELECT client FROM stream1 WHERE id
> IN
> ((SELECT id FROM stream2 GROUP BY FLOOR(proctime TO HOUR), WHERE salary> 4500
> ))```
> Comment: A concrete example for this query can be to consider selecting the
> customers where their country is the list of countries of suppliers (`Select
> customer FROM customers WHERE Country IN (Select Country FROM suppliers)` )
> Comment: This implementation depends on the implementation of the inner
> query. The structure can be the same as for inner query support with the
> difference that the LogicalJoin between main query and inner query is
> conditional.
> Comment: The inner query needs a bound as otherwise it cannot be decided
> when to trigger.
> Comment: If the value is not triggered by the grouping expression then
> the inner query must based on when that expression changes value.
> Comments: boundaries should be supported over all options: group by
> clauses; windows or time expressions (\[floor/ceil\](rowtime/proctime to
> hour),)
> With collection
> Q2. ```SELECT * FROM stream1 WHERE b
> IN
> (5000, 7000, 8000, 9000)```
> Comment: This can be checked if it is supported by the DataStreamCalc
> implementation. If not it can be transformed as a sub-JIRA task to
> extend the DataStreamCalc functionality to implement this conditional
> behavior.
> Comment: A similar functionality can be provided if the collection is a
> table rather than a set of values.
> With table
> ```SELECT client FROM stream1 WHERE id
> IN
> ((SELECT id FROM table1 where stream1.id = table1.id))```
> Comment: This can be a sub-JIRA issue, perhaps within the context of dynamic
> tables, to support the join with tables and filtering operations based on
> contents from an external table
> General comments: **Except** is similar in behavior with IN or EXISTS as
> it filters out outputs of the main stream based on data from a secondary
> stream. The implementation will follow exactly the same logic as for
> IN/Exists by filtering the outputs in the join function between the main
> stream and the secondary stream. Additionally, we apply the same
> restrictions for the secondary/inner queries.
> ```SELECT ID, NAME FROM CUSTOMERS LEFT JOIN ORDERS ON CUSTOMERS.ID =
> ORDERS.CUSTOMER\_ID
> EXCEPT
> SELECT ID, NAME FROM CUSTOMERS RIGHT JOIN ORDERS ON CUSTOMERS.ID =
> ORDERS.CUSTOMER\_ID GROUP BY FLOOR(procTime TO HOUR);```
> Description:
> ------------
> The IN and EXISTS operators are conditional clauses in WHERE clause to
> check for values in certain collections. The collections based on which
> the restriction of the values is done can either be static (values,
> tables, or parts of a stream). This JIRA issue is concerned with the
> latter case of checking for values over a stream. In order for this
> operation to work, the stream needs to be bounded such that the result
> can trigger and the collection can be formed. This points out to using
> some boundaries or groupings over the sub-query that forms the
> collection over which IN is applied. This should be supported via 3
> options as shown below. Each of these options can be a sub-JIRA issue.
> 1) Group By clauses that are applied over some monotonic order of the
> stream based on which ranges are defined.
> ` [...] GROUP BY prodId`
> 3) Window clauses to define rolling partitions of the data of the
> stream, which evolve in time.
> ` [...] WINDOW HOUR AS (RANGE INTERVAL '10' MINUTE TO SECOND(3)
> PRECEDING);`
> Functionality example
> ---------------------
> We exemplify below the functionality of the IN/Exists when working with
> streams.
> ```SELECT * FROM stream1 WHERE id IN ((SELECT id2 FROM stream2 GROUP BY
> FLOOR(PROCTIME TO HOUR) WHERE b>10 ))```
> Note: The inner query triggers only once an hour. For the next hour the
> result of the previous hour from the inner query will be the one used to
> filter the results from the main query as they come. This is consistent also
> with how the inner queries are translated (see inner queries)
> ||IngestionTime(Event)||Stream1||Stream 2||Output||
> |10:00:01| Id1,10| |nil|
> |10:02:00| |Id2,2| |
> |11:25:00| |Id3,15| |
> |12:3:00| Id2,15| |nil|
> |12:05:00| Id3,11| |Id3,11|
> |12:06:00| |Id2,30| |
> |12:07:00| |Id3,2| |
> |12:09:00| Id2.17| |nil|
> |12:10:00| Id3,20| |Id3,20|
> |...|
> Implementation option
> ---------------------
> Considering that the query only makes sense in the context of 1) window
> boundaries and 2) over sub-queries that extract collections of data, the
> main design of this is based on inner query implementation with the
> following modifications. (As a recap the Inner query is implemented with
> a special Join \[left type with always true condition\] between the main
> stream and the output of the inner query which is passed through a
> single value selection aggregation):
> 1) The condition of outputting a result by the LogicalJoin is not
> always true as before. Instead the condition is done within the
> window function by checking that the input from main stream is
> within the collection from the inner query.
> 2) The check is done specifically based on the type of function used (IN,
> ANY, SOMEā¦.). The logic of each such function would need to have a direct
> implementation.
> 3) The filter on the inner query to keep a single value is removed and
> instead a collection is passed for evaluation in the join.
> 4) The boundaries of the SQL query are to be used as the boundaries to
> define the join window in which the verification is done.
> 5) Type of the join behavior is of the INNER JOIN from condition point
> of view (value is emitted only if exists on the other side).
> !in.png!
> General logic of Join
> ---------------------
> leftDataStream.join(rightDataStream).where(new
> ConstantConditionSelector())
> .equalTo(new ConstantConditionSelector())
> .window(\[TIME/COUNT\]\[TUMBLE/SLIDE\]window.create())
> > //.trigger(new DefaultTrigger())
> >
> > //.evictor(new DefaultEvictor())
> .apply(FlatJoinFunctionWithInSelection());
--
This message was sent by Atlassian Jira
(v8.3.4#803005)