This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new 23e33b056 [CELEBORN-1345] Add a limit to the master's estimated 
partition size
23e33b056 is described below

commit 23e33b0562a5024fd08637df66f969f2e4c574a6
Author: Shuang <[email protected]>
AuthorDate: Mon Mar 25 14:40:47 2024 +0800

    [CELEBORN-1345] Add a limit to the master's estimated partition size
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 12 ++++++++-
 docs/configuration/master.md                       |  2 ++
 docs/migration.md                                  |  5 ++++
 .../master/clustermeta/AbstractMetaManager.java    |  5 +++-
 .../clustermeta/DefaultMetaSystemSuiteJ.java       | 29 ++++++++++++++++++++++
 .../ha/RatisMasterStatusSystemSuiteJ.java          | 29 +++++++++++++++++++++-
 6 files changed, 79 insertions(+), 3 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 5bbe7d2ea..261e97478 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -682,6 +682,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
     if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else 
get(WORKER_COMMIT_THREADS)
   def workerCleanThreads: Int = get(WORKER_CLEAN_THREADS)
   def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
+  def maxPartitionSizeToEstimate: Long =
+    get(ESTIMATED_PARTITION_SIZE_MAX_SIZE).getOrElse(partitionSplitMaximumSize 
* 2)
   def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
   def partitionSorterSortPartitionTimeout: Long = 
get(PARTITION_SORTER_SORT_TIMEOUT)
   def partitionSorterReservedMemoryPerPartition: Long =
