zhengchenyu commented on issue #1239:
URL: 
https://github.com/apache/incubator-uniffle/issues/1239#issuecomment-1869223301

   # 1 Default shuffle
   
   > Note: The first chapter briefly introduces the principle of default 
shuffle, with the purpose of find where local disks are used, then design 
remote merge. If you know enough, you can ignore this part.
   
   We will analyze the shuffle of MapReduce, Tez, and Spark in turn.
   
   ## 1.1 MapReduce
   
   Map writes the record to the memory. When the memory exceeds the threshold, 
the memory data is spilled to the disk file, and the Record is written to the 
disk file in order of partitionid+key. After Map has processed all records, it 
will spill the data currently in memory to a disk file. Then read all the files 
spilled to the disk and merge them in the order of partitionid+key to get the 
sorted Records.
   
   > Note: The purpose of sorting according to partitionid is that when the 
Reduce side obtains the data from the Map side, it should be read as 
sequentially as possible. For MR, Tez, and Spark, regardless of whether they 
are sorted or not, as long as there are partitioned, they need to be sorted 
according to partitionid.
   
   The reduce will pull the records of the corresponding partition remotely or 
locally from the Map, which is called MapOutput. Under normal circumstances, 
the memory will be used directly. If the memory exceeds the threshold, these 
records will be written to the disk. Then the reduce will perform merge 
operations on MapOutputs using minimum heap K-way merge sorting to obtain 
globally sorted records. During the Merge process, temporary results may be 
spilled to disk because the memory exceeds the threshold. In addition, if there 
are too many files spilled to disk, additional merges will be triggered.
   
   ## 1.2 Tez
   
   There are two cases of tez: (1) ordered io  (2) unordered io.
   
   Ordered io is the same as MapReduce and so ignore it here.
   
   Unordered io is generally used in hashjoin and other situations where keys 
are not required for sorting. Non-sorted io adopts a ready-to-use solution. Map 
writes the Record directly to the file or writes it to the file through cache. 
The Reduce side can also read and use it when reading data.
   
   ## 1.3 Spark
   
   Spark's shuffle is more complex and more reasonable. Some tasks do not 
require sort and combine, so spark users can determine the shuffle logic 
according to their needs.
   
   ### 1.3.1 Shuffle write operation
   When writing shuffle data, three writers are supported:
   
   * (1) BypassMergeSortShuffleWriter
   
   A temporary file is generated for each partition. When writing record, find 
the corresponding partition and write it directly to the corresponding 
temporary file. Then when all data is processed, these temporary files are 
written to a final file in order of the partitions, and the temporary files are 
deleted.
   
   * (2) UnsafeShuffleWriter
   
   UnsafeShuffleWriter mainly implements specific logic through 
ShuffleExternalSorter. When writing a Record, the serialization operation is 
performed directly and the serialized bytes are copied to the requested memory. 
At the same time, the address and partition of the record will also be recorded 
into the memory (inMemSorter).
   
   When the memory reaches the threshold, spill operation will be performed. 
Based on the information in memory (inMemSorter), we can easily get a Record 
sorted by partition and write it to a file.
   
   When all Records are processed, we will spill the records currently in 
memory into the file. Finally, all spilled files are aggregated once. Since the 
previously spilled files have been sorted according to the partition, we can 
copy the corresponding copies of all the spilled files to the final file in the 
order of the partitions. The final file obtained in this way is the 
partition-sorted file.
   
   * (3) SortShuffleWriter
   
   SortShuffleWriter mainly implements specific logic through ExternalSorter. 
ExternalSorter decides whether to combine and sort based on the user's needs.
   
   When writing record, it will be inserted directly into memory. If combine is 
required, the memory architecture is map, otherwise it is buffer.
   
   If the current evaluation memory is greater than the threshold, the spill 
operation will be triggered. During the spill operation, the Record will be 
spilled to the disk. This process requires sorting. The specific comparator 
will use different values according to different user needs. If keyordering is 
set, it will be sorted by key. If keyordering is not set, but aggregator (i.e. 
combine) is set, the keys are sorted according to  the hashcode of key, thus 
ensuring that the same keys are organized together to facilitate combine 
operations. If neither keyordering nor aggregator is set, it will be sorted 
according to partition.
   
   When all Records are written, the spill files need to be read and merged 
into a globally ordered file.
   ​
   Comparison of three writers
   
   | writer | advantages | disadvantages | scene |
   | --- |---|---|---|
   | BypassMergeSortShuffleWriter | (1) Only serialized once. <br>(2) Using 
hashmap-like data structure, inserting data is fast. | (1) Combine and sort are 
not supported <br>(2) Each partition must generate a temporary file, which will 
generate too many temporary files. | Suitable for situations where the number 
of partitions is small (default is less than or equal to 200) and there is no 
combine. |
   | UnsafeShuffleWriter | (1) Only serialized once. <br>(2) The number of 
files spilled to disk is limited and is no longer based on the number of 
partitions, and can support larger partitions. | (1) Combine, sort is not 
supported <br>(2) The writing order Record order will be disrupted, and 
supportsRelocationOfSerializedObjects is required.  | Applicable to situations 
where combine does not exist, and supportsRelocationOfSerializedObjects is 
true, and the maximum number of supported partitions is 16777216. |
   | SortShuffleWriter | (1) Supports combine, sort <br> (2) Suitable for all 
scenarios <br> (3) The number of files spilled to disk is limited | (1) 
Multiple serializations are required | Suitable for all scenarios. |
   ​
   
   ### 1.3.2 shuffle read
   
   Currently there is only one implementation of BlockStoreShuffleReader. The 
implementation is similar to MapReduce.
   The reduce will pull the records of the corresponding partition remotely or 
locally from the map. Under normal circumstances, it will be written directly 
to the memory, but if the block size to be obtained exceeds the threshold, will 
use disk.
   Then it will be decided according to the user's needs whether to combine or 
sort, and finally form a record iterator required by the user.
   Combine and sort use ExternalAppendOnlyMap and ExternalSorter respectively. 
When the memory exceeds the threshold, the data will be spilled to the local 
disk.
   
   ## 1.4 Summary
   
   (1) About the semantics of each framework
   
   For MapReduce and the ordered io of Tez, it is a special case of spark 
sorting. For Tez's unordered io, it is essentially a special case of spark's 
non-sorting. In essence, the semantics of each framework are the same, and 
spark is more general.
   
   (2) Where will generate local disk files?
   
   After analyzing the three computing frameworks, we learned that the 
following processes will use disks:
   * (1) Map may generate intermediate temporary files because the memory 
exceeds the threshold.
   * (2) The map will eventually generate disk files to provide shuffle 
services.
   * (3) When reduce pulls records, disk files may be generated because the 
threshold is exceeded.
   * (4) When merging on the reduce side, temporary disk files may be generated 
for global sorting.
   
   In fact, uniffle has solved (1), (2). For (3), if the parameters are 
adjusted effectively, it is difficult to generate disk files. In fact, only (4) 
needs to be discussed in this article.
   
   ## 2 Task requirement analysis: What types of tasks require remote merge?
   
   Currently, uniffle's map-side operations no longer require disk operations. 
This article mainly considers the situation on the reduce side. Mainly divided 
into the following situations:
   * (1) For spark's non-sorted, non-aggregated, tez unordered io, Record is 
ready to use. It does not require any global aggregation and sorting 
operations, and only requires very little memory. The current version of 
uniffle will not use disk if the memory settings are reasonable. Just use the 
current uniffle solution. This article will not discuss this aspect.
   * (2) For spark sorting or aggregation tasks, tez ordered io, mapreduce, due 
to the need for global sorting or aggregation, the memory may not be enough, 
and the record may be spilled to the disk. This article mainly discusses this 
situation. According to the discussion in the first section, the 
SortShuffleWriter reference is more meaningful because both need to deserialize 
and sort.
   
   
   ## 3 Solution selection: Why choose the remote merge solution?
   
   ### 3.1 Situations where sort is required
   
   #### 3.1.1 How to sort
   
   For sorting operations, Map will generally be sorted to obtain a set of 
partially sorted records, here called segments, and then reduce will obtain all 
segments and merge them.
   
   So how to merge? Here Spark, MR, and Tez all use minimum heap K-way merge 
sorting. We can continue this approach.
   
   #### 3.1.2 Where to merge
   Merging there is more critical, because it involves whether or where we will 
spill. Again our goal is to avoid performing file operations on the same 
machine as the task.
   
   (1) Sort in Reduce
   
   To avoid spilling to disk, sorting can still be done on the reduce side. On 
the reduce side, Merger can only obtain a small portion of the Segment content, 
and then perform merge sorting. Resources will be obtained on demand according 
to the sorted order, and finally a globally sorted Records will be formed.
   
   <img width="440" alt="remote_merge_by_sub_segment" 
