That is a really good way to describe my mental model as well.
On Tue, Jan 7, 2020 at 12:20 PM Kenneth Knowles <[email protected]
<mailto:[email protected]>> wrote:
On Tue, Jan 7, 2020 at 1:39 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi Kenn,
I see that my terminology seems not to be 100% aligned with
Beam's. I'll work on that. :-)
I agree with what you say, and by "late" I mostly meant
"droppable" (arriving too late after watermark).
I'm definitely not proposing to get back to something like
"out of order" == "late" or anything like that. I'm also aware
that stateful operation is windowed operation, but the
semantics of the windowing is different than of a GBK. The
difference is how time moves in GBK and how moves in stateful
DoFn. Throwing away some details (early triggers, late data
triggers), the main difference is that in GBK case, time hops
just between window boundaries, while in stateful DoFn time
moves "smoothly" (with each watermark update). Now, this
difference brings the question about why the definition of
"droppable" data is the same for both types of operations,
when there is a difference in how users "perceive" time. As
the more generic operation, stateful DoFn might deserve a more
general definition of droppable data, which should degrade
naturally to the one of GBK in presence of "discrete time hops".
I understand what you mean. On the other hand, I encourage
thinking of event time spatially, not as time passing. That is a
big part of unifying batch/streaming real-time/archival
processing. The event time window is a secondary key to partition
the data (merging windows are slightly more complex). All event
time windows exist simultaneously. So for both stateful ParDo and
GBK, I find it helpful to consider this perspective where all
windows are processed simultaneously / in an arbitrary order not
assuming windows are ordered at all. Then you see that GBK and
stateful ParDo do not really treat windows / watermark
differently: both of them process a stream of data for each (key,
window) pair until the watermark informs them that the stream is
expired, then they GC the state associated with that (key, window)
pair.
Kenn
This might have some consequences on how the droppable data
should be handled in presence of (early) triggers, because
triggerring is actually what makes time to "hop", so we might
arrive to a conclusion that we might actually drop any data
that has timestamp less than "last trigger time + allowed
lateness". This looks appealing to me, because IMO it has
strong internal logical consistency. Although it is possible
that it would drop more data, which is generally undesirable,
I admit that.
I'm looking for explanation why the current approach was
chosen instead of the other.
Jan
On 1/7/20 12:52 AM, Kenneth Knowles wrote:
This thread has a lot in it, so I am just top-posting.
- Stateful DoFn is a windowed operation; state is
per-window. When the window expires, any further inputs are
dropped.
- "Late" is not synonymous with out-of-order. It doesn't
really have an independent meaning.
- For a GBK/Combine "late" means "not included prior to
the on-time output", and "droppable" means "arriving after
window expiry".
- For Stateful DoFn there is no real meaning to "late"
except if one is talking about "droppable", which still means
"arriving after window expiry". A user may have a special
timer where they flip a flag and treat elements after the
timer differently.
I think the definition of when data is droppable is very
simple. We explicitly moved to this definition, away from the
"out of order == late", because it is more robust and simpler
to think about. Users saw lots of confusing behavior when we
had "out of order by allowed lateness == droppable" logic.
Kenn
On Mon, Jan 6, 2020 at 1:42 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
> Generally the watermark update can overtake elements,
because runners can explicitly ignore late data in the
watermark calculation (for good reason - those elements
are already late, so no need to hold up the watermark
advancing any more).
This seems not to affect the decision of _not late_ vs.
_late_, is it? If element is late and gets ignored from
watermark calculation (whatever that includes in this
context), than the watermark cannot move past elements
that were not marked as _not late_ and thus nothing can
make them _late_.
> For GBK on-time data simply means the first pane marked
as on time. For state+timers I don't think it makes sense
for Beam to define on-time v.s. late, rather I think the
user can come up with their own definition depending on
their use case. For example, if you are buffering data
into BagState and setting a timer to process it, it would
be logical to say that any element that was buffered
before the timer expired is on time, and any data that
showed up after the timer fired is late. This would
roughly correspond to what GBK does, and the answer would
be very similar to simply comparing against the watermark
(as the timers fire when the watermark advances).
Yes, I'd say that stateful DoFns don't have (well
defined) concept of pane, because that is related to
concept of trigger and this is a concept of GBK (or
windowed operations in general). The only semantic
meaning of window in stateful DoFn is that it "scopes" state.
This discussion might have got a little off the original
question, so I'll try to rephrase it:
Should stateful DoFn drop *all* late data, not just data
that arrive after window boundary + allowed lateness?
Some arguments why I think it should:
* in windowed operations (GBK), it is correct to drop
data on window boundaries only, because time (as seen by
user) effectively hops only on these discrete time points
* in stateful dofn on the other hand time move
"smoothly" (yes, with some granularity, millisecond,
nanosecond, whatever and with watermark updates only, but
still)
* this could be viewed that dropping late data
immediately as time (again, from user perspective) moves
(not on some more or less artificial boundary having only
little semantic meaning) is consistent with both the
above properties
The negative side effect of this would be, that more data
could be dropped, but ... isn't this what defines allowed
lateness? I don't want to discuss the implications on
user pipelines of such a change (and if we can or cannot
do it), just trying to build some theoretical
understanding of the problem as a whole. The decision if
any change could / should be made can be done afterwards.
Thanks,
Jan
On 1/4/20 10:35 PM, Reuven Lax wrote:
On Sat, Jan 4, 2020 at 12:13 PM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
> Yes, but invariants should hold. If I add a ParDo
that drops late elements (or, more commonly,diverts
the late elements to a different PCollection), then
the result of that ParDo should _never_ introduce
and more late data. This cannot be guaranteed simply
with watermark checks. The ParDo may decide that the
element was not late, but by the time it outputs the
element the watermark may have advanced, causing the
element to actually be late.
This is actually very interesting. The question is -
if I decide about lateness based on output watermark
of a PTransform, is it still the case, that in
downstream operator(s) the element could be changed
from "not late" to "late"? Provided the output
watermark is updated synchronously based on input
data (which should be) and watermark update cannot
"overtake" elements, I think that the downstream
decision should not be changed, so the invariant
should hold. Or am I missing something?
Generally the watermark update can overtake elements,
because runners can explicitly ignore late data in the
watermark calculation (for good reason - those elements
are already late, so no need to hold up the watermark
advancing any more).
For GBK on-time data simply means the first pane marked
as on time. For state+timers I don't think it makes
sense for Beam to define on-time v.s. late, rather I
think the user can come up with their own definition
depending on their use case. For example, if you are
buffering data into BagState and setting a timer to
process it, it would be logical to say that any element
that was buffered before the timer expired is on time,
and any data that showed up after the timer fired is
late. This would roughly correspond to what GBK does,
and the answer would be very similar to simply comparing
against the watermark (as the timers fire when the
watermark advances).
Reuven
On 1/4/20 8:11 PM, Reuven Lax wrote:
On Sat, Jan 4, 2020 at 11:03 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
On 1/4/20 6:14 PM, Reuven Lax wrote:
There is a very good reason not to define
lateness directly in terms of the watermark.
The model does not make any guarantees that
the watermark advances synchronously, and in
fact for the Dataflow runner the watermark
advances asynchronously (i.e. independent of
element processing). This means that simply
comparing an element timestamp against the
watermark creates a race condition. There are
cases where the answer could change depending
on exactly when you examine the watermark, and
if you examine again while processing the same
bundle you might come to a different
conclusion about lateness.
Due to monotonicity of watermark, I don't think
that the asynchronous updates of watermark can
change the answer from "late" to "not late".
That seems fine to me.
It's the other way around. You check to see whether
an element is late and the answer is "not late." An
instant later the answer changes to "late" This
does cause many problems, and is why this was changed.
This non determinism is undesirable when
considering lateness, as it can break many
invariants that users may rely on (e.g. if I
could write a ParDo that filtered all late
data, yet still find late data showing up
downstream of the ParDo which would be very
surprising). For that reason, the SDK always
marks things as late based on deterministic
signals. e.g. for a triggered GBK everything
in the first post-watermark pane is marked as
on time (no matter what the watermark is) and
everything in subsequent panes is marked as late.
Dropping latecomers will always be
non-deterministic, that is certain. This is
true even in case where watermark is updated
synchronously with element processing, due to
shuffling and varying (random) differences of
processing and event time in upstream
operator(s). The question was only if a
latecomer should be dropped only at a window
boundaries only (which is a sort of artificial
time boundary), or right away when spotted (in
stateful dofns only). Another question would be
if latecomers should be dropped based on input
or output watermark, dropping based on output
watermark seems even to be stable in the sense,
that all downstream operators should come to
the same conclusion (this is a bit of a
speculation).
Yes, but invariants should hold. If I add a ParDo
that drops late elements (or, more commonly,diverts
the late elements to a different PCollection),
then the result of that ParDo should _never_
introduce and more late data. This cannot be
guaranteed simply with watermark checks. The ParDo
may decide that the element was not late, but by
the time it outputs the element the watermark may
have advanced, causing the element to actually be late.
In practice this is important. And early version of
Dataflow (pre Beam) implemented lateness by
comparing against the watermark, and it caused no
end of trouble for users.
FYI - this is also the reason why Beam does
not currently provide users direct access to
the watermark. The asynchronous nature of it
can be very confusing, and often results in
users writing bugs in their pipelines. We
decided instead to expose
easier-to-reason-about signals such as timers
(triggered by the watermark), windows, and
lateness.
Reuven
On Sat, Jan 4, 2020 at 1:15 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
I realized the problem. I misinterpreted
the LateDataDroppingDoFnRunner. It doesn't
drop *all* late (arriving after watermark
- allowed lateness) data, but only data,
that arrive after maxTimestamp +
allowedLateness of their respective windows.
Stateful DoFn can run on global window
(which was the case of my tests) and there
is no dropping then.
Two questions arise then:
a) does it mean that this is one more
argument to move this logic to
StatefulDoFnRunner? StatefulDoFnRunner
performs state cleanup on window GC time,
so without LateDataDroppingDoFnRunner and
late data will see empty state and will
produce wrong results.
b) is this behavior generally intentional
and correct? Windows and triggers are (in
my point of view) features of GBK, not
stateful DoFn. Stateful DoFn is a low
level primitive, which can be viewed to
operate on "instant" windows, which should
then probably be defined as dropping every
single element arrive after allowed
lateness. This might probably relate to
question if operations should be built
bottom up from most primitive and generic
ones to more specific ones - that is GBK
be implemented on top of stateful DoFn and
not vice versa.
Thoughts?
Jan
On 1/4/20 1:03 AM, Steve Niemitz wrote:
I do agree that the direct runner doesn't
drop late data arriving at a stateful
DoFn (I just tested as well).
However, I believe this is consistent
with other runners. I'm fairly certain
(at least last time I checked) that at
least Dataflow will also only drop late
data at GBK operations, and NOT stateful
DoFns. Whether or not this is intentional
is debatable however, without being able
to inspect the watermark inside the
stateful DoFn, it'd be very difficult to
do anything useful with late data.
On Fri, Jan 3, 2020 at 5:47 PM Jan
Lukavský <[email protected]
<mailto:[email protected]>> wrote:
I did write a test that tested if
data is dropped in a plain stateful
DoFn. I did this as part of
validating that PR [1] didn't drop
more data when using
@RequiresTimeSortedInput than it
would without this annotation. This
test failed and I didn't commit it, yet.
The test was basically as follows:
- use TestStream to generate three
elements with timestamps 2, 1 and 0
- between elements with timestamp 1
and 0 move watermark to 1
- use allowed lateness of zero
- use stateful dofn that just emits
arbitrary data for each input element
- use Count.globally to count outputs
The outcome was that stateful dofn
using @RequiresTimeSortedInput output
2 elements, without the annotation it
was 3 elements. I think the correct
one would be 2 elements in this case.
The difference is caused by the
annotation having (currently) its own
logic for dropping data, which could
be removed if we agree, that the data
should be dropped in all cases.
On 1/3/20 11:23 PM, Kenneth Knowles
wrote:
Did you write such
a @Category(ValidatesRunner.class)
test? I believe the Java direct
runner does drop late data, for both
GBK and stateful ParDo.
Stateful ParDo is implemented on top
of GBK:
https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172
And GroupByKey, via
DirectGroupByKey, via
DirectGroupAlsoByWindow, does drop
late data:
https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220
I'm not sure why it has its own
code, since ReduceFnRunner also
drops late data, and it does use
ReduceFnRunner (the same code path
all Java-based runners use).
Kenn
On Fri, Jan 3, 2020 at 1:02 PM Jan
Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Yes, the non-reliability of late
data dropping in distributed
runner is understood. But this
is even where DirectRunner can
play its role, because only
there it is actually possible to
emulate and test specific
watermark conditions. Question
regarding this for the java
DirectRunner - should we
completely drop
LataDataDroppingDoFnRunner and
delegate the late data dropping
to StatefulDoFnRunner? Seems
logical to me, as if we agree
that late data should always be
dropped, then there would no
"valid" use of
StatefulDoFnRunner without the
late data dropping functionality.
On 1/3/20 9:32 PM, Robert
Bradshaw wrote:
I agree, in fact we just
recently enabled late data
dropping to the direct runner
in Python to be able to develop
better tests for Dataflow.
It should be noted, however,
that in a distributed runner
(absent the quiessence of
TestStream) that one can't
*count* on late data being
dropped at a certain point, and
in fact (due to delays in fully
propagating the watermark) late
data can even become on-time,
so the promises about what
happens behind the
watermark are necessarily a bit
loose.
On Fri, Jan 3, 2020 at 9:15 AM
Luke Cwik <[email protected]
<mailto:[email protected]>> wrote:
I agree that the
DirectRunner should drop
late data. Late data
dropping is optional but
the DirectRunner is used by
many for testing and we
should have the same
behaviour they would get on
other runners or users may
be surprised.
On Fri, Jan 3, 2020 at 3:33
AM Jan Lukavský
<[email protected]
<mailto:[email protected]>>
wrote:
Hi,
I just found out that
DirectRunner is
apparently not using
LateDataDroppingDoFnRunner,
which means that it
doesn't drop late data
in cases where there is
no GBK operation
involved (dropping in
GBK seems
to be correct). There
is apparently no
@Category(ValidatesRunner)
test
for that behavior
(because DirectRunner
would fail it), so the
question
is - should late data
dropping be considered
part of model (of which
DirectRunner should be
a canonical
implementation) and
therefore that
should be fixed there,
or is the late data
dropping an optional
feature
of a runner?
I'm strongly in favor
of the first option,
and I think it is
likely that
all real-world runners
would probably adhere
to that (I didn't check
that, though).
Opinions?
Jan