I have created a PR for enhancing WriteFiles for custom sharding function.
https://github.com/apache/beam/pull/8438

If this sort of change looks good, then next step would be to use in in
Flink runner transform override. Let me know what do you think

On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote:

> I guess it is fine to enable shardingFn control only on WriteFiles level
> rather than FileIO. On WriteFiles it can be manipulated in
> PTransformOverride by runner.
>
> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>
>> Yes, a hook would have to be added to allow specifying a different
>> function for choosing the shard number (I assume the problem is that there
>> are cases where the current random assignment is not good?). However this
>> can be set using PTransformOverride, we ideally shouldn't force the user to
>> know details of the runner when writing their code.
>>
>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <m...@apache.org>
>> wrote:
>>
>>> Reuven is talking about PTransformOverride, e.g.
>>> FlinkTransformOverrides. We already use this to determine the number of
>>> shards in case of Runner-determined sharding.
>>>
>>> Not sure if that would work for Jozef's case because setting the number
>>> of shards is not enough. We want to set the shard key directly and that
>>> logic is buried inside WriteFiles.
>>>
>>> -Max
>>>
>>> On 25.04.19 16:30, Reuven Lax wrote:
>>> > Actually the runner is free to perform surgery on the graph. The
>>> > FlinkRunner can insert a custom function to determine the sharding
>>> keys.
>>> >
>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jozo.vil...@gmail.com
>>> > <mailto:jozo.vil...@gmail.com>> wrote:
>>> >
>>> >     Right now, sharding can be specified only via target `shardCount`,
>>> >     be it user or runner. Next to configurable shardCount, I am
>>> >     proposing to be able to pass also a function which will allow to
>>> the
>>> >     user (or runner) control how is shard determined and what key will
>>> >     be used to represent it
>>> >
>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]  extends
>>> >     Serializable {
>>> >         ShardKeyT assign(DestinationT destination, UserT element,
>>> >     shardCount: Integer);
>>> >     }
>>> >
>>> >     Default implementation can be what is right now =>  random shard
>>> >     encapsulated as ShardedKey<Integer>.
>>> >
>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <re...@google.com
>>> >     <mailto:re...@google.com>> wrote:
>>> >
>>> >         If sharding is not specified, then the semantics are
>>> >         "runner-determined sharding." The DataflowRunner already takes
>>> >         advantage of this to impose its own sharding if the user hasn't
>>> >         specified an explicit one. Could the Flink runner do the same
>>> >         instead of pushing this to the users?
>>> >
>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>> >         <m...@apache.org <mailto:m...@apache.org>> wrote:
>>> >
>>> >             Hi Jozef,
>>> >
>>> >             For sharding in FileIO there are basically two options:
>>> >
>>> >             (1) num_shards ~= num_workers => bad spread of the load
>>> >             across workers
>>> >             (2) num_shards >> num_workers => good spread of the load
>>> >             across workers,
>>> >             but huge number of files
>>> >
>>> >             Your approach would give users control over the sharding
>>> >             keys such that
>>> >             they could be adjusted to spread load more evenly.
>>> >
>>> >             I'd like to hear from Beam IO experts if that would make
>>> sense.
>>> >
>>> >             Thanks,
>>> >             Max
>>> >
>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>> >              > Hello,
>>> >              >
>>> >              > Right now, if someone needs sharded files via FileIO,
>>> >             there is only one
>>> >              > option which is random (round robin) shard assignment
>>> per
>>> >             element and it
>>> >              > always use ShardedKey<Integer> as a key for the GBK
>>> which
>>> >             follows.
>>> >              >
>>> >              > I would like to generalize this and have a possibility
>>> to
>>> >             provide some
>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
>>> >              > What I am mainly after is, to have a possibility to
>>> >             provide optimisation
>>> >              > for Flink runtime and pass in a special function which
>>> >             generates shard
>>> >              > keys in a way that they are evenly spread among workers
>>> >             (BEAM-5865).
>>> >              >
>>> >              > Would such extension for FileIO make sense? If yes, I
>>> >             would create a
>>> >              > ticket for it and try to draft a PR.
>>> >              >
>>> >              > Best,
>>> >              > Jozef
>>> >
>>>
>>

Reply via email to