I modify my above proposal: le's remove the deprecation and replace it in Javadoc with a detailed explanation of the use and risk of this method.
Big +1.

On 9/17/21 6:26 PM, Reuven Lax wrote:
Not really - at least for Dataflow, the runner would have to fetch all possible watermark holds each time to do that check.

One thing I did consider was changing the check to be based on the output watermark. In theory it seems like it would solve this since the output watermark should be <= the current element and any watermark holds. However on reflection there are a number of problems with that approach:   - It leads to weird semantics in the case of late data, since in that case the output watermark might (or might not!) be ahead of the current timestamp. The runner would have to know that a current element is late and disable the check in that case. (or equivalently the runner would base the check on min(current element ts, output ts)  - The check is now based on a property that cannot be directly observed by the user in the programming model, which seems problematic.  - Practically, FnApiDoFnRunner does not have access to watermarks today AFAICT. Implementing this for portable pipelines would be hard (or a least tedious).

It appears that Jan is correct - that method does not hold up the watermark. It definitely used to! In this case it may simply be that we need to fix the timer code to examine allowed timestamp skew. Maybe we should consider removing the Deprecated tag then as well? The proposal in the JIRA is to provide an operator to shift the watermark. This is precisely what we do not need here. I modify my above proposal: le's remove the deprecation and replace it in Javadoc with a detailed explanation of the use and risk of this method.

On Thu, Sep 16, 2021 at 5:39 PM Robert Bradshaw <[email protected] <mailto:[email protected]>> wrote:

    Is it possible to only allow this when the watermark is being held
    (with that limit)?

    On Thu, Sep 16, 2021 at 11:09 AM Lara Schmidt
    <[email protected] <mailto:[email protected]>> wrote:
    >
    > Timer.withOutputTimestamp doesn't seem to take into account
    DoFn#getAllowedTimestampSkew at all. I have PR requiring this
    functionality (backward timestamps with separate watermark
    handling) and I had to remove the check in
    Timer.withOutputTimestamp. PR, (see SimpleDoFnRunner.java).
    >
    > On Thu, Sep 16, 2021 at 11:06 AM Reuven Lax <[email protected]
    <mailto:[email protected]>> wrote:
    >>
    >> That's what that function is supposed to do and used to do.
    Given that it's.a deprecated function that was never used much in
    the first place (due to the watermark delays), it's possible that
    not much effort was put into maintaining that behavior.
    >>
    >> On Thu, Sep 16, 2021 at 11:00 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:
    >>>
    >>> Is it possible that the effect of getAllowedTimestampSkew on
    watermark
    >>> is runner-dependent? I have never observed the watermark being
    delayed
    >>> by this parameter on FlinkRunner. Also looking into
    SinpleDoFnRunner,
    >>> the call seems to be only part of validation, not any
    watermark logic
    >>> [1]. And more notable, the javadoc explicitly states, that
    >>> Long.MAX_VALUE is allowed, and that would cause infinite delay
    of the
    >>> watermark [2].
    >>>
    >>>   Jan
    >>>
    >>> [1]
    >>>
    
https://github.com/apache/beam/blob/081cb9a51384bba1e66bb60bf1d61e84e817b4e1/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L442
    
<https://github.com/apache/beam/blob/081cb9a51384bba1e66bb60bf1d61e84e817b4e1/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L442>
    >>>
    >>> [2]
    >>>
    
https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/DoFn.html#getAllowedTimestampSkew--
    
<https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/DoFn.html#getAllowedTimestampSkew-->
    >>>
    >>> On 9/16/21 7:17 PM, Reuven Lax wrote:
    >>> > tl;dr Beam prevents users from ever moving timestamps
    backwards, with
    >>> > no good way to bypass his check. This made sense when the
    check was
    >>> > introduced, but now I propose that we add a way to bypass that
    >>> > restriction.
    >>> >
    >>> > Historically, Beam prevents users from moving timestamps
    backwards. A
    >>> > processElement processing an element with timestamp 12:00 cannot
    >>> > output an element with timestamp 11:00. This holds true for
    timers as
    >>> > well - one cannot set a timer earlier than the current
    element time.
    >>> > This was done for a good reason - there was no way for Beam to
    >>> > maintain a good watermark if time could suddenly and
    arbitrarily move
    >>> > backwards. DoFn#getAllowedTimestampSkew (deprecated) somewhat
    >>> > mitigated this, but was unsatisfactory. It worked by
    shifting the
    >>> > watermark back by the specified amount - if you returned 1
    day, then
    >>> > the watermark (and all subsequent aggregations) would be
    delayed by a
    >>> > day.
    >>> >
    >>> > I propose that we provide a way for users to disable this check
    >>> > entirely, with no effect on the watermark.
    >>> >    - Today there are techniques for the user to hold the
    watermark
    >>> > using Timer.withOutputTimestamp(). In such cases there is no
    reason
    >>> > for Beam to have this restriction at all.
    >>> >    - While it's possible for users to misuse this feature,
    resulting
    >>> > in late data, I don't think that's a reason not to add it.
    We should
    >>> > well document that this is an advanced method that can be
    misused, and
    >>> > beyond that it's up to users not to misuse it.
    >>> >
    >>> > Reuven

Reply via email to