Please excuse my typos and apply "s/differ/defer/g" ;-). Amit. On Fri, Oct 21, 2016 at 2:59 PM Amit Sela <[email protected]> wrote:
> I'd like to raise an issue that was discussed in BEAM-696 > <https://issues.apache.org/jira/browse/BEAM-696>. > I won't recap here because it would be extensive (and probably > exhaustive), and I'd also like to restart the discussion here rather then > summarize it. > > *The problem* > In the case of (main) input in a merging window (e.g. Sessions) with > sideInputs, pre-combining might lead to non-deterministic behaviour, for > example: > Main input: e1 (time: 3), e2 (time: 5) > Session: gap duration of 3 -> e1 alone belongs to [3, 6), e2 alone [5, 8), > combined together the merging of their windows yields [3, 8). > Matching SideInputs with FixedWindows of size 2 should yield - e1 matching > sideInput window [4, 6), e2 [6, 8), merged [6, 8). > Now, if the sideInput is used in a merging step of the combine, and both > elements are a part of the same bundle, the sideInput accessed will > correspond to [6, 8) which is the expected behaviour, but if e1 is > pre-combined in a separate bundle, it will access sideInput for [4, 6) > which is wrong. > ** this can tends to be a bit confusing, so any clarifications/corrections > are most welcomed.* > > *Solutions* > The optimal solution would be to differ until trigger in case of merging > windows with sideInputs that are not "agnostic" to such behaviour, but this > is clearly not feasible since the nature and use of sideInputs in > CombineFns are opaque. > Second best would be to differ until trigger *only* if sideInputs are > used for merging windows - pretty sure this is how Flink and Dataflow (soon > Spark) runners do that. > > *Tradeoffs* > This seems like a very user-friendly way to apply authored pipelines > correctly, but this also means that users who called for a Combine > transformation will get a Grouping transformation instead (sort of the > opposite of combiner lifting ? a combiner unwrapping ?). > For the SDK, Combine is simply a composite transform, but keep in mind > that this affects runner optimization. > The price to pay here is (1) shuffle all elements into a single bundle > (the cost varies according to a runner's typical bundle size) (2) state can > grow as processing is differed and not compacted until triggered. > > IMHO, the execution should remain faithful to what the pipeline states, > and if this results in errors, well... it happens. > There are many legitimate use cases where an actual GroupByKey should be > used (regardless of sideInputs), such as sequencing of events in a window, > and I don't see the difference here. > > As stated above, I'm (almost) not recapping anyones notes as they are > persisted in BEAM-696, so if you had something to say please provide you > input here. > I will note that Ben Chambers and Pei He mentioned that even with > differing, this could still run into some non-determinism if there are > triggers controlling when we extract output because non-merging windows' > trigger firing is non-deterministic. > > Thanks, > Amit > >