@@ -2140,10 +2142,18 @@ object CelebornConf extends Logging {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("64mb")
 
+  val ESTIMATED_PARTITION_SIZE_MAX_SIZE: OptionalConfigEntry[Long] =
+    buildConf("celeborn.master.estimatedPartitionSize.maxSize")
+      .categories("master")
+      .doc("Max partition size for estimation. Default value should be 
celeborn.worker.shuffle.partitionSplit.max * 2.")
+      .version("0.4.1")
+      .bytesConf(ByteUnit.BYTE)
+      .createOptional
+
   val ESTIMATED_PARTITION_SIZE_MIN_SIZE: ConfigEntry[Long] =
     buildConf("celeborn.master.estimatedPartitionSize.minSize")
       .withAlternative("celeborn.shuffle.minPartitionSizeToEstimate")
-      .categories("worker")
+      .categories("master", "worker")
       .doc(
         "Ignore partition size smaller than this configuration of partition 
size for estimation.")
       .version("0.3.0")
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index c80d324c3..94773a978 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -22,6 +22,8 @@ license: |
 | celeborn.dynamicConfig.refresh.interval | 120s | Interval for refreshing the 
corresponding dynamic config periodically. | 0.4.0 |  | 
 | celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic 
config. Available options: NONE, FS. Note: NONE means disabling dynamic config 
store. | 0.4.0 |  | 
 | celeborn.master.estimatedPartitionSize.initialSize | 64mb | Initial 
partition size for estimation, it will change according to runtime stats. | 
0.3.0 | celeborn.shuffle.initialEstimatedPartitionSize | 
+| celeborn.master.estimatedPartitionSize.maxSize | &lt;undefined&gt; | false | 
Max partition size for estimation. Default value should be 
celeborn.worker.shuffle.partitionSplit.max * 2. | 0.4.1 |  | 
+| celeborn.master.estimatedPartitionSize.minSize | 8mb | false | Ignore 
partition size smaller than this configuration of partition size for 
estimation. | 0.3.0 | celeborn.shuffle.minPartitionSizeToEstimate | 
 | celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | Initial 
delay time before start updating partition size for estimation. | 0.3.0 | 
celeborn.shuffle.estimatedPartitionSize.update.initialDelay | 
 | celeborn.master.estimatedPartitionSize.update.interval | 10min | Interval of 
updating partition size for estimation. | 0.3.0 | 
celeborn.shuffle.estimatedPartitionSize.update.interval | 
 | celeborn.master.hdfs.expireDirs.timeout | 1h | The timeout for a expire dirs 
to be deleted on HDFS. | 0.3.0 |  | 
diff --git a/docs/migration.md b/docs/migration.md
index ae8715a76..6128357e7 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -21,6 +21,11 @@ license: |
 
 # Migration Guide
 
+## Upgrading from 0.4.0 to 0.4.1
+
+- Since 0.4.1, Celeborn master adds a limit to the estimated partition size 
used for computing worker slots. 
+  This size is now constrained within the range specified by 
`celeborn.master.estimatedPartitionSize.minSize` and 
`celeborn.master.estimatedPartitionSize.maxSize`.
+
 ## Upgrading from 0.3 to 0.4
 
 - Since 0.4.0, Celeborn won't be compatible with Celeborn client that versions 
below 0.3.0.
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 3ddd39232..aeb74b50d 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -359,7 +359,10 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
         Utils.bytesToString(tmpTotalWritten),
         tmpFileCount);
     if (tmpFileCount != 0) {
-      estimatedPartitionSize = tmpTotalWritten / tmpFileCount;
+      estimatedPartitionSize =
+          Math.max(
+              conf.minPartitionSizeToEstimate(),
+              Math.min(tmpTotalWritten / tmpFileCount, 
conf.maxPartitionSizeToEstimate()));
     } else {
       estimatedPartitionSize = initialEstimatedPartitionSize;
     }
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 83f5f7232..05c52230b 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -27,6 +27,7 @@ import java.util.*;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -642,6 +643,34 @@ public class DefaultMetaSystemSuiteJ {
 
   @Test
   public void testHandleUpdatePartitionSize() {
+    statusSystem.partitionTotalWritten.reset();
+    statusSystem.partitionTotalFileCount.reset();
+
+    // Default size
+    statusSystem.handleUpdatePartitionSize();
+    Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.initialEstimatedPartitionSize());
+
+    Long dummy = 1235L;
+    statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, dummy, 
getNewReqeustId());
+    String appId2 = "app02";
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+
+    // Max size
+    statusSystem.handleUpdatePartitionSize();
+    Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.maxPartitionSizeToEstimate());
+
+    statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+
+    // Size between minEstimateSize -> maxEstimateSize
+    statusSystem.handleUpdatePartitionSize();
+    Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
+
+    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1000l, 1, dummy, 
getNewReqeustId());
+
+    // Min size
     statusSystem.handleUpdatePartitionSize();
+    Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.minPartitionSizeToEstimate());
   }
 }
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index b66099e1d..f16d27bdf 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -1018,8 +1018,35 @@ public class RatisMasterStatusSystemSuiteJ {
     AbstractMetaManager statusSystem = pickLeaderStatusSystem();
     Assert.assertNotNull(statusSystem);
 
+    CelebornConf conf = new CelebornConf();
+    statusSystem.partitionTotalWritten.reset();
+    statusSystem.partitionTotalFileCount.reset();
+
     statusSystem.handleUpdatePartitionSize();
-    Thread.sleep(3000L);
+    Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.initialEstimatedPartitionSize());
+
+    Long dummy = 1235L;
+    statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, dummy, 
getNewReqeustId());
+    String appId2 = "app02";
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+
+    // Max size
+    statusSystem.handleUpdatePartitionSize();
+    Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.maxPartitionSizeToEstimate());
+
+    statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+
+    // Size between minEstimateSize -> maxEstimateSize
+    statusSystem.handleUpdatePartitionSize();
+    Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
+
+    statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, dummy, 
getNewReqeustId());
+    statusSystem.handleAppHeartbeat(appId2, 1000l, 1, dummy, 
getNewReqeustId());
+
+    // Min size
+    statusSystem.handleUpdatePartitionSize();
+    Assert.assertEquals(statusSystem.estimatedPartitionSize, 
conf.minPartitionSizeToEstimate());
   }
 
   @AfterClass

Reply via email to