Your TUMBLE_ROWTIME function seems to be a reasonable short-term workaround. However, using values that are essentially 10:59:59.999999 seems error-prone, because a change of precision will cause the value to be rounded into the next period.
Do we need to have a kind of window that is closed below and open above? Or I wonder whether you can change how Flink propagates watermarks? Also, is it possible that the problem is caused by the “as rowtime” in the inner query? By promoting a value to a “special” column, it becomes susceptible to watermark processing. And maybe watermark processing isn’t even necessary between the inner and outer queries. Julian > On May 18, 2017, at 3:56 PM, Fabian Hueske <[email protected]> wrote: > > Thanks Julian. I agree, semantically TUMBLE_START would work. > > Unfortunately, the window implementations of Flink's DataStream API (which > are used to implement TUMBLE, HOP, and SESSION windows) assign (window_end > - 1) as internal timestamp to records. The reason for this design decision > is that otherwise we would need to hold back watermarks until a window ends > to prevent that a window emits late data. Holding back watermarks is tricky > (especially in case of non-aligned windows such as SESSION) because the > emission of watermarks would need to be coordinated across the windows for > all grouping keys that are processed by a parallel task instance. > > Of course, these are internal details of Flink which are not directly > related to the semantics of the window start/end functions, but they make > the use of TUMBLE_START impractical for us (at least for now). As a > workaround for the upcoming 1.3.0 release we added another method > TUMBLE_ROWTIME that returns TUMBLE_END - 1 which can be used for time-based > operations. > > Best, Fabian > > 2017-05-18 17:59 GMT+02:00 Julian Hyde <[email protected] > <mailto:[email protected]>>: > >> By the way, a principle I would follow when composing windows is that all >> that comes out of a query is simply data values. I don’t think this is >> controversial. But I don’t want anyone to suggest that we pass fuzzy >> concepts like “window instances” from sub-query to enclosing query. >> >> Julian >> >> >>> On May 18, 2017, at 11:53 AM, Julian Hyde <[email protected]> wrote: >>> >>> Have you considered using TUMBLE_START? This is an inclusive bound, so >> you should be able to compose windows. >>> >>> It’s possible that TUMBLE_START isn’t returning the right value yet, >> e.g. see https://insight.io/github.com/apache/calcite/blob/ >> a11d14054e9c1d2ce22f60e11536f1885faaae7c/core/src/main/java/ >> org/apache/calcite/sql2rel/AuxiliaryConverter.java#L56 < >> https://insight.io/github.com/apache/calcite/blob/ >> <https://insight.io/github.com/apache/calcite/blob/> >> a11d14054e9c1d2ce22f60e11536f1885faaae7c/core/src/main/java/ >> org/apache/calcite/sql2rel/AuxiliaryConverter.java#L56> but in principle >> it’s the right thing to use. >>> >>> Julian >>> >>> >>>> On May 17, 2017, at 4:46 AM, Timo Walther <[email protected] >>>> <mailto:[email protected]> <mailto: >> [email protected] <mailto:[email protected]>>> wrote: >>>> >>>> Hi everyone, >>>> >>>> we are very happy to support TUMBLE/HOP/SESSION in our upcoming Flink >> 1.3 release. However, there are some problems regarding nested window >> queries that we would like to discuss with the Calcite community. >>>> >>>> Take the following query: >>>> >>>> SELECT >>>> rowtime, SUM(x) >>>> FROM ( >>>> SELECT >>>> TUMBLE_END(rowtime, INTERVAL '2' MINUTE) AS rowtime, >>>> MIN(x) AS x >>>> FROM MyTable >>>> GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE) >>>> ) >>>> GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR) >>>> >>>> >>>> Initially, we thought that we can use the xxx_END() group auxiliary >> functions to define the rowtime for the upper query. However, according to >> http://calcite.apache.org/docs/stream.html >> <http://calcite.apache.org/docs/stream.html> <http://calcite.apache.org/ >> <http://calcite.apache.org/> >> docs/stream.html>, TUMBLE_END should return the timestamp of the >> exclusive window end, i.e., for a window of 1 hour that contains all >> elements from 12:00:00.000 until 12:59:59.999 (inclusive), TUMBLE_END would >> return 13:00:00.000. The problem is that Flink uses the inclusive window >> end as new timestamp. The reason for that is that if you do preaggregation >> with a window, say 5 minute windows which later will be aggregated into 1 >> hour windows, the last 5 minute window (from 12:55:00.000 until >> 12:59:59.999 incl) would have a timestamp of 13:00:00.000 and fall into the >> next window starting at 13:00:00.000. >>>> >>>> >>>> The question is how Calcite is planning to support nested windows. >> Right now we see the following options: >>>> >>>> - TUMBLE_END returns the inclusive window end >>>> >>>> - we introduce an additional group auxiliary function for the inclusive >> window end like: SELECT TUMBLE_TIME(rowtime, INTERVAL '2' MINUTE) AS >> rowtime ... >>>> >>>> - we allow references to the window in the select: SELECT >> TUMBLE(rowtime, INTERVAL '1' HOUR) AS rowtime ... >>>> >>>> What do you think? >>>> >>>> >>>> Regards, >>>> >>>> Timo
