As Kenn points out, I think the nature of the Redistribute operation is to act as a hint (or requirement) to the runner that a certain distribution the elements is desirable. In a perfect this wouldn't be necessary because every runner would be able to do exactly the right thing. Looking at the different use cases may be helpful:
1. Redistribute.arbitrarily being used in IO as a fusion break and checkpoint. We could express this as a hint saying that we'd like to persist the PCollection at this point. 2. Redistribute.perKey being used to checkpoint keys in a keyed PCollection. I think this could be the same as the previous hint or a variant thereof. 3. Redistribute.perKey to ensure that the elements are distributed across machines such that all elements with a specific key are on the same machine. This should only be necessary for per-key processing (such as state) and can be added by the runner when necessary (becomes easier once we have a notion of transforms that preserve key-partitioning, etc.) Of these 1 and 2 seem to be the most interesting. The hint can be implemented in various ways -- a transform that represents the hint (and the runner can then implement as it sees fit) or via a method that sets some property on the PCollection, to which the runner could choose to apply a transform. I lean towards the former (keeping this as a transform) since it fits more naturally into the codebase and doesn't require extending PCollection (something we avoid). What if this was something like: ".apply(Hints.checkpoint())" or ".apply(Hints.break())"? This makes it clearer that this is a hint to the runner and not part of the semantics? On Tue, Oct 11, 2016 at 10:09 AM Kenneth Knowles <k...@google.com.invalid> wrote: > On Mon, Oct 10, 2016 at 1:38 PM Eugene Kirpichov > > <kirpic...@google.com.invalid> wrote: > > > > > The transform, the way it's implemented, actually does several things at > > > the same time and that's why it's tricky to document it. > > > > > > > This thread has actually made me less sure about my thoughts on this > > transform. I do know what the transform is about and I do think we need it. > > But I don't know that it can be explained "within the model". Look at our > > classic questions about Redistribute.arbitrarily() and > Redistribute.byKey(): > > > > - "what" is it computing? The identity on its input. > > - "where" is the event time windowing? Same as its input. > > - "when" is output produced? As fast as reasonable (runner-specific). > > - "how" are refinements related? Same as its input (I think this might > > actually be incorrect if accumulating fired panes) > > > > These points don't describe any of the real goals of Redistribute. Hence > > describing it in terms of fusion and checkpointing, which are quite > > runner-specific in their (optional) manifestations. > > > > - Introduces a fusion barrier (in runners that have it), making sure that > > > the runner can fully parallelize processing the output PCollection with > > > DoFn's > > > > > > > Can a runner introduce other fusion barriers whenever it wants? Yes. > > Can a runner ignore a proposed fusion barrier? Yes. (or when can it not? > > why not?) > > > > > > > - Introduces a fault-tolerance barrier, effectively "checkpointing" the > > > input PCollection (again, in runners where it makes sense) and making > sure > > > that processing elements of the output PCollection with a DoFn, if the > DoFn > > > fails, will redo only that processing, but not need to recompute the > input > > > PCollection. > > > > > > > Can a runner introduce a checkpoint whenever appropriate? Yes. > > Can a runner ignore a hint to checkpoint? Yes (if it can still compute the > > same result - it may not even conceive of checkpointing in a compatible > > way). > > > > - All of the above and also makes the collection "key-partitioned", giving > > > access to per-key state to downstream key-preserving DoFns. However, this > > > is also runner-specific, because it's conceivable that a runner might not > > > need this "key-partitioned" property (in fact it's best if a runner > > > inserted such a "redistribute by key" automatically if it needs it...), > and > > > it currently isn't exposed anyway. > > > > > > > Agreed. The runner should insert the necessary keying wherever needed. One > > might say the same for other uses of Redistribute, but in practice hints > > are useful. > > > > > > > Still thinking about the best way to describe this in a way that's least > > > confusing to users. > > > > > > > I think it isn't just about users. I don't the transform is quite > > well-defined at the "what the runner must do" level. Here is a question I > > am considering: When is it _incorrect_ for a runner to replace a > > Redistribute with an identity transform? I have some thoughts, such as > > committing pseudorandomly generated data, but do you have some other ideas? > > > > Kenn > >