Hi Amit,

The transform, the way it's implemented, actually does several things at
the same time and that's why it's tricky to document it.

Redistribute.arbitrarily():
- Introduces a fusion barrier (in runners that have it), making sure that
the runner can fully parallelize processing the output PCollection with
DoFn's
- Introduces a fault-tolerance barrier, effectively "checkpointing" the
input PCollection (again, in runners where it makes sense) and making sure
that processing elements of the output PCollection with a DoFn, if the DoFn
fails, will redo only that processing, but not need to recompute the input
PCollection.

Redistribute.byKey():
- All of the above and also makes the collection "key-partitioned", giving
access to per-key state to downstream key-preserving DoFns. However, this
is also runner-specific, because it's conceivable that a runner might not
need this "key-partitioned" property (in fact it's best if a runner
inserted such a "redistribute by key" automatically if it needs it...), and
it currently isn't exposed anyway.

Still thinking about the best way to describe this in a way that's least
confusing to users.

Regarding redistributing into N shards: this is problematic because it
doesn't seem to make sense in the unified model (in streaming in particular
- having N keys doesn't mean you have N bundles), and breaks down if you
add dynamic work rebalancing, backups and other magic. So I decided not to
bother with this in that PR.

Agreed with Robert that limiting the parallelism, or throttling, are very
useful features, but Redistribute is not the right place to introduce them.

On Mon, Oct 10, 2016 at 12:58 PM Amit Sela <[email protected]> wrote:

> On Mon, Oct 10, 2016 at 9:21 PM Robert Bradshaw
> <[email protected]>
> wrote:
>
> > On Sat, Oct 8, 2016 at 7:31 AM, Amit Sela <[email protected]> wrote:
> >
> > > Hi Eugene,
> >
> > >
> >
> > > This is very interesting.
> >
> > > Let me see if I get this right, the "Redistribute"  transformation
> > assigns
> >
> > > a "running id" key (per-bundle) , calls "Redistribute.byKey", and
> > extracts
> >
> > > back the values, correct ?
> >
> >
> >
> > The keys are (pseudorandomly) unique per element.
> >
> >
> >
> > > As for "Redistribute.byKey" - it's made of a GroupByKey transformation
> > that
> >
> > > follows a Window transformation that neutralises the "resolution" of
> >
> > > triggers and panes that usually occurs in GroupByKey, correct ?
> >
> > >
> >
> > > So this is basically a "FanOut" transformation which will depend on the
> >
> > > available resources of the runner (and the uniqueness of the assigned
> > keys)
> >
> > > ?
> >
> > >
> >
> > > Would we want to Redistribute into a user-defined number of bundles (>
> >
> > > current) ?
> >
> >
> >
> > I don't think there's any advantage to letting the user specify a
> >
> > number here; the data is spread out among as many machines as are
> >
> > handling the shuffling (for N elements, there are ~N unique keys,
> >
> > which gets partitioned by the system to the M workers).
> >
> >
> >
> > > How about "FanIn" ?
> >
> >
> >
> > Could you clarify what you would hope to use this for?
> >
> Well, what if for some reason I would want to limit parallelism for a step
> in the Pipeline ? like calling an external service without "DDoS"ing it ?
>
> >
> >
> >
> > > On Fri, Oct 7, 2016 at 10:49 PM Eugene Kirpichov
> >
> > > <[email protected]> wrote:
> >
> > >
> >
> > >> Hello,
> >
> > >>
> >
> > >> Heads up that https://github.com/apache/incubator-beam/pull/1036 will
> >
> > >> introduce a transform called "Redistribute", encapsulating a
> relatively
> >
> > >> common pattern - a "fusion break" [see
> >
> > >>
> >
> > >>
> >
> https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion
> >
> > >> previously
> >
> > >> providing advice on that] - useful e.g. when you write an IO as a
> > sequence
> >
> > >> of ParDo's: split a query into parts, read each part, and you want to
> >
> > >> prevent fusing these ParDo's because that would make the whole thing
> >
> > >> execute sequentially, and in other similar cases.
> >
> > >>
> >
> > >> The PR also uses it, as an example, in DatastoreIO and JdbcIO, both of
> >
> > >> which used to have a hand-rolled implementation of the same. The Write
> >
> > >> transform has something similar, but not quite identical, so I skipped
> > it.
> >
> > >>
> >
> > >> This is not a model change - merely providing a common implementation
> of
> >
> > >> something useful that already existed but was scattered across the
> >
> > >> codebase.
> >
> > >>
> >
> > >> Redistribute also subsumes the old mostly-internal Reshuffle transform
> > via
> >
> > >> Redistribute.byKey().
> >
> > >>
> >
> > >> I tried finding more cases in the Beam codebase that have an ad-hoc
> >
> > >> implementation of this; I did not find any, but I might have missed
> >
> > >> something. I suppose the transform will need to be advertised in
> >
> > >> documentation on best-practices for connector development; perhaps
> some
> >
> > >> StackOverflow answers should be updated; any other places?
> >
> > >>
> >
> >
>

Reply via email to