Hm, what would be the scenario? Have version A running with original random
sharding and then start version B where I change sharding to some custom
function?
So I have to enable the pipeline to digest old keys from GBK restored state
and also work with new keys produced to GBK going forward?

On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:

> Initial thought on PR: we usually try to limit changing coders in these
> types of transforms to better support runners that allow in-place updates
> of pipelines. Can this be done without changing the coder?
>
> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
>
>> I have created a PR for enhancing WriteFiles for custom sharding function.
>> https://github.com/apache/beam/pull/8438
>>
>> If this sort of change looks good, then next step would be to use in in
>> Flink runner transform override. Let me know what do you think
>>
>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jozo.vil...@gmail.com>
>> wrote:
>>
>>> I guess it is fine to enable shardingFn control only on WriteFiles level
>>> rather than FileIO. On WriteFiles it can be manipulated in
>>> PTransformOverride by runner.
>>>
>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Yes, a hook would have to be added to allow specifying a different
>>>> function for choosing the shard number (I assume the problem is that there
>>>> are cases where the current random assignment is not good?). However this
>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>> know details of the runner when writing their code.
>>>>
>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <m...@apache.org>
>>>> wrote:
>>>>
>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>> FlinkTransformOverrides. We already use this to determine the number
>>>>> of
>>>>> shards in case of Runner-determined sharding.
>>>>>
>>>>> Not sure if that would work for Jozef's case because setting the
>>>>> number
>>>>> of shards is not enough. We want to set the shard key directly and
>>>>> that
>>>>> logic is buried inside WriteFiles.
>>>>>
>>>>> -Max
>>>>>
>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>> > FlinkRunner can insert a custom function to determine the sharding
>>>>> keys.
>>>>> >
>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jozo.vil...@gmail.com
>>>>> > <mailto:jozo.vil...@gmail.com>> wrote:
>>>>> >
>>>>> >     Right now, sharding can be specified only via target
>>>>> `shardCount`,
>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>> >     proposing to be able to pass also a function which will allow to
>>>>> the
>>>>> >     user (or runner) control how is shard determined and what key
>>>>> will
>>>>> >     be used to represent it
>>>>> >
>>>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>> extends
>>>>> >     Serializable {
>>>>> >         ShardKeyT assign(DestinationT destination, UserT element,
>>>>> >     shardCount: Integer);
>>>>> >     }
>>>>> >
>>>>> >     Default implementation can be what is right now =>  random shard
>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>> >
>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <re...@google.com
>>>>> >     <mailto:re...@google.com>> wrote:
>>>>> >
>>>>> >         If sharding is not specified, then the semantics are
>>>>> >         "runner-determined sharding." The DataflowRunner already
>>>>> takes
>>>>> >         advantage of this to impose its own sharding if the user
>>>>> hasn't
>>>>> >         specified an explicit one. Could the Flink runner do the same
>>>>> >         instead of pushing this to the users?
>>>>> >
>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>> >         <m...@apache.org <mailto:m...@apache.org>> wrote:
>>>>> >
>>>>> >             Hi Jozef,
>>>>> >
>>>>> >             For sharding in FileIO there are basically two options:
>>>>> >
>>>>> >             (1) num_shards ~= num_workers => bad spread of the load
>>>>> >             across workers
>>>>> >             (2) num_shards >> num_workers => good spread of the load
>>>>> >             across workers,
>>>>> >             but huge number of files
>>>>> >
>>>>> >             Your approach would give users control over the sharding
>>>>> >             keys such that
>>>>> >             they could be adjusted to spread load more evenly.
>>>>> >
>>>>> >             I'd like to hear from Beam IO experts if that would make
>>>>> sense.
>>>>> >
>>>>> >             Thanks,
>>>>> >             Max
>>>>> >
>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>> >              > Hello,
>>>>> >              >
>>>>> >              > Right now, if someone needs sharded files via FileIO,
>>>>> >             there is only one
>>>>> >              > option which is random (round robin) shard assignment
>>>>> per
>>>>> >             element and it
>>>>> >              > always use ShardedKey<Integer> as a key for the GBK
>>>>> which
>>>>> >             follows.
>>>>> >              >
>>>>> >              > I would like to generalize this and have a
>>>>> possibility to
>>>>> >             provide some
>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
>>>>> >              > What I am mainly after is, to have a possibility to
>>>>> >             provide optimisation
>>>>> >              > for Flink runtime and pass in a special function which
>>>>> >             generates shard
>>>>> >              > keys in a way that they are evenly spread among
>>>>> workers
>>>>> >             (BEAM-5865).
>>>>> >              >
>>>>> >              > Would such extension for FileIO make sense? If yes, I
>>>>> >             would create a
>>>>> >              > ticket for it and try to draft a PR.
>>>>> >              >
>>>>> >              > Best,
>>>>> >              > Jozef
>>>>> >
>>>>>
>>>>

Reply via email to