[ 
https://issues.apache.org/jira/browse/FLINK-6082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945036#comment-15945036
 ] 

Fabian Hueske commented on FLINK-6082:
--------------------------------------

I think time-based local predicates (i.e., checking against a literal or 
function) cannot be used to define windows. Instead they select a subset of the 
stream (usually the tail).

In order to support windowed joins, we should start by specifying two streams 
and an expected result.
Based on these tables (input and output), we should specify a batch query that 
produces the output table given the input tables.
The resulting query will be based on time-based join predicates (both sides of 
the condition refer to the time attribute of one of both input stream).

> Support window definition for SQL Queries based on WHERE clause with time 
> condition
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-6082
>                 URL: https://issues.apache.org/jira/browse/FLINK-6082
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: radu
>
> Time target: Proc Time
> Calcite documentation refers to query examples where the (time)
> boundaries are defined as condition within the WHERE clause. As Flink
> community targets compatibility with Calcite, it makes sense to support
> the definition of windows via this method as well as corresponding
> aggregation on top of them.
> SQL targeted query examples:
> ----------------------------
> ```SELECT productId, count(\*) FROM stream1 WHERE proctime BETWEEN current\_ 
> timestamp - INTERVAL '1' HOUR AND current\_timestamp```
> General comment:
> 1)  window boundaries are defined as conditions in WHERE clause.
> 2)  For indicating the usage of different stream times, rowtime and
>     proctime can be used
> 3)  The boundaries are defined based on special construct provided by
>     calcite: current\_timestamp and time operations
> Description:
> ------------
> The logic of this operator is strictly related to supporting aggregates
> over sliding windows defined with OVER
> ([FLINK-5653](https://issues.apache.org/jira/browse/FLINK-5653),
> [FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654),
> [FLINK-5655](https://issues.apache.org/jira/browse/FLINK-5655),
> [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658),
> [FLINK-5656](https://issues.apache.org/jira/browse/FLINK-5656)). In this
> issue the design considered queries where the window is defined with the
> syntax of OVER clause and aggregates are applied over this period. This
> is similar in behavior with the only exception that the window
> boundaries are defined with respect to the WHERE conditions. Besides
> this the logic and the types of aggregates to be supported should be the
> same (sum, count, avg, min, max). Supporting these types of query is
> related to the pie chart problem tackled by calcite.
> Similar as for the OVER windows, the construct should build rolling
> windows (i.e., windows that are triggered and move with every incoming
> event).
> Functionality example
> ---------------------
> We exemplify below the functionality of the IN/Exists when working with
> streams.
> `SELECT a, count( * ) FROM stream1 WHERE proctime BETWEEN current_ timestamp 
> - INTERVAL '1' HOUR AND current_timestamp;`
> ||IngestionTime(Event)||      Stream1||       Output||
> |10:00:01     |Id1,10 |Id1,1|
> |10:02:00     |Id2,2  |Id2,2|
> |11:25:00     |Id3,2  |Id3,1|
> |12:03:00     |Id4,15 |Id4,2|
> |12:05:00     |Id5,11 |Id5,3|
> |12:56:00     |Id6,20 |Id6,3|
> |...|
> Implementation option
> ---------------------
> Considering that the query follows the same functionality as for the
> aggregates over window, the implementation should follow the same
> implementation as for the OVER clause. Considering that the WHERE
> condition are typically related to timing, this means that in case of
> one unbound boundary the
> [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658) should be
> used, while for bounded time windows the
> [FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654) design
> should be used.
> The window boundaries will be extracted from the WHERE condition.
> The rule will not be mapped anymore to a LogicalWindow, which means that
> the conversion to this would need to happen from the current
> DataStreamCalc rule. In this sense, a dedicated condition will be added
> such that in case the WHERE clause has time conditions, the operator
> implementation of the Over clause (used in the previous issues) should
> be used.
> ```
> class DataStreamCalcRule
>   
> -----------------------------------------------------------------------------------------------
>   {
>   --- 
> -------------------------------------------------------------------------------------------
>       
>       def convert(rel: RelNode): RelNode = {
>       val calc: LogicalCalc = rel.asInstanceOf\[LogicalCalc\]
>       val traitSet: RelTraitSet = 
> rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
>       val convInput: RelNode = RelOptRule.convert(calc.getInput, 
> DataStreamConvention.INSTANCE)
>       
>       IF(WHERE contains TIME limits)
>       
>       {
>       
>  >   IF(bounded)
> >       
> >       new DataStreamProcTimeTimeAggregate
> >       
> >       ELSE
> >       
> >       new DataStreamSlideEventTimeRowAgg
> >       
> >       }
> >  
>       
>       Else
>       
>       **{**
>       
>       new DataStreamCalc(
>       rel.getCluster,
>       traitSet,
>       convInput,
>       rel.getRowType,
>       calc.getProgram,
>       description)
>       }
>       
>       }
>       }
>   
> -----------------------------------------------------------------------------------------------
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to