infoverload commented on a change in pull request #476:
URL: https://github.com/apache/flink-web/pull/476#discussion_r735705089



##########
File path: _posts/2021-10-15-sort-shuffle-part2.md
##########
@@ -0,0 +1,154 @@
+---
+layout: post
+title: "Sort-Based Blocking Shuffle Implementation in Flink - Part Two"
+date: 2021-10-15 00:00:00
+authors:
+- Yingjie Cao:
+  name: "Yingjie Cao (Kevin)"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: Flink has implemented the sort-based blocking shuffle (FLIP-148) for 
batch data processing. In this blog post, we will take a close look at the 
design & implementation details and see what we can gain from it.
+---
+
+The part two of this blog post will describe the [design 
considerations](#design-considerations) & 
[implementations](#implementation-details) in detail which can provide more 
insights and list several [potential improvements](#future-improvements) that 
can be implemented in the future.
+
+{% toc %}
+
+# Abstract
+
+Like sort-merge shuffle implemented by other distributed data processing 
frameworks, the whole sort-based shuffle process in Flink consists of several 
important stages, including collecting data in memory, sorting the collected 
data in memory, spilling the sorted data to files, and reading the shuffle data 
from these spilled files. However, Flink’s implementation has some core 
differences, including the multiple data region file structure, the removal of 
file merge, and IO scheduling. The following sections describe some core design 
considerations and implementations of the sort-based blocking shuffle in Flink.
+
+# Design Considerations
+
+There are several core objectives we want to achieve for the new sort-based 
blocking shuffle to be implemented Flink:
+
+## Produce Fewer (Small) Files
+
+As discussed above, the hash-based blocking shuffle would produce too many 
small files for large-scale batch jobs. Producing fewer files can help to 
improve both stability and performance. The sort-merge approach has been widely 
adopted to solve this problem. By first writing to the in-memory buffer and 
then sorting and spilling the data into a file after the in-memory buffer is 
full, the number of output files can be reduced, which becomes (total data 
size) / (in-memory buffer size). Then by merging the produced files together, 
the number of files can be further reduced and larger data blocks can provide 
better sequential reads.
+
+Flink’s sort-based blocking shuffle adopts a similar logic. A core difference 
is that data spilling will always append data to the same file so only one file 
will be spilled for each output, which means fewer files are produced.
+
+## Open Fewer Files Concurrently
+
+The hash-based implementation will open all partition files when writing and 
reading data which will consume resources like file descriptors and native 
memory. Exhaustion of file descriptors will lead to stability issues like "too 
many open files".
+
+By always writing/reading only one file per data result partition and sharing 
the same opened file channel among all the concurrent data reads from the 
downstream consumer tasks, Flink’s sort-based blocking shuffle implementation 
can greatly reduce the number of concurrently opened files.
+
+## Create More Sequential Disk IO
+
+Although the hash-based implementation writes and reads each output file 
sequentially, the large amount of writing and reading can cause random IO 
because of the large number of files being processed concurrently, which means 
that reducing the number of files can also achieve more sequential IO.
+
+In addition to producing larger files, there are some other optimizations 
implemented by Flink. In the data writing phase, by merging small output data 
together into larger batches and writing through the writev system call, more 
writing sequential IO can be achieved. In the data reading phase, more 
sequential data reading IO is achieved by IO scheduling. In short, Flink tries 
to always read data in file offset order which maximizes sequential reads. 
Please refer to the IO scheduling section for more information.
+
+## Have Less Disk IO Amplification
+
+The sort-merge approach can reduce the number of files and produce larger data 
blocks by merging the spilled data files together. One down side of this 
approach is that it writes and reads the same data multiple times because of 
the data merging and, theoretically, it may also take up more storage space 
than the total size of shuffle data.
+
+Flink’s implementation eliminates the data merging phase by spilling all data 
of one data result partition together into one file. As a result, the total 
amount of disk IO can be reduced, as well as the storage space. Though without 
the data merging, the data blocks are not merged into larger ones. With the IO 
scheduling technique, Flink can still achieve good sequential reading and high 
disk IO throughput. The benchmark results from the [first 
part](/2021/10/15/sort-shuffle-part1#benchmark-results) shows that.
+
+## Decouple Memory Consumption from Parallelism
+
+Similar to the sort-merge implementation in other distributed data processing 
systems, Flink’s implementation uses a piece of fixed size (configurable) 
in-memory buffer for data sorting and the buffer does not necessarily need to 
be extended after the task parallelism is changed, though increasing the size 
may lead to better performance for large-scale batch jobs.
+
+**Note:** This only decouples the memory consumption from the parallelism at 
the data producer side. On the data consumer side, there is an improvement 
which works for both streaming and batch jobs (see 
[FLINK-16428](https://issues.apache.org/jira/browse/FLINK-16428)).
+
+# Implementation Details

Review comment:
       ```suggestion
   # Implementation details
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to