This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 9497d557e [CELEBORN-1345] Add a limit to the master's estimated
partition size
9497d557e is described below
commit 9497d557e6a8898db6a1801a1020497a3a736d5b
Author: lvshuang.xjs <[email protected]>
AuthorDate: Mon Mar 25 14:40:47 2024 +0800
[CELEBORN-1345] Add a limit to the master's estimated partition size
### What changes were proposed in this pull request?
Currently, the Celeborn master calculates the estimatedPartitionSize based
on the fileInfo committed by the application. This estimate is then used to
allocate slots across all workers. However, this partition size may be too
large or too small for Celeborn. For example, if an application commits a
single file of 1TB to only one worker, using that partition size could result
in all other workers having no available slots or only very small slots. To
improve this, it would be better to [...]
### Why are the changes needed?
As title
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes #2412 from RexXiong/CELEBORN-1345.
Lead-authored-by: lvshuang.xjs <[email protected]>
Co-authored-by: Shuang <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 12 ++++++++-
docs/configuration/master.md | 2 ++
docs/migration.md | 5 ++++
.../master/clustermeta/AbstractMetaManager.java | 5 +++-
.../clustermeta/DefaultMetaSystemSuiteJ.java | 29 ++++++++++++++++++++++
.../ha/RatisMasterStatusSystemSuiteJ.java | 29 +++++++++++++++++++++-
6 files changed, 79 insertions(+), 3 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 9a7751a75..90248088a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -699,6 +699,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else
get(WORKER_COMMIT_THREADS)
def workerCleanThreads: Int = get(WORKER_CLEAN_THREADS)
def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
+ def maxPartitionSizeToEstimate: Long =
+ get(ESTIMATED_PARTITION_SIZE_MAX_SIZE).getOrElse(partitionSplitMaximumSize
* 2)
def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
def workerPartitionSorterSortPartitionTimeout: Long =
get(WORKER_PARTITION_SORTER_SORT_TIMEOUT)
def workerPartitionSorterPrefetchEnabled: Boolean =
@@ -2219,10 +2221,18 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64mb")
+ val ESTIMATED_PARTITION_SIZE_MAX_SIZE: OptionalConfigEntry[Long] =
+ buildConf("celeborn.master.estimatedPartitionSize.maxSize")
+ .categories("master")
+ .doc("Max partition size for estimation. Default value should be
celeborn.worker.shuffle.partitionSplit.max * 2.")
+ .version("0.4.1")
+ .bytesConf(ByteUnit.BYTE)
+ .createOptional
+
val ESTIMATED_PARTITION_SIZE_MIN_SIZE: ConfigEntry[Long] =
buildConf("celeborn.master.estimatedPartitionSize.minSize")
.withAlternative("celeborn.shuffle.minPartitionSizeToEstimate")
- .categories("worker")
+ .categories("master", "worker")
.doc(
"Ignore partition size smaller than this configuration of partition
size for estimation.")
.version("0.3.0")
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index e1efa174d..7a504441d 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -34,6 +34,8 @@ license: |
| celeborn.dynamicConfig.store.fs.path | <undefined> | false | The path
of dynamic config file for fs store backend. The file format should be yaml.
The default path is `${CELEBORN_CONF_DIR}/dynamicConfig.yaml`. | 0.5.0 | |
| celeborn.internal.port.enabled | false | false | Whether to create a
internal port on Masters/Workers for inter-Masters/Workers communication. This
is beneficial when SASL authentication is enforced for all interactions between
clients and Celeborn Services, but the services can exchange messages without
being subject to SASL authentication. | 0.5.0 | |
| celeborn.master.estimatedPartitionSize.initialSize | 64mb | false | Initial
partition size for estimation, it will change according to runtime stats. |
0.3.0 | celeborn.shuffle.initialEstimatedPartitionSize |
+| celeborn.master.estimatedPartitionSize.maxSize | <undefined> | false |
Max partition size for estimation. Default value should be
celeborn.worker.shuffle.partitionSplit.max * 2. | 0.4.1 | |
+| celeborn.master.estimatedPartitionSize.minSize | 8mb | false | Ignore
partition size smaller than this configuration of partition size for
estimation. | 0.3.0 | celeborn.shuffle.minPartitionSizeToEstimate |
| celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | false |
Initial delay time before start updating partition size for estimation. | 0.3.0
| celeborn.shuffle.estimatedPartitionSize.update.initialDelay |
| celeborn.master.estimatedPartitionSize.update.interval | 10min | false |
Interval of updating partition size for estimation. | 0.3.0 |
celeborn.shuffle.estimatedPartitionSize.update.interval |
| celeborn.master.hdfs.expireDirs.timeout | 1h | false | The timeout for a
expire dirs to be deleted on HDFS. | 0.3.0 | |
diff --git a/docs/migration.md b/docs/migration.md
index eabd881e3..0c6214da2 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -38,6 +38,11 @@ license: |
- Since 0.5.0, Celeborn client removes configuration
`celeborn.client.push.splitPartition.threads`.
+## Upgrading from 0.4.0 to 0.4.1
+
+- Since 0.4.1, Celeborn master adds a limit to the estimated partition size
used for computing worker slots.
+ This size is now constrained within the range specified by
`celeborn.master.estimatedPartitionSize.minSize` and
`celeborn.master.estimatedPartitionSize.maxSize`.
+
## Upgrading from 0.3 to 0.4
- Since 0.4.0, Celeborn won't be compatible with Celeborn client that versions
below 0.3.0.
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index b566ed0aa..c1cd37b28 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -445,7 +445,10 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
Utils.bytesToString(tmpTotalWritten),
tmpFileCount);
if (tmpFileCount != 0) {
- estimatedPartitionSize = tmpTotalWritten / tmpFileCount;
+ estimatedPartitionSize =
+ Math.max(
+ conf.minPartitionSizeToEstimate(),
+ Math.min(tmpTotalWritten / tmpFileCount,
conf.maxPartitionSizeToEstimate()));
} else {
estimatedPartitionSize = initialEstimatedPartitionSize;
}
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 081c1b39e..64fae0b9b 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -27,6 +27,7 @@ import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -720,7 +721,35 @@ public class DefaultMetaSystemSuiteJ {
@Test
public void testHandleUpdatePartitionSize() {
+ statusSystem.partitionTotalWritten.reset();
+ statusSystem.partitionTotalFileCount.reset();
+
+ // Default size
+ statusSystem.handleUpdatePartitionSize();
+ Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.initialEstimatedPartitionSize());
+
+ Long dummy = 1235L;
+ statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, dummy,
getNewReqeustId());
+ String appId2 = "app02";
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+
+ // Max size
+ statusSystem.handleUpdatePartitionSize();
+ Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.maxPartitionSizeToEstimate());
+
+ statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+
+ // Size between minEstimateSize -> maxEstimateSize
+ statusSystem.handleUpdatePartitionSize();
+ Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
+
+ statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1000l, 1, dummy,
getNewReqeustId());
+
+ // Min size
statusSystem.handleUpdatePartitionSize();
+ Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.minPartitionSizeToEstimate());
}
@Test
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 772845ec0..d7cf715a0 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -1121,8 +1121,35 @@ public class RatisMasterStatusSystemSuiteJ {
AbstractMetaManager statusSystem = pickLeaderStatusSystem();
Assert.assertNotNull(statusSystem);
+ CelebornConf conf = new CelebornConf();
+ statusSystem.partitionTotalWritten.reset();
+ statusSystem.partitionTotalFileCount.reset();
+
statusSystem.handleUpdatePartitionSize();
- Thread.sleep(3000L);
+ Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.initialEstimatedPartitionSize());
+
+ Long dummy = 1235L;
+ statusSystem.handleAppHeartbeat(APPID1, 10000000000l, 1, dummy,
getNewReqeustId());
+ String appId2 = "app02";
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+
+ // Max size
+ statusSystem.handleUpdatePartitionSize();
+ Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.maxPartitionSizeToEstimate());
+
+ statusSystem.handleAppHeartbeat(APPID1, 1000000000l, 1, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
+
+ // Size between minEstimateSize -> maxEstimateSize
+ statusSystem.handleUpdatePartitionSize();
+ Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
+
+ statusSystem.handleAppHeartbeat(APPID1, 1000l, 1, dummy,
getNewReqeustId());
+ statusSystem.handleAppHeartbeat(appId2, 1000l, 1, dummy,
getNewReqeustId());
+
+ // Min size
+ statusSystem.handleUpdatePartitionSize();
+ Assert.assertEquals(statusSystem.estimatedPartitionSize,
conf.minPartitionSizeToEstimate());
}
@AfterClass