zhengchenyu commented on issue #1239: URL: https://github.com/apache/incubator-uniffle/issues/1239#issuecomment-1869223301
# 1 Default shuffle > Note: The first chapter briefly introduces the principle of default shuffle, with the purpose of find where local disks are used, then design remote merge. If you know enough, you can ignore this part. We will analyze the shuffle of MapReduce, Tez, and Spark in turn. ## 1.1 MapReduce Map writes the record to the memory. When the memory exceeds the threshold, the memory data is spilled to the disk file, and the Record is written to the disk file in order of partitionid+key. After Map has processed all records, it will spill the data currently in memory to a disk file. Then read all the files spilled to the disk and merge them in the order of partitionid+key to get the sorted Records. > Note: The purpose of sorting according to partitionid is that when the Reduce side obtains the data from the Map side, it should be read as sequentially as possible. For MR, Tez, and Spark, regardless of whether they are sorted or not, as long as there are partitioned, they need to be sorted according to partitionid. The reduce will pull the records of the corresponding partition remotely or locally from the Map, which is called MapOutput. Under normal circumstances, the memory will be used directly. If the memory exceeds the threshold, these records will be written to the disk. Then the reduce will perform merge operations on MapOutputs using minimum heap K-way merge sorting to obtain globally sorted records. During the Merge process, temporary results may be spilled to disk because the memory exceeds the threshold. In addition, if there are too many files spilled to disk, additional merges will be triggered. ## 1.2 Tez There are two cases of tez: (1) ordered io (2) unordered io. Ordered io is the same as MapReduce and so ignore it here. Unordered io is generally used in hashjoin and other situations where keys are not required for sorting. Non-sorted io adopts a ready-to-use solution. Map writes the Record directly to the file or writes it to the file through cache. The Reduce side can also read and use it when reading data. ## 1.3 Spark Spark's shuffle is more complex and more reasonable. Some tasks do not require sort and combine, so spark users can determine the shuffle logic according to their needs. ### 1.3.1 Shuffle write operation When writing shuffle data, three writers are supported: * (1) BypassMergeSortShuffleWriter A temporary file is generated for each partition. When writing record, find the corresponding partition and write it directly to the corresponding temporary file. Then when all data is processed, these temporary files are written to a final file in order of the partitions, and the temporary files are deleted. * (2) UnsafeShuffleWriter UnsafeShuffleWriter mainly implements specific logic through ShuffleExternalSorter. When writing a Record, the serialization operation is performed directly and the serialized bytes are copied to the requested memory. At the same time, the address and partition of the record will also be recorded into the memory (inMemSorter). When the memory reaches the threshold, spill operation will be performed. Based on the information in memory (inMemSorter), we can easily get a Record sorted by partition and write it to a file. When all Records are processed, we will spill the records currently in memory into the file. Finally, all spilled files are aggregated once. Since the previously spilled files have been sorted according to the partition, we can copy the corresponding copies of all the spilled files to the final file in the order of the partitions. The final file obtained in this way is the partition-sorted file. * (3) SortShuffleWriter SortShuffleWriter mainly implements specific logic through ExternalSorter. ExternalSorter decides whether to combine and sort based on the user's needs. When writing record, it will be inserted directly into memory. If combine is required, the memory architecture is map, otherwise it is buffer. If the current evaluation memory is greater than the threshold, the spill operation will be triggered. During the spill operation, the Record will be spilled to the disk. This process requires sorting. The specific comparator will use different values according to different user needs. If keyordering is set, it will be sorted by key. If keyordering is not set, but aggregator (i.e. combine) is set, the keys are sorted according to the hashcode of key, thus ensuring that the same keys are organized together to facilitate combine operations. If neither keyordering nor aggregator is set, it will be sorted according to partition. When all Records are written, the spill files need to be read and merged into a globally ordered file. Comparison of three writers | writer | advantages | disadvantages | scene | | --- |---|---|---| | BypassMergeSortShuffleWriter | (1) Only serialized once. <br>(2) Using hashmap-like data structure, inserting data is fast. | (1) Combine and sort are not supported <br>(2) Each partition must generate a temporary file, which will generate too many temporary files. | Suitable for situations where the number of partitions is small (default is less than or equal to 200) and there is no combine. | | UnsafeShuffleWriter | (1) Only serialized once. <br>(2) The number of files spilled to disk is limited and is no longer based on the number of partitions, and can support larger partitions. | (1) Combine, sort is not supported <br>(2) The writing order Record order will be disrupted, and supportsRelocationOfSerializedObjects is required. | Applicable to situations where combine does not exist, and supportsRelocationOfSerializedObjects is true, and the maximum number of supported partitions is 16777216. | | SortShuffleWriter | (1) Supports combine, sort <br> (2) Suitable for all scenarios <br> (3) The number of files spilled to disk is limited | (1) Multiple serializations are required | Suitable for all scenarios. | ### 1.3.2 shuffle read Currently there is only one implementation of BlockStoreShuffleReader. The implementation is similar to MapReduce. The reduce will pull the records of the corresponding partition remotely or locally from the map. Under normal circumstances, it will be written directly to the memory, but if the block size to be obtained exceeds the threshold, will use disk. Then it will be decided according to the user's needs whether to combine or sort, and finally form a record iterator required by the user. Combine and sort use ExternalAppendOnlyMap and ExternalSorter respectively. When the memory exceeds the threshold, the data will be spilled to the local disk. ## 1.4 Summary (1) About the semantics of each framework For MapReduce and the ordered io of Tez, it is a special case of spark sorting. For Tez's unordered io, it is essentially a special case of spark's non-sorting. In essence, the semantics of each framework are the same, and spark is more general. (2) Where will generate local disk files? After analyzing the three computing frameworks, we learned that the following processes will use disks: * (1) Map may generate intermediate temporary files because the memory exceeds the threshold. * (2) The map will eventually generate disk files to provide shuffle services. * (3) When reduce pulls records, disk files may be generated because the threshold is exceeded. * (4) When merging on the reduce side, temporary disk files may be generated for global sorting. In fact, uniffle has solved (1), (2). For (3), if the parameters are adjusted effectively, it is difficult to generate disk files. In fact, only (4) needs to be discussed in this article. ## 2 Task requirement analysis: What types of tasks require remote merge? Currently, uniffle's map-side operations no longer require disk operations. This article mainly considers the situation on the reduce side. Mainly divided into the following situations: * (1) For spark's non-sorted, non-aggregated, tez unordered io, Record is ready to use. It does not require any global aggregation and sorting operations, and only requires very little memory. The current version of uniffle will not use disk if the memory settings are reasonable. Just use the current uniffle solution. This article will not discuss this aspect. * (2) For spark sorting or aggregation tasks, tez ordered io, mapreduce, due to the need for global sorting or aggregation, the memory may not be enough, and the record may be spilled to the disk. This article mainly discusses this situation. According to the discussion in the first section, the SortShuffleWriter reference is more meaningful because both need to deserialize and sort. ## 3 Solution selection: Why choose the remote merge solution? ### 3.1 Situations where sort is required #### 3.1.1 How to sort For sorting operations, Map will generally be sorted to obtain a set of partially sorted records, here called segments, and then reduce will obtain all segments and merge them. So how to merge? Here Spark, MR, and Tez all use minimum heap K-way merge sorting. We can continue this approach. #### 3.1.2 Where to merge Merging there is more critical, because it involves whether or where we will spill. Again our goal is to avoid performing file operations on the same machine as the task. (1) Sort in Reduce To avoid spilling to disk, sorting can still be done on the reduce side. On the reduce side, Merger can only obtain a small portion of the Segment content, and then perform merge sorting. Resources will be obtained on demand according to the sorted order, and finally a globally sorted Records will be formed. <img width="440" alt="remote_merge_by_sub_segment" src="https://github.com/apache/incubator-uniffle/assets/10381583/5dcf3d69-84cd-45f0-ba74-a74d9884f609"> * Advantages: The shuffle server does not require a key comparator. * Disadvantages: Block data information needs to be maintained. Generally, the number of blocks is large and the amount of data in each block is not large. Therefore, a large amount of block metadata information needs to be maintained on the reduce side. More importantly, this solution generates a large number of IO operations, and the performance is difficult to guarantee. Therefore this option is ruled out. (2) Sorting on the shuffle server side BufferManager and FlushManager maintain block information in memory and disk. We only need to provide a service to merge these blocks and obtain globally sorted Records. * Advantages: You can use the metadata information already available on the shuffle server. * Disadvantage: The shuffle server needs to obtain the comparator of the task. Based on the above analysis, it was decided to use the scheme of sorting on the shuffle server side. ## 3.2 Handling situations that require combine After the Record is sorted by key, the keys can be read sequentially, so it is easy to complete the combine operation. How does combine operate? In general, combine operations are difficult to standardize, so it is necessary to avoid doing combine operations on the shuffle server side and move the combine operations to the reduce side. Therefore, when reading Records from the shuffle server, it is necessary to ensure that the same keys are placed together. We can use the hash value of the key as the basis for sorting (Note: Comparators are not provided for non-sorting operations), so that we can ensure that the same keys can be put together. > Note: In fact, records can still be organized using hashmap. This may seem more efficient, but it is not. This requires all segments to be loaded into memory, which the sorting scheme does not require. # 4 Scheme Design ## 4.1 Solution introduction > UFile represents a data set, containing a set of sorted Records. For the map side, it directly corresponds to the existing Block. For the reduce side, the size of UFile will be determined based on the cache size. <img width="1062" alt="remote_merge_structure" src="https://github.com/apache/incubator-uniffle/assets/10381583/19bf3dcb-ec91-41e7-a5bd-f0d3cc398119"> The process is as follows: * (1) The map side produces records and then sends them to the shuffle server. * (2) The shuffle server will store the data in the cache, or cache it to the local file system or remote file system through the flush manager. * (3) Add a new merge manager to the shuffle server to manage each shuffle. Segment information is recorded under each partition, which is called InMemorySegment in the cache and OnDiskSegment in the file. The data in InMemorySegment comes from bufferPool, and the data in OnDiskSegment comes from files. * (4) When certain conditions are met, some segments are merged according to a certain algorithm, and the merged results are written to local files or remote storage. * (5) When the reduce side reads data, it is a merge sort process. After the merge operation in the shuffle server, a certain partition will have a few merged ordered files left. We will read the files required by the reduce side based on the idea of merge sort. > Note: If there are only a few segments, we don't need to merge the segments into a new file, but can perform real-time merge. ## 4.2 How to read? Currently, Uniffle's shuffle reader uses blockid as the read mark, which makes it easy to verify whether an accurate and complete records are obtained. For remote merge, we no longer use the blockid generated by the map segment. The most accurate semantics is to read in the order of keys, that is, the next read should start from the next key that was read last time. In order to speed up the implementation of this semantics, I used an index on the merge side to read. A large number of io operations can be skipped. <img width="435" alt="remote_merge_ufile_format" src="https://github.com/apache/incubator-uniffle/assets/10381583/1468cc69-bf14-4c96-8651-248641847096"> When writing a file, sections will be divided according to a certain size, the value of the first key of each section and the starting address and length of the section will be recorded, and these metadata information will be written to the file. In this way, a large number of IO operations can be skipped when reading. > I will explore the method of directly recording offset as an alternative to this solution. # 5 Plan Main plans: * Unified serialization * ufile, shuffle writer and reader * Development of Merger and Merger Manager * Architecture adaptation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
