infoverload commented on a change in pull request #476:
URL: https://github.com/apache/flink-web/pull/476#discussion_r735705844
##########
File path: _posts/2021-10-15-sort-shuffle-part2.md
##########
@@ -0,0 +1,154 @@
+---
+layout: post
+title: "Sort-Based Blocking Shuffle Implementation in Flink - Part Two"
+date: 2021-10-15 00:00:00
+authors:
+- Yingjie Cao:
+ name: "Yingjie Cao (Kevin)"
+- Daisy Tsang:
+ name: "Daisy Tsang"
+excerpt: Flink has implemented the sort-based blocking shuffle (FLIP-148) for
batch data processing. In this blog post, we will take a close look at the
design & implementation details and see what we can gain from it.
+---
+
+The part two of this blog post will describe the [design
considerations](#design-considerations) &
[implementations](#implementation-details) in detail which can provide more
insights and list several [potential improvements](#future-improvements) that
can be implemented in the future.
+
+{% toc %}
+
+# Abstract
+
+Like sort-merge shuffle implemented by other distributed data processing
frameworks, the whole sort-based shuffle process in Flink consists of several
important stages, including collecting data in memory, sorting the collected
data in memory, spilling the sorted data to files, and reading the shuffle data
from these spilled files. However, Flink’s implementation has some core
differences, including the multiple data region file structure, the removal of
file merge, and IO scheduling. The following sections describe some core design
considerations and implementations of the sort-based blocking shuffle in Flink.
+
+# Design Considerations
+
+There are several core objectives we want to achieve for the new sort-based
blocking shuffle to be implemented Flink:
+
+## Produce Fewer (Small) Files
+
+As discussed above, the hash-based blocking shuffle would produce too many
small files for large-scale batch jobs. Producing fewer files can help to
improve both stability and performance. The sort-merge approach has been widely
adopted to solve this problem. By first writing to the in-memory buffer and
then sorting and spilling the data into a file after the in-memory buffer is
full, the number of output files can be reduced, which becomes (total data
size) / (in-memory buffer size). Then by merging the produced files together,
the number of files can be further reduced and larger data blocks can provide
better sequential reads.
+
+Flink’s sort-based blocking shuffle adopts a similar logic. A core difference
is that data spilling will always append data to the same file so only one file
will be spilled for each output, which means fewer files are produced.
+
+## Open Fewer Files Concurrently
+
+The hash-based implementation will open all partition files when writing and
reading data which will consume resources like file descriptors and native
memory. Exhaustion of file descriptors will lead to stability issues like "too
many open files".
+
+By always writing/reading only one file per data result partition and sharing
the same opened file channel among all the concurrent data reads from the
downstream consumer tasks, Flink’s sort-based blocking shuffle implementation
can greatly reduce the number of concurrently opened files.
+
+## Create More Sequential Disk IO
+
+Although the hash-based implementation writes and reads each output file
sequentially, the large amount of writing and reading can cause random IO
because of the large number of files being processed concurrently, which means
that reducing the number of files can also achieve more sequential IO.
+
+In addition to producing larger files, there are some other optimizations
implemented by Flink. In the data writing phase, by merging small output data
together into larger batches and writing through the writev system call, more
writing sequential IO can be achieved. In the data reading phase, more
sequential data reading IO is achieved by IO scheduling. In short, Flink tries
to always read data in file offset order which maximizes sequential reads.
Please refer to the IO scheduling section for more information.
+
+## Have Less Disk IO Amplification
+
+The sort-merge approach can reduce the number of files and produce larger data
blocks by merging the spilled data files together. One down side of this
approach is that it writes and reads the same data multiple times because of
the data merging and, theoretically, it may also take up more storage space
than the total size of shuffle data.
+
+Flink’s implementation eliminates the data merging phase by spilling all data
of one data result partition together into one file. As a result, the total
amount of disk IO can be reduced, as well as the storage space. Though without
the data merging, the data blocks are not merged into larger ones. With the IO
scheduling technique, Flink can still achieve good sequential reading and high
disk IO throughput. The benchmark results from the [first
part](/2021/10/15/sort-shuffle-part1#benchmark-results) shows that.
+
+## Decouple Memory Consumption from Parallelism
+
+Similar to the sort-merge implementation in other distributed data processing
systems, Flink’s implementation uses a piece of fixed size (configurable)
in-memory buffer for data sorting and the buffer does not necessarily need to
be extended after the task parallelism is changed, though increasing the size
may lead to better performance for large-scale batch jobs.
+
+**Note:** This only decouples the memory consumption from the parallelism at
the data producer side. On the data consumer side, there is an improvement
which works for both streaming and batch jobs (see
[FLINK-16428](https://issues.apache.org/jira/browse/FLINK-16428)).
+
+# Implementation Details
+
+Here are several core components and algorithms implemented in Flink’s
sort-based blocking shuffle:
+
+## In-Memory Sort
+
+In the sort-spill phase, data records are serialized to the in-memory sort
buffer first. When the sort buffer is full or all output has been finished, the
data in the sort buffer will be copied and spilled into the target data file in
the specific order. The following is the sort buffer interface in Flink:
+
+```java
+public interface SortBuffer {
+
+ /** Appends data of the specified channel to this SortBuffer. */
+ boolean append(ByteBuffer source, int targetChannel, Buffer.DataType
dataType) throws IOException;
+
+ /** Copies data in this SortBuffer to the target MemorySegment. */
+ BufferWithChannel copyIntoSegment(MemorySegment target);
+
+ long numRecords();
+
+ long numBytes();
+
+ boolean hasRemaining();
+
+ void finish();
+
+ boolean isFinished();
+
+ void release();
+
+ boolean isReleased();
+}
+```
+
+Currently, Flink does not need to sort records by key on the data producer
side, so the default implementation of sort buffer only sorts data by
subpartition index, which is achieved by binary bucket sort. More specifically,
each data record will be serialized and attached a 16 bytes binary header.
Among the 16 bytes, 4 bytes is for the record length, 4 bytes is for the data
type (event or data buffer) and 8 bytes is for pointers to the next records
belonging to the same subpartition to be consumed by the same downstream data
consumer. When reading data from the sort buffer, all records of the same
subpartition will be copied one by one following the pointer in the record
header, which guarantees that for each subpartition, the order of record
reading/spilling is the same order as when the record is emitted by the
producer task. The following picture shows the internal structure of the
in-memory binary sort buffer:
+
+<center>
+<img src="{{site.baseurl}}/img/blog/2021-10-15-sort-shuffle/1.jpg"
width="70%"/>
+</center>
+
+## Storage Structure
+
+The data of each blocking result partition is stored as a physical data file
on the disk. The data file consists of multiple data regions, one data spilling
produces one data region. In each data region, the data is clustered by the
subpartition ID (index) and each subpartition is corresponding to one data
consumer.
+
+The following picture shows the structure of a simple data file. This data
file has three data regions (R1, R2, R3) and three consumers (C1, C2, C3). Data
blocks B1.1, B2.1 and B3.1 will be consumed by C1, data blocks B1.2, B2.2 and
B3.2 will be consumed by C2, and data blocks B1.3, B2.3 and B3.3 will be
consumed by C3.
+
+<center>
+<img src="{{site.baseurl}}/img/blog/2021-10-15-sort-shuffle/2.jpg"
width="60%"/>
+</center>
+
+In addition to the data file, for each result partition, there is also an
index file which contains pointers to the data file. The index file has the
same number of regions as the data file. In each region, there are n (equals to
the number of subpartitions) index entries. Each index entry consists of two
parts: one is the file offset of the target data in the data file, the other is
the data size. To reduce the disk IO caused by index data file access, Flink
caches the index data using unmanaged heap memory if the index data file size
is less than 4M. The following picture illustrates the relationship between
index file and data file:
+
+<center>
+<img src="{{site.baseurl}}/img/blog/2021-10-15-sort-shuffle/4.jpg"
width="60%"/>
+</center>
+
+## IO Scheduling
Review comment:
```suggestion
## IO scheduling
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]