My understanding matches Jan's. Using getAllowedTimestampSkew removed the
check but has no effect on the watermark. It is up to the user to _also_
set allowed lateness so this data is not dropped.

It was always knows to be risky and also insufficient. There is a Jira
filed at the very beginning of Beam to try to find a proper semantic
solution: https://issues.apache.org/jira/browse/BEAM-644. It is a new core
primitive in the model, but not very complicated. I would guess we need
experts in many runners to evaluate feasibility. And maybe there is another
way to do it now - I would expect SDF to have similar needs on occasion.

Kenn

On Fri, Sep 17, 2021 at 1:30 AM Jan Lukavský <[email protected]> wrote:

> I think that allowing getAllowedTimestampSkew() to return Long.MAX_VALUE
> and using this value to hold watermark is mutually exclusive. We should
> either disallow Long.MAX_VALUE as allowed skew, or not block the
> watermark at all.
>
> As I understood the allowed timestamp skew, it is there so that you
> don't accidentally output droppable data, that actually arrived on time
> (which could happen, if we allow arbitrarily shifting the output
> timestamp back in time). If the user specifies the allowed skew, we can
> assume (optimistically) that the user is aware of the implications and
> sets downstream allowed lateness and handles late data appropriately.
>
> Additional argument against holding watermark until allowed timestamp
> skew is that the method is valid on stateless DoFn as well, but actually
> makes it stateful, which is an unexpected side-effect.
>
> If we believe that holding the watermark with timestamp skew > 0 is a
> contract for some runner (but it is definitely not contract of the
> model), then we might remove the method (afterall, it is deprecated) and
> replace it with some other method with appropriate semantics (without
> watermark hold, only sanity checking). Or we might allow arbitrarily
> shifting the timestamp back in time, but that might under certain
> circumstances look to users like strange data loss.
>
>   Jan
>
> On 9/17/21 2:38 AM, Robert Bradshaw wrote:
> > Is it possible to only allow this when the watermark is being held
> > (with that limit)?
> >
> > On Thu, Sep 16, 2021 at 11:09 AM Lara Schmidt <[email protected]>
> wrote:
> >> Timer.withOutputTimestamp doesn't seem to take into account
> DoFn#getAllowedTimestampSkew at all. I have PR requiring this functionality
> (backward timestamps with separate watermark handling) and I had to remove
> the check in Timer.withOutputTimestamp. PR, (see SimpleDoFnRunner.java).
> >>
> >> On Thu, Sep 16, 2021 at 11:06 AM Reuven Lax <[email protected]> wrote:
> >>> That's what that function is supposed to do and used to do. Given that
> it's.a deprecated function that was never used much in the first place (due
> to the watermark delays), it's possible that not much effort was put into
> maintaining that behavior.
> >>>
> >>> On Thu, Sep 16, 2021 at 11:00 AM Jan Lukavský <[email protected]> wrote:
> >>>> Is it possible that the effect of getAllowedTimestampSkew on watermark
> >>>> is runner-dependent? I have never observed the watermark being delayed
> >>>> by this parameter on FlinkRunner. Also looking into SinpleDoFnRunner,
> >>>> the call seems to be only part of validation, not any watermark logic
> >>>> [1]. And more notable, the javadoc explicitly states, that
> >>>> Long.MAX_VALUE is allowed, and that would cause infinite delay of the
> >>>> watermark [2].
> >>>>
> >>>>    Jan
> >>>>
> >>>> [1]
> >>>>
> https://github.com/apache/beam/blob/081cb9a51384bba1e66bb60bf1d61e84e817b4e1/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L442
> >>>>
> >>>> [2]
> >>>>
> https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/DoFn.html#getAllowedTimestampSkew--
> >>>>
> >>>> On 9/16/21 7:17 PM, Reuven Lax wrote:
> >>>>> tl;dr Beam prevents users from ever moving timestamps backwards, with
> >>>>> no good way to bypass his check. This made sense when the check was
> >>>>> introduced, but now I propose that we add a way to bypass that
> >>>>> restriction.
> >>>>>
> >>>>> Historically, Beam prevents users from moving timestamps backwards. A
> >>>>> processElement processing an element with timestamp 12:00 cannot
> >>>>> output an element with timestamp 11:00. This holds true for timers as
> >>>>> well - one cannot set a timer earlier than the current element time.
> >>>>> This was done for a good reason - there was no way for Beam to
> >>>>> maintain a good watermark if time could suddenly and arbitrarily move
> >>>>> backwards. DoFn#getAllowedTimestampSkew (deprecated) somewhat
> >>>>> mitigated this, but was unsatisfactory. It worked by shifting the
> >>>>> watermark back by the specified amount - if you returned 1 day, then
> >>>>> the watermark (and all subsequent aggregations) would be delayed by a
> >>>>> day.
> >>>>>
> >>>>> I propose that we provide a way for users to disable this check
> >>>>> entirely, with no effect on the watermark.
> >>>>>     - Today there are techniques for the user to hold the watermark
> >>>>> using Timer.withOutputTimestamp(). In such cases there is no reason
> >>>>> for Beam to have this restriction at all.
> >>>>>     - While it's possible for users to misuse this feature, resulting
> >>>>> in late data, I don't think that's a reason not to add it. We should
> >>>>> well document that this is an advanced method that can be misused,
> and
> >>>>> beyond that it's up to users not to misuse it.
> >>>>>
> >>>>> Reuven
>

Reply via email to