Hi Till, Thanks for your reply and comments.
You are right, the proposed sort-merge based shuffle is an extension of the existing blocking shuffle and does not change any default behavior of Flink. As for the performance, according to our previous experience, sort-merge based implementation can reduce the shuffle time by 30% to even 90% compared to hash-based implementation. My PoC implementation without any further optimization can already reduce the shuffle time over 10% on SSD and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job. After switch to sort-merge based blocking shuffle, some of our users' jobs can scale up to over 20000 parallelism, though need some JM and RM side optimization. I haven't ever tried to find where the upper bound is, but I guess sever tens of thousand should be able to m <http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W>eet the needs of most users. Best, Yingjie Till Rohrmann <trohrm...@apache.org> 于2020年10月15日周四 下午3:57写道: > Hi Yingjie, > > thanks for proposing the sort-merge based blocking shuffle. I like the > proposal and it does not seem to change the internals of Flink. Instead it > is an extension of existing interfaces which makes it a > non-invasive addition. > > Do you have any numbers comparing the performance of the sort-merge based > shuffle against the hash-based shuffle? To what parallelism can you scale > up when using the sort-merge based shuffle? > > Cheers, > Till > > On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <kevin.ying...@gmail.com> > wrote: > > > Hi devs, > > > > Currently, Flink adopts a hash-style blocking shuffle implementation > which > > writes data sent to different reducer tasks into separate files > > concurrently. Compared to sort-merge based approach writes those data > > together into a single file and merges those small files into bigger > ones, > > hash-based approach has several weak points when it comes to running > large > > scale batch jobs: > > > > 1. *Stability*: For high parallelism (tens of thousands) batch job, > > current hash-based blocking shuffle implementation writes too many > files > > concurrently which gives high pressure to the file system, for > example, > > maintenance of too many file metas, exhaustion of inodes or file > > descriptors. All of these can be potential stability issues. > Sort-Merge > > based blocking shuffle don’t have the problem because for one result > > partition, only one file is written at the same time. > > 2. *Performance*: Large amounts of small shuffle files and random IO > can > > influence shuffle performance a lot especially for hdd (for ssd, > > sequential > > read is also important because of read ahead and cache). For batch > jobs > > processing massive data, small amount of data per subpartition is > common > > because of high parallelism. Besides, data skew is another cause of > > small > > subpartition files. By merging data of all subpartitions together in > one > > file, more sequential read can be achieved. > > 3. *Resource*: For current hash-based implementation, each > subpartition > > needs at least one buffer. For large scale batch shuffles, the memory > > consumption can be huge. For example, we need at least 320M network > > memory > > per result partition if parallelism is set to 10000 and because of the > > huge > > network consumption, it is hard to config the network memory for large > > scale batch job and sometimes parallelism can not be increased just > > because of insufficient network memory which leads to bad user > > experience. > > > > To improve Flink’s capability of running large scale batch jobs, we would > > like to introduce sort-merge based blocking shuffle to Flink[1]. Any > > feedback is appreciated. > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink > > > > Best, > > Yingjie > > >