[
https://issues.apache.org/jira/browse/FLINK-19582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17226764#comment-17226764
]
Stephan Ewen commented on FLINK-19582:
--------------------------------------
This has been merged as an optional experimental feature in 1.12.0
If the parallelism is larger than a threshold, the sort-merge shuffle
activates. This parallelism can be set via
"taskmanager.network.sort-shuffle.min-parallelism" and is by default MAX_INT,
so this feature is off by default in 1.12.0.
See
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#taskmanager-network-sort-shuffle-min-parallelism
It can be activated / evaluated by setting the value for the parallelism
threshold to a lower value.
> Introduce sort-merge based blocking shuffle to Flink
> ----------------------------------------------------
>
> Key: FLINK-19582
> URL: https://issues.apache.org/jira/browse/FLINK-19582
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Network
> Affects Versions: 1.12.0
> Reporter: Yingjie Cao
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.12.0
>
>
>
> *Motivation*
> Hash-based blocking shuffle and sort-merge based blocking shuffle are two
> main blocking shuffle implementations wildly adopted by existing distributed
> data processing frameworks. Hash-based implementation writes data sent to
> different reducer tasks into separate files concurrently while sort-merge
> based approach writes those data together into a single file and merges those
> small files into bigger ones. Compared to sort-merge based approach,
> hash-based approach has several weak points when it comes to running large
> scale batch jobs:
> *1. Stability*
> For high parallelism (tens of thousands) batch job, current hash-based
> blocking shuffle implementation writes too many files concurrently which
> gives high pressure to the file system, for example, maintenance of too many
> file metas, high system cpu consumption and exhaustion of inodes or file
> descriptors. All of these can be potential stability issues which we
> encountered in our production environment before we switch to sort-merge
> based blocking shuffle.
> Sort-Merge based blocking shuffle don’t have the problem because for one
> result partition, only one file is written at the same time.
> *2. Performance*
> Large amounts of small shuffle files and random io can influence shuffle
> performance a lot especially for hdd (for ssd, sequential read is also
> important because of read ahead and cache).
> For batch job processing massive data, small amount of data per subpartition
> is common, because to reduce the job completion time, we usually increase the
> job parallelism to reduce the amount of data processed per task and the
> average data amount per subpartition is relevant to:
> (the amount of data per task) / (parallelism) = (total amount of data) /
> (parallelism^2)
> which means increasing parallelism can decrease the amount of data per
> subpartition rapidly.
> Besides, data skew is another cause of small subpartition files. By merging
> data of all subpartitions together in one file, more sequential read can be
> achieved.
> *3. Resource*
> For current hash-based implementation, each subpartition needs at least one
> buffer. For large scale batch shuffles, the memory consumption can be huge.
> For example, we need at least 320M network memory per result partition if
> parallelism is set to 10000 and because of the huge network consumption, it
> is hard to config the network memory for large scale batch job and sometimes
> parallelism can not be increased just because of insufficient network memory
> which leads to bad user experience.
> By introducing the sort-merge based approach to Flink, we can improve Flink’s
> capability of running large scale batch jobs.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)