Thanks Eugene, I've added a few more questions inline. So this is about rebalancing/autoscaling ? When would you expect shards/bundles/partitions ( this language is becoming extremely overloaded ;-) ) to be out-of-balance ? I can think of:
- After reading from IOs - IOs don't necessarily guarantee they'd be balanced, or parallelized according to the Pipeline's parallelism. - After "Key"ing / "Re-Key"ing / "Un-Key"ing - which could lead to downstream aggregations to be unbalanced. - After Flatten. Anything else ? This is NOT about Fanning in/out, correct ? hence the name Re*distribute*, meaning across existing parallelism ? Thanks, Amit On Mon, Oct 10, 2016 at 11:38 PM Eugene Kirpichov <[email protected]> wrote: > Hi Amit, > > > > The transform, the way it's implemented, actually does several things at > > the same time and that's why it's tricky to document it. > > > > Redistribute.arbitrarily(): > > - Introduces a fusion barrier (in runners that have it), making sure that > > the runner can fully parallelize processing the output PCollection with > > DoFn's > - Introduces a fault-tolerance barrier, effectively "checkpointing" the > > input PCollection (again, in runners where it makes sense) and making sure > > that processing elements of the output PCollection with a DoFn, if the DoFn > > fails, will redo only that processing, but not need to recompute the input > > PCollection. > Like caching <http://spark.apache.org/docs/1.6.2/programming-guide.html#rdd-persistence> an RDD with Spark ? so that downstream computations won't repeat the redistribution ? > > > > Redistribute.byKey(): > > - All of the above and also makes the collection "key-partitioned", giving > > access to per-key state to downstream key-preserving DoFns. However, this > > is also runner-specific, because it's conceivable that a runner might not > > need this "key-partitioned" property (in fact it's best if a runner > > inserted such a "redistribute by key" automatically if it needs it...), and > > it currently isn't exposed anyway. Isn't this the actual redistribution, while "arbitrarily" adds pseudo-random keys to non-keyed records ? As for per-key state, this is more for "Autoscaling" mid-pipeline (maybe after a stateful operator which is tied to the key), right ? > > > > Still thinking about the best way to describe this in a way that's least > > confusing to users. > > > > Regarding redistributing into N shards: this is problematic because it > > doesn't seem to make sense in the unified model (in streaming in particular > > - having N keys doesn't mean you have N bundles), and breaks down if you > > add dynamic work rebalancing, backups and other magic. So I decided not to > > bother with this in that PR. > > > > Agreed with Robert that limiting the parallelism, or throttling, are very > > useful features, but Redistribute is not the right place to introduce them. > You're right, it fits around here somewhere (logically) but not as part of redistribute. > > > > On Mon, Oct 10, 2016 at 12:58 PM Amit Sela <[email protected]> wrote: > > > > > On Mon, Oct 10, 2016 at 9:21 PM Robert Bradshaw > > > <[email protected]> > > > wrote: > > > > > > > On Sat, Oct 8, 2016 at 7:31 AM, Amit Sela <[email protected]> > wrote: > > > > > > > > > Hi Eugene, > > > > > > > > > > > > > > > > > > This is very interesting. > > > > > > > > > Let me see if I get this right, the "Redistribute" transformation > > > > assigns > > > > > > > > > a "running id" key (per-bundle) , calls "Redistribute.byKey", and > > > > extracts > > > > > > > > > back the values, correct ? > > > > > > > > > > > > > > > > The keys are (pseudorandomly) unique per element. > > > > > > > > > > > > > > > > > As for "Redistribute.byKey" - it's made of a GroupByKey > transformation > > > > that > > > > > > > > > follows a Window transformation that neutralises the "resolution" of > > > > > > > > > triggers and panes that usually occurs in GroupByKey, correct ? > > > > > > > > > > > > > > > > > > So this is basically a "FanOut" transformation which will depend on > the > > > > > > > > > available resources of the runner (and the uniqueness of the assigned > > > > keys) > > > > > > > > > ? > > > > > > > > > > > > > > > > > > Would we want to Redistribute into a user-defined number of bundles > (> > > > > > > > > > current) ? > > > > > > > > > > > > > > > > I don't think there's any advantage to letting the user specify a > > > > > > > > number here; the data is spread out among as many machines as are > > > > > > > > handling the shuffling (for N elements, there are ~N unique keys, > > > > > > > > which gets partitioned by the system to the M workers). > > > > > > > > > > > > > > > > > How about "FanIn" ? > > > > > > > > > > > > > > > > Could you clarify what you would hope to use this for? > > > > > > > Well, what if for some reason I would want to limit parallelism for a > step > > > in the Pipeline ? like calling an external service without "DDoS"ing it ? > > > > > > > > > > > > > > > > > > > > On Fri, Oct 7, 2016 at 10:49 PM Eugene Kirpichov > > > > > > > > > <[email protected]> wrote: > > > > > > > > > > > > > > > > > >> Hello, > > > > > > > > >> > > > > > > > > >> Heads up that https://github.com/apache/incubator-beam/pull/1036 > will > > > > > > > > >> introduce a transform called "Redistribute", encapsulating a > > > relatively > > > > > > > > >> common pattern - a "fusion break" [see > > > > > > > > >> > > > > > > > > >> > > > > > > > > https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion > > > > > > > > >> previously > > > > > > > > >> providing advice on that] - useful e.g. when you write an IO as a > > > > sequence > > > > > > > > >> of ParDo's: split a query into parts, read each part, and you want > to > > > > > > > > >> prevent fusing these ParDo's because that would make the whole thing > > > > > > > > >> execute sequentially, and in other similar cases. > > > > > > > > >> > > > > > > > > >> The PR also uses it, as an example, in DatastoreIO and JdbcIO, both > of > > > > > > > > >> which used to have a hand-rolled implementation of the same. The > Write > > > > > > > > >> transform has something similar, but not quite identical, so I > skipped > > > > it. > > > > > > > > >> > > > > > > > > >> This is not a model change - merely providing a common > implementation > > > of > > > > > > > > >> something useful that already existed but was scattered across the > > > > > > > > >> codebase. > > > > > > > > >> > > > > > > > > >> Redistribute also subsumes the old mostly-internal Reshuffle > transform > > > > via > > > > > > > > >> Redistribute.byKey(). > > > > > > > > >> > > > > > > > > >> I tried finding more cases in the Beam codebase that have an ad-hoc > > > > > > > > >> implementation of this; I did not find any, but I might have missed > > > > > > > > >> something. I suppose the transform will need to be advertised in > > > > > > > > >> documentation on best-practices for connector development; perhaps > > > some > > > > > > > > >> StackOverflow answers should be updated; any other places? > > > > > > > > >> > > > > > > > > > > > > >
