Your TUMBLE_ROWTIME function seems to be a reasonable short-term workaround. 
However, using values that are essentially 10:59:59.999999 seems error-prone, 
because a change of precision will cause the value to be rounded into the next 
period.

Do we need to have a kind of window that is closed below and open above? Or I 
wonder whether you can change how Flink propagates watermarks?

Also, is it possible that the problem is caused by the “as rowtime” in the 
inner query? By promoting a value to a “special” column, it becomes susceptible 
to watermark processing. And maybe watermark processing isn’t even necessary 
between the inner and outer queries.

Julian
 
 
> On May 18, 2017, at 3:56 PM, Fabian Hueske <[email protected]> wrote:
> 
> Thanks Julian. I agree, semantically TUMBLE_START would work.
> 
> Unfortunately, the window implementations of Flink's DataStream API (which
> are used to implement TUMBLE, HOP, and SESSION windows) assign (window_end
> - 1) as internal timestamp to records. The reason for this design decision
> is that otherwise we would need to hold back watermarks until a window ends
> to prevent that a window emits late data. Holding back watermarks is tricky
> (especially in case of non-aligned windows such as SESSION) because the
> emission of watermarks would need to be coordinated across the windows for
> all grouping keys that are processed by a parallel task instance.
> 
> Of course, these are internal details of Flink which are not directly
> related to the semantics of the window start/end functions, but they make
> the use of TUMBLE_START impractical for us (at least for now). As a
> workaround for the upcoming 1.3.0 release we added another method
> TUMBLE_ROWTIME that returns TUMBLE_END - 1 which can be used for time-based
> operations.
> 
> Best, Fabian
> 
> 2017-05-18 17:59 GMT+02:00 Julian Hyde <[email protected] 
> <mailto:[email protected]>>:
> 
>> By the way, a principle I would follow when composing windows is that all
>> that comes out of a query is simply data values. I don’t think this is
>> controversial. But I don’t want anyone to suggest that we pass fuzzy
>> concepts like “window instances” from sub-query to enclosing query.
>> 
>> Julian
>> 
>> 
>>> On May 18, 2017, at 11:53 AM, Julian Hyde <[email protected]> wrote:
>>> 
>>> Have you considered using TUMBLE_START? This is an inclusive bound, so
>> you should be able to compose windows.
>>> 
>>> It’s possible that TUMBLE_START isn’t returning the right value yet,
>> e.g. see https://insight.io/github.com/apache/calcite/blob/
>> a11d14054e9c1d2ce22f60e11536f1885faaae7c/core/src/main/java/
>> org/apache/calcite/sql2rel/AuxiliaryConverter.java#L56 <
>> https://insight.io/github.com/apache/calcite/blob/ 
>> <https://insight.io/github.com/apache/calcite/blob/>
>> a11d14054e9c1d2ce22f60e11536f1885faaae7c/core/src/main/java/
>> org/apache/calcite/sql2rel/AuxiliaryConverter.java#L56> but in principle
>> it’s the right thing to use.
>>> 
>>> Julian
>>> 
>>> 
>>>> On May 17, 2017, at 4:46 AM, Timo Walther <[email protected] 
>>>> <mailto:[email protected]> <mailto:
>> [email protected] <mailto:[email protected]>>> wrote:
>>>> 
>>>> Hi everyone,
>>>> 
>>>> we are very happy to support TUMBLE/HOP/SESSION in our upcoming Flink
>> 1.3 release. However, there are some problems regarding nested window
>> queries that we would like to discuss with the Calcite community.
>>>> 
>>>> Take the following query:
>>>> 
>>>> SELECT
>>>> rowtime, SUM(x)
>>>> FROM (
>>>> SELECT
>>>>    TUMBLE_END(rowtime, INTERVAL '2' MINUTE) AS rowtime,
>>>>    MIN(x) AS x
>>>> FROM MyTable
>>>> GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE)
>>>> )
>>>> GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR)
>>>> 
>>>> 
>>>> Initially, we thought that we can use the xxx_END() group auxiliary
>> functions to define the rowtime for the upper query. However, according to
>> http://calcite.apache.org/docs/stream.html 
>> <http://calcite.apache.org/docs/stream.html> <http://calcite.apache.org/ 
>> <http://calcite.apache.org/>
>> docs/stream.html>, TUMBLE_END should return the timestamp of the
>> exclusive window end, i.e., for a window of 1 hour that contains all
>> elements from 12:00:00.000 until 12:59:59.999 (inclusive), TUMBLE_END would
>> return 13:00:00.000. The problem is that Flink uses the inclusive window
>> end as new timestamp. The reason for that is that if you do preaggregation
>> with a window, say 5 minute windows which later will be aggregated into 1
>> hour windows, the last 5 minute window (from 12:55:00.000 until
>> 12:59:59.999 incl) would have a timestamp of 13:00:00.000 and fall into the
>> next window starting at 13:00:00.000.
>>>> 
>>>> 
>>>> The question is how Calcite is planning to support nested windows.
>> Right now we see the following options:
>>>> 
>>>> - TUMBLE_END returns the inclusive window end
>>>> 
>>>> - we introduce an additional group auxiliary function for the inclusive
>> window end like: SELECT TUMBLE_TIME(rowtime, INTERVAL '2' MINUTE) AS
>> rowtime ...
>>>> 
>>>> - we allow references to the window in the select: SELECT
>> TUMBLE(rowtime, INTERVAL '1' HOUR) AS rowtime ...
>>>> 
>>>> What do you think?
>>>> 
>>>> 
>>>> Regards,
>>>> 
>>>> Timo

Reply via email to