Actually, Redistribute.perKey seems a bit dangerous, as there's no
guarantee the partitioning is persisted to any subsequent steps, and
we don't have a concrete notion of key-partitioned elements outside of
GBK in the model. I suspect it was only introduced because that's what
Redistribute.arbitrarily() is built on.

On Tue, Oct 11, 2016 at 10:16 AM, Ben Chambers <[email protected]> wrote:
> As Kenn points out, I think the nature of the Redistribute operation is to
> act as a hint (or requirement) to the runner that a certain distribution
> the elements is desirable. In a perfect this wouldn't be necessary because
> every runner would be able to do exactly the right thing. Looking at the
> different use cases may be helpful:
>
> 1. Redistribute.arbitrarily being used in IO as a fusion break and
> checkpoint. We could express this as a hint saying that we'd like to
> persist the PCollection at this point.
> 2. Redistribute.perKey being used to checkpoint keys in a keyed
> PCollection. I think this could be the same as the previous hint or a
> variant thereof.
> 3. Redistribute.perKey to ensure that the elements are distributed across
> machines such that all elements with a specific key are on the same
> machine. This should only be necessary for per-key processing (such as
> state) and can be added by the runner when necessary (becomes easier once
> we have a notion of transforms that preserve key-partitioning, etc.)
>
> Of these 1 and 2 seem to be the most interesting. The hint can be
> implemented in various ways -- a transform that represents the hint (and
> the runner can then implement as it sees fit) or via a method that sets
> some property on the PCollection, to which the runner could choose to apply
> a transform. I lean towards the former (keeping this as a transform) since
> it fits more naturally into the codebase and doesn't require extending
> PCollection (something we avoid).
>
> What if this was something like: ".apply(Hints.checkpoint())" or
> ".apply(Hints.break())"? This makes it clearer that this is a hint to the
> runner and not part of the semantics?
>
> On Tue, Oct 11, 2016 at 10:09 AM Kenneth Knowles <[email protected]>
> wrote:
>
>> On Mon, Oct 10, 2016 at 1:38 PM Eugene Kirpichov
>>
>> <[email protected]> wrote:
>>
>>
>>
>> > The transform, the way it's implemented, actually does several things at
>>
>> > the same time and that's why it's tricky to document it.
>>
>> >
>>
>>
>>
>> This thread has actually made me less sure about my thoughts on this
>>
>> transform. I do know what the transform is about and I do think we need it.
>>
>> But I don't know that it can be explained "within the model". Look at our
>>
>> classic questions about Redistribute.arbitrarily() and
>> Redistribute.byKey():
>>
>>
>>
>>  - "what" is it computing? The identity on its input.
>>
>>  - "where" is the event time windowing? Same as its input.
>>
>>  - "when" is output produced? As fast as reasonable (runner-specific).
>>
>>  - "how" are refinements related? Same as its input (I think this might
>>
>> actually be incorrect if accumulating fired panes)
>>
>>
>>
>> These points don't describe any of the real goals of Redistribute. Hence
>>
>> describing it in terms of fusion and checkpointing, which are quite
>>
>> runner-specific in their (optional) manifestations.
>>
>>
>>
>> - Introduces a fusion barrier (in runners that have it), making sure that
>>
>> > the runner can fully parallelize processing the output PCollection with
>>
>> > DoFn's
>>
>> >
>>
>>
>>
>> Can a runner introduce other fusion barriers whenever it wants? Yes.
>>
>> Can a runner ignore a proposed fusion barrier? Yes. (or when can it not?
>>
>> why not?)
>>
>>
>>
>>
>>
>> > - Introduces a fault-tolerance barrier, effectively "checkpointing" the
>>
>> > input PCollection (again, in runners where it makes sense) and making
>> sure
>>
>> > that processing elements of the output PCollection with a DoFn, if the
>> DoFn
>>
>> > fails, will redo only that processing, but not need to recompute the
>> input
>>
>> > PCollection.
>>
>> >
>>
>>
>>
>> Can a runner introduce a checkpoint whenever appropriate? Yes.
>>
>> Can a runner ignore a hint to checkpoint? Yes (if it can still compute the
>>
>> same result - it may not even conceive of checkpointing in a compatible
>>
>> way).
>>
>>
>>
>> - All of the above and also makes the collection "key-partitioned", giving
>>
>> > access to per-key state to downstream key-preserving DoFns. However, this
>>
>> > is also runner-specific, because it's conceivable that a runner might not
>>
>> > need this "key-partitioned" property (in fact it's best if a runner
>>
>> > inserted such a "redistribute by key" automatically if it needs it...),
>> and
>>
>> > it currently isn't exposed anyway.
>>
>> >
>>
>>
>>
>> Agreed. The runner should insert the necessary keying wherever needed. One
>>
>> might say the same for other uses of Redistribute, but in practice hints
>>
>> are useful.
>>
>>
>>
>>
>>
>> > Still thinking about the best way to describe this in a way that's least
>>
>> > confusing to users.
>>
>> >
>>
>>
>>
>> I think it isn't just about users. I don't the transform is quite
>>
>> well-defined at the "what the runner must do" level. Here is a question I
>>
>> am considering: When is it _incorrect_ for a runner to replace a
>>
>> Redistribute with an identity transform? I have some thoughts, such as
>>
>> committing pseudorandomly generated data, but do you have some other ideas?
>>
>>
>>
>> Kenn
>>
>>

Reply via email to