Please excuse my typos and apply "s/differ/defer/g" ;-).
Amit.

On Fri, Oct 21, 2016 at 2:59 PM Amit Sela <[email protected]> wrote:

> I'd like to raise an issue that was discussed in BEAM-696
> <https://issues.apache.org/jira/browse/BEAM-696>.
> I won't recap here because it would be extensive (and probably
> exhaustive), and I'd also like to restart the discussion here rather then
> summarize it.
>
> *The problem*
> In the case of (main) input in a merging window (e.g. Sessions) with
> sideInputs, pre-combining might lead to non-deterministic behaviour, for
> example:
> Main input: e1 (time: 3), e2 (time: 5)
> Session: gap duration of 3 -> e1 alone belongs to [3, 6), e2 alone [5, 8),
> combined together the merging of their windows yields [3, 8).
> Matching SideInputs with FixedWindows of size 2 should yield - e1 matching
> sideInput window [4, 6), e2 [6, 8), merged [6, 8).
> Now, if the sideInput is used in a merging step of the combine, and both
> elements are a part of the same bundle, the sideInput accessed will
> correspond to [6, 8) which is the expected behaviour, but if e1 is
> pre-combined in a separate bundle, it will access sideInput for [4, 6)
> which is wrong.
> ** this can tends to be a bit confusing, so any clarifications/corrections
> are most welcomed.*
>
> *Solutions*
> The optimal solution would be to differ until trigger in case of merging
> windows with sideInputs that are not "agnostic" to such behaviour, but this
> is clearly not feasible since the nature and use of sideInputs in
> CombineFns are opaque.
> Second best would be to differ until trigger *only* if sideInputs are
> used for merging windows - pretty sure this is how Flink and Dataflow (soon
> Spark) runners do that.
>
> *Tradeoffs*
> This seems like a very user-friendly way to apply authored pipelines
> correctly, but this also means that users who called for a Combine
> transformation will get a Grouping transformation instead (sort of the
> opposite of combiner lifting ? a combiner unwrapping ?).
> For the SDK, Combine is simply a composite transform, but keep in mind
> that this affects runner optimization.
> The price to pay here is (1) shuffle all elements into a single bundle
> (the cost varies according to a runner's typical bundle size) (2) state can
> grow as processing is differed and not compacted until triggered.
>
> IMHO, the execution should remain faithful to what the pipeline states,
> and if this results in errors, well... it happens.
> There are many legitimate use cases where an actual GroupByKey should be
> used (regardless of sideInputs), such as sequencing of events in a window,
> and I don't see the difference here.
>
> As stated above, I'm (almost) not recapping anyones notes as they are
> persisted in BEAM-696, so if you had something to say please provide you
> input here.
> I will note that Ben Chambers and Pei He mentioned that even with
> differing, this could still run into some non-determinism if there are
> triggers controlling when we extract output because non-merging windows'
> trigger firing is non-deterministic.
>
> Thanks,
> Amit
>
>

Reply via email to