Sorry, it took a while. I wanted to actually use this extension for
WriteFiles in Flink and see it works and that proved too be a bit bumpy.
PR is at https://github.com/apache/beam/pull/8499

On Thu, May 2, 2019 at 3:22 PM Reuven Lax <re...@google.com> wrote:

> Great, let me know when to take another look at the PR!
>
> Reuven
>
> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote:
>
>> That coder is added extra as a re-map stage from "original" key to new
>> ShardAwareKey ... But pipeline might get broken I guess.
>> Very fair point. I am having a second thought pass over this and will try
>> to simplify it much more
>>
>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:
>>
>>> I haven't looked at the PR in depth yet, but it appears that someone
>>> running a pipeline today who then tries to update post this PR will have
>>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>>> custom function. Is that correct, or am I misreading things?
>>>
>>> Reuven
>>>
>>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>> wrote:
>>>
>>>> Hm, what would be the scenario? Have version A running with original
>>>> random sharding and then start version B where I change sharding to some
>>>> custom function?
>>>> So I have to enable the pipeline to digest old keys from GBK restored
>>>> state and also work with new keys produced to GBK going forward?
>>>>
>>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Initial thought on PR: we usually try to limit changing coders in
>>>>> these types of transforms to better support runners that allow in-place
>>>>> updates of pipelines. Can this be done without changing the coder?
>>>>>
>>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>>> function.
>>>>>> https://github.com/apache/beam/pull/8438
>>>>>>
>>>>>> If this sort of change looks good, then next step would be to use in
>>>>>> in Flink runner transform override. Let me know what do you think
>>>>>>
>>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>>> PTransformOverride by runner.
>>>>>>>
>>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>>>>> function for choosing the shard number (I assume the problem is that 
>>>>>>>> there
>>>>>>>> are cases where the current random assignment is not good?). However 
>>>>>>>> this
>>>>>>>> can be set using PTransformOverride, we ideally shouldn't force the 
>>>>>>>> user to
>>>>>>>> know details of the runner when writing their code.
>>>>>>>>
>>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <m...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>>> FlinkTransformOverrides. We already use this to determine the
>>>>>>>>> number of
>>>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>>>
>>>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>>>> number
>>>>>>>>> of shards is not enough. We want to set the shard key directly and
>>>>>>>>> that
>>>>>>>>> logic is buried inside WriteFiles.
>>>>>>>>>
>>>>>>>>> -Max
>>>>>>>>>
>>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>>>>>> > FlinkRunner can insert a custom function to determine the
>>>>>>>>> sharding keys.
>>>>>>>>> >
>>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>>>> jozo.vil...@gmail.com
>>>>>>>>> > <mailto:jozo.vil...@gmail.com>> wrote:
>>>>>>>>> >
>>>>>>>>> >     Right now, sharding can be specified only via target
>>>>>>>>> `shardCount`,
>>>>>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>>>>>> >     proposing to be able to pass also a function which will
>>>>>>>>> allow to the
>>>>>>>>> >     user (or runner) control how is shard determined and what
>>>>>>>>> key will
>>>>>>>>> >     be used to represent it
>>>>>>>>> >
>>>>>>>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>>>>> extends
>>>>>>>>> >     Serializable {
>>>>>>>>> >         ShardKeyT assign(DestinationT destination, UserT element,
>>>>>>>>> >     shardCount: Integer);
>>>>>>>>> >     }
>>>>>>>>> >
>>>>>>>>> >     Default implementation can be what is right now =>  random
>>>>>>>>> shard
>>>>>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>>>>>> >
>>>>>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <re...@google.com
>>>>>>>>> >     <mailto:re...@google.com>> wrote:
>>>>>>>>> >
>>>>>>>>> >         If sharding is not specified, then the semantics are
>>>>>>>>> >         "runner-determined sharding." The DataflowRunner already
>>>>>>>>> takes
>>>>>>>>> >         advantage of this to impose its own sharding if the user
>>>>>>>>> hasn't
>>>>>>>>> >         specified an explicit one. Could the Flink runner do the
>>>>>>>>> same
>>>>>>>>> >         instead of pushing this to the users?
>>>>>>>>> >
>>>>>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>>>> >         <m...@apache.org <mailto:m...@apache.org>> wrote:
>>>>>>>>> >
>>>>>>>>> >             Hi Jozef,
>>>>>>>>> >
>>>>>>>>> >             For sharding in FileIO there are basically two
>>>>>>>>> options:
>>>>>>>>> >
>>>>>>>>> >             (1) num_shards ~= num_workers => bad spread of the
>>>>>>>>> load
>>>>>>>>> >             across workers
>>>>>>>>> >             (2) num_shards >> num_workers => good spread of the
>>>>>>>>> load
>>>>>>>>> >             across workers,
>>>>>>>>> >             but huge number of files
>>>>>>>>> >
>>>>>>>>> >             Your approach would give users control over the
>>>>>>>>> sharding
>>>>>>>>> >             keys such that
>>>>>>>>> >             they could be adjusted to spread load more evenly.
>>>>>>>>> >
>>>>>>>>> >             I'd like to hear from Beam IO experts if that would
>>>>>>>>> make sense.
>>>>>>>>> >
>>>>>>>>> >             Thanks,
>>>>>>>>> >             Max
>>>>>>>>> >
>>>>>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>>>> >              > Hello,
>>>>>>>>> >              >
>>>>>>>>> >              > Right now, if someone needs sharded files via
>>>>>>>>> FileIO,
>>>>>>>>> >             there is only one
>>>>>>>>> >              > option which is random (round robin) shard
>>>>>>>>> assignment per
>>>>>>>>> >             element and it
>>>>>>>>> >              > always use ShardedKey<Integer> as a key for the
>>>>>>>>> GBK which
>>>>>>>>> >             follows.
>>>>>>>>> >              >
>>>>>>>>> >              > I would like to generalize this and have a
>>>>>>>>> possibility to
>>>>>>>>> >             provide some
>>>>>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>>>> FileIO.
>>>>>>>>> >              > What I am mainly after is, to have a possibility
>>>>>>>>> to
>>>>>>>>> >             provide optimisation
>>>>>>>>> >              > for Flink runtime and pass in a special function
>>>>>>>>> which
>>>>>>>>> >             generates shard
>>>>>>>>> >              > keys in a way that they are evenly spread among
>>>>>>>>> workers
>>>>>>>>> >             (BEAM-5865).
>>>>>>>>> >              >
>>>>>>>>> >              > Would such extension for FileIO make sense? If
>>>>>>>>> yes, I
>>>>>>>>> >             would create a
>>>>>>>>> >              > ticket for it and try to draft a PR.
>>>>>>>>> >              >
>>>>>>>>> >              > Best,
>>>>>>>>> >              > Jozef
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>

Reply via email to