I think that allowing getAllowedTimestampSkew() to return Long.MAX_VALUE and using this value to hold watermark is mutually exclusive. We should either disallow Long.MAX_VALUE as allowed skew, or not block the watermark at all.

As I understood the allowed timestamp skew, it is there so that you don't accidentally output droppable data, that actually arrived on time (which could happen, if we allow arbitrarily shifting the output timestamp back in time). If the user specifies the allowed skew, we can assume (optimistically) that the user is aware of the implications and sets downstream allowed lateness and handles late data appropriately.

Additional argument against holding watermark until allowed timestamp skew is that the method is valid on stateless DoFn as well, but actually makes it stateful, which is an unexpected side-effect.

If we believe that holding the watermark with timestamp skew > 0 is a contract for some runner (but it is definitely not contract of the model), then we might remove the method (afterall, it is deprecated) and replace it with some other method with appropriate semantics (without watermark hold, only sanity checking). Or we might allow arbitrarily shifting the timestamp back in time, but that might under certain circumstances look to users like strange data loss.

 Jan

On 9/17/21 2:38 AM, Robert Bradshaw wrote:
Is it possible to only allow this when the watermark is being held
(with that limit)?

On Thu, Sep 16, 2021 at 11:09 AM Lara Schmidt <[email protected]> wrote:
Timer.withOutputTimestamp doesn't seem to take into account 
DoFn#getAllowedTimestampSkew at all. I have PR requiring this functionality 
(backward timestamps with separate watermark handling) and I had to remove the 
check in Timer.withOutputTimestamp. PR, (see SimpleDoFnRunner.java).

On Thu, Sep 16, 2021 at 11:06 AM Reuven Lax <[email protected]> wrote:
That's what that function is supposed to do and used to do. Given that it's.a 
deprecated function that was never used much in the first place (due to the 
watermark delays), it's possible that not much effort was put into maintaining 
that behavior.

On Thu, Sep 16, 2021 at 11:00 AM Jan Lukavský <[email protected]> wrote:
Is it possible that the effect of getAllowedTimestampSkew on watermark
is runner-dependent? I have never observed the watermark being delayed
by this parameter on FlinkRunner. Also looking into SinpleDoFnRunner,
the call seems to be only part of validation, not any watermark logic
[1]. And more notable, the javadoc explicitly states, that
Long.MAX_VALUE is allowed, and that would cause infinite delay of the
watermark [2].

   Jan

[1]
https://github.com/apache/beam/blob/081cb9a51384bba1e66bb60bf1d61e84e817b4e1/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L442

[2]
https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/DoFn.html#getAllowedTimestampSkew--

On 9/16/21 7:17 PM, Reuven Lax wrote:
tl;dr Beam prevents users from ever moving timestamps backwards, with
no good way to bypass his check. This made sense when the check was
introduced, but now I propose that we add a way to bypass that
restriction.

Historically, Beam prevents users from moving timestamps backwards. A
processElement processing an element with timestamp 12:00 cannot
output an element with timestamp 11:00. This holds true for timers as
well - one cannot set a timer earlier than the current element time.
This was done for a good reason - there was no way for Beam to
maintain a good watermark if time could suddenly and arbitrarily move
backwards. DoFn#getAllowedTimestampSkew (deprecated) somewhat
mitigated this, but was unsatisfactory. It worked by shifting the
watermark back by the specified amount - if you returned 1 day, then
the watermark (and all subsequent aggregations) would be delayed by a
day.

I propose that we provide a way for users to disable this check
entirely, with no effect on the watermark.
    - Today there are techniques for the user to hold the watermark
using Timer.withOutputTimestamp(). In such cases there is no reason
for Beam to have this restriction at all.
    - While it's possible for users to misuse this feature, resulting
in late data, I don't think that's a reason not to add it. We should
well document that this is an advanced method that can be misused, and
beyond that it's up to users not to misuse it.

Reuven

Reply via email to