infoverload commented on a change in pull request #476:
URL: https://github.com/apache/flink-web/pull/476#discussion_r735704835
##########
File path: _posts/2021-10-15-sort-shuffle-part2.md
##########
@@ -0,0 +1,154 @@
+---
+layout: post
+title: "Sort-Based Blocking Shuffle Implementation in Flink - Part Two"
+date: 2021-10-15 00:00:00
+authors:
+- Yingjie Cao:
+ name: "Yingjie Cao (Kevin)"
+- Daisy Tsang:
+ name: "Daisy Tsang"
+excerpt: Flink has implemented the sort-based blocking shuffle (FLIP-148) for
batch data processing. In this blog post, we will take a close look at the
design & implementation details and see what we can gain from it.
+---
+
+The part two of this blog post will describe the [design
considerations](#design-considerations) &
[implementations](#implementation-details) in detail which can provide more
insights and list several [potential improvements](#future-improvements) that
can be implemented in the future.
+
+{% toc %}
+
+# Abstract
+
+Like sort-merge shuffle implemented by other distributed data processing
frameworks, the whole sort-based shuffle process in Flink consists of several
important stages, including collecting data in memory, sorting the collected
data in memory, spilling the sorted data to files, and reading the shuffle data
from these spilled files. However, Flink’s implementation has some core
differences, including the multiple data region file structure, the removal of
file merge, and IO scheduling. The following sections describe some core design
considerations and implementations of the sort-based blocking shuffle in Flink.
+
+# Design Considerations
+
+There are several core objectives we want to achieve for the new sort-based
blocking shuffle to be implemented Flink:
+
+## Produce Fewer (Small) Files
+
+As discussed above, the hash-based blocking shuffle would produce too many
small files for large-scale batch jobs. Producing fewer files can help to
improve both stability and performance. The sort-merge approach has been widely
adopted to solve this problem. By first writing to the in-memory buffer and
then sorting and spilling the data into a file after the in-memory buffer is
full, the number of output files can be reduced, which becomes (total data
size) / (in-memory buffer size). Then by merging the produced files together,
the number of files can be further reduced and larger data blocks can provide
better sequential reads.
+
+Flink’s sort-based blocking shuffle adopts a similar logic. A core difference
is that data spilling will always append data to the same file so only one file
will be spilled for each output, which means fewer files are produced.
+
+## Open Fewer Files Concurrently
+
+The hash-based implementation will open all partition files when writing and
reading data which will consume resources like file descriptors and native
memory. Exhaustion of file descriptors will lead to stability issues like "too
many open files".
+
+By always writing/reading only one file per data result partition and sharing
the same opened file channel among all the concurrent data reads from the
downstream consumer tasks, Flink’s sort-based blocking shuffle implementation
can greatly reduce the number of concurrently opened files.
+
+## Create More Sequential Disk IO
+
+Although the hash-based implementation writes and reads each output file
sequentially, the large amount of writing and reading can cause random IO
because of the large number of files being processed concurrently, which means
that reducing the number of files can also achieve more sequential IO.
+
+In addition to producing larger files, there are some other optimizations
implemented by Flink. In the data writing phase, by merging small output data
together into larger batches and writing through the writev system call, more
writing sequential IO can be achieved. In the data reading phase, more
sequential data reading IO is achieved by IO scheduling. In short, Flink tries
to always read data in file offset order which maximizes sequential reads.
Please refer to the IO scheduling section for more information.
+
+## Have Less Disk IO Amplification
+
+The sort-merge approach can reduce the number of files and produce larger data
blocks by merging the spilled data files together. One down side of this
approach is that it writes and reads the same data multiple times because of
the data merging and, theoretically, it may also take up more storage space
than the total size of shuffle data.
+
+Flink’s implementation eliminates the data merging phase by spilling all data
of one data result partition together into one file. As a result, the total
amount of disk IO can be reduced, as well as the storage space. Though without
the data merging, the data blocks are not merged into larger ones. With the IO
scheduling technique, Flink can still achieve good sequential reading and high
disk IO throughput. The benchmark results from the [first
part](/2021/10/15/sort-shuffle-part1#benchmark-results) shows that.
+
+## Decouple Memory Consumption from Parallelism
Review comment:
```suggestion
## Decouple memory consumption from parallelism
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]