As I was about to add the 5th copy-paste of the same "fusion break" pattern in FileIO.match(), I sent a PR to just abstract away the copy-paste, deliberately not promising anything semantically - but it's related to the current thread: https://github.com/apache/beam/pull/3890
On Wed, Oct 12, 2016 at 10:39 PM Jean-Baptiste Onofré <[email protected]> wrote: > Hi Eugene, > > thanks for the update on the mailing list, much appreciated. > > Let me take a deeper look on that. > > Regards > JB > > On 10/13/2016 02:03 AM, Eugene Kirpichov wrote: > > So, based on some offline discussion, the problem is more complex. > There's > > several classes of ultimate user needs which are potentially orthogonal, > > even though the current Reshuffle transform, as implemented by the > Dataflow > > runner, happens to satisfy all of them at the same time: > > > > 1. Checkpointing a collection. > > > > Suppose you have something like: > > > > PCollection<Data> data = something.apply(ParDo.of(new GenerateFn())); > > data.apply(ParDo.of(new WriteFn())) > > > > Suppose GenerateFn is non-deterministic - it can generate different > output > > on the same input element. > > Such Fn's are not forbidden by the Beam model per se, and are obviously > > useful (e.g. it can be querying an external API, or it can be pairing > each > > output element with a random key, etc). > > > > Beam model guarantees that the PCollection "data" will logically consist > of > > elements produced by *some* sequential execution of GenerateFn on > elements > > of the PCollection "something" - i.e., even if GenerateFn is invoked on > the > > same element multiple times and produces different results, only one of > > those results will make it into the PCollection. The act of invoking a > > DoFn, and the DoFn's side effects, are not part of the Beam model - only > > PCollection contents are. > > > > However, these factors can be important. E.g. imagine WriteFn writes data > > to a third-party service. Suppose it even does so in an idempotent way > > (e.g. suppose the data has a primary key, and WriteFn inserts or > overwrites > > the row in a database with each primary key) - then, each element on > which > > WriteFn is invoked will be written to the database exactly once. > > > > However, "each element on which WriteFn is invoked" is not the same as > > "each element in the PCollection data" - because, again, the Beam model > > does not make guarantees about what DoFn's are invoked on. > > In particular, in case of failures, WriteFn might be applied arbitrarily > > many times to arbitrarily many different results of GenerateFn. Say, > > imagine a hypothetical runner that executes the whole pipeline, and on > any > > failure, it re-executes the whole pipeline - over the course of that, > even > > if "data" logically has just 1 element, this element may be produced > > multiple times and WriteFn might be applied to multiple different > versions > > of this element, and in the example above, it may insert extra rows into > > the database. > > > > Checkpointing can look like this: > > > > PCollection<Data> dataCheckpoint = data.apply(Checkpoint.create()); > > > > It limits the scope of non-determinism, and guarantees that an immediate > > consumer of the collection "dataCheckpoint" will be invoked *only* on the > > committed logical contents of the collection. It may still be invoked > > multiple times, but on the same element, so it is sufficient to have > > idempotency of side effects at the level of the consumer. Note: the > > hypothetical "rerun full pipeline on failure" runner will not be able to > > implement this transform correctly. > > > > Reshuffle is used in this capacity in BigQueryIO: > > > https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2718 > > > > 2. Preventing a runner "producer-consumer fusion" optimization that would > > have limited parallelism. > > > > Suppose "pc" is a collection with 3 elements, and "GenerateFn" generates > > 1000 elements for each of them. > > > > Then, consider running pc.apply(ParDo.of(new > > GenerateFn()).apply(ParDo.of(new ProcessFn())). > > > > Obviously, GenerateFn can at best be applied in parallel to the 3 > elements > > - no more parallelism can be achieved at this level. > > However, ideally ProcessFn would be applied to the 3000 elements all in > > parallel - but some runners implement "fusion" where they collapse a > > sequence of ParDo's into a single ParDo whose DoFn is the composition of > > the component DoFn's. In that case, the pipeline will apply the > composition > > of GenerateFn and ProcessFn in parallel to 3 elements, achieving a > > parallelism of only 3. > > > > This behavior, too, is not part of the Beam model. But we need a > transform > > that can disable this optimization - e.g. prevent fusion across a > > particular PCollection, and guarantee that it is processed with as much > > parallelism as if it had been read from a perfectly parallelizable > location > > (say, an Avro file): we could call it "Redistribute". > > > > pc.apply(ParDo.of(new > > GenerateFn())).apply(Redistribute.create()).apply(ParDo.of(new > ProcessFn())) > > > > 3. Preventing a runner "sibling fusion" optimization > > > > Suppose FooFn and BarFn are DoFn<Integer, String>. > > Suppose the pipeline looks like: > > > > PCollection<String> foos = ints.apply(ParDo.of(new FooFn())) > > PCollection<String> bars = ints.apply(ParDo.of(new BarFn())) > > PCollectionList.of(foos).and(bars).apply(PubsubIO.Write.to(topic)); > > > > In this case, a runner might perform an optimization, fusing FooFn and > > BarFn into an Fn that takes an element "x" and produces the concatenation > > FooFn(x) + BarFn(x). In some cases, this can be undesirable - e.g. > suppose > > that BarFn is much slower to compute than FooFn, but the results of FooFn > > need to be sent to pubsub as quickly as possible. In that case we don't > > want to wait for both FooFn and BarFn on a particular element to complete > > before sending the result of FooFn to pubsub. > > > > It seems like a Redistribute transform should be the answer to this as > well: > > > > PCollection<String> foos = > > ints.apply(Redistribute.create()).apply(ParDo.of(new FooFn())) > > PCollection<String> bars = > > ints.apply(Redistribute.create()).apply(ParDo.of(new BarFn())) > > PCollectionList.of(foos).and(bars).apply(PubsubIO.Write.to(topic)); > > > > Here, it would cause FooFn and BarFn to be applied to logically > independent > > collections, whose contents are equivalent to "ints". > > > > *** > > > > The current Reshuffle transform happens to do all 3 at the same time, and > > does it in a potentially non-portable way: in particular it relies on the > > fact that GroupByKey provides all 3 semantics, but the Beam model does > not > > require this (and in fact, in Spark it probably wouldn't provide #1). It > is > > also potentially non-optimal: there can exist better implementations of > > each of these 3 semantics not involving a global group-by-key. > > > > It's not clear yet what to do about all this. Thoughts welcome. > > > > On Tue, Oct 11, 2016 at 11:35 AM Kenneth Knowles <[email protected] > > > > wrote: > > > >> On Tue, Oct 11, 2016 at 10:56 AM Eugene Kirpichov > >> <[email protected]> wrote: > >> > >>> Yeah, I'm starting to lean towards removing Redistribute.byKey() from > the > >>> public API - because it only makes sense for getting access to per-key > >>> state, and 1) we don't have it yet and 2) runner should insert it > >>> automatically - so there's no use case for it. > >> > >> > >> +1 to removing Redistribute.byKey() from the public API. > >> > >> > >>> The "checkpointing keys" use > >>> case should be done via Redistribute.arbitrarily(), I believe. > >>> > >> > >> Actually I think it really does have to be a GroupByKey followed by > writing > >> the groups without breaking them up: > >> > >> - With GBK each element must appear in exactly one output group, so you > >> have to checkpoint or be able to retract groupings (nice easy > explanation > >> from Thomas Groh; any faults in my paraphrase are my own). > >> > >> - But GBK followed by "output the elements one by one" actually removes > >> this property. Now you can replace the whole thing with a no-op and fuse > >> with downstream and still get exactly once processing according to the > >> model but not as observed via side effects to external systems. So sinks > >> should really be doing that, and I'll retract this use case for > >> Redistribute. > >> > >> As for Redistribute.arbitrarily(): > >>> In a batch runner, we could describe it as "identity transform, but > >> runner > >>> is required to process the resulting PCollection with downstream > >> transforms > >>> as well as if it had been created from elements via Create.of(), in > terms > >>> of ability to parallelize processing and minimize amount of re-executed > >>> work in case of failures" (which is a fancy way of saying "materialize" > >>> without really saying "materialize" :) ). > >>> > >> > >> How can you observe if the runner ignored you? > >> > > > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com >
