This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new 74102d8e3 [CELEBORN-1410] Combine multiple ShuffleBlockInfo into a 
single ShuffleBlockInfo
74102d8e3 is described below

commit 74102d8e3e214418066c146e0c0bef7e0ef46262
Author: Sanskar Modi <[email protected]>
AuthorDate: Thu May 23 21:22:45 2024 +0800

    [CELEBORN-1410] Combine multiple ShuffleBlockInfo into a single 
ShuffleBlockInfo
    
    Merging smaller `ShuffleBlockInfo` corresponding into same mapID, such that 
size of each block does not exceeds `celeborn.shuffle.chunk.size`
    
    As sorted ShuffleBlocks are contiguous, we can compact multiple 
`ShuffleBlockInfo` into one as long as the size of compacted one does not 
exceeds half of `celeborn.shuffle.chunk.size`. This way we can decrease the 
number of ShuffleBlock objects.
    
    No
    
    Existing UTs
    
    Closes #2524 from s0nskar/CELEBORN-1410.
    
    Lead-authored-by: Sanskar Modi <[email protected]>
    Co-authored-by: Fu Chen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 12 +++++++++++
 docs/configuration/worker.md                       |  1 +
 .../worker/storage/PartitionFilesSorter.java       | 23 ++++++++++++++++++----
 3 files changed, 32 insertions(+), 4 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 5c1cc0865..ea0147d5d 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -686,6 +686,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
     get(ESTIMATED_PARTITION_SIZE_MAX_SIZE).getOrElse(partitionSplitMaximumSize 
* 2)
   def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
   def partitionSorterSortPartitionTimeout: Long = 
get(PARTITION_SORTER_SORT_TIMEOUT)
+  def workerPartitionSorterShuffleBlockCompactionFactor: Double =
+    get(WORKER_SHUFFLE_BLOCK_COMPACTION_FACTOR)
   def partitionSorterReservedMemoryPerPartition: Long =
     get(WORKER_PARTITION_SORTER_PER_PARTITION_RESERVED_MEMORY)
   def partitionSorterThreads: Int =
@@ -2537,6 +2539,16 @@ object CelebornConf extends Logging {
       .checkValue(v => v < Int.MaxValue, "Reserved memory per partition must 
be less than 2GB.")
       .createWithDefaultString("1mb")
 
+  val WORKER_SHUFFLE_BLOCK_COMPACTION_FACTOR: ConfigEntry[Double] =
+    buildConf("celeborn.shuffle.sortPartition.block.compactionFactor")
+      .categories("worker")
+      .version("0.4.2")
+      .doc("Combine sorted shuffle blocks such that size of compacted shuffle 
block does not " +
+        s"exceed compactionFactor * ${SHUFFLE_CHUNK_SIZE.key}")
+      .doubleConf
+      .checkValue(v => v >= 0.0 && v <= 1.0, "Should be in [0.0, 1.0].")
+      .createWithDefault(0.25)
+
   val WORKER_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
     buildConf("celeborn.worker.flusher.buffer.size")
       .categories("worker")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 1f7f48a24..69221343b 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -24,6 +24,7 @@ license: |
 | celeborn.master.endpoints | &lt;localhost&gt;:9097 | Endpoints of master 
nodes for celeborn client to connect, allowed pattern is: 
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If 
the port is omitted, 9097 will be used. | 0.2.0 |  | 
 | celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size 
smaller than this configuration of partition size for estimation. | 0.3.0 | 
celeborn.shuffle.minPartitionSizeToEstimate | 
 | celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged 
shuffle data. For example, if a reducer's shuffle data is 128M and the data 
will need 16 fetch chunk requests to fetch. | 0.2.0 |  | 
+| celeborn.shuffle.sortPartition.block.compactionFactor | 0.25 | Combine 
sorted shuffle blocks such that size of compacted shuffle block does not exceed 
compactionFactor * celeborn.shuffle.chunk.size | 0.4.2 |  | 
 | celeborn.storage.availableTypes | HDD | Enabled storages. Available options: 
MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 | 
celeborn.storage.activeTypes | 
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for 
Celeborn to store shuffle data. | 0.2.0 |  | 
 | celeborn.storage.hdfs.kerberos.keytab | &lt;undefined&gt; | Kerberos keytab 
file path for HDFS storage connection. | 0.3.2 |  | 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index d2a6388a8..95eddcf0a 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -84,6 +84,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
   private final AtomicLong sortedFilesSize = new AtomicLong();
   protected final long sortTimeout;
   protected final long shuffleChunkSize;
+  protected final double compactionFactor;
   protected final long reservedMemoryPerPartition;
   private boolean gracefulShutdown;
   private long partitionSorterShutdownAwaitTime;
@@ -100,6 +101,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
     this.sortTimeout = conf.partitionSorterSortPartitionTimeout();
     this.shuffleChunkSize = conf.shuffleChunkSize();
     this.reservedMemoryPerPartition = 
conf.partitionSorterReservedMemoryPerPartition();
+    this.compactionFactor = 
conf.workerPartitionSorterShuffleBlockCompactionFactor();
     this.partitionSorterShutdownAwaitTime =
         conf.workerGracefulShutdownPartitionSorterCloseAwaitTimeMs();
     this.indexCacheMaxWeight = conf.partitionSorterIndexCacheMaxWeight();
@@ -623,10 +625,23 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
           for (ShuffleBlockInfo blockInfo : originShuffleBlocks) {
             long offset = blockInfo.offset;
             long length = blockInfo.length;
-            ShuffleBlockInfo sortedBlock = new ShuffleBlockInfo();
-            sortedBlock.offset = fileIndex;
-            sortedBlock.length = length;
-            sortedShuffleBlocks.add(sortedBlock);
+            // Combine multiple small `ShuffleBlockInfo` for same mapId such 
that size of compacted
+            // `ShuffleBlockInfo` does not exceed `compactionFactor` * 
`shuffleChunkSize`
+            boolean shuffleBlockCompacted = false;
+            if (!sortedShuffleBlocks.isEmpty()) {
+              ShuffleBlockInfo lastShuffleBlock =
+                  sortedShuffleBlocks.get(sortedShuffleBlocks.size() - 1);
+              if (lastShuffleBlock.length + length <= compactionFactor * 
shuffleChunkSize) {
+                lastShuffleBlock.length += length;
+                shuffleBlockCompacted = true;
+              }
+            }
+            if (!shuffleBlockCompacted) {
+              ShuffleBlockInfo sortedBlock = new ShuffleBlockInfo();
+              sortedBlock.offset = fileIndex;
+              sortedBlock.length = length;
+              sortedShuffleBlocks.add(sortedBlock);
+            }
             fileIndex += transferBlock(offset, length);
           }
           sortedBlockInfoMap.put(mapId, sortedShuffleBlocks);

Reply via email to