This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new f527b22b4 [CELEBORN-1410] Combine multiple ShuffleBlockInfo into a 
single ShuffleBlockInfo
f527b22b4 is described below

commit f527b22b4d579b5ccd37a5de29df391d7e42e0ac
Author: Sanskar Modi <[email protected]>
AuthorDate: Thu May 23 21:22:45 2024 +0800

    [CELEBORN-1410] Combine multiple ShuffleBlockInfo into a single 
ShuffleBlockInfo
    
    ### What changes were proposed in this pull request?
    
    Merging smaller `ShuffleBlockInfo` corresponding into same mapID, such that 
size of each block does not exceeds `celeborn.shuffle.chunk.size`
    
    ### Why are the changes needed?
    As sorted ShuffleBlocks are contiguous, we can compact multiple 
`ShuffleBlockInfo` into one as long as the size of compacted one does not 
exceeds half of `celeborn.shuffle.chunk.size`. This way we can decrease the 
number of ShuffleBlock objects.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing UTs
    
    Closes #2524 from s0nskar/CELEBORN-1410.
    
    Lead-authored-by: Sanskar Modi <[email protected]>
    Co-authored-by: Fu Chen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 12 +++++++++++
 docs/configuration/worker.md                       |  1 +
 .../worker/storage/PartitionFilesSorter.java       | 23 ++++++++++++++++++----
 3 files changed, 32 insertions(+), 4 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 62cb30b4d..68fa938b5 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -811,6 +811,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def workerPartitionSorterSortPartitionTimeout: Long = 
get(WORKER_PARTITION_SORTER_SORT_TIMEOUT)
   def workerPartitionSorterPrefetchEnabled: Boolean =
     get(WORKER_PARTITION_SORTER_PREFETCH_ENABLED)
+  def workerPartitionSorterShuffleBlockCompactionFactor: Double =
+    get(WORKER_SHUFFLE_BLOCK_COMPACTION_FACTOR)
   def workerPartitionSorterReservedMemoryPerPartition: Long =
     get(WORKER_PARTITION_SORTER_RESERVED_MEMORY_PER_PARTITION)
   def workerPartitionSorterThreads: Int =
@@ -2961,6 +2963,16 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(true)
 
+  val WORKER_SHUFFLE_BLOCK_COMPACTION_FACTOR: ConfigEntry[Double] =
+    buildConf("celeborn.shuffle.sortPartition.block.compactionFactor")
+      .categories("worker")
+      .version("0.4.2")
+      .doc("Combine sorted shuffle blocks such that size of compacted shuffle 
block does not " +
+        s"exceed compactionFactor * ${SHUFFLE_CHUNK_SIZE.key}")
+      .doubleConf
+      .checkValue(v => v >= 0.0 && v <= 1.0, "Should be in [0.0, 1.0].")
+      .createWithDefault(0.25)
+
   val WORKER_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
     buildConf("celeborn.worker.flusher.buffer.size")
       .categories("worker")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 42d4ec6ed..ee6756131 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -39,6 +39,7 @@ license: |
 | celeborn.master.internal.endpoints | &lt;localhost&gt;:8097 | false | 
Endpoints of master nodes just for celeborn workers to connect, allowed pattern 
is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:8097,clb2:8097,clb3:8097`. 
If the port is omitted, 8097 will be used. | 0.5.0 |  | 
 | celeborn.redaction.regex | (?i)secret|password|token|access[.]key | false | 
Regex to decide which Celeborn configuration properties and environment 
variables in master and worker environments contain sensitive information. When 
this regex matches a property key or value, the value is redacted from the 
logging. | 0.5.0 |  | 
 | celeborn.shuffle.chunk.size | 8m | false | Max chunk size of reducer's 
merged shuffle data. For example, if a reducer's shuffle data is 128M and the 
data will need 16 fetch chunk requests to fetch. | 0.2.0 |  | 
+| celeborn.shuffle.sortPartition.block.compactionFactor | 0.25 | false | 
Combine sorted shuffle blocks such that size of compacted shuffle block does 
not exceed compactionFactor * celeborn.shuffle.chunk.size | 0.4.2 |  | 
 | celeborn.storage.availableTypes | HDD | false | Enabled storages. Available 
options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. 
| 0.3.0 | celeborn.storage.activeTypes | 
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | false | HDFS base directory 
for Celeborn to store shuffle data. | 0.2.0 |  | 
 | celeborn.storage.hdfs.kerberos.keytab | &lt;undefined&gt; | false | Kerberos 
keytab file path for HDFS storage connection. | 0.3.2 |  | 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 481ea6c96..151e1df4c 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -86,6 +86,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
   private final AtomicLong sortedFilesSize = new AtomicLong();
   protected final long sortTimeout;
   protected final long shuffleChunkSize;
+  protected final double compactionFactor;
   protected final boolean prefetchEnabled;
   protected final long reservedMemoryPerPartition;
   private final long partitionSorterShutdownAwaitTime;
@@ -100,6 +101,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
       MemoryManager memoryManager, CelebornConf conf, AbstractSource source) {
     this.sortTimeout = conf.workerPartitionSorterSortPartitionTimeout();
     this.shuffleChunkSize = conf.shuffleChunkSize();
+    this.compactionFactor = 
conf.workerPartitionSorterShuffleBlockCompactionFactor();
     this.prefetchEnabled = conf.workerPartitionSorterPrefetchEnabled();
     this.reservedMemoryPerPartition = 
conf.workerPartitionSorterReservedMemoryPerPartition();
     this.partitionSorterShutdownAwaitTime =
@@ -719,10 +721,23 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
           for (ShuffleBlockInfo blockInfo : originShuffleBlocks) {
             long offset = blockInfo.offset;
             long length = blockInfo.length;
-            ShuffleBlockInfo sortedBlock = new ShuffleBlockInfo();
-            sortedBlock.offset = fileIndex;
-            sortedBlock.length = length;
-            sortedShuffleBlocks.add(sortedBlock);
+            // Combine multiple small `ShuffleBlockInfo` for same mapId such 
that size of compacted
+            // `ShuffleBlockInfo` does not exceed `compactionFactor` * 
`shuffleChunkSize`
+            boolean shuffleBlockCompacted = false;
+            if (!sortedShuffleBlocks.isEmpty()) {
+              ShuffleBlockInfo lastShuffleBlock =
+                  sortedShuffleBlocks.get(sortedShuffleBlocks.size() - 1);
+              if (lastShuffleBlock.length + length <= compactionFactor * 
shuffleChunkSize) {
+                lastShuffleBlock.length += length;
+                shuffleBlockCompacted = true;
+              }
+            }
+            if (!shuffleBlockCompacted) {
+              ShuffleBlockInfo sortedBlock = new ShuffleBlockInfo();
+              sortedBlock.offset = fileIndex;
+              sortedBlock.length = length;
+              sortedShuffleBlocks.add(sortedBlock);
+            }
             fileIndex += transferBlock(offset, length);
           }
           sortedBlockInfoMap.put(mapId, sortedShuffleBlocks);

Reply via email to