Hm, what would be the scenario? Have version A running with original random sharding and then start version B where I change sharding to some custom function? So I have to enable the pipeline to digest old keys from GBK restored state and also work with new keys produced to GBK going forward?
On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote: > Initial thought on PR: we usually try to limit changing coders in these > types of transforms to better support runners that allow in-place updates > of pipelines. Can this be done without changing the coder? > > On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >> I have created a PR for enhancing WriteFiles for custom sharding function. >> https://github.com/apache/beam/pull/8438 >> >> If this sort of change looks good, then next step would be to use in in >> Flink runner transform override. Let me know what do you think >> >> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jozo.vil...@gmail.com> >> wrote: >> >>> I guess it is fine to enable shardingFn control only on WriteFiles level >>> rather than FileIO. On WriteFiles it can be manipulated in >>> PTransformOverride by runner. >>> >>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote: >>> >>>> Yes, a hook would have to be added to allow specifying a different >>>> function for choosing the shard number (I assume the problem is that there >>>> are cases where the current random assignment is not good?). However this >>>> can be set using PTransformOverride, we ideally shouldn't force the user to >>>> know details of the runner when writing their code. >>>> >>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <m...@apache.org> >>>> wrote: >>>> >>>>> Reuven is talking about PTransformOverride, e.g. >>>>> FlinkTransformOverrides. We already use this to determine the number >>>>> of >>>>> shards in case of Runner-determined sharding. >>>>> >>>>> Not sure if that would work for Jozef's case because setting the >>>>> number >>>>> of shards is not enough. We want to set the shard key directly and >>>>> that >>>>> logic is buried inside WriteFiles. >>>>> >>>>> -Max >>>>> >>>>> On 25.04.19 16:30, Reuven Lax wrote: >>>>> > Actually the runner is free to perform surgery on the graph. The >>>>> > FlinkRunner can insert a custom function to determine the sharding >>>>> keys. >>>>> > >>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jozo.vil...@gmail.com >>>>> > <mailto:jozo.vil...@gmail.com>> wrote: >>>>> > >>>>> > Right now, sharding can be specified only via target >>>>> `shardCount`, >>>>> > be it user or runner. Next to configurable shardCount, I am >>>>> > proposing to be able to pass also a function which will allow to >>>>> the >>>>> > user (or runner) control how is shard determined and what key >>>>> will >>>>> > be used to represent it >>>>> > >>>>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT] >>>>> extends >>>>> > Serializable { >>>>> > ShardKeyT assign(DestinationT destination, UserT element, >>>>> > shardCount: Integer); >>>>> > } >>>>> > >>>>> > Default implementation can be what is right now => random shard >>>>> > encapsulated as ShardedKey<Integer>. >>>>> > >>>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <re...@google.com >>>>> > <mailto:re...@google.com>> wrote: >>>>> > >>>>> > If sharding is not specified, then the semantics are >>>>> > "runner-determined sharding." The DataflowRunner already >>>>> takes >>>>> > advantage of this to impose its own sharding if the user >>>>> hasn't >>>>> > specified an explicit one. Could the Flink runner do the same >>>>> > instead of pushing this to the users? >>>>> > >>>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels >>>>> > <m...@apache.org <mailto:m...@apache.org>> wrote: >>>>> > >>>>> > Hi Jozef, >>>>> > >>>>> > For sharding in FileIO there are basically two options: >>>>> > >>>>> > (1) num_shards ~= num_workers => bad spread of the load >>>>> > across workers >>>>> > (2) num_shards >> num_workers => good spread of the load >>>>> > across workers, >>>>> > but huge number of files >>>>> > >>>>> > Your approach would give users control over the sharding >>>>> > keys such that >>>>> > they could be adjusted to spread load more evenly. >>>>> > >>>>> > I'd like to hear from Beam IO experts if that would make >>>>> sense. >>>>> > >>>>> > Thanks, >>>>> > Max >>>>> > >>>>> > On 25.04.19 08:52, Jozef Vilcek wrote: >>>>> > > Hello, >>>>> > > >>>>> > > Right now, if someone needs sharded files via FileIO, >>>>> > there is only one >>>>> > > option which is random (round robin) shard assignment >>>>> per >>>>> > element and it >>>>> > > always use ShardedKey<Integer> as a key for the GBK >>>>> which >>>>> > follows. >>>>> > > >>>>> > > I would like to generalize this and have a >>>>> possibility to >>>>> > provide some >>>>> > > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO. >>>>> > > What I am mainly after is, to have a possibility to >>>>> > provide optimisation >>>>> > > for Flink runtime and pass in a special function which >>>>> > generates shard >>>>> > > keys in a way that they are evenly spread among >>>>> workers >>>>> > (BEAM-5865). >>>>> > > >>>>> > > Would such extension for FileIO make sense? If yes, I >>>>> > would create a >>>>> > > ticket for it and try to draft a PR. >>>>> > > >>>>> > > Best, >>>>> > > Jozef >>>>> > >>>>> >>>>