[ 
https://issues.apache.org/jira/browse/BEAM-13667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17476467#comment-17476467
 ] 

Sandeep Kathula commented on BEAM-13667:
----------------------------------------

[~lcwik] Yeah this looks like the duplicate of 
https://issues.apache.org/jira/browse/BEAM-12493. But the issue we are facing 
is genuine with Flink runner. The fix I would like to put in by adding 
withShardingFunction() is already proposed but that was rejected because for 
the hive partitioning use case, there is an alternative way of doing using 
writeDynamic() but in the case of Flink runner this is very useful and will 
allow the files to be equally distributed from all the task slots. It can also 
help users to avoid getting OOM. Please suggest...

> Users cannot provide their own sharding function when using FileIO
> ------------------------------------------------------------------
>
>                 Key: BEAM-13667
>                 URL: https://issues.apache.org/jira/browse/BEAM-13667
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>    Affects Versions: 2.35.0
>            Reporter: Sandeep Kathula
>            Assignee: Sandeep Kathula
>            Priority: P2
>
> Beam uses RandomShardingFunction 
> ([https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L834])
>  by default for sharding when using FileIO.write().
>  
> RandomShardingFuncction doesn’t work well with Flink Runner. Its assigning 
> same key (hashDestination(destination, destinationCoder))  along with the 
> shard number. 
> ([https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L863])
> This is causing different ShardedKeys going to same task slot. As a result, 
> there is no equal distribution of files written from different task slots.
> E.g. 2 files are written by task slot 1, 3 files written by task slot 2, 0 
> files written by other task slots.
>  
> As a result, we are seeing Out of Memory from the pods writing more files.
>  
> At 
> [https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L695-L698]
> There is an option to give a different sharding function. But there is no way 
> for the user to mention different sharding function when using FileIO class.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to