[ 
https://issues.apache.org/jira/browse/BEAM-13667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17481374#comment-17481374
 ] 

Sandeep Kathula commented on BEAM-13667:
----------------------------------------

[~lcwik] [~kenn] 

As of now I don't see any way of distributing load on all the Flink task 
managers when using FileIO to write in Parquet format. RandomShardingFunction 
is causing OOM from task managers. Is there any better way to distribute load 
when writing using FileIO with Flink runner other than providing our own 
sharding function?

I have seen the conversation in 
https://issues.apache.org/jira/browse/BEAM-12493 and even there [~chamikara]  
mentions this is the good use case for providing our own sharding function. Is 
there any alternative solution to this other than using our own sharding 
function?

> Users cannot provide their own sharding function when using FileIO
> ------------------------------------------------------------------
>
>                 Key: BEAM-13667
>                 URL: https://issues.apache.org/jira/browse/BEAM-13667
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>    Affects Versions: 2.35.0
>            Reporter: Sandeep Kathula
>            Assignee: Sandeep Kathula
>            Priority: P2
>
> Beam uses RandomShardingFunction 
> ([https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L834])
>  by default for sharding when using FileIO.write().
>  
> RandomShardingFuncction doesn’t work well with Flink Runner. Its assigning 
> same key (hashDestination(destination, destinationCoder))  along with the 
> shard number. 
> ([https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L863])
> This is causing different ShardedKeys going to same task slot. As a result, 
> there is no equal distribution of files written from different task slots.
> E.g. 2 files are written by task slot 1, 3 files written by task slot 2, 0 
> files written by other task slots.
>  
> As a result, we are seeing Out of Memory from the pods writing more files.
>  
> At 
> [https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L695-L698]
> There is an option to give a different sharding function. But there is no way 
> for the user to mention different sharding function when using FileIO class.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to