@Amit: Yes, Flink is more "what you write is what you get". For example, in
Flink we have a Fold function for windows which cannot be efficiently
computed with merging windows (it would require using a "group by" window
and then folding the iterable). We just don't allow this.

For Beam, I think it's ok if we clearly define Combine in terms of
GroupByKey | CombineValues (which we do). With different runners it's hard
to enforce common optimisation strategies.

On Sun, 23 Oct 2016 at 06:02 Robert Bradshaw <rober...@google.com.invalid>
wrote:

> On Sat, Oct 22, 2016 at 2:38 AM, Amit Sela <amitsel...@gmail.com> wrote:
> > I understand the semantics, but I feel like there might be a different
> > point of view for open-source runners.
>
> It seems we're losing a major promise of the runner interchangeability
> story if different runners can give different results for a
> well-defined transformation. I strongly feel we should avoid that path
> whenever possible. Specifically in this case Combine.perKey should
> mean the same thing on all runners (namely its composite definition),
> and only be executed differently when it's safe to do so.
>
> > Dataflow is a service, and it tries to do it's best to optimize execution
> > while users don't have to worry about internal implementation (they are
> not
> > aware of it).
> > I can assure
> > <
> https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html
> >
> > you that for Spark users, applying groupByKey instead of combinePerKey is
> > an important note.
>
> For sure. Dataflow calls this out too. See the second star at
> https://cloud.google.com/dataflow/model/combine#using-combine-perkey
> (though it's not called out as prominently as it is for Spark
> users--likely should be more). Beam documentation should make this
> point as well.
>
> > @Aljoscha do Flink users (working on Flink native API) usually care about
> > this difference of implementation ?
> > Any other runners that can provide input ?
>
> IIRC, Flink and Dataflow (and, trivially, the direct runner) all avoid
> this unsafe optimization when merging windows are mixed with
> non-global side inputs.
>
> Note also that the user of the Combine.perKey transform may not know
> the choice of windowing of the main or side inputs, so can't make this
> determination of whether it's safe to use this optimization. (As a
> concrete example, suppose I created a TopNPercent transform that did a
> global count and passed that as a side input to the Top CombineFn.)
>
> > On Sat, Oct 22, 2016 at 2:25 AM Robert Bradshaw
> <rober...@google.com.invalid>
> > wrote:
> >
> > Combine.perKey() is defined as GroupByKey() | Combine.values().
> >
> > A runner is free, in fact encouraged, to take advantage of the
> > associative properties of CombineFn to compute the result of
> > GroupByKey() | Combine.values() as cheaply as possible, but it is
> > incorrect to produce something that could not have been produced by
> > this composite implementation. (In the case of deterministic trigger
> > firing, (e.g. the default trigger), plus assuming of course a
> > associative, deterministic CombineFn, there is exactly one correct
> > output for every input no matter the WindowFns).
> >
> > A corollary to this is that we cannot apply combining operations that
> > inspect the main input window (including side inputs where the mapping
> > is anything but the constant map (like to GlobalWindow)) until the
> > main input window is known.
> >
> >
> > On Fri, Oct 21, 2016 at 3:50 PM, Amit Sela <amitsel...@gmail.com> wrote:
> >> Please excuse my typos and apply "s/differ/defer/g" ;-).
> >> Amit.
> >>
> >> On Fri, Oct 21, 2016 at 2:59 PM Amit Sela <amitsel...@gmail.com> wrote:
> >>
> >>> I'd like to raise an issue that was discussed in BEAM-696
> >>> <https://issues.apache.org/jira/browse/BEAM-696>.
> >>> I won't recap here because it would be extensive (and probably
> >>> exhaustive), and I'd also like to restart the discussion here rather
> then
> >>> summarize it.
> >>>
> >>> *The problem*
> >>> In the case of (main) input in a merging window (e.g. Sessions) with
> >>> sideInputs, pre-combining might lead to non-deterministic behaviour,
> for
> >>> example:
> >>> Main input: e1 (time: 3), e2 (time: 5)
> >>> Session: gap duration of 3 -> e1 alone belongs to [3, 6), e2 alone [5,
> > 8),
> >>> combined together the merging of their windows yields [3, 8).
> >>> Matching SideInputs with FixedWindows of size 2 should yield - e1
> > matching
> >>> sideInput window [4, 6), e2 [6, 8), merged [6, 8).
> >>> Now, if the sideInput is used in a merging step of the combine, and
> both
> >>> elements are a part of the same bundle, the sideInput accessed will
> >>> correspond to [6, 8) which is the expected behaviour, but if e1 is
> >>> pre-combined in a separate bundle, it will access sideInput for [4, 6)
> >>> which is wrong.
> >>> ** this can tends to be a bit confusing, so any
> > clarifications/corrections
> >>> are most welcomed.*
> >>>
> >>> *Solutions*
> >>> The optimal solution would be to differ until trigger in case of
> merging
> >>> windows with sideInputs that are not "agnostic" to such behaviour, but
> > this
> >>> is clearly not feasible since the nature and use of sideInputs in
> >>> CombineFns are opaque.
> >>> Second best would be to differ until trigger *only* if sideInputs are
> >>> used for merging windows - pretty sure this is how Flink and Dataflow
> > (soon
> >>> Spark) runners do that.
> >>>
> >>> *Tradeoffs*
> >>> This seems like a very user-friendly way to apply authored pipelines
> >>> correctly, but this also means that users who called for a Combine
> >>> transformation will get a Grouping transformation instead (sort of the
> >>> opposite of combiner lifting ? a combiner unwrapping ?).
> >>> For the SDK, Combine is simply a composite transform, but keep in mind
> >>> that this affects runner optimization.
> >>> The price to pay here is (1) shuffle all elements into a single bundle
> >>> (the cost varies according to a runner's typical bundle size) (2) state
> > can
> >>> grow as processing is differed and not compacted until triggered.
> >>>
> >>> IMHO, the execution should remain faithful to what the pipeline states,
> >>> and if this results in errors, well... it happens.
> >>> There are many legitimate use cases where an actual GroupByKey should
> be
> >>> used (regardless of sideInputs), such as sequencing of events in a
> > window,
> >>> and I don't see the difference here.
> >>>
> >>> As stated above, I'm (almost) not recapping anyones notes as they are
> >>> persisted in BEAM-696, so if you had something to say please provide
> you
> >>> input here.
> >>> I will note that Ben Chambers and Pei He mentioned that even with
> >>> differing, this could still run into some non-determinism if there are
> >>> triggers controlling when we extract output because non-merging
> windows'
> >>> trigger firing is non-deterministic.
> >>>
> >>> Thanks,
> >>> Amit
> >>>
> >>>
>

Reply via email to