My understanding matches Jan's. Using getAllowedTimestampSkew removed the check but has no effect on the watermark. It is up to the user to _also_ set allowed lateness so this data is not dropped.
It was always knows to be risky and also insufficient. There is a Jira filed at the very beginning of Beam to try to find a proper semantic solution: https://issues.apache.org/jira/browse/BEAM-644. It is a new core primitive in the model, but not very complicated. I would guess we need experts in many runners to evaluate feasibility. And maybe there is another way to do it now - I would expect SDF to have similar needs on occasion. Kenn On Fri, Sep 17, 2021 at 1:30 AM Jan Lukavský <[email protected]> wrote: > I think that allowing getAllowedTimestampSkew() to return Long.MAX_VALUE > and using this value to hold watermark is mutually exclusive. We should > either disallow Long.MAX_VALUE as allowed skew, or not block the > watermark at all. > > As I understood the allowed timestamp skew, it is there so that you > don't accidentally output droppable data, that actually arrived on time > (which could happen, if we allow arbitrarily shifting the output > timestamp back in time). If the user specifies the allowed skew, we can > assume (optimistically) that the user is aware of the implications and > sets downstream allowed lateness and handles late data appropriately. > > Additional argument against holding watermark until allowed timestamp > skew is that the method is valid on stateless DoFn as well, but actually > makes it stateful, which is an unexpected side-effect. > > If we believe that holding the watermark with timestamp skew > 0 is a > contract for some runner (but it is definitely not contract of the > model), then we might remove the method (afterall, it is deprecated) and > replace it with some other method with appropriate semantics (without > watermark hold, only sanity checking). Or we might allow arbitrarily > shifting the timestamp back in time, but that might under certain > circumstances look to users like strange data loss. > > Jan > > On 9/17/21 2:38 AM, Robert Bradshaw wrote: > > Is it possible to only allow this when the watermark is being held > > (with that limit)? > > > > On Thu, Sep 16, 2021 at 11:09 AM Lara Schmidt <[email protected]> > wrote: > >> Timer.withOutputTimestamp doesn't seem to take into account > DoFn#getAllowedTimestampSkew at all. I have PR requiring this functionality > (backward timestamps with separate watermark handling) and I had to remove > the check in Timer.withOutputTimestamp. PR, (see SimpleDoFnRunner.java). > >> > >> On Thu, Sep 16, 2021 at 11:06 AM Reuven Lax <[email protected]> wrote: > >>> That's what that function is supposed to do and used to do. Given that > it's.a deprecated function that was never used much in the first place (due > to the watermark delays), it's possible that not much effort was put into > maintaining that behavior. > >>> > >>> On Thu, Sep 16, 2021 at 11:00 AM Jan Lukavský <[email protected]> wrote: > >>>> Is it possible that the effect of getAllowedTimestampSkew on watermark > >>>> is runner-dependent? I have never observed the watermark being delayed > >>>> by this parameter on FlinkRunner. Also looking into SinpleDoFnRunner, > >>>> the call seems to be only part of validation, not any watermark logic > >>>> [1]. And more notable, the javadoc explicitly states, that > >>>> Long.MAX_VALUE is allowed, and that would cause infinite delay of the > >>>> watermark [2]. > >>>> > >>>> Jan > >>>> > >>>> [1] > >>>> > https://github.com/apache/beam/blob/081cb9a51384bba1e66bb60bf1d61e84e817b4e1/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L442 > >>>> > >>>> [2] > >>>> > https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/DoFn.html#getAllowedTimestampSkew-- > >>>> > >>>> On 9/16/21 7:17 PM, Reuven Lax wrote: > >>>>> tl;dr Beam prevents users from ever moving timestamps backwards, with > >>>>> no good way to bypass his check. This made sense when the check was > >>>>> introduced, but now I propose that we add a way to bypass that > >>>>> restriction. > >>>>> > >>>>> Historically, Beam prevents users from moving timestamps backwards. A > >>>>> processElement processing an element with timestamp 12:00 cannot > >>>>> output an element with timestamp 11:00. This holds true for timers as > >>>>> well - one cannot set a timer earlier than the current element time. > >>>>> This was done for a good reason - there was no way for Beam to > >>>>> maintain a good watermark if time could suddenly and arbitrarily move > >>>>> backwards. DoFn#getAllowedTimestampSkew (deprecated) somewhat > >>>>> mitigated this, but was unsatisfactory. It worked by shifting the > >>>>> watermark back by the specified amount - if you returned 1 day, then > >>>>> the watermark (and all subsequent aggregations) would be delayed by a > >>>>> day. > >>>>> > >>>>> I propose that we provide a way for users to disable this check > >>>>> entirely, with no effect on the watermark. > >>>>> - Today there are techniques for the user to hold the watermark > >>>>> using Timer.withOutputTimestamp(). In such cases there is no reason > >>>>> for Beam to have this restriction at all. > >>>>> - While it's possible for users to misuse this feature, resulting > >>>>> in late data, I don't think that's a reason not to add it. We should > >>>>> well document that this is an advanced method that can be misused, > and > >>>>> beyond that it's up to users not to misuse it. > >>>>> > >>>>> Reuven >
