infoverload commented on a change in pull request #476:
URL: https://github.com/apache/flink-web/pull/476#discussion_r735683887



##########
File path: _posts/2021-10-15-sort-shuffle-part1.md
##########
@@ -0,0 +1,224 @@
+---
+layout: post
+title: "Sort-Based Blocking Shuffle Implementation in Flink - Part One"
+date: 2021-10-15 00:00:00
+authors:
+- Yingjie Cao:
+  name: "Yingjie Cao (Kevin)"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: Flink has implemented the sort-based blocking shuffle (FLIP-148) for 
batch data processing. In this blog post, we will take a close look at the 
design & implementation details and see what we can gain from it.
+---
+
+The part one of this blog post will explain the 
[motivation](#why-did-we-introduce-the-sort-based-implementation) behind 
sort-based blocking shuffle, and present the [benchmark 
results](#benchmark-results) to show how you can benefit from the new blocking 
shuffle implementation. The [operations & performance 
tuning](#operations--tuning) section provides guidelines on how to use this new 
feature.
+
+{% toc %}
+
+# Introduction
+
+Data shuffling is an important stage in batch processing applications and 
describes how data is sent from one operator to the next. In this phase, output 
data of the upstream operator will spill over to persistent storages like disk, 
then the downstream operator will read the corresponding data and process it. 
Blocking shuffle means that intermediate results from operator A are not sent 
immediately to operator B until operator A has completely finished.
+
+The hash-based and sort-based blocking shuffle are two main blocking shuffle 
implementations widely adopted by existing distributed data processing 
frameworks:
+
+1. **Hash-Based Approach:** The core idea behind the hash-based approach is to 
write data consumed by different consumer tasks to different files and each 
file can then serve as a natural boundary for the partitioned data.
+2. **Sort-Based Approach:** The core idea behind the sort-based approach is to 
write all the produced data together first and then leverage sorting to cluster 
data belonging to different data partitions or even keys.
+
+The sort-based blocking shuffle was introduced in Flink 1.12 and further 
optimized and made production-ready in 1.13 for both stability and performance. 
We hope you enjoy the improvements and any feedback is highly appreciated.
+
+# Why did we introduce the sort-based implementation?
+
+The hash-based blocking shuffle has been supported in Flink for a long time. 
However, compared to the sort-based approach, it can have several weaknesses:
+
+1. **Stability:** For batch jobs with high parallelism (tens of thousands of 
subtasks), the hash-based approach opens many files concurrently while writing 
or reading data, which can give high pressure to the file system (i.e. 
maintenance of too many file metas, exhaustion of inodes or file descriptors). 
We have encountered many stability issues when running large-scale batch jobs 
via the hash-based blocking shuffle.
+2. **Performance:** For large-scale batch jobs, the hash-based approach can 
produce too many small files: for each data shuffle (or connection), the number 
of output files is (producer parallelism) * (consumer parallelism) and the 
average size of each file is (shuffle data size) / (number of files). The 
random IO caused by writing/reading these fragmented files can influence the 
shuffle performance a lot, especially on spinning disks. See the [benchmark 
results](#benchmark-results) section for more information.
+
+By introducing the sort-based blocking shuffle implementation, fewer data 
files will be created and opened, and more sequential reads are done. As a 
result, better stability and performance can be achieved.
+
+Moreover, the sort-based implementation can save network buffers for 
large-scale batch jobs. For the hash-based implementation, the network buffers 
needed for each output result partition are proportional to the consumers’ 
parallelism. For the sort-based implementation, the network memory consumption 
can be decoupled from the parallelism, which means that a fixed size of network 
memory can satisfy requests for all result partitions, though more network 
memory may lead to better performance.
+
+# Benchmark Results
+
+## Stability
+
+Aside from the problem of consuming too many file descriptors and inodes 
mentioned in the above section, the hash-based blocking shuffle also has a 
known issue of creating too many files which blocks the TaskExecutor’s main 
thread ([FLINK-21201](https://issues.apache.org/jira/browse/FLINK-21201)). In 
addition, some large-scale jobs like q78 and q80 of the tpc-ds benchmark failed 
to run on the hash-based blocking shuffle in our tests because of the 
“connection reset by peer” exception which is similar to the issue reported in 
[FLINK-19925](https://issues.apache.org/jira/browse/FLINK-19925) (reading 
shuffle data by Netty threads can influence network stability).
+
+## Performance

Review comment:
       ```suggestion
   # Benchmark results on stability and performance 
   
   Aside from the problem of consuming too many file descriptors and inodes 
mentioned in the above section, the hash-based blocking shuffle also has a 
known issue of creating too many files which blocks the TaskExecutor’s main 
thread ([FLINK-21201](https://issues.apache.org/jira/browse/FLINK-21201)). In 
addition, some large-scale jobs like q78 and q80 of the tpc-ds benchmark failed 
to run on the hash-based blocking shuffle in our tests because of the 
“connection reset by peer” exception which is similar to the issue reported in 
[FLINK-19925](https://issues.apache.org/jira/browse/FLINK-19925) (reading 
shuffle data by Netty threads can influence network stability).
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to