Actually, Redistribute.perKey seems a bit dangerous, as there's no guarantee the partitioning is persisted to any subsequent steps, and we don't have a concrete notion of key-partitioned elements outside of GBK in the model. I suspect it was only introduced because that's what Redistribute.arbitrarily() is built on.
On Tue, Oct 11, 2016 at 10:16 AM, Ben Chambers <[email protected]> wrote: > As Kenn points out, I think the nature of the Redistribute operation is to > act as a hint (or requirement) to the runner that a certain distribution > the elements is desirable. In a perfect this wouldn't be necessary because > every runner would be able to do exactly the right thing. Looking at the > different use cases may be helpful: > > 1. Redistribute.arbitrarily being used in IO as a fusion break and > checkpoint. We could express this as a hint saying that we'd like to > persist the PCollection at this point. > 2. Redistribute.perKey being used to checkpoint keys in a keyed > PCollection. I think this could be the same as the previous hint or a > variant thereof. > 3. Redistribute.perKey to ensure that the elements are distributed across > machines such that all elements with a specific key are on the same > machine. This should only be necessary for per-key processing (such as > state) and can be added by the runner when necessary (becomes easier once > we have a notion of transforms that preserve key-partitioning, etc.) > > Of these 1 and 2 seem to be the most interesting. The hint can be > implemented in various ways -- a transform that represents the hint (and > the runner can then implement as it sees fit) or via a method that sets > some property on the PCollection, to which the runner could choose to apply > a transform. I lean towards the former (keeping this as a transform) since > it fits more naturally into the codebase and doesn't require extending > PCollection (something we avoid). > > What if this was something like: ".apply(Hints.checkpoint())" or > ".apply(Hints.break())"? This makes it clearer that this is a hint to the > runner and not part of the semantics? > > On Tue, Oct 11, 2016 at 10:09 AM Kenneth Knowles <[email protected]> > wrote: > >> On Mon, Oct 10, 2016 at 1:38 PM Eugene Kirpichov >> >> <[email protected]> wrote: >> >> >> >> > The transform, the way it's implemented, actually does several things at >> >> > the same time and that's why it's tricky to document it. >> >> > >> >> >> >> This thread has actually made me less sure about my thoughts on this >> >> transform. I do know what the transform is about and I do think we need it. >> >> But I don't know that it can be explained "within the model". Look at our >> >> classic questions about Redistribute.arbitrarily() and >> Redistribute.byKey(): >> >> >> >> - "what" is it computing? The identity on its input. >> >> - "where" is the event time windowing? Same as its input. >> >> - "when" is output produced? As fast as reasonable (runner-specific). >> >> - "how" are refinements related? Same as its input (I think this might >> >> actually be incorrect if accumulating fired panes) >> >> >> >> These points don't describe any of the real goals of Redistribute. Hence >> >> describing it in terms of fusion and checkpointing, which are quite >> >> runner-specific in their (optional) manifestations. >> >> >> >> - Introduces a fusion barrier (in runners that have it), making sure that >> >> > the runner can fully parallelize processing the output PCollection with >> >> > DoFn's >> >> > >> >> >> >> Can a runner introduce other fusion barriers whenever it wants? Yes. >> >> Can a runner ignore a proposed fusion barrier? Yes. (or when can it not? >> >> why not?) >> >> >> >> >> >> > - Introduces a fault-tolerance barrier, effectively "checkpointing" the >> >> > input PCollection (again, in runners where it makes sense) and making >> sure >> >> > that processing elements of the output PCollection with a DoFn, if the >> DoFn >> >> > fails, will redo only that processing, but not need to recompute the >> input >> >> > PCollection. >> >> > >> >> >> >> Can a runner introduce a checkpoint whenever appropriate? Yes. >> >> Can a runner ignore a hint to checkpoint? Yes (if it can still compute the >> >> same result - it may not even conceive of checkpointing in a compatible >> >> way). >> >> >> >> - All of the above and also makes the collection "key-partitioned", giving >> >> > access to per-key state to downstream key-preserving DoFns. However, this >> >> > is also runner-specific, because it's conceivable that a runner might not >> >> > need this "key-partitioned" property (in fact it's best if a runner >> >> > inserted such a "redistribute by key" automatically if it needs it...), >> and >> >> > it currently isn't exposed anyway. >> >> > >> >> >> >> Agreed. The runner should insert the necessary keying wherever needed. One >> >> might say the same for other uses of Redistribute, but in practice hints >> >> are useful. >> >> >> >> >> >> > Still thinking about the best way to describe this in a way that's least >> >> > confusing to users. >> >> > >> >> >> >> I think it isn't just about users. I don't the transform is quite >> >> well-defined at the "what the runner must do" level. Here is a question I >> >> am considering: When is it _incorrect_ for a runner to replace a >> >> Redistribute with an identity transform? I have some thoughts, such as >> >> committing pseudorandomly generated data, but do you have some other ideas? >> >> >> >> Kenn >> >>
