Thanks for launching the discussion and the respective FLIP, Yingjie! In general, I am +1 for this proposal since sort-merge ability has already been taken widely in other batch-based project, like MR, Spark, etc. And it indeed has some performance benefits in some scenarios as mentioned in FLIP.
I only have some thoughts for the section of `Public Interfaces` since it cares about how the users understand and better use in practice. As for the new introduced classes, the can be further reviewed in follow up PR since without existing interfaces refactoring ATM. 1. taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition: the default value should be `1` I guess? It is better to give a proper default value that most of users do not need to care about it in practice. 2. taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition: how about making the default for the number of required buffers in LocalBufferPool as now for result partition? Then it is transparent for users to not increase any memory resource no matter with either hash based or sort-merge based way. For the tuned setting , it is better to give some hints to guide users how to adjust it for better performance based on some factors. 3. taskmanager.network.sort-merge-blocking-shuffle.min-parallelism: I guess it is not very easy or determined to give a proper value for the switch between hash based and sort-merge based. And how much data a subpartition taking (broadcast) or not suitable for hash based is not completely decided by the number of parallelism somehow. And users might be confused how to tune it in practice. I prefer to giving a simple boolean type option for easy use and the default value can be false in MVP. Then it will not bring any effects for users after upgrade to new version by default, and sort-merge option can be enabled to try out if users willing in desired scenarios. Best, Zhijiang ------------------------------------------------------------------ From:Till Rohrmann <trohrm...@apache.org> Send Time:2020年10月16日(星期五) 15:42 To:dev <dev@flink.apache.org> Subject:Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink Thanks for sharing the preliminary numbers with us Yingjie. The numbers look quite impressive :-) Cheers, Till On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao <kevin.ying...@gmail.com> wrote: > Hi Till, > > Thanks for your reply and comments. > > You are right, the proposed sort-merge based shuffle is an extension of the > existing blocking shuffle and does not change any default behavior of > Flink. > > As for the performance, according to our previous experience, sort-merge > based implementation can reduce the shuffle time by 30% to even 90% > compared to hash-based implementation. My PoC implementation without any > further optimization can already reduce the shuffle time over 10% on SSD > and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job. > > After switch to sort-merge based blocking shuffle, some of our users' jobs > can scale up to over 20000 parallelism, though need some JM and RM side > optimization. I haven't ever tried to find where the upper bound is, but I > guess sever tens of thousand should be able to m > < > http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W > >eet > the needs of most users. > > Best, > Yingjie > > Till Rohrmann <trohrm...@apache.org> 于2020年10月15日周四 下午3:57写道: > > > Hi Yingjie, > > > > thanks for proposing the sort-merge based blocking shuffle. I like the > > proposal and it does not seem to change the internals of Flink. Instead > it > > is an extension of existing interfaces which makes it a > > non-invasive addition. > > > > Do you have any numbers comparing the performance of the sort-merge based > > shuffle against the hash-based shuffle? To what parallelism can you scale > > up when using the sort-merge based shuffle? > > > > Cheers, > > Till > > > > On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <kevin.ying...@gmail.com> > > wrote: > > > > > Hi devs, > > > > > > Currently, Flink adopts a hash-style blocking shuffle implementation > > which > > > writes data sent to different reducer tasks into separate files > > > concurrently. Compared to sort-merge based approach writes those data > > > together into a single file and merges those small files into bigger > > ones, > > > hash-based approach has several weak points when it comes to running > > large > > > scale batch jobs: > > > > > > 1. *Stability*: For high parallelism (tens of thousands) batch job, > > > current hash-based blocking shuffle implementation writes too many > > files > > > concurrently which gives high pressure to the file system, for > > example, > > > maintenance of too many file metas, exhaustion of inodes or file > > > descriptors. All of these can be potential stability issues. > > Sort-Merge > > > based blocking shuffle don’t have the problem because for one result > > > partition, only one file is written at the same time. > > > 2. *Performance*: Large amounts of small shuffle files and random IO > > can > > > influence shuffle performance a lot especially for hdd (for ssd, > > > sequential > > > read is also important because of read ahead and cache). For batch > > jobs > > > processing massive data, small amount of data per subpartition is > > common > > > because of high parallelism. Besides, data skew is another cause of > > > small > > > subpartition files. By merging data of all subpartitions together in > > one > > > file, more sequential read can be achieved. > > > 3. *Resource*: For current hash-based implementation, each > > subpartition > > > needs at least one buffer. For large scale batch shuffles, the > memory > > > consumption can be huge. For example, we need at least 320M network > > > memory > > > per result partition if parallelism is set to 10000 and because of > the > > > huge > > > network consumption, it is hard to config the network memory for > large > > > scale batch job and sometimes parallelism can not be increased just > > > because of insufficient network memory which leads to bad user > > > experience. > > > > > > To improve Flink’s capability of running large scale batch jobs, we > would > > > like to introduce sort-merge based blocking shuffle to Flink[1]. Any > > > feedback is appreciated. > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink > > > > > > Best, > > > Yingjie > > > > > >