I think that allowing getAllowedTimestampSkew() to return Long.MAX_VALUE
and using this value to hold watermark is mutually exclusive. We should
either disallow Long.MAX_VALUE as allowed skew, or not block the
watermark at all.
As I understood the allowed timestamp skew, it is there so that you
don't accidentally output droppable data, that actually arrived on time
(which could happen, if we allow arbitrarily shifting the output
timestamp back in time). If the user specifies the allowed skew, we can
assume (optimistically) that the user is aware of the implications and
sets downstream allowed lateness and handles late data appropriately.
Additional argument against holding watermark until allowed timestamp
skew is that the method is valid on stateless DoFn as well, but actually
makes it stateful, which is an unexpected side-effect.
If we believe that holding the watermark with timestamp skew > 0 is a
contract for some runner (but it is definitely not contract of the
model), then we might remove the method (afterall, it is deprecated) and
replace it with some other method with appropriate semantics (without
watermark hold, only sanity checking). Or we might allow arbitrarily
shifting the timestamp back in time, but that might under certain
circumstances look to users like strange data loss.
Jan
On 9/17/21 2:38 AM, Robert Bradshaw wrote:
Is it possible to only allow this when the watermark is being held
(with that limit)?
On Thu, Sep 16, 2021 at 11:09 AM Lara Schmidt <[email protected]> wrote:
Timer.withOutputTimestamp doesn't seem to take into account
DoFn#getAllowedTimestampSkew at all. I have PR requiring this functionality
(backward timestamps with separate watermark handling) and I had to remove the
check in Timer.withOutputTimestamp. PR, (see SimpleDoFnRunner.java).
On Thu, Sep 16, 2021 at 11:06 AM Reuven Lax <[email protected]> wrote:
That's what that function is supposed to do and used to do. Given that it's.a
deprecated function that was never used much in the first place (due to the
watermark delays), it's possible that not much effort was put into maintaining
that behavior.
On Thu, Sep 16, 2021 at 11:00 AM Jan Lukavský <[email protected]> wrote:
Is it possible that the effect of getAllowedTimestampSkew on watermark
is runner-dependent? I have never observed the watermark being delayed
by this parameter on FlinkRunner. Also looking into SinpleDoFnRunner,
the call seems to be only part of validation, not any watermark logic
[1]. And more notable, the javadoc explicitly states, that
Long.MAX_VALUE is allowed, and that would cause infinite delay of the
watermark [2].
Jan
[1]
https://github.com/apache/beam/blob/081cb9a51384bba1e66bb60bf1d61e84e817b4e1/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L442
[2]
https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/transforms/DoFn.html#getAllowedTimestampSkew--
On 9/16/21 7:17 PM, Reuven Lax wrote:
tl;dr Beam prevents users from ever moving timestamps backwards, with
no good way to bypass his check. This made sense when the check was
introduced, but now I propose that we add a way to bypass that
restriction.
Historically, Beam prevents users from moving timestamps backwards. A
processElement processing an element with timestamp 12:00 cannot
output an element with timestamp 11:00. This holds true for timers as
well - one cannot set a timer earlier than the current element time.
This was done for a good reason - there was no way for Beam to
maintain a good watermark if time could suddenly and arbitrarily move
backwards. DoFn#getAllowedTimestampSkew (deprecated) somewhat
mitigated this, but was unsatisfactory. It worked by shifting the
watermark back by the specified amount - if you returned 1 day, then
the watermark (and all subsequent aggregations) would be delayed by a
day.
I propose that we provide a way for users to disable this check
entirely, with no effect on the watermark.
- Today there are techniques for the user to hold the watermark
using Timer.withOutputTimestamp(). In such cases there is no reason
for Beam to have this restriction at all.
- While it's possible for users to misuse this feature, resulting
in late data, I don't think that's a reason not to add it. We should
well document that this is an advanced method that can be misused, and
beyond that it's up to users not to misuse it.
Reuven