Hi Kenn,
I see that my terminology seems not to be 100% aligned with Beam's. I'll
work on that. :-)
I agree with what you say, and by "late" I mostly meant "droppable"
(arriving too late after watermark).
I'm definitely not proposing to get back to something like "out of
order" == "late" or anything like that. I'm also aware that stateful
operation is windowed operation, but the semantics of the windowing is
different than of a GBK. The difference is how time moves in GBK and how
moves in stateful DoFn. Throwing away some details (early triggers, late
data triggers), the main difference is that in GBK case, time hops just
between window boundaries, while in stateful DoFn time moves "smoothly"
(with each watermark update). Now, this difference brings the question
about why the definition of "droppable" data is the same for both types
of operations, when there is a difference in how users "perceive" time.
As the more generic operation, stateful DoFn might deserve a more
general definition of droppable data, which should degrade naturally to
the one of GBK in presence of "discrete time hops".
This might have some consequences on how the droppable data should be
handled in presence of (early) triggers, because triggerring is actually
what makes time to "hop", so we might arrive to a conclusion that we
might actually drop any data that has timestamp less than "last trigger
time + allowed lateness". This looks appealing to me, because IMO it has
strong internal logical consistency. Although it is possible that it
would drop more data, which is generally undesirable, I admit that.
I'm looking for explanation why the current approach was chosen instead
of the other.
Jan
On 1/7/20 12:52 AM, Kenneth Knowles wrote:
This thread has a lot in it, so I am just top-posting.
- Stateful DoFn is a windowed operation; state is per-window. When
the window expires, any further inputs are dropped.
- "Late" is not synonymous with out-of-order. It doesn't really have
an independent meaning.
- For a GBK/Combine "late" means "not included prior to the
on-time output", and "droppable" means "arriving after window expiry".
- For Stateful DoFn there is no real meaning to "late" except if
one is talking about "droppable", which still means "arriving after
window expiry". A user may have a special timer where they flip a flag
and treat elements after the timer differently.
I think the definition of when data is droppable is very simple. We
explicitly moved to this definition, away from the "out of order ==
late", because it is more robust and simpler to think about. Users saw
lots of confusing behavior when we had "out of order by allowed
lateness == droppable" logic.
Kenn
On Mon, Jan 6, 2020 at 1:42 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
> Generally the watermark update can overtake elements, because
runners can explicitly ignore late data in the watermark
calculation (for good reason - those elements are already late, so
no need to hold up the watermark advancing any more).
This seems not to affect the decision of _not late_ vs. _late_, is
it? If element is late and gets ignored from watermark calculation
(whatever that includes in this context), than the watermark
cannot move past elements that were not marked as _not late_ and
thus nothing can make them _late_.
> For GBK on-time data simply means the first pane marked as on
time. For state+timers I don't think it makes sense for Beam to
define on-time v.s. late, rather I think the user can come up with
their own definition depending on their use case. For example, if
you are buffering data into BagState and setting a timer to
process it, it would be logical to say that any element that was
buffered before the timer expired is on time, and any data that
showed up after the timer fired is late. This would roughly
correspond to what GBK does, and the answer would be very similar
to simply comparing against the watermark (as the timers fire when
the watermark advances).
Yes, I'd say that stateful DoFns don't have (well defined) concept
of pane, because that is related to concept of trigger and this is
a concept of GBK (or windowed operations in general). The only
semantic meaning of window in stateful DoFn is that it "scopes" state.
This discussion might have got a little off the original question,
so I'll try to rephrase it:
Should stateful DoFn drop *all* late data, not just data that
arrive after window boundary + allowed lateness? Some arguments
why I think it should:
* in windowed operations (GBK), it is correct to drop data on
window boundaries only, because time (as seen by user) effectively
hops only on these discrete time points
* in stateful dofn on the other hand time move "smoothly" (yes,
with some granularity, millisecond, nanosecond, whatever and with
watermark updates only, but still)
* this could be viewed that dropping late data immediately as
time (again, from user perspective) moves (not on some more or
less artificial boundary having only little semantic meaning) is
consistent with both the above properties
The negative side effect of this would be, that more data could be
dropped, but ... isn't this what defines allowed lateness? I don't
want to discuss the implications on user pipelines of such a
change (and if we can or cannot do it), just trying to build some
theoretical understanding of the problem as a whole. The decision
if any change could / should be made can be done afterwards.
Thanks,
Jan
On 1/4/20 10:35 PM, Reuven Lax wrote:
On Sat, Jan 4, 2020 at 12:13 PM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
> Yes, but invariants should hold. If I add a ParDo that
drops late elements (or, more commonly,diverts the late
elements to a different PCollection), then the result of
that ParDo should _never_ introduce and more late data. This
cannot be guaranteed simply with watermark checks. The ParDo
may decide that the element was not late, but by the time it
outputs the element the watermark may have advanced, causing
the element to actually be late.
This is actually very interesting. The question is - if I
decide about lateness based on output watermark of a
PTransform, is it still the case, that in downstream
operator(s) the element could be changed from "not late" to
"late"? Provided the output watermark is updated
synchronously based on input data (which should be) and
watermark update cannot "overtake" elements, I think that the
downstream decision should not be changed, so the invariant
should hold. Or am I missing something?
Generally the watermark update can overtake elements, because
runners can explicitly ignore late data in the watermark
calculation (for good reason - those elements are already late,
so no need to hold up the watermark advancing any more).
For GBK on-time data simply means the first pane marked as on
time. For state+timers I don't think it makes sense for Beam to
define on-time v.s. late, rather I think the user can come up
with their own definition depending on their use case. For
example, if you are buffering data into BagState and setting a
timer to process it, it would be logical to say that any element
that was buffered before the timer expired is on time, and any
data that showed up after the timer fired is late. This would
roughly correspond to what GBK does, and the answer would be very
similar to simply comparing against the watermark (as the timers
fire when the watermark advances).
Reuven
On 1/4/20 8:11 PM, Reuven Lax wrote:
On Sat, Jan 4, 2020 at 11:03 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
On 1/4/20 6:14 PM, Reuven Lax wrote:
There is a very good reason not to define lateness
directly in terms of the watermark. The model does not
make any guarantees that the watermark advances
synchronously, and in fact for the Dataflow runner the
watermark advances asynchronously (i.e. independent of
element processing). This means that simply comparing
an element timestamp against the watermark creates a
race condition. There are cases where the answer could
change depending on exactly when you examine the
watermark, and if you examine again while processing
the same bundle you might come to a different
conclusion about lateness.
Due to monotonicity of watermark, I don't think that the
asynchronous updates of watermark can change the answer
from "late" to "not late". That seems fine to me.
It's the other way around. You check to see whether an
element is late and the answer is "not late." An instant
later the answer changes to "late" This does cause many
problems, and is why this was changed.
This non determinism is undesirable when considering
lateness, as it can break many invariants that users
may rely on (e.g. if I could write a ParDo that
filtered all late data, yet still find late data
showing up downstream of the ParDo which would be very
surprising). For that reason, the SDK always marks
things as late based on deterministic signals. e.g. for
a triggered GBK everything in the first post-watermark
pane is marked as on time (no matter what the watermark
is) and everything in subsequent panes is marked as late.
Dropping latecomers will always be non-deterministic,
that is certain. This is true even in case where
watermark is updated synchronously with element
processing, due to shuffling and varying (random)
differences of processing and event time in upstream
operator(s). The question was only if a latecomer should
be dropped only at a window boundaries only (which is a
sort of artificial time boundary), or right away when
spotted (in stateful dofns only). Another question would
be if latecomers should be dropped based on input or
output watermark, dropping based on output watermark
seems even to be stable in the sense, that all
downstream operators should come to the same conclusion
(this is a bit of a speculation).
Yes, but invariants should hold. If I add a ParDo that drops
late elements (or, more commonly,diverts the late elements
to a different PCollection), then the result of that ParDo
should _never_ introduce and more late data. This cannot be
guaranteed simply with watermark checks. The ParDo may
decide that the element was not late, but by the time it
outputs the element the watermark may have advanced, causing
the element to actually be late.
In practice this is important. And early version of Dataflow
(pre Beam) implemented lateness by comparing against the
watermark, and it caused no end of trouble for users.
FYI - this is also the reason why Beam does not
currently provide users direct access to the watermark.
The asynchronous nature of it can be very confusing,
and often results in users writing bugs in their
pipelines. We decided instead to expose
easier-to-reason-about signals such as timers
(triggered by the watermark), windows, and lateness.
Reuven
On Sat, Jan 4, 2020 at 1:15 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
I realized the problem. I misinterpreted the
LateDataDroppingDoFnRunner. It doesn't drop *all*
late (arriving after watermark - allowed lateness)
data, but only data, that arrive after maxTimestamp
+ allowedLateness of their respective windows.
Stateful DoFn can run on global window (which was
the case of my tests) and there is no dropping then.
Two questions arise then:
a) does it mean that this is one more argument to
move this logic to StatefulDoFnRunner?
StatefulDoFnRunner performs state cleanup on window
GC time, so without LateDataDroppingDoFnRunner and
late data will see empty state and will produce
wrong results.
b) is this behavior generally intentional and
correct? Windows and triggers are (in my point of
view) features of GBK, not stateful DoFn. Stateful
DoFn is a low level primitive, which can be viewed
to operate on "instant" windows, which should then
probably be defined as dropping every single
element arrive after allowed lateness. This might
probably relate to question if operations should be
built bottom up from most primitive and generic
ones to more specific ones - that is GBK be
implemented on top of stateful DoFn and not vice versa.
Thoughts?
Jan
On 1/4/20 1:03 AM, Steve Niemitz wrote:
I do agree that the direct runner doesn't drop
late data arriving at a stateful DoFn (I just
tested as well).
However, I believe this is consistent with other
runners. I'm fairly certain (at least last time I
checked) that at least Dataflow will also only
drop late data at GBK operations, and NOT stateful
DoFns. Whether or not this is intentional is
debatable however, without being able to inspect
the watermark inside the stateful DoFn, it'd be
very difficult to do anything useful with late data.
On Fri, Jan 3, 2020 at 5:47 PM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
I did write a test that tested if data is
dropped in a plain stateful DoFn. I did this
as part of validating that PR [1] didn't drop
more data when using @RequiresTimeSortedInput
than it would without this annotation. This
test failed and I didn't commit it, yet.
The test was basically as follows:
- use TestStream to generate three elements
with timestamps 2, 1 and 0
- between elements with timestamp 1 and 0
move watermark to 1
- use allowed lateness of zero
- use stateful dofn that just emits arbitrary
data for each input element
- use Count.globally to count outputs
The outcome was that stateful dofn using
@RequiresTimeSortedInput output 2 elements,
without the annotation it was 3 elements. I
think the correct one would be 2 elements in
this case. The difference is caused by the
annotation having (currently) its own logic
for dropping data, which could be removed if
we agree, that the data should be dropped in
all cases.
On 1/3/20 11:23 PM, Kenneth Knowles wrote:
Did you write such
a @Category(ValidatesRunner.class) test? I
believe the Java direct runner does drop
late data, for both GBK and stateful ParDo.
Stateful ParDo is implemented on top of GBK:
https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172
And GroupByKey, via DirectGroupByKey, via
DirectGroupAlsoByWindow, does drop late data:
https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220
I'm not sure why it has its own code, since
ReduceFnRunner also drops late data, and it
does use ReduceFnRunner (the same code path
all Java-based runners use).
Kenn
On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Yes, the non-reliability of late data
dropping in distributed runner is
understood. But this is even where
DirectRunner can play its role, because
only there it is actually possible to
emulate and test specific watermark
conditions. Question regarding this for
the java DirectRunner - should we
completely drop
LataDataDroppingDoFnRunner and delegate
the late data dropping to
StatefulDoFnRunner? Seems logical to me,
as if we agree that late data should
always be dropped, then there would no
"valid" use of StatefulDoFnRunner without
the late data dropping functionality.
On 1/3/20 9:32 PM, Robert Bradshaw wrote:
I agree, in fact we just recently
enabled late data dropping to the direct
runner in Python to be able to develop
better tests for Dataflow.
It should be noted, however, that in a
distributed runner (absent the
quiessence of TestStream) that one can't
*count* on late data being dropped at a
certain point, and in fact (due to
delays in fully propagating the
watermark) late data can even become
on-time, so the promises about what
happens behind the watermark are
necessarily a bit loose.
On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik
<[email protected]
<mailto:[email protected]>> wrote:
I agree that the DirectRunner should
drop late data. Late data dropping
is optional but the DirectRunner is
used by many for testing and we
should have the same behaviour they
would get on other runners or users
may be surprised.
On Fri, Jan 3, 2020 at 3:33 AM Jan
Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi,
I just found out that
DirectRunner is apparently not
using
LateDataDroppingDoFnRunner,
which means that it doesn't drop
late data
in cases where there is no GBK
operation involved (dropping in
GBK seems
to be correct). There is
apparently no
@Category(ValidatesRunner) test
for that behavior (because
DirectRunner would fail it), so
the question
is - should late data dropping
be considered part of model (of
which
DirectRunner should be a
canonical implementation) and
therefore that
should be fixed there, or is the
late data dropping an optional
feature
of a runner?
I'm strongly in favor of the
first option, and I think it is
likely that
all real-world runners would
probably adhere to that (I
didn't check
that, though).
Opinions?
Jan