Thanks Eugene, I've added a few more questions inline.

So this is about rebalancing/autoscaling ?
When would you expect shards/bundles/partitions ( this language is becoming
extremely overloaded ;-) ) to be out-of-balance ?
I can think of:

   - After reading from IOs - IOs don't necessarily guarantee they'd be
   balanced, or parallelized according to the Pipeline's parallelism.
   - After "Key"ing / "Re-Key"ing / "Un-Key"ing - which could lead to
   downstream aggregations to be unbalanced.
   - After Flatten.

Anything else ?

This is NOT about Fanning in/out, correct ? hence the name Re*distribute*,
meaning across existing parallelism ?

Thanks,
Amit

On Mon, Oct 10, 2016 at 11:38 PM Eugene Kirpichov
<[email protected]> wrote:

> Hi Amit,
>
>
>
> The transform, the way it's implemented, actually does several things at
>
> the same time and that's why it's tricky to document it.
>
>
>
> Redistribute.arbitrarily():
>
> - Introduces a fusion barrier (in runners that have it), making sure that
>
> the runner can fully parallelize processing the output PCollection with
>
> DoFn's


> - Introduces a fault-tolerance barrier, effectively "checkpointing" the
>
> input PCollection (again, in runners where it makes sense) and making sure
>
> that processing elements of the output PCollection with a DoFn, if the DoFn
>
> fails, will redo only that processing, but not need to recompute the input
>
> PCollection.
>
Like caching
<http://spark.apache.org/docs/1.6.2/programming-guide.html#rdd-persistence>
an RDD with Spark ? so that downstream computations won't repeat the
redistribution ?

>
>
>
> Redistribute.byKey():
>
> - All of the above and also makes the collection "key-partitioned", giving
>
> access to per-key state to downstream key-preserving DoFns. However, this
>
> is also runner-specific, because it's conceivable that a runner might not
>
> need this "key-partitioned" property (in fact it's best if a runner
>
> inserted such a "redistribute by key" automatically if it needs it...), and
>
> it currently isn't exposed anyway.

Isn't this the actual redistribution, while "arbitrarily" adds
pseudo-random keys to non-keyed records ?
As for per-key state, this is more for "Autoscaling" mid-pipeline (maybe
after a stateful operator which is tied to the key), right ?

>
>
>
> Still thinking about the best way to describe this in a way that's least
>
> confusing to users.
>
>
>
> Regarding redistributing into N shards: this is problematic because it
>
> doesn't seem to make sense in the unified model (in streaming in particular
>
> - having N keys doesn't mean you have N bundles), and breaks down if you
>
> add dynamic work rebalancing, backups and other magic. So I decided not to
>
> bother with this in that PR.
>
>
>
> Agreed with Robert that limiting the parallelism, or throttling, are very
>
> useful features, but Redistribute is not the right place to introduce them.
>
You're right, it fits around here somewhere (logically) but not as part of
redistribute.

>
>
>
> On Mon, Oct 10, 2016 at 12:58 PM Amit Sela <[email protected]> wrote:
>
>
>
> > On Mon, Oct 10, 2016 at 9:21 PM Robert Bradshaw
>
> > <[email protected]>
>
> > wrote:
>
> >
>
> > > On Sat, Oct 8, 2016 at 7:31 AM, Amit Sela <[email protected]>
> wrote:
>
> > >
>
> > > > Hi Eugene,
>
> > >
>
> > > >
>
> > >
>
> > > > This is very interesting.
>
> > >
>
> > > > Let me see if I get this right, the "Redistribute"  transformation
>
> > > assigns
>
> > >
>
> > > > a "running id" key (per-bundle) , calls "Redistribute.byKey", and
>
> > > extracts
>
> > >
>
> > > > back the values, correct ?
>
> > >
>
> > >
>
> > >
>
> > > The keys are (pseudorandomly) unique per element.
>
> > >
>
> > >
>
> > >
>
> > > > As for "Redistribute.byKey" - it's made of a GroupByKey
> transformation
>
> > > that
>
> > >
>
> > > > follows a Window transformation that neutralises the "resolution" of
>
> > >
>
> > > > triggers and panes that usually occurs in GroupByKey, correct ?
>
> > >
>
> > > >
>
> > >
>
> > > > So this is basically a "FanOut" transformation which will depend on
> the
>
> > >
>
> > > > available resources of the runner (and the uniqueness of the assigned
>
> > > keys)
>
> > >
>
> > > > ?
>
> > >
>
> > > >
>
> > >
>
> > > > Would we want to Redistribute into a user-defined number of bundles
> (>
>
> > >
>
> > > > current) ?
>
> > >
>
> > >
>
> > >
>
> > > I don't think there's any advantage to letting the user specify a
>
> > >
>
> > > number here; the data is spread out among as many machines as are
>
> > >
>
> > > handling the shuffling (for N elements, there are ~N unique keys,
>
> > >
>
> > > which gets partitioned by the system to the M workers).
>
> > >
>
> > >
>
> > >
>
> > > > How about "FanIn" ?
>
> > >
>
> > >
>
> > >
>
> > > Could you clarify what you would hope to use this for?
>
> > >
>
> > Well, what if for some reason I would want to limit parallelism for a
> step
>
> > in the Pipeline ? like calling an external service without "DDoS"ing it ?
>
> >
>
> > >
>
> > >
>
> > >
>
> > > > On Fri, Oct 7, 2016 at 10:49 PM Eugene Kirpichov
>
> > >
>
> > > > <[email protected]> wrote:
>
> > >
>
> > > >
>
> > >
>
> > > >> Hello,
>
> > >
>
> > > >>
>
> > >
>
> > > >> Heads up that https://github.com/apache/incubator-beam/pull/1036
> will
>
> > >
>
> > > >> introduce a transform called "Redistribute", encapsulating a
>
> > relatively
>
> > >
>
> > > >> common pattern - a "fusion break" [see
>
> > >
>
> > > >>
>
> > >
>
> > > >>
>
> > >
>
> >
> https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion
>
> > >
>
> > > >> previously
>
> > >
>
> > > >> providing advice on that] - useful e.g. when you write an IO as a
>
> > > sequence
>
> > >
>
> > > >> of ParDo's: split a query into parts, read each part, and you want
> to
>
> > >
>
> > > >> prevent fusing these ParDo's because that would make the whole thing
>
> > >
>
> > > >> execute sequentially, and in other similar cases.
>
> > >
>
> > > >>
>
> > >
>
> > > >> The PR also uses it, as an example, in DatastoreIO and JdbcIO, both
> of
>
> > >
>
> > > >> which used to have a hand-rolled implementation of the same. The
> Write
>
> > >
>
> > > >> transform has something similar, but not quite identical, so I
> skipped
>
> > > it.
>
> > >
>
> > > >>
>
> > >
>
> > > >> This is not a model change - merely providing a common
> implementation
>
> > of
>
> > >
>
> > > >> something useful that already existed but was scattered across the
>
> > >
>
> > > >> codebase.
>
> > >
>
> > > >>
>
> > >
>
> > > >> Redistribute also subsumes the old mostly-internal Reshuffle
> transform
>
> > > via
>
> > >
>
> > > >> Redistribute.byKey().
>
> > >
>
> > > >>
>
> > >
>
> > > >> I tried finding more cases in the Beam codebase that have an ad-hoc
>
> > >
>
> > > >> implementation of this; I did not find any, but I might have missed
>
> > >
>
> > > >> something. I suppose the transform will need to be advertised in
>
> > >
>
> > > >> documentation on best-practices for connector development; perhaps
>
> > some
>
> > >
>
> > > >> StackOverflow answers should be updated; any other places?
>
> > >
>
> > > >>
>
> > >
>
> > >
>
> >
>
>

Reply via email to