As Kenn points out, I think the nature of the Redistribute operation is to
act as a hint (or requirement) to the runner that a certain distribution
the elements is desirable. In a perfect this wouldn't be necessary because
every runner would be able to do exactly the right thing. Looking at the
different use cases may be helpful:

1. Redistribute.arbitrarily being used in IO as a fusion break and
checkpoint. We could express this as a hint saying that we'd like to
persist the PCollection at this point.
2. Redistribute.perKey being used to checkpoint keys in a keyed
PCollection. I think this could be the same as the previous hint or a
variant thereof.
3. Redistribute.perKey to ensure that the elements are distributed across
machines such that all elements with a specific key are on the same
machine. This should only be necessary for per-key processing (such as
state) and can be added by the runner when necessary (becomes easier once
we have a notion of transforms that preserve key-partitioning, etc.)

Of these 1 and 2 seem to be the most interesting. The hint can be
implemented in various ways -- a transform that represents the hint (and
the runner can then implement as it sees fit) or via a method that sets
some property on the PCollection, to which the runner could choose to apply
a transform. I lean towards the former (keeping this as a transform) since
it fits more naturally into the codebase and doesn't require extending
PCollection (something we avoid).

What if this was something like: ".apply(Hints.checkpoint())" or
".apply(Hints.break())"? This makes it clearer that this is a hint to the
runner and not part of the semantics?

On Tue, Oct 11, 2016 at 10:09 AM Kenneth Knowles <k...@google.com.invalid>
wrote:

> On Mon, Oct 10, 2016 at 1:38 PM Eugene Kirpichov
>
> <kirpic...@google.com.invalid> wrote:
>
>
>
> > The transform, the way it's implemented, actually does several things at
>
> > the same time and that's why it's tricky to document it.
>
> >
>
>
>
> This thread has actually made me less sure about my thoughts on this
>
> transform. I do know what the transform is about and I do think we need it.
>
> But I don't know that it can be explained "within the model". Look at our
>
> classic questions about Redistribute.arbitrarily() and
> Redistribute.byKey():
>
>
>
>  - "what" is it computing? The identity on its input.
>
>  - "where" is the event time windowing? Same as its input.
>
>  - "when" is output produced? As fast as reasonable (runner-specific).
>
>  - "how" are refinements related? Same as its input (I think this might
>
> actually be incorrect if accumulating fired panes)
>
>
>
> These points don't describe any of the real goals of Redistribute. Hence
>
> describing it in terms of fusion and checkpointing, which are quite
>
> runner-specific in their (optional) manifestations.
>
>
>
> - Introduces a fusion barrier (in runners that have it), making sure that
>
> > the runner can fully parallelize processing the output PCollection with
>
> > DoFn's
>
> >
>
>
>
> Can a runner introduce other fusion barriers whenever it wants? Yes.
>
> Can a runner ignore a proposed fusion barrier? Yes. (or when can it not?
>
> why not?)
>
>
>
>
>
> > - Introduces a fault-tolerance barrier, effectively "checkpointing" the
>
> > input PCollection (again, in runners where it makes sense) and making
> sure
>
> > that processing elements of the output PCollection with a DoFn, if the
> DoFn
>
> > fails, will redo only that processing, but not need to recompute the
> input
>
> > PCollection.
>
> >
>
>
>
> Can a runner introduce a checkpoint whenever appropriate? Yes.
>
> Can a runner ignore a hint to checkpoint? Yes (if it can still compute the
>
> same result - it may not even conceive of checkpointing in a compatible
>
> way).
>
>
>
> - All of the above and also makes the collection "key-partitioned", giving
>
> > access to per-key state to downstream key-preserving DoFns. However, this
>
> > is also runner-specific, because it's conceivable that a runner might not
>
> > need this "key-partitioned" property (in fact it's best if a runner
>
> > inserted such a "redistribute by key" automatically if it needs it...),
> and
>
> > it currently isn't exposed anyway.
>
> >
>
>
>
> Agreed. The runner should insert the necessary keying wherever needed. One
>
> might say the same for other uses of Redistribute, but in practice hints
>
> are useful.
>
>
>
>
>
> > Still thinking about the best way to describe this in a way that's least
>
> > confusing to users.
>
> >
>
>
>
> I think it isn't just about users. I don't the transform is quite
>
> well-defined at the "what the runner must do" level. Here is a question I
>
> am considering: When is it _incorrect_ for a runner to replace a
>
> Redistribute with an identity transform? I have some thoughts, such as
>
> committing pseudorandomly generated data, but do you have some other ideas?
>
>
>
> Kenn
>
>

Reply via email to