On Wed, Jun 16, 2021 at 4:44 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote:

>
>
> On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles <k...@apache.org> wrote:
>
>> In general, Beam only deals with keys and grouping by key. I think
>> expanding this idea to some more abstract notion of a sharding function
>> could make sense.
>>
>> For FileIO specifically, I wonder if you can use writeDynamic() to get
>> the behavior you are seeking.
>>
>
> The change in mind looks like this:
>
> https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18
>
> Dynamic Destinations in my mind is more towards the need for
> "partitioning" data (destination as directory level) or if one needs to
> handle groups of events differently, e.g. write some events in FormatA and
> others in FormatB.
>
Shards are now used for distributing writes or bucketing of events within a
> particular destination group. More specifically, currently, each element is
> assigned `ShardedKey<Integer>` [1] before GBK operation. Sharded key is a
> compound of destination and assigned shard.
>

I agree with your perspective. This is what the feature was designed for.
But I think it can also be used as a workaround for a sharding function. If
you read the code it is basically the same. I am actually in the middle of
using this workaround, so I hope it works :-). And so I also agree with
your proposal, somewhat. I would not have to use this workaround if we
exposed sharding function.

There are some reasons we didn't. I think they are:

 - avoid accidental data loss or poor performance due to bad sharding
function
 - avoid a UDF allows runners to execute natively
 - for fixed sharding, a runner can also know all the expected files (part
of natively executing)

Someone else may have better history on this. I'll ping +Robert Bradshaw
<rober...@google.com> and +Reuven Lax <re...@google.com> with that question.

Tyson's suggestion of a model feature is pretty interesting to avoid
shuffles entirely sometimes. That sounds like a big design project to see
what sort of metadata we would have to track and where and how a user would
specify sharding. But I see it is out of scope for your simpler goals.

Kenn


>
> Having said that, I might be able to use dynamic destination for this,
> possibly with the need of custom FileNaming, and set shards to be always 1.
> But it feels less natural than allowing the user to swap already present
> `RandomShardingFunction` [2] with something of his own choosing.
>
>
> [1]
> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
>
> [2]
> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856
>
> Kenn
>>
>> On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton <tyso...@google.com>
>> wrote:
>>
>>> Adding sharding to the model may require a wider discussion than FileIO
>>> alone. I'm not entirely sure how wide, or if this has been proposed before,
>>> but IMO it warrants a design doc or proposal.
>>>
>>> A couple high level questions I can think of are,
>>>   - What runners support sharding?
>>>       * There will be some work in Dataflow required to support this but
>>> I'm not sure how much.
>>>   - What does sharding mean for streaming pipelines?
>>>
>>> A more nitty-detail question:
>>>   - How can this be achieved performantly? For example, if a shuffle is
>>> required to achieve a particular sharding constraint, should we
>>> allow transforms to declare they don't modify the sharding property (e.g.
>>> key preserving) which may allow a runner to avoid an additional shuffle if
>>> a preceding shuffle can guarantee the sharding requirements?
>>>
>>> Where X is the shuffle that could be avoided: input -> shuffle (key
>>> sharding fn A) -> transform1 (key preserving) -> transform 2 (key
>>> preserving) -> X -> fileio (key sharding fn A)
>>>
>>> On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>> wrote:
>>>
>>>> I would like to extend FileIO with possibility to specify a custom
>>>> sharding function:
>>>> https://issues.apache.org/jira/browse/BEAM-12493
>>>>
>>>> I have 2 use-cases for this:
>>>>
>>>>    1. I need to generate shards which are compatible with Hive
>>>>    bucketing and therefore need to decide shard assignment based on data
>>>>    fields of input element
>>>>    2. When running e.g. on Spark and job encounters kind of failure
>>>>    which cause a loss of some data from previous stages, Spark does issue
>>>>    recompute of necessary task in necessary stages to recover data. Because
>>>>    the shard assignment function is random as default, some data will end 
>>>> up
>>>>    in different shards and cause duplicates in the final output.
>>>>
>>>> Please let me know your thoughts in case you see a reason to not to add
>>>> such improvement.
>>>>
>>>> Thanks,
>>>> Jozef
>>>>
>>>

Reply via email to