This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new 23e33b056 [CELEBORN-1345] Add a limit to the master's estimated
partition size
23e33b056 is described below
commit 23e33b0562a5024fd08637df66f969f2e4c574a6
Author: Shuang <[email protected]>
AuthorDate: Mon Mar 25 14:40:47 2024 +0800
[CELEBORN-1345] Add a limit to the master's estimated partition size
---
.../org/apache/celeborn/common/CelebornConf.scala | 12 ++++++++-
docs/configuration/master.md | 2 ++
docs/migration.md | 5 ++++
.../master/clustermeta/AbstractMetaManager.java | 5 +++-
.../clustermeta/DefaultMetaSystemSuiteJ.java | 29 ++++++++++++++++++++++
.../ha/RatisMasterStatusSystemSuiteJ.java | 29 +++++++++++++++++++++-
6 files changed, 79 insertions(+), 3 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 5bbe7d2ea..261e97478 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -682,6 +682,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else
get(WORKER_COMMIT_THREADS)
def workerCleanThreads: Int = get(WORKER_CLEAN_THREADS)
def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
+ def maxPartitionSizeToEstimate: Long =
+ get(ESTIMATED_PARTITION_SIZE_MAX_SIZE).getOrElse(partitionSplitMaximumSize
* 2)
def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
def partitionSorterSortPartitionTimeout: Long =
get(PARTITION_SORTER_SORT_TIMEOUT)
def partitionSorterReservedMemoryPerPartition: Long =
@@ -2140,10 +2142,18 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64mb")
+ val ESTIMATED_PARTITION_SIZE_MAX_SIZE: OptionalConfigEntry[Long] =
+ buildConf("celeborn.master.estimatedPartitionSize.maxSize")
+ .categories("master")
+ .doc("Max partition size for estimation. Default value should be
celeborn.worker.shuffle.partitionSplit.max * 2.")
+ .version("0.4.1")
+ .bytesConf(ByteUnit.BYTE)
+ .createOptional
+
val ESTIMATED_PARTITION_SIZE_MIN_SIZE: ConfigEntry[Long] =
buildConf("celeborn.master.estimatedPartitionSize.minSize")
.withAlternative("celeborn.shuffle.minPartitionSizeToEstimate")
- .categories("worker")
+ .categories("master", "worker")
.doc(
"Ignore partition size smaller than this configuration of partition
size for estimation.")
.version("0.3.0")
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index c80d324c3..94773a978 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -22,6 +22,8 @@ license: |
| celeborn.dynamicConfig.refresh.interval | 120s | Interval for refreshing the
corresponding dynamic config periodically. | 0.4.0 | |
| celeborn.dynamicConfig.store.backend | NONE | Store backend for dynamic
config. Available options: NONE, FS. Note: NONE means disabling dynamic config
store. | 0.4.0 | |
| celeborn.master.estimatedPartitionSize.initialSize | 64mb | Initial
partition size for estimation, it will change according to runtime stats. |
0.3.0 | celeborn.shuffle.initialEstimatedPartitionSize |
+| celeborn.master.estimatedPartitionSize.maxSize | <undefined> | false |
Max partition size for estimation. Default value should be
celeborn.worker.shuffle.partitionSplit.max * 2. | 0.4.1 | |
+| celeborn.master.estimatedPartitionSize.minSize | 8mb | false | Ignore
partition size smaller than this configuration of partition size for
estimation. | 0.3.0 | celeborn.shuffle.minPartitionSizeToEstimate |
| celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | Initial
delay time before start updating partition size for estimation. | 0.3.0 |
celeborn.shuffle.estimatedPartitionSize.update.initialDelay |
| celeborn.master.estimatedPartitionSize.update.interval | 10min | Interval of
updating partition size for estimation. | 0.3.0 |
celeborn.shuffle.estimatedPartitionSize.update.interval |
| celeborn.master.hdfs.expireDirs.timeout | 1h | The timeout for a expire dirs
to be deleted on HDFS. | 0.3.0 | |
diff --git a/docs/migration.md b/docs/migration.md
index ae8715a76..6128357e7 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -21,6 +21,11 @@ license: |
# Migration Guide
+## Upgrading from 0.4.0 to 0.4.1
+
+- Since 0.4.1, Celeborn master adds a limit to the estimated partition size
used for computing worker slots.
+ This size is now constrained within the range specified by
`celeborn.master.estimatedPartitionSize.minSize` and
`celeborn.master.estimatedPartitionSize.maxSize`.
+
## Upgrading from 0.3 to 0.4
- Since 0.4.0, Celeborn won't be compatible with Celeborn client that versions
below 0.3.0.
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 3ddd39232..aeb74b50d 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -359,7 +359,10 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
Utils.bytesToString(tmpTotalWritten),
tmpFileCount);
if (tmpFileCount != 0) {
- estimatedPartitionSize = tmpTotalWritten / tmpFileCount;
+ estimatedPartitionSize =
+ Math.max(
+ conf.minPartitionSizeToEstimate(),
+ Math.min(tmpTotalWritten / tmpFileCount,
conf.maxPartitionSizeToEstimate()));
} else {
estimatedPartitionSize = initialEstimatedPartitionSize;
}
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 83f5f7232..05c52230b 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -27,6 +27,7 @@ import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -642,6 +643,34 @@ public class DefaultMetaSystemSuiteJ {
@Test
public void testHandleUpdatePartitionSize() {
+ statusSystem.partitionTotalWritten.reset();
+ statusSystem.partitionTotalFileCount.reset();
+
+ // Default size
+ statusSystem.handleUpdatePartitionSize();
+ Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.initialEstimatedPartitionSize());
+
+ Long dummy = 1235L;
+ statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, dummy,
getNewReqeustId());
+ String appId2 = "app02";
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+
+ // Max size
+ statusSystem.handleUpdatePartitionSize();
+ Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.maxPartitionSizeToEstimate());
+
+ statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+
+ // Size between minEstimateSize -> maxEstimateSize
+ statusSystem.handleUpdatePartitionSize();
+ Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
+
+ statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1000l, 1, dummy,
getNewReqeustId());
+
+ // Min size
statusSystem.handleUpdatePartitionSize();
+ Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.minPartitionSizeToEstimate());
}
}
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index b66099e1d..f16d27bdf 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -1018,8 +1018,35 @@ public class RatisMasterStatusSystemSuiteJ {
AbstractMetaManager statusSystem = pickLeaderStatusSystem();
Assert.assertNotNull(statusSystem);
+ CelebornConf conf = new CelebornConf();
+ statusSystem.partitionTotalWritten.reset();
+ statusSystem.partitionTotalFileCount.reset();
+
statusSystem.handleUpdatePartitionSize();
- Thread.sleep(3000L);
+ Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.initialEstimatedPartitionSize());
+
+ Long dummy = 1235L;
+ statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, dummy,
getNewReqeustId());
+ String appId2 = "app02";
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+
+ // Max size
+ statusSystem.handleUpdatePartitionSize();
+ Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.maxPartitionSizeToEstimate());
+
+ statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+
+ // Size between minEstimateSize -> maxEstimateSize
+ statusSystem.handleUpdatePartitionSize();
+ Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
+
+ statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1000l, 1, dummy,
getNewReqeustId());
+
+ // Min size
+ statusSystem.handleUpdatePartitionSize();
+ Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.minPartitionSizeToEstimate());
}
@AfterClass