src="https://github.com/apache/incubator-uniffle/assets/10381583/5dcf3d69-84cd-45f0-ba74-a74d9884f609";>
   
   * Advantages: The shuffle server does not require a key comparator.
   * Disadvantages: Block data information needs to be maintained.
   
   Generally, the number of blocks is large and the amount of data in each 
block is not large. Therefore, a large amount of block metadata information 
needs to be maintained on the reduce side. More importantly, this solution 
generates a large number of IO operations, and the performance is difficult to 
guarantee. Therefore this option is ruled out.
   
   (2) Sorting on the shuffle server side
   
   BufferManager and FlushManager maintain block information in memory and 
disk. We only need to provide a service to merge these blocks and obtain 
globally sorted Records.
   
   * Advantages: You can use the metadata information already available on the 
shuffle server.
   * Disadvantage: The shuffle server needs to obtain the comparator of the 
task.
   
   Based on the above analysis, it was decided to use the scheme of sorting on 
the shuffle server side.
   
   ## 3.2 Handling situations that require combine
   
   After the Record is sorted by key, the keys can be read sequentially, so it 
is easy to complete the combine operation.
   
   How does combine operate? In general, combine operations are difficult to 
standardize, so it is necessary to avoid doing combine operations on the 
shuffle server side and move the combine operations to the reduce side. 
Therefore, when reading Records from the shuffle server, it is necessary to 
ensure that the same keys are placed together. We can use the hash value of the 
key as the basis for sorting (Note: Comparators are not provided for 
non-sorting operations), so that we can ensure that the same keys can be put 
together.
   
   > Note: In fact, records can still be organized using hashmap. This may seem 
more efficient, but it is not. This requires all segments to be loaded into 
memory, which the sorting scheme does not require.
   
   # 4 Scheme Design
   ## 4.1 Solution introduction
   
   > UFile represents a data set, containing a set of sorted Records. For the 
map side, it directly corresponds to the existing Block. For the reduce side, 
the size of UFile will be determined based on the cache size.
   
   <img width="1062" alt="remote_merge_structure" 
src="https://github.com/apache/incubator-uniffle/assets/10381583/19bf3dcb-ec91-41e7-a5bd-f0d3cc398119";>
   
   The process is as follows:
   * (1) The map side produces records and then sends them to the shuffle 
server.
   * (2) The shuffle server will store the data in the cache, or cache it to 
the local file system or remote file system through the flush manager.
   * (3) Add a new merge manager to the shuffle server to manage each shuffle. 
Segment information is recorded under each partition, which is called 
InMemorySegment in the cache and OnDiskSegment in the file. The data in 
InMemorySegment comes from bufferPool, and the data in OnDiskSegment comes from 
files.
   * (4) When certain conditions are met, some segments are merged according to 
a certain algorithm, and the merged results are written to local files or 
remote storage.
   * (5) When the reduce side reads data, it is a merge sort process. After the 
merge operation in the shuffle server, a certain partition will have a few 
merged ordered files left. We will read the files required by the reduce side 
based on the idea of merge sort.
   
   > Note: If there are only a few segments, we don't need to merge the 
segments into a new file, but can perform real-time merge.
   
   ## 4.2 How to read?
   
   Currently, Uniffle's shuffle reader uses blockid as the read mark, which 
makes it easy to verify whether an accurate and complete records are obtained. 
For remote merge, we no longer use the blockid generated by the map segment. 
The most accurate semantics is to read in the order of keys, that is, the next 
read should start from the next key that was read last time.
   
   In order to speed up the implementation of this semantics, I used an index 
on the merge side to read. A large number of io operations can be skipped.
   
   <img width="435" alt="remote_merge_ufile_format" 
src="https://github.com/apache/incubator-uniffle/assets/10381583/1468cc69-bf14-4c96-8651-248641847096";>
   
   When writing a file, sections will be divided according to a certain size, 
the value of the first key of each section and the starting address and length 
of the section will be recorded, and these metadata information will be written 
to the file. In this way, a large number of IO operations can be skipped when 
reading.
   
   > I will explore the method of directly recording offset as an alternative 
to this solution.
   
   # 5 Plan
   
   Main plans:
   
   * Unified serialization
   * ufile, shuffle writer and reader
   * Development of Merger and Merger Manager
   * Architecture adaptation
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to