[
https://issues.apache.org/jira/browse/FLINK-19582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17232714#comment-17232714
]
Yingjie Cao commented on FLINK-19582:
-------------------------------------
[[email protected]] Sorry for the late reply. I am glad that it can help you.
Currently the implementation is not completed yet. We have another Jira which
focuses mainly on optimization:
https://issues.apache.org/jira/browse/FLINK-19614. I have opened a PR for it
which is really important for performance and stability. But unfortunately, we
don't have enough time to merge it into release-1.12, sorry for that. If I
understand correctly, you maintain a private branch of Flink. If so, you may
also be interested in the new optimization in FLINK-19614. Note that the code
is still under review and some logics may be changed. Feel free to give it any
comments or try.
By the way, we do not need to merge the result files any more. Because
currently all data of the same result partition is written sequentially into
one file. I will update the FLIP soon.
> FLIP-148: Introduce sort-merge based blocking shuffle to Flink
> --------------------------------------------------------------
>
> Key: FLINK-19582
> URL: https://issues.apache.org/jira/browse/FLINK-19582
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Network
> Affects Versions: 1.12.0
> Reporter: Yingjie Cao
> Assignee: Yingjie Cao
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.12.0
>
>
>
> *Motivation*
> Hash-based blocking shuffle and sort-merge based blocking shuffle are two
> main blocking shuffle implementations wildly adopted by existing distributed
> data processing frameworks. Hash-based implementation writes data sent to
> different reducer tasks into separate files concurrently while sort-merge
> based approach writes those data together into a single file and merges those
> small files into bigger ones. Compared to sort-merge based approach,
> hash-based approach has several weak points when it comes to running large
> scale batch jobs:
> *1. Stability*
> For high parallelism (tens of thousands) batch job, current hash-based
> blocking shuffle implementation writes too many files concurrently which
> gives high pressure to the file system, for example, maintenance of too many
> file metas, high system cpu consumption and exhaustion of inodes or file
> descriptors. All of these can be potential stability issues which we
> encountered in our production environment before we switch to sort-merge
> based blocking shuffle.
> Sort-Merge based blocking shuffle don’t have the problem because for one
> result partition, only one file is written at the same time.
> *2. Performance*
> Large amounts of small shuffle files and random io can influence shuffle
> performance a lot especially for hdd (for ssd, sequential read is also
> important because of read ahead and cache).
> For batch job processing massive data, small amount of data per subpartition
> is common, because to reduce the job completion time, we usually increase the
> job parallelism to reduce the amount of data processed per task and the
> average data amount per subpartition is relevant to:
> (the amount of data per task) / (parallelism) = (total amount of data) /
> (parallelism^2)
> which means increasing parallelism can decrease the amount of data per
> subpartition rapidly.
> Besides, data skew is another cause of small subpartition files. By merging
> data of all subpartitions together in one file, more sequential read can be
> achieved.
> *3. Resource*
> For current hash-based implementation, each subpartition needs at least one
> buffer. For large scale batch shuffles, the memory consumption can be huge.
> For example, we need at least 320M network memory per result partition if
> parallelism is set to 10000 and because of the huge network consumption, it
> is hard to config the network memory for large scale batch job and sometimes
> parallelism can not be increased just because of insufficient network memory
> which leads to bad user experience.
> By introducing the sort-merge based approach to Flink, we can improve Flink’s
> capability of running large scale batch jobs.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)