On Wed, Jun 16, 2021 at 4:44 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote:
> > > On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles <k...@apache.org> wrote: > >> In general, Beam only deals with keys and grouping by key. I think >> expanding this idea to some more abstract notion of a sharding function >> could make sense. >> >> For FileIO specifically, I wonder if you can use writeDynamic() to get >> the behavior you are seeking. >> > > The change in mind looks like this: > > https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18 > > Dynamic Destinations in my mind is more towards the need for > "partitioning" data (destination as directory level) or if one needs to > handle groups of events differently, e.g. write some events in FormatA and > others in FormatB. > Shards are now used for distributing writes or bucketing of events within a > particular destination group. More specifically, currently, each element is > assigned `ShardedKey<Integer>` [1] before GBK operation. Sharded key is a > compound of destination and assigned shard. > I agree with your perspective. This is what the feature was designed for. But I think it can also be used as a workaround for a sharding function. If you read the code it is basically the same. I am actually in the middle of using this workaround, so I hope it works :-). And so I also agree with your proposal, somewhat. I would not have to use this workaround if we exposed sharding function. There are some reasons we didn't. I think they are: - avoid accidental data loss or poor performance due to bad sharding function - avoid a UDF allows runners to execute natively - for fixed sharding, a runner can also know all the expected files (part of natively executing) Someone else may have better history on this. I'll ping +Robert Bradshaw <rober...@google.com> and +Reuven Lax <re...@google.com> with that question. Tyson's suggestion of a model feature is pretty interesting to avoid shuffles entirely sometimes. That sounds like a big design project to see what sort of metadata we would have to track and where and how a user would specify sharding. But I see it is out of scope for your simpler goals. Kenn > > Having said that, I might be able to use dynamic destination for this, > possibly with the need of custom FileNaming, and set shards to be always 1. > But it feels less natural than allowing the user to swap already present > `RandomShardingFunction` [2] with something of his own choosing. > > > [1] > https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java > > [2] > https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856 > > Kenn >> >> On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton <tyso...@google.com> >> wrote: >> >>> Adding sharding to the model may require a wider discussion than FileIO >>> alone. I'm not entirely sure how wide, or if this has been proposed before, >>> but IMO it warrants a design doc or proposal. >>> >>> A couple high level questions I can think of are, >>> - What runners support sharding? >>> * There will be some work in Dataflow required to support this but >>> I'm not sure how much. >>> - What does sharding mean for streaming pipelines? >>> >>> A more nitty-detail question: >>> - How can this be achieved performantly? For example, if a shuffle is >>> required to achieve a particular sharding constraint, should we >>> allow transforms to declare they don't modify the sharding property (e.g. >>> key preserving) which may allow a runner to avoid an additional shuffle if >>> a preceding shuffle can guarantee the sharding requirements? >>> >>> Where X is the shuffle that could be avoided: input -> shuffle (key >>> sharding fn A) -> transform1 (key preserving) -> transform 2 (key >>> preserving) -> X -> fileio (key sharding fn A) >>> >>> On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek <jozo.vil...@gmail.com> >>> wrote: >>> >>>> I would like to extend FileIO with possibility to specify a custom >>>> sharding function: >>>> https://issues.apache.org/jira/browse/BEAM-12493 >>>> >>>> I have 2 use-cases for this: >>>> >>>> 1. I need to generate shards which are compatible with Hive >>>> bucketing and therefore need to decide shard assignment based on data >>>> fields of input element >>>> 2. When running e.g. on Spark and job encounters kind of failure >>>> which cause a loss of some data from previous stages, Spark does issue >>>> recompute of necessary task in necessary stages to recover data. Because >>>> the shard assignment function is random as default, some data will end >>>> up >>>> in different shards and cause duplicates in the final output. >>>> >>>> Please let me know your thoughts in case you see a reason to not to add >>>> such improvement. >>>> >>>> Thanks, >>>> Jozef >>>> >>>