[
https://issues.apache.org/jira/browse/BEAM-13667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17539861#comment-17539861
]
Tapan Upadhyay commented on BEAM-13667:
---------------------------------------
[~kenn] [~lcwik] It will be really helpful to have this feature for someone who
is writing files as it helps in distributing load evenly to all task managers,
we have made this change in local beam version and able to process 20k tps and
write to s3 when it was failing for 5-8k tps without evenly sharding data
(allocating same resource).
> Users cannot provide their own sharding function when using FileIO
> ------------------------------------------------------------------
>
> Key: BEAM-13667
> URL: https://issues.apache.org/jira/browse/BEAM-13667
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Affects Versions: 2.35.0
> Reporter: Sandeep Kathula
> Priority: P3
>
> Beam uses RandomShardingFunction
> ([https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L834])
> by default for sharding when using FileIO.write().
>
> RandomShardingFuncction doesn’t work well with Flink Runner. Its assigning
> same key (hashDestination(destination, destinationCoder)) along with the
> shard number.
> ([https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L863])
> This is causing different ShardedKeys going to same task slot. As a result,
> there is no equal distribution of files written from different task slots.
> E.g. 2 files are written by task slot 1, 3 files written by task slot 2, 0
> files written by other task slots.
>
> As a result, we are seeing Out of Memory from the pods writing more files.
>
> At
> [https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L695-L698]
> There is an option to give a different sharding function. But there is no way
> for the user to mention different sharding function when using FileIO class.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)