OK, so to move forward, shall we update the default Sessions to not do this
timestamp shifting, perhaps with a (deprecated) timestamp-shifting opt-in
variant to ease the transition for those that want the old (marked
experimental) behavior?

On Fri, Feb 12, 2021 at 9:12 AM Kenneth Knowles <k...@apache.org> wrote:

> All of this is right. Things have changed a lot. Nowadays the default will
> work well, and we can caveat to users that EARLIEST may hold up downstream
> output for overlapping windows.
>
> I'm slightly concerned about the fact that EARLIEST is necessary for CoGBK
> joins, unless there is some special consideration why it doesn't matter. So
> I wonder what happens when a pipeline has a few different joins.
>
> Kenn
>
> On Fri, Feb 12, 2021 at 12:37 AM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> Yes, unless you manually set the timestamp combiner to earliest, which in
>> this case gives earliest + shifted.
>>
>> On Fri, Feb 12, 2021 at 12:33 AM Reuven Lax <re...@google.com> wrote:
>>
>>> The default now is end of window, right? Doesn't that alleviate the
>>> problem that the original change was supposed to fix?
>>>
>>> On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> The default timestamp combiner used to be earliest as well.
>>>>
>>>> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> IIRC, this was introduced because at the time users complained that
>>>>> sliding windows were virtually unusable for reasonably-sized windows.
>>>>> However this was before we allowed customizing the timestamp combiner, so
>>>>> maybe this is less of a problem now?
>>>>>
>>>>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <rober...@google.com>
>>>>> wrote:
>>>>>
>>>>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <k...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <ajam...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into a
>>>>>>>>> discussion of a very old and strange feature of Beam that I think we 
>>>>>>>>> should
>>>>>>>>> revisit.
>>>>>>>>>
>>>>>>>>> The WindowFn has the ability to shift timestamps forward in order
>>>>>>>>> to unblock downstream watermarks. Why? Specifically in this situation:
>>>>>>>>>
>>>>>>>>>  - aggregation/GBK with overlapping windows like SlidingWindows
>>>>>>>>>  - timestamp combiner of the aggregated outputs is EARLIEST of the
>>>>>>>>> inputs
>>>>>>>>>  - there is another downstream aggregation/GBK
>>>>>>>>>
>>>>>>>>> The output watermark of the upstream aggregation is held to the
>>>>>>>>> minimum of the inputs. When an output is emitted, we desire the 
>>>>>>>>> output to
>>>>>>>>> flow through the rest of the pipeline without delay. However, the
>>>>>>>>> downstream aggregation can (and often will) be delayed by the window 
>>>>>>>>> size
>>>>>>>>> because of *watermark holds in other later windows that are not
>>>>>>>>> released until those windows output.*
>>>>>>>>>
>>>>>>>> Could you describe this a bit more? Why would later windows hold up
>>>>>>>> the watermark for upstream steps. (Is it due to some subtlety? Such as
>>>>>>>> tracking the watermark for each stage, rather than for each step?)
>>>>>>>>
>>>>>>>
>>>>>>> It does not have to do with stages/fusion (a runner-specific
>>>>>>> concept) but is a necessity of watermarks being per-PCollection.
>>>>>>>
>>>>>>> Suppose:
>>>>>>>
>>>>>>>  - Default triggering
>>>>>>>  - Timestamp combiner EARLIEST
>>>>>>>  - 60s windows sliding every 10s
>>>>>>>  - An element with timestamp 42
>>>>>>>  - Aggregation (A) with downstream aggregation (B)
>>>>>>>
>>>>>>> Here is what happens:
>>>>>>>
>>>>>>>  - The element falls into [-10, 50) and [0, 60) and [10, 70) and
>>>>>>> [20, 80) and [30, 90) and [40, 100)
>>>>>>>  - For each of those windows the output watermark hold is set to 42
>>>>>>> (the element's timestamp)
>>>>>>>  - At time 50 the aggregation (A) over the first window is emitted;
>>>>>>> the other windows remain buffered and held
>>>>>>>  - The element arrives at aggregation (B) and is buffered because
>>>>>>> the input watermark (which is the held output watermark from A) is still
>>>>>>> 42, even though no other data will arrive for that window (WLOG if 
>>>>>>> elements
>>>>>>> from other keys are shuffled in)
>>>>>>>  - The input watermark for aggregation (B) does not advance past 42
>>>>>>> until the [40, 100) window is fired and releases its watermark hold
>>>>>>>
>>>>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not
>>>>>>> delayed by the window size, but by the difference in end-of-window
>>>>>>> timestamps to all assigned windows (window size minus slide?)
>>>>>>>
>>>>>>> So to avoid this, what actually happens in Java today is that the
>>>>>>> watermark hold, and output timestamp, is set not to 42 but altered to 
>>>>>>> 50 to
>>>>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive since
>>>>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner plays 
>>>>>>> an
>>>>>>> important role in CoGBK based joins in SQL, where the iterables are
>>>>>>> re-exploded with timestamps that may be the minimum of input elements. 
>>>>>>> This
>>>>>>> shifting may actually break SQL...
>>>>>>>
>>>>>>> This predated our switch away from "delta from watermark" late data
>>>>>>> dropping to "window expiry" data dropping. So maybe there is some new 
>>>>>>> way
>>>>>>> to set a hold that does not make data late or droppable but still use 
>>>>>>> the
>>>>>>> EARLIEST timestamp. That is my question, for which I have not figured 
>>>>>>> out
>>>>>>> the answer.
>>>>>>>
>>>>>>
>>>>>> This is, indeed, a very tough question...
>>>>>>
>>>>>> I'd say this is generally a problem with EARLIEST and non-aligned
>>>>>> windows. E.g. for sessions, a long key can hold up the watermark for all
>>>>>> others. Here we "know" what the hold up is, and can adjust for it. But I
>>>>>> don't think doing this adjustment is the right thing. It would certainly
>>>>>> seem to mess up the timestamp of the outputs from a join. And it's 
>>>>>> possible
>>>>>> that the values get re-windowed in which case this element should get
>>>>>> joined with itself from a later window (which I'll admit is a bit odd, 
>>>>>> but
>>>>>> maybe a reflection that multiple-windowing, like multi-firing triggering,
>>>>>> is non-local).
>>>>>>
>>>>>> Logicaly, the reason we want [-10 50) window for B to fire shortly
>>>>>> after the input watermark for A passes 50 because no non-late data coming
>>>>>> out of A could influence it. In some sense, the "watermark" for the [-10,
>>>>>> 50) windows has indeed passed, but not that for later windows. I don't
>>>>>> think the beam model requires that we have a single watermark, just that 
>>>>>> we
>>>>>> fire triggers/timers once we have seen all the on-time data that we think
>>>>>> we could, and a runner could be smart about this.
>>>>>>
>>>>>> We may want to keep the ability to shift timestamps for WindowFns,
>>>>>> but I think we shouldn't be doing so for the default sliding windows.
>>>>>> Correctness (of output timestamps) over latency unless one asks 
>>>>>> otherwise.
>>>>>>
>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>> To avoid this problem, element x in window w will have its
>>>>>>>>> timestamp shifted to not overlap with any earlier windows. It is a 
>>>>>>>>> weird
>>>>>>>>> behavior. It fixes the watermark hold problem but introduces a strange
>>>>>>>>> output with a mysterious timestamp that is hard to justify.
>>>>>>>>>
>>>>>>>>> Any other ideas?
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>

Reply via email to