On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles <k...@apache.org> wrote:

> In general, Beam only deals with keys and grouping by key. I think
> expanding this idea to some more abstract notion of a sharding function
> could make sense.
>
> For FileIO specifically, I wonder if you can use writeDynamic() to get the
> behavior you are seeking.
>

The change in mind looks like this:
https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18

Dynamic Destinations in my mind is more towards the need for "partitioning"
data (destination as directory level) or if one needs to handle groups of
events differently, e.g. write some events in FormatA and others in FormatB.
Shards are now used for distributing writes or bucketing of events within a
particular destination group. More specifically, currently, each element is
assigned `ShardedKey<Integer>` [1] before GBK operation. Sharded key is a
compound of destination and assigned shard.

Having said that, I might be able to use dynamic destination for this,
possibly with the need of custom FileNaming, and set shards to be always 1.
But it feels less natural than allowing the user to swap already present
`RandomShardingFunction` [2] with something of his own choosing.


[1]
https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java

[2]
https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856

Kenn
>
> On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton <tyso...@google.com> wrote:
>
>> Adding sharding to the model may require a wider discussion than FileIO
>> alone. I'm not entirely sure how wide, or if this has been proposed before,
>> but IMO it warrants a design doc or proposal.
>>
>> A couple high level questions I can think of are,
>>   - What runners support sharding?
>>       * There will be some work in Dataflow required to support this but
>> I'm not sure how much.
>>   - What does sharding mean for streaming pipelines?
>>
>> A more nitty-detail question:
>>   - How can this be achieved performantly? For example, if a shuffle is
>> required to achieve a particular sharding constraint, should we
>> allow transforms to declare they don't modify the sharding property (e.g.
>> key preserving) which may allow a runner to avoid an additional shuffle if
>> a preceding shuffle can guarantee the sharding requirements?
>>
>> Where X is the shuffle that could be avoided: input -> shuffle (key
>> sharding fn A) -> transform1 (key preserving) -> transform 2 (key
>> preserving) -> X -> fileio (key sharding fn A)
>>
>> On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek <jozo.vil...@gmail.com>
>> wrote:
>>
>>> I would like to extend FileIO with possibility to specify a custom
>>> sharding function:
>>> https://issues.apache.org/jira/browse/BEAM-12493
>>>
>>> I have 2 use-cases for this:
>>>
>>>    1. I need to generate shards which are compatible with Hive
>>>    bucketing and therefore need to decide shard assignment based on data
>>>    fields of input element
>>>    2. When running e.g. on Spark and job encounters kind of failure
>>>    which cause a loss of some data from previous stages, Spark does issue
>>>    recompute of necessary task in necessary stages to recover data. Because
>>>    the shard assignment function is random as default, some data will end up
>>>    in different shards and cause duplicates in the final output.
>>>
>>> Please let me know your thoughts in case you see a reason to not to add
>>> such improvement.
>>>
>>> Thanks,
>>> Jozef
>>>
>>

Reply via email to