As I was about to add the 5th copy-paste of the same "fusion break" pattern
in FileIO.match(), I sent a PR to just abstract away the copy-paste,
deliberately not promising anything semantically - but it's related to the
current thread: https://github.com/apache/beam/pull/3890

On Wed, Oct 12, 2016 at 10:39 PM Jean-Baptiste Onofré <[email protected]>
wrote:

> Hi Eugene,
>
> thanks for the update on the mailing list, much appreciated.
>
> Let me take a deeper look on that.
>
> Regards
> JB
>
> On 10/13/2016 02:03 AM, Eugene Kirpichov wrote:
> > So, based on some offline discussion, the problem is more complex.
> There's
> > several classes of ultimate user needs which are potentially orthogonal,
> > even though the current Reshuffle transform, as implemented by the
> Dataflow
> > runner, happens to satisfy all of them at the same time:
> >
> > 1. Checkpointing a collection.
> >
> > Suppose you have something like:
> >
> > PCollection<Data> data = something.apply(ParDo.of(new GenerateFn()));
> > data.apply(ParDo.of(new WriteFn()))
> >
> > Suppose GenerateFn is non-deterministic - it can generate different
> output
> > on the same input element.
> > Such Fn's are not forbidden by the Beam model per se, and are obviously
> > useful (e.g. it can be querying an external API, or it can be pairing
> each
> > output element with a random key, etc).
> >
> > Beam model guarantees that the PCollection "data" will logically consist
> of
> > elements produced by *some* sequential execution of GenerateFn on
> elements
> > of the PCollection "something" - i.e., even if GenerateFn is invoked on
> the
> > same element multiple times and produces different results, only one of
> > those results will make it into the PCollection. The act of invoking a
> > DoFn, and the DoFn's side effects, are not part of the Beam model - only
> > PCollection contents are.
> >
> > However, these factors can be important. E.g. imagine WriteFn writes data
> > to a third-party service. Suppose it even does so in an idempotent way
> > (e.g. suppose the data has a primary key, and WriteFn inserts or
> overwrites
> > the row in a database with each primary key) - then, each element on
> which
> > WriteFn is invoked will be written to the database exactly once.
> >
> > However, "each element on which WriteFn is invoked" is not the same as
> > "each element in the PCollection data" - because, again, the Beam model
> > does not make guarantees about what DoFn's are invoked on.
> > In particular, in case of failures, WriteFn might be applied arbitrarily
> > many times to arbitrarily many different results of GenerateFn. Say,
> > imagine a hypothetical runner that executes the whole pipeline, and on
> any
> > failure, it re-executes the whole pipeline - over the course of that,
> even
> > if "data" logically has just 1 element, this element may be produced
> > multiple times and WriteFn might be applied to multiple different
> versions
> > of this element, and in the example above, it may insert extra rows into
> > the database.
> >
> > Checkpointing can look like this:
> >
> >   PCollection<Data> dataCheckpoint = data.apply(Checkpoint.create());
> >
> > It limits the scope of non-determinism, and guarantees that an immediate
> > consumer of the collection "dataCheckpoint" will be invoked *only* on the
> > committed logical contents of the collection. It may still be invoked
> > multiple times, but on the same element, so it is sufficient to have
> > idempotency of side effects at the level of the consumer. Note: the
> > hypothetical "rerun full pipeline on failure" runner will not be able to
> > implement this transform correctly.
> >
> > Reshuffle is used in this capacity in BigQueryIO:
> >
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2718
> >
> > 2. Preventing a runner "producer-consumer fusion" optimization that would
> > have limited parallelism.
> >
> > Suppose "pc" is a collection with 3 elements, and "GenerateFn" generates
> > 1000 elements for each of them.
> >
> > Then, consider running pc.apply(ParDo.of(new
> > GenerateFn()).apply(ParDo.of(new ProcessFn())).
> >
> > Obviously, GenerateFn can at best be applied in parallel to the 3
> elements
> > - no more parallelism can be achieved at this level.
> > However, ideally ProcessFn would be applied to the 3000 elements all in
> > parallel - but some runners implement "fusion" where they collapse a
> > sequence of ParDo's into a single ParDo whose DoFn is the composition of
> > the component DoFn's. In that case, the pipeline will apply the
> composition
> > of GenerateFn and ProcessFn in parallel to 3 elements, achieving a
> > parallelism of only 3.
> >
> > This behavior, too, is not part of the Beam model. But we need a
> transform
> > that can disable this optimization - e.g. prevent fusion across a
> > particular PCollection, and guarantee that it is processed with as much
> > parallelism as if it had been read from a perfectly parallelizable
> location
> > (say, an Avro file): we could call it "Redistribute".
> >
> > pc.apply(ParDo.of(new
> > GenerateFn())).apply(Redistribute.create()).apply(ParDo.of(new
> ProcessFn()))
> >
> > 3. Preventing a runner "sibling fusion" optimization
> >
> > Suppose FooFn and BarFn are DoFn<Integer, String>.
> > Suppose the pipeline looks like:
> >
> > PCollection<String> foos = ints.apply(ParDo.of(new FooFn()))
> > PCollection<String> bars = ints.apply(ParDo.of(new BarFn()))
> > PCollectionList.of(foos).and(bars).apply(PubsubIO.Write.to(topic));
> >
> > In this case, a runner might perform an optimization, fusing FooFn and
> > BarFn into an Fn that takes an element "x" and produces the concatenation
> > FooFn(x) + BarFn(x). In some cases, this can be undesirable - e.g.
> suppose
> > that BarFn is much slower to compute than FooFn, but the results of FooFn
> > need to be sent to pubsub as quickly as possible. In that case we don't
> > want to wait for both FooFn and BarFn on a particular element to complete
> > before sending the result of FooFn to pubsub.
> >
> > It seems like a Redistribute transform should be the answer to this as
> well:
> >
> > PCollection<String> foos =
> > ints.apply(Redistribute.create()).apply(ParDo.of(new FooFn()))
> > PCollection<String> bars =
> > ints.apply(Redistribute.create()).apply(ParDo.of(new BarFn()))
> > PCollectionList.of(foos).and(bars).apply(PubsubIO.Write.to(topic));
> >
> > Here, it would cause FooFn and BarFn to be applied to logically
> independent
> > collections, whose contents are equivalent to "ints".
> >
> > ***
> >
> > The current Reshuffle transform happens to do all 3 at the same time, and
> > does it in a potentially non-portable way: in particular it relies on the
> > fact that GroupByKey provides all 3 semantics, but the Beam model does
> not
> > require this (and in fact, in Spark it probably wouldn't provide #1). It
> is
> > also potentially non-optimal: there can exist better implementations of
> > each of these 3 semantics not involving a global group-by-key.
> >
> > It's not clear yet what to do about all this. Thoughts welcome.
> >
> > On Tue, Oct 11, 2016 at 11:35 AM Kenneth Knowles <[email protected]
> >
> > wrote:
> >
> >> On Tue, Oct 11, 2016 at 10:56 AM Eugene Kirpichov
> >> <[email protected]> wrote:
> >>
> >>> Yeah, I'm starting to lean towards removing Redistribute.byKey() from
> the
> >>> public API - because it only makes sense for getting access to per-key
> >>> state, and 1) we don't have it yet and 2) runner should insert it
> >>> automatically - so there's no use case for it.
> >>
> >>
> >> +1 to removing Redistribute.byKey() from the public API.
> >>
> >>
> >>> The "checkpointing keys" use
> >>> case should be done via Redistribute.arbitrarily(), I believe.
> >>>
> >>
> >> Actually I think it really does have to be a GroupByKey followed by
> writing
> >> the groups without breaking them up:
> >>
> >>  - With GBK each element must appear in exactly one output group, so you
> >> have to checkpoint or be able to retract groupings (nice easy
> explanation
> >> from Thomas Groh; any faults in my paraphrase are my own).
> >>
> >>  - But GBK followed by "output the elements one by one" actually removes
> >> this property. Now you can replace the whole thing with a no-op and fuse
> >> with downstream and still get exactly once processing according to the
> >> model but not as observed via side effects to external systems. So sinks
> >> should really be doing that, and I'll retract this use case for
> >> Redistribute.
> >>
> >> As for Redistribute.arbitrarily():
> >>> In a batch runner, we could describe it as "identity transform, but
> >> runner
> >>> is required to process the resulting PCollection with downstream
> >> transforms
> >>> as well as if it had been created from elements via Create.of(), in
> terms
> >>> of ability to parallelize processing and minimize amount of re-executed
> >>> work in case of failures" (which is a fancy way of saying "materialize"
> >>> without really saying "materialize" :) ).
> >>>
> >>
> >> How can you observe if the runner ignored you?
> >>
> >
>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to