Sorry, it took a while. I wanted to actually use this extension for WriteFiles in Flink and see it works and that proved too be a bit bumpy. PR is at https://github.com/apache/beam/pull/8499
On Thu, May 2, 2019 at 3:22 PM Reuven Lax <re...@google.com> wrote: > Great, let me know when to take another look at the PR! > > Reuven > > On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote: > >> That coder is added extra as a re-map stage from "original" key to new >> ShardAwareKey ... But pipeline might get broken I guess. >> Very fair point. I am having a second thought pass over this and will try >> to simplify it much more >> >> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote: >> >>> I haven't looked at the PR in depth yet, but it appears that someone >>> running a pipeline today who then tries to update post this PR will have >>> the coder change to DefaultShardKeyCoder, even if they haven't picked any >>> custom function. Is that correct, or am I misreading things? >>> >>> Reuven >>> >>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jozo.vil...@gmail.com> >>> wrote: >>> >>>> Hm, what would be the scenario? Have version A running with original >>>> random sharding and then start version B where I change sharding to some >>>> custom function? >>>> So I have to enable the pipeline to digest old keys from GBK restored >>>> state and also work with new keys produced to GBK going forward? >>>> >>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote: >>>> >>>>> Initial thought on PR: we usually try to limit changing coders in >>>>> these types of transforms to better support runners that allow in-place >>>>> updates of pipelines. Can this be done without changing the coder? >>>>> >>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jozo.vil...@gmail.com> >>>>> wrote: >>>>> >>>>>> I have created a PR for enhancing WriteFiles for custom sharding >>>>>> function. >>>>>> https://github.com/apache/beam/pull/8438 >>>>>> >>>>>> If this sort of change looks good, then next step would be to use in >>>>>> in Flink runner transform override. Let me know what do you think >>>>>> >>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jozo.vil...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles >>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in >>>>>>> PTransformOverride by runner. >>>>>>> >>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> Yes, a hook would have to be added to allow specifying a different >>>>>>>> function for choosing the shard number (I assume the problem is that >>>>>>>> there >>>>>>>> are cases where the current random assignment is not good?). However >>>>>>>> this >>>>>>>> can be set using PTransformOverride, we ideally shouldn't force the >>>>>>>> user to >>>>>>>> know details of the runner when writing their code. >>>>>>>> >>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <m...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Reuven is talking about PTransformOverride, e.g. >>>>>>>>> FlinkTransformOverrides. We already use this to determine the >>>>>>>>> number of >>>>>>>>> shards in case of Runner-determined sharding. >>>>>>>>> >>>>>>>>> Not sure if that would work for Jozef's case because setting the >>>>>>>>> number >>>>>>>>> of shards is not enough. We want to set the shard key directly and >>>>>>>>> that >>>>>>>>> logic is buried inside WriteFiles. >>>>>>>>> >>>>>>>>> -Max >>>>>>>>> >>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote: >>>>>>>>> > Actually the runner is free to perform surgery on the graph. The >>>>>>>>> > FlinkRunner can insert a custom function to determine the >>>>>>>>> sharding keys. >>>>>>>>> > >>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek < >>>>>>>>> jozo.vil...@gmail.com >>>>>>>>> > <mailto:jozo.vil...@gmail.com>> wrote: >>>>>>>>> > >>>>>>>>> > Right now, sharding can be specified only via target >>>>>>>>> `shardCount`, >>>>>>>>> > be it user or runner. Next to configurable shardCount, I am >>>>>>>>> > proposing to be able to pass also a function which will >>>>>>>>> allow to the >>>>>>>>> > user (or runner) control how is shard determined and what >>>>>>>>> key will >>>>>>>>> > be used to represent it >>>>>>>>> > >>>>>>>>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT] >>>>>>>>> extends >>>>>>>>> > Serializable { >>>>>>>>> > ShardKeyT assign(DestinationT destination, UserT element, >>>>>>>>> > shardCount: Integer); >>>>>>>>> > } >>>>>>>>> > >>>>>>>>> > Default implementation can be what is right now => random >>>>>>>>> shard >>>>>>>>> > encapsulated as ShardedKey<Integer>. >>>>>>>>> > >>>>>>>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <re...@google.com >>>>>>>>> > <mailto:re...@google.com>> wrote: >>>>>>>>> > >>>>>>>>> > If sharding is not specified, then the semantics are >>>>>>>>> > "runner-determined sharding." The DataflowRunner already >>>>>>>>> takes >>>>>>>>> > advantage of this to impose its own sharding if the user >>>>>>>>> hasn't >>>>>>>>> > specified an explicit one. Could the Flink runner do the >>>>>>>>> same >>>>>>>>> > instead of pushing this to the users? >>>>>>>>> > >>>>>>>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels >>>>>>>>> > <m...@apache.org <mailto:m...@apache.org>> wrote: >>>>>>>>> > >>>>>>>>> > Hi Jozef, >>>>>>>>> > >>>>>>>>> > For sharding in FileIO there are basically two >>>>>>>>> options: >>>>>>>>> > >>>>>>>>> > (1) num_shards ~= num_workers => bad spread of the >>>>>>>>> load >>>>>>>>> > across workers >>>>>>>>> > (2) num_shards >> num_workers => good spread of the >>>>>>>>> load >>>>>>>>> > across workers, >>>>>>>>> > but huge number of files >>>>>>>>> > >>>>>>>>> > Your approach would give users control over the >>>>>>>>> sharding >>>>>>>>> > keys such that >>>>>>>>> > they could be adjusted to spread load more evenly. >>>>>>>>> > >>>>>>>>> > I'd like to hear from Beam IO experts if that would >>>>>>>>> make sense. >>>>>>>>> > >>>>>>>>> > Thanks, >>>>>>>>> > Max >>>>>>>>> > >>>>>>>>> > On 25.04.19 08:52, Jozef Vilcek wrote: >>>>>>>>> > > Hello, >>>>>>>>> > > >>>>>>>>> > > Right now, if someone needs sharded files via >>>>>>>>> FileIO, >>>>>>>>> > there is only one >>>>>>>>> > > option which is random (round robin) shard >>>>>>>>> assignment per >>>>>>>>> > element and it >>>>>>>>> > > always use ShardedKey<Integer> as a key for the >>>>>>>>> GBK which >>>>>>>>> > follows. >>>>>>>>> > > >>>>>>>>> > > I would like to generalize this and have a >>>>>>>>> possibility to >>>>>>>>> > provide some >>>>>>>>> > > ShardingFn[UserT, DestinationT, ShardKeyT] via >>>>>>>>> FileIO. >>>>>>>>> > > What I am mainly after is, to have a possibility >>>>>>>>> to >>>>>>>>> > provide optimisation >>>>>>>>> > > for Flink runtime and pass in a special function >>>>>>>>> which >>>>>>>>> > generates shard >>>>>>>>> > > keys in a way that they are evenly spread among >>>>>>>>> workers >>>>>>>>> > (BEAM-5865). >>>>>>>>> > > >>>>>>>>> > > Would such extension for FileIO make sense? If >>>>>>>>> yes, I >>>>>>>>> > would create a >>>>>>>>> > > ticket for it and try to draft a PR. >>>>>>>>> > > >>>>>>>>> > > Best, >>>>>>>>> > > Jozef >>>>>>>>> > >>>>>>>>> >>>>>>>>