[ 
https://issues.apache.org/jira/browse/FLINK-19582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17226764#comment-17226764
 ] 

Stephan Ewen commented on FLINK-19582:
--------------------------------------

This has been merged as an optional experimental feature in 1.12.0

If the parallelism is larger than a threshold, the sort-merge shuffle 
activates. This parallelism can be set via 
"taskmanager.network.sort-shuffle.min-parallelism" and is by default MAX_INT, 
so this feature is off by default in 1.12.0.
See 
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#taskmanager-network-sort-shuffle-min-parallelism

It can be activated / evaluated by setting the value for the parallelism 
threshold to a lower value.

> Introduce sort-merge based blocking shuffle to Flink
> ----------------------------------------------------
>
>                 Key: FLINK-19582
>                 URL: https://issues.apache.org/jira/browse/FLINK-19582
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>    Affects Versions: 1.12.0
>            Reporter: Yingjie Cao
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.12.0
>
>
>  
> *Motivation*
> Hash-based blocking shuffle and sort-merge based blocking shuffle are two 
> main blocking shuffle implementations wildly adopted by existing distributed 
> data processing frameworks. Hash-based implementation writes data sent to 
> different reducer tasks into separate files concurrently while sort-merge 
> based approach writes those data together into a single file and merges those 
> small files into bigger ones. Compared to sort-merge based approach, 
> hash-based approach has several weak points when it comes to running large 
> scale batch jobs:
> *1. Stability*
> For high parallelism (tens of thousands) batch job, current hash-based 
> blocking shuffle implementation writes too many files concurrently which 
> gives high pressure to the file system, for example, maintenance of too many  
> file metas, high system cpu consumption and exhaustion of inodes or file 
> descriptors. All of these can be potential stability issues which we 
> encountered in our production environment before we switch to sort-merge 
> based blocking shuffle.
> Sort-Merge based blocking shuffle don’t have the problem because for one 
> result partition, only one file is written at the same time.
> *2. Performance*
> Large amounts of small shuffle files and random io can influence shuffle 
> performance a lot especially for hdd (for ssd, sequential read is also 
> important because of read ahead and cache). 
> For batch job processing massive data, small amount of data per subpartition 
> is common, because to reduce the job completion time, we usually increase the 
> job parallelism to reduce the amount of data processed per task and the 
> average data amount per subpartition is relevant to:
> (the amount of data per task) / (parallelism) = (total amount of data) / 
> (parallelism^2)
> which means increasing parallelism can decrease the amount of data per 
> subpartition rapidly. 
> Besides, data skew is another cause of small subpartition files. By merging 
> data of all subpartitions together in one file, more sequential read can be 
> achieved.
> *3. Resource*
> For current hash-based implementation, each subpartition needs at least one 
> buffer. For large scale batch shuffles, the memory consumption can be huge. 
> For example, we need at least 320M network memory per result partition if 
> parallelism is set to 10000 and because of the huge network consumption, it 
> is hard to config the network memory for large scale batch job and  sometimes 
> parallelism can not be increased just because of insufficient network memory  
> which leads to bad user experience.
> By introducing the sort-merge based approach to Flink, we can improve Flink’s 
> capability of running large scale batch jobs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to