[ 
https://issues.apache.org/jira/browse/FLINK-19582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17228950#comment-17228950
 ] 

Husky Zeng commented on FLINK-19582:
------------------------------------

Hi  Yingjie,
Thanks for your contribute,it's very useful for my project! I am trying to 
merge this function from master to my project branch,so I want to know that do 
you have finish all work for this function?
It seems like “Step #2: Implement File Merge and Other Optimizations“ is  not 
merge to master.

Thank you for your reply!
[~kevin.cyj]


> Introduce sort-merge based blocking shuffle to Flink
> ----------------------------------------------------
>
>                 Key: FLINK-19582
>                 URL: https://issues.apache.org/jira/browse/FLINK-19582
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>    Affects Versions: 1.12.0
>            Reporter: Yingjie Cao
>            Assignee: Yingjie Cao
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.12.0
>
>
>  
> *Motivation*
> Hash-based blocking shuffle and sort-merge based blocking shuffle are two 
> main blocking shuffle implementations wildly adopted by existing distributed 
> data processing frameworks. Hash-based implementation writes data sent to 
> different reducer tasks into separate files concurrently while sort-merge 
> based approach writes those data together into a single file and merges those 
> small files into bigger ones. Compared to sort-merge based approach, 
> hash-based approach has several weak points when it comes to running large 
> scale batch jobs:
> *1. Stability*
> For high parallelism (tens of thousands) batch job, current hash-based 
> blocking shuffle implementation writes too many files concurrently which 
> gives high pressure to the file system, for example, maintenance of too many  
> file metas, high system cpu consumption and exhaustion of inodes or file 
> descriptors. All of these can be potential stability issues which we 
> encountered in our production environment before we switch to sort-merge 
> based blocking shuffle.
> Sort-Merge based blocking shuffle don’t have the problem because for one 
> result partition, only one file is written at the same time.
> *2. Performance*
> Large amounts of small shuffle files and random io can influence shuffle 
> performance a lot especially for hdd (for ssd, sequential read is also 
> important because of read ahead and cache). 
> For batch job processing massive data, small amount of data per subpartition 
> is common, because to reduce the job completion time, we usually increase the 
> job parallelism to reduce the amount of data processed per task and the 
> average data amount per subpartition is relevant to:
> (the amount of data per task) / (parallelism) = (total amount of data) / 
> (parallelism^2)
> which means increasing parallelism can decrease the amount of data per 
> subpartition rapidly. 
> Besides, data skew is another cause of small subpartition files. By merging 
> data of all subpartitions together in one file, more sequential read can be 
> achieved.
> *3. Resource*
> For current hash-based implementation, each subpartition needs at least one 
> buffer. For large scale batch shuffles, the memory consumption can be huge. 
> For example, we need at least 320M network memory per result partition if 
> parallelism is set to 10000 and because of the huge network consumption, it 
> is hard to config the network memory for large scale batch job and  sometimes 
> parallelism can not be increased just because of insufficient network memory  
> which leads to bad user experience.
> By introducing the sort-merge based approach to Flink, we can improve Flink’s 
> capability of running large scale batch jobs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to