I'd like to raise an issue that was discussed in BEAM-696
<https://issues.apache.org/jira/browse/BEAM-696>.
I won't recap here because it would be extensive (and probably exhaustive),
and I'd also like to restart the discussion here rather then summarize it.

*The problem*
In the case of (main) input in a merging window (e.g. Sessions) with
sideInputs, pre-combining might lead to non-deterministic behaviour, for
example:
Main input: e1 (time: 3), e2 (time: 5)
Session: gap duration of 3 -> e1 alone belongs to [3, 6), e2 alone [5, 8),
combined together the merging of their windows yields [3, 8).
Matching SideInputs with FixedWindows of size 2 should yield - e1 matching
sideInput window [4, 6), e2 [6, 8), merged [6, 8).
Now, if the sideInput is used in a merging step of the combine, and both
elements are a part of the same bundle, the sideInput accessed will
correspond to [6, 8) which is the expected behaviour, but if e1 is
pre-combined in a separate bundle, it will access sideInput for [4, 6)
which is wrong.
** this can tends to be a bit confusing, so any clarifications/corrections
are most welcomed.*

*Solutions*
The optimal solution would be to differ until trigger in case of merging
windows with sideInputs that are not "agnostic" to such behaviour, but this
is clearly not feasible since the nature and use of sideInputs in
CombineFns are opaque.
Second best would be to differ until trigger *only* if sideInputs are used
for merging windows - pretty sure this is how Flink and Dataflow (soon
Spark) runners do that.

*Tradeoffs*
This seems like a very user-friendly way to apply authored pipelines
correctly, but this also means that users who called for a Combine
transformation will get a Grouping transformation instead (sort of the
opposite of combiner lifting ? a combiner unwrapping ?).
For the SDK, Combine is simply a composite transform, but keep in mind that
this affects runner optimization.
The price to pay here is (1) shuffle all elements into a single bundle (the
cost varies according to a runner's typical bundle size) (2) state can grow
as processing is differed and not compacted until triggered.

IMHO, the execution should remain faithful to what the pipeline states, and
if this results in errors, well... it happens.
There are many legitimate use cases where an actual GroupByKey should be
used (regardless of sideInputs), such as sequencing of events in a window,
and I don't see the difference here.

As stated above, I'm (almost) not recapping anyones notes as they are
persisted in BEAM-696, so if you had something to say please provide you
input here.
I will note that Ben Chambers and Pei He mentioned that even with
differing, this could still run into some non-determinism if there are
triggers controlling when we extract output because non-merging windows'
trigger firing is non-deterministic.

Thanks,
Amit

Reply via email to