OK, so to move forward, shall we update the default Sessions to not do this timestamp shifting, perhaps with a (deprecated) timestamp-shifting opt-in variant to ease the transition for those that want the old (marked experimental) behavior?
On Fri, Feb 12, 2021 at 9:12 AM Kenneth Knowles <k...@apache.org> wrote: > All of this is right. Things have changed a lot. Nowadays the default will > work well, and we can caveat to users that EARLIEST may hold up downstream > output for overlapping windows. > > I'm slightly concerned about the fact that EARLIEST is necessary for CoGBK > joins, unless there is some special consideration why it doesn't matter. So > I wonder what happens when a pipeline has a few different joins. > > Kenn > > On Fri, Feb 12, 2021 at 12:37 AM Robert Bradshaw <rober...@google.com> > wrote: > >> Yes, unless you manually set the timestamp combiner to earliest, which in >> this case gives earliest + shifted. >> >> On Fri, Feb 12, 2021 at 12:33 AM Reuven Lax <re...@google.com> wrote: >> >>> The default now is end of window, right? Doesn't that alleviate the >>> problem that the original change was supposed to fix? >>> >>> On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> The default timestamp combiner used to be earliest as well. >>>> >>>> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> wrote: >>>> >>>>> IIRC, this was introduced because at the time users complained that >>>>> sliding windows were virtually unusable for reasonably-sized windows. >>>>> However this was before we allowed customizing the timestamp combiner, so >>>>> maybe this is less of a problem now? >>>>> >>>>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <rober...@google.com> >>>>> wrote: >>>>> >>>>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <k...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <ajam...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into a >>>>>>>>> discussion of a very old and strange feature of Beam that I think we >>>>>>>>> should >>>>>>>>> revisit. >>>>>>>>> >>>>>>>>> The WindowFn has the ability to shift timestamps forward in order >>>>>>>>> to unblock downstream watermarks. Why? Specifically in this situation: >>>>>>>>> >>>>>>>>> - aggregation/GBK with overlapping windows like SlidingWindows >>>>>>>>> - timestamp combiner of the aggregated outputs is EARLIEST of the >>>>>>>>> inputs >>>>>>>>> - there is another downstream aggregation/GBK >>>>>>>>> >>>>>>>>> The output watermark of the upstream aggregation is held to the >>>>>>>>> minimum of the inputs. When an output is emitted, we desire the >>>>>>>>> output to >>>>>>>>> flow through the rest of the pipeline without delay. However, the >>>>>>>>> downstream aggregation can (and often will) be delayed by the window >>>>>>>>> size >>>>>>>>> because of *watermark holds in other later windows that are not >>>>>>>>> released until those windows output.* >>>>>>>>> >>>>>>>> Could you describe this a bit more? Why would later windows hold up >>>>>>>> the watermark for upstream steps. (Is it due to some subtlety? Such as >>>>>>>> tracking the watermark for each stage, rather than for each step?) >>>>>>>> >>>>>>> >>>>>>> It does not have to do with stages/fusion (a runner-specific >>>>>>> concept) but is a necessity of watermarks being per-PCollection. >>>>>>> >>>>>>> Suppose: >>>>>>> >>>>>>> - Default triggering >>>>>>> - Timestamp combiner EARLIEST >>>>>>> - 60s windows sliding every 10s >>>>>>> - An element with timestamp 42 >>>>>>> - Aggregation (A) with downstream aggregation (B) >>>>>>> >>>>>>> Here is what happens: >>>>>>> >>>>>>> - The element falls into [-10, 50) and [0, 60) and [10, 70) and >>>>>>> [20, 80) and [30, 90) and [40, 100) >>>>>>> - For each of those windows the output watermark hold is set to 42 >>>>>>> (the element's timestamp) >>>>>>> - At time 50 the aggregation (A) over the first window is emitted; >>>>>>> the other windows remain buffered and held >>>>>>> - The element arrives at aggregation (B) and is buffered because >>>>>>> the input watermark (which is the held output watermark from A) is still >>>>>>> 42, even though no other data will arrive for that window (WLOG if >>>>>>> elements >>>>>>> from other keys are shuffled in) >>>>>>> - The input watermark for aggregation (B) does not advance past 42 >>>>>>> until the [40, 100) window is fired and releases its watermark hold >>>>>>> >>>>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not >>>>>>> delayed by the window size, but by the difference in end-of-window >>>>>>> timestamps to all assigned windows (window size minus slide?) >>>>>>> >>>>>>> So to avoid this, what actually happens in Java today is that the >>>>>>> watermark hold, and output timestamp, is set not to 42 but altered to >>>>>>> 50 to >>>>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive since >>>>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner plays >>>>>>> an >>>>>>> important role in CoGBK based joins in SQL, where the iterables are >>>>>>> re-exploded with timestamps that may be the minimum of input elements. >>>>>>> This >>>>>>> shifting may actually break SQL... >>>>>>> >>>>>>> This predated our switch away from "delta from watermark" late data >>>>>>> dropping to "window expiry" data dropping. So maybe there is some new >>>>>>> way >>>>>>> to set a hold that does not make data late or droppable but still use >>>>>>> the >>>>>>> EARLIEST timestamp. That is my question, for which I have not figured >>>>>>> out >>>>>>> the answer. >>>>>>> >>>>>> >>>>>> This is, indeed, a very tough question... >>>>>> >>>>>> I'd say this is generally a problem with EARLIEST and non-aligned >>>>>> windows. E.g. for sessions, a long key can hold up the watermark for all >>>>>> others. Here we "know" what the hold up is, and can adjust for it. But I >>>>>> don't think doing this adjustment is the right thing. It would certainly >>>>>> seem to mess up the timestamp of the outputs from a join. And it's >>>>>> possible >>>>>> that the values get re-windowed in which case this element should get >>>>>> joined with itself from a later window (which I'll admit is a bit odd, >>>>>> but >>>>>> maybe a reflection that multiple-windowing, like multi-firing triggering, >>>>>> is non-local). >>>>>> >>>>>> Logicaly, the reason we want [-10 50) window for B to fire shortly >>>>>> after the input watermark for A passes 50 because no non-late data coming >>>>>> out of A could influence it. In some sense, the "watermark" for the [-10, >>>>>> 50) windows has indeed passed, but not that for later windows. I don't >>>>>> think the beam model requires that we have a single watermark, just that >>>>>> we >>>>>> fire triggers/timers once we have seen all the on-time data that we think >>>>>> we could, and a runner could be smart about this. >>>>>> >>>>>> We may want to keep the ability to shift timestamps for WindowFns, >>>>>> but I think we shouldn't be doing so for the default sliding windows. >>>>>> Correctness (of output timestamps) over latency unless one asks >>>>>> otherwise. >>>>>> >>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> >>>>>>>> >>>>>>>>> To avoid this problem, element x in window w will have its >>>>>>>>> timestamp shifted to not overlap with any earlier windows. It is a >>>>>>>>> weird >>>>>>>>> behavior. It fixes the watermark hold problem but introduces a strange >>>>>>>>> output with a mysterious timestamp that is hard to justify. >>>>>>>>> >>>>>>>>> Any other ideas? >>>>>>>>> >>>>>>>>> Kenn >>>>>>>>> >>>>>>>>