This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 9497d557e [CELEBORN-1345] Add a limit to the master's estimated 
partition size
9497d557e is described below

commit 9497d557e6a8898db6a1801a1020497a3a736d5b
Author: lvshuang.xjs <[email protected]>
AuthorDate: Mon Mar 25 14:40:47 2024 +0800

    [CELEBORN-1345] Add a limit to the master's estimated partition size
    
    ### What changes were proposed in this pull request?
    Currently, the Celeborn master calculates the estimatedPartitionSize based 
on the fileInfo committed by the application. This estimate is then used to 
allocate slots across all workers. However, this partition size may be too 
large or too small for Celeborn. For example, if an application commits a 
single file of 1TB to only one worker, using that partition size could result 
in all other workers having no available slots or only very small slots. To 
improve this, it would be better to [...]
    
    ### Why are the changes needed?
    As title
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
    
    Closes #2412 from RexXiong/CELEBORN-1345.
    
    Lead-authored-by: lvshuang.xjs <[email protected]>
    Co-authored-by: Shuang <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 12 ++++++++-
 docs/configuration/master.md                       |  2 ++
 docs/migration.md                                  |  5 ++++
 .../master/clustermeta/AbstractMetaManager.java    |  5 +++-
 .../clustermeta/DefaultMetaSystemSuiteJ.java       | 29 ++++++++++++++++++++++
 .../ha/RatisMasterStatusSystemSuiteJ.java          | 29 +++++++++++++++++++++-
 6 files changed, 79 insertions(+), 3 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 9a7751a75..90248088a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -699,6 +699,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
     if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else 
get(WORKER_COMMIT_THREADS)
   def workerCleanThreads: Int = get(WORKER_CLEAN_THREADS)
   def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
+  def maxPartitionSizeToEstimate: Long =
+    get(ESTIMATED_PARTITION_SIZE_MAX_SIZE).getOrElse(partitionSplitMaximumSize 
* 2)
   def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
   def workerPartitionSorterSortPartitionTimeout: Long = 
get(WORKER_PARTITION_SORTER_SORT_TIMEOUT)
   def workerPartitionSorterPrefetchEnabled: Boolean =
@@ -2219,10 +2221,18 @@ object CelebornConf extends Logging {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("64mb")
 
+  val ESTIMATED_PARTITION_SIZE_MAX_SIZE: OptionalConfigEntry[Long] =
+    buildConf("celeborn.master.estimatedPartitionSize.maxSize")
+      .categories("master")
+      .doc("Max partition size for estimation. Default value should be 
celeborn.worker.shuffle.partitionSplit.max * 2.")
+      .version("0.4.1")
+      .bytesConf(ByteUnit.BYTE)
+      .createOptional
+
   val ESTIMATED_PARTITION_SIZE_MIN_SIZE: ConfigEntry[Long] =
     buildConf("celeborn.master.estimatedPartitionSize.minSize")
       .withAlternative("celeborn.shuffle.minPartitionSizeToEstimate")
-      .categories("worker")
+      .categories("master", "worker")
       .doc(
         "Ignore partition size smaller than this configuration of partition 
size for estimation.")
       .version("0.3.0")
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index e1efa174d..7a504441d 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -34,6 +34,8 @@ license: |
 | celeborn.dynamicConfig.store.fs.path | &lt;undefined&gt; | false | The path 
of dynamic config file for fs store backend. The file format should be yaml. 
The default path is `${CELEBORN_CONF_DIR}/dynamicConfig.yaml`. | 0.5.0 |  | 
 | celeborn.internal.port.enabled | false | false | Whether to create a 
internal port on Masters/Workers for inter-Masters/Workers communication. This 
is beneficial when SASL authentication is enforced for all interactions between 
clients and Celeborn Services, but the services can exchange messages without 
being subject to SASL authentication. | 0.5.0 |  | 
 | celeborn.master.estimatedPartitionSize.initialSize | 64mb | false | Initial 
partition size for estimation, it will change according to runtime stats. | 
0.3.0 | celeborn.shuffle.initialEstimatedPartitionSize | 
+| celeborn.master.estimatedPartitionSize.maxSize | &lt;undefined&gt; | false | 
Max partition size for estimation. Default value should be 
celeborn.worker.shuffle.partitionSplit.max * 2. | 0.4.1 |  | 
+| celeborn.master.estimatedPartitionSize.minSize | 8mb | false | Ignore 
partition size smaller than this configuration of partition size for 
estimation. | 0.3.0 | celeborn.shuffle.minPartitionSizeToEstimate | 
 | celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | false | 
Initial delay time before start updating partition size for estimation. | 0.3.0 
| celeborn.shuffle.estimatedPartitionSize.update.initialDelay | 
 | celeborn.master.estimatedPartitionSize.update.interval | 10min | false | 
Interval of updating partition size for estimation. | 0.3.0 | 
celeborn.shuffle.estimatedPartitionSize.update.interval | 
 | celeborn.master.hdfs.expireDirs.timeout | 1h | false | The timeout for a 
expire dirs to be deleted on HDFS. | 0.3.0 |  | 
diff --git a/docs/migration.md b/docs/migration.md
index eabd881e3..0c6214da2 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -38,6 +38,11 @@ license: |
 
 - Since 0.5.0, Celeborn client removes configuration 
`celeborn.client.push.splitPartition.threads`.
 
+## Upgrading from 0.4.0 to 0.4.1
+
+- Since 0.4.1, Celeborn master adds a limit to the estimated partition size 
used for computing worker slots. 
+  This size is now constrained within the range specified by 
`celeborn.master.estimatedPartitionSize.minSize` and 
`celeborn.master.estimatedPartitionSize.maxSize`.
+
 ## Upgrading from 0.3 to 0.4
 
 - Since 0.4.0, Celeborn won't be compatible with Celeborn client that versions 
below 0.3.0.
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index b566ed0aa..c1cd37b28 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -445,7 +445,10 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
         Utils.bytesToString(tmpTotalWritten),
         tmpFileCount);
     if (tmpFileCount != 0) {
-      estimatedPartitionSize = tmpTotalWritten / tmpFileCount;
+      estimatedPartitionSize =
+          Math.max(
+              conf.minPartitionSizeToEstimate(),
+              Math.min(tmpTotalWritten / tmpFileCount, 
conf.maxPartitionSizeToEstimate()));
     } else {
       estimatedPartitionSize = initialEstimatedPartitionSize;
     }
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 081c1b39e..64fae0b9b 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -27,6 +27,7 @@ import java.util.*;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -720,7 +721,35 @@ public class DefaultMetaSystemSuiteJ {
 
   @Test
   public void testHandleUpdatePartitionSize() {
+    statusSystem.partitionTotalWritten.reset();
+    statusSystem.partitionTotalFileCount.reset();
+
+    // Default size
+    statusSystem.handleUpdatePartitionSize();
+    Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.initialEstimatedPartitionSize());
+
+    Long dummy = 1235L;
+    statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, dummy, 
getNewReqeustId());
+    String appId2 = "app02";
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+
+    // Max size
+    statusSystem.handleUpdatePartitionSize();
+    Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.maxPartitionSizeToEstimate());
+
+    statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+
+    // Size between minEstimateSize -> maxEstimateSize
+    statusSystem.handleUpdatePartitionSize();
+    Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
+
+    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1000l, 1, dummy, 
getNewReqeustId());
+
+    // Min size
     statusSystem.handleUpdatePartitionSize();
+    Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.minPartitionSizeToEstimate());
   }
 
   @Test
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 772845ec0..d7cf715a0 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -1121,8 +1121,35 @@ public class RatisMasterStatusSystemSuiteJ {
     AbstractMetaManager statusSystem = pickLeaderStatusSystem();
     Assert.assertNotNull(statusSystem);
 
+    CelebornConf conf = new CelebornConf();
+    statusSystem.partitionTotalWritten.reset();
+    statusSystem.partitionTotalFileCount.reset();
+
     statusSystem.handleUpdatePartitionSize();
-    Thread.sleep(3000L);
+    Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.initialEstimatedPartitionSize());
+
+    Long dummy = 1235L;
+    statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, dummy, 
getNewReqeustId());
+    String appId2 = "app02";
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+
+    // Max size
+    statusSystem.handleUpdatePartitionSize();
+    Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.maxPartitionSizeToEstimate());
+
+    statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+
+    // Size between minEstimateSize -> maxEstimateSize
+    statusSystem.handleUpdatePartitionSize();
+    Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
+
+    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1000l, 1, dummy, 
getNewReqeustId());
+
+    // Min size
+    statusSystem.handleUpdatePartitionSize();
+    Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.minPartitionSizeToEstimate());
   }
 
   @AfterClass

Reply via email to