Sandeep Kathula created BEAM-13667:
--------------------------------------
Summary: Users cannot provide their own sharding function when
using FileIO
Key: BEAM-13667
URL: https://issues.apache.org/jira/browse/BEAM-13667
Project: Beam
Issue Type: Improvement
Components: sdk-java-core
Affects Versions: 2.35.0
Reporter: Sandeep Kathula
Beam uses RandomShardingFunction
([https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L834])
by default for sharding when using FileIO.write().
RandomShardingFuncction doesn’t work well with Flink Runner. Its assigning same
key (hashDestination(destination, destinationCoder)) along with the shard
number.
([https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L863])
This is causing different ShardedKeys going to same task slot. As a result,
there is no equal distribution of files written from different task slots.
E.g. 2 files are written by task slot 1, 3 files written by task slot 2, 0
files written by other task slots.
As a result, we are seeing Out of Memory from the pods writing more files.
At
[https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L695-L698]
There is an option to give a different sharding function. But there is no way
for the user to mention different sharding function when using FileIO class.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)