On Wed, Jun 16, 2021 at 12:49 AM Tyson Hamilton <tyso...@google.com> wrote:
> Adding sharding to the model may require a wider discussion than FileIO > alone. I'm not entirely sure how wide, or if this has been proposed before, > but IMO it warrants a design doc or proposal. > I should have been more clear about the intent. I am not trying to add it to a general model. Only upgrade possibility to control sharding loginc in FileIO (java) as sharding already is there. You can call `.withNumShards(...)` on FileIO in which case this will be used: https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856 > A couple high level questions I can think of are, > - What runners support sharding? > * There will be some work in Dataflow required to support this but > I'm not sure how much. > - What does sharding mean for streaming pipelines? > > A more nitty-detail question: > - How can this be achieved performantly? For example, if a shuffle is > required to achieve a particular sharding constraint, should we > allow transforms to declare they don't modify the sharding property (e.g. > key preserving) which may allow a runner to avoid an additional shuffle if > a preceding shuffle can guarantee the sharding requirements? > > Where X is the shuffle that could be avoided: input -> shuffle (key > sharding fn A) -> transform1 (key preserving) -> transform 2 (key > preserving) -> X -> fileio (key sharding fn A) > > On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >> I would like to extend FileIO with possibility to specify a custom >> sharding function: >> https://issues.apache.org/jira/browse/BEAM-12493 >> >> I have 2 use-cases for this: >> >> 1. I need to generate shards which are compatible with Hive bucketing >> and therefore need to decide shard assignment based on data fields of >> input >> element >> 2. When running e.g. on Spark and job encounters kind of failure >> which cause a loss of some data from previous stages, Spark does issue >> recompute of necessary task in necessary stages to recover data. Because >> the shard assignment function is random as default, some data will end up >> in different shards and cause duplicates in the final output. >> >> Please let me know your thoughts in case you see a reason to not to add >> such improvement. >> >> Thanks, >> Jozef >> >