This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ba3cfe604 [CELEBORN-2018] Support min number of workers selected for
shuffle
ba3cfe604 is described below
commit ba3cfe604eaa0d238eb1c1d1c6ab60bf5f9afa83
Author: Sanskar Modi <[email protected]>
AuthorDate: Thu May 29 11:27:14 2025 +0800
[CELEBORN-2018] Support min number of workers selected for shuffle
### What changes were proposed in this pull request?
Support min number of workers to assign slots on for a shuffle.
### Why are the changes needed?
PR https://github.com/apache/celeborn/pull/3039 updated the default value
of `celeborn.master.slot.assign.extraSlots` to avoid skew in shuffle stage with
less number of reducers. However, it will also affect the stage with large
number of reducers, thus not ideal.
We are introducing a new config `celeborn.master.slot.assign.minWorkers`
which will ensure that shuffle stages with less number of reducers will not
cause load imbalance on few nodes.
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
NA
Closes #3297 from s0nskar/min_workers.
Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit 12e91640f68749cabbb4844e2ab7c8b5f254235f)
Signed-off-by: Wang, Fei <[email protected]>
---
.../scala/org/apache/celeborn/common/CelebornConf.scala | 13 +++++++++++--
docs/configuration/master.md | 3 ++-
docs/migration.md | 2 +-
.../apache/celeborn/service/deploy/master/Master.scala | 16 +++++++++++-----
4 files changed, 25 insertions(+), 9 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index a70381bbd..0ec7de075 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -674,6 +674,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
get(MASTER_SLOT_ASSIGN_LOADAWARE_FETCHTIME_WEIGHT)
def masterSlotAssignExtraSlots: Int = get(MASTER_SLOT_ASSIGN_EXTRA_SLOTS)
def masterSlotAssignMaxWorkers: Int = get(MASTER_SLOT_ASSIGN_MAX_WORKERS)
+ def masterSlotAssignMinWorkers: Int = get(MASTER_SLOT_ASSIGN_MIN_WORKERS)
def initialEstimatedPartitionSize: Long =
get(ESTIMATED_PARTITION_SIZE_INITIAL_SIZE)
def estimatedPartitionSizeUpdaterInitialDelay: Long =
get(ESTIMATED_PARTITION_SIZE_UPDATE_INITIAL_DELAY)
@@ -2974,9 +2975,9 @@ object CelebornConf extends Logging {
.withAlternative("celeborn.slots.assign.extraSlots")
.categories("master")
.version("0.3.0")
- .doc("Extra slots number when master assign slots.")
+ .doc("Extra slots number when master assign slots. Provided enough
workers are available.")
.intConf
- .createWithDefault(100)
+ .createWithDefault(2)
val MASTER_SLOT_ASSIGN_MAX_WORKERS: ConfigEntry[Int] =
buildConf("celeborn.master.slot.assign.maxWorkers")
@@ -2987,6 +2988,14 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(10000)
+ val MASTER_SLOT_ASSIGN_MIN_WORKERS: ConfigEntry[Int] =
+ buildConf("celeborn.master.slot.assign.minWorkers")
+ .categories("master")
+ .version("0.6.0")
+ .doc("Min workers that slots of one shuffle should be allocated on.
Provided enough workers are available.")
+ .intConf
+ .createWithDefault(100)
+
val ESTIMATED_PARTITION_SIZE_INITIAL_SIZE: ConfigEntry[Long] =
buildConf("celeborn.master.estimatedPartitionSize.initialSize")
.withAlternative("celeborn.shuffle.initialEstimatedPartitionSize")
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index add7c136e..f24d89e52 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -71,12 +71,13 @@ license: |
| celeborn.master.port | 9097 | false | Port for master to bind. | 0.2.0 | |
| celeborn.master.rackResolver.refresh.interval | 30s | false | Interval for
refreshing the node rack information periodically. | 0.5.0 | |
| celeborn.master.send.applicationMeta.threads | 8 | false | Number of threads
used by the Master to send ApplicationMeta to Workers. | 0.5.0 | |
-| celeborn.master.slot.assign.extraSlots | 100 | false | Extra slots number
when master assign slots. | 0.3.0 | celeborn.slots.assign.extraSlots |
+| celeborn.master.slot.assign.extraSlots | 2 | false | Extra slots number when
master assign slots. Provided enough workers are available. | 0.3.0 |
celeborn.slots.assign.extraSlots |
| celeborn.master.slot.assign.loadAware.diskGroupGradient | 0.1 | false | This
value means how many more workload will be placed into a faster disk group than
a slower group. | 0.3.0 | celeborn.slots.assign.loadAware.diskGroupGradient |
| celeborn.master.slot.assign.loadAware.fetchTimeWeight | 1.0 | false | Weight
of average fetch time when calculating ordering in load-aware assignment
strategy | 0.3.0 | celeborn.slots.assign.loadAware.fetchTimeWeight |
| celeborn.master.slot.assign.loadAware.flushTimeWeight | 0.0 | false | Weight
of average flush time when calculating ordering in load-aware assignment
strategy | 0.3.0 | celeborn.slots.assign.loadAware.flushTimeWeight |
| celeborn.master.slot.assign.loadAware.numDiskGroups | 5 | false | This
configuration is a guidance for load-aware slot allocation algorithm. This
value is control how many disk groups will be created. | 0.3.0 |
celeborn.slots.assign.loadAware.numDiskGroups |
| celeborn.master.slot.assign.maxWorkers | 10000 | false | Max workers that
slots of one shuffle can be allocated on. Will choose the smaller positive one
from Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`.
| 0.3.1 | |
+| celeborn.master.slot.assign.minWorkers | 100 | false | Min workers that
slots of one shuffle should be allocated on. Provided enough workers are
available. | 0.6.0 | |
| celeborn.master.slot.assign.policy | ROUNDROBIN | false | Policy for master
to assign slots, Celeborn supports two types of policy: roundrobin and
loadaware. Loadaware policy will be ignored when `HDFS` is enabled in
`celeborn.storage.availableTypes` | 0.3.0 | celeborn.slots.assign.policy |
| celeborn.master.userResourceConsumption.metrics.enabled | false | false |
Whether to enable resource consumption metrics. | 0.6.0 | |
| celeborn.master.userResourceConsumption.update.interval | 30s | false | Time
length for a window about compute user resource consumption. | 0.3.0 | |
diff --git a/docs/migration.md b/docs/migration.md
index 34b7fe7c4..b3105e3e1 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -33,7 +33,7 @@ license: |
- Since 0.6.0, Celeborn modified `celeborn.master.hdfs.expireDirs.timeout` to
`celeborn.master.dfs.expireDirs.timeout`. Please use
`cceleborn.master.dfs.expireDirs.timeout` if you want to set timeout for an
expired dirs to be deleted.
-- Since 0.6.0, Celeborn changed the default value of
`celeborn.master.slot.assign.extraSlots` from `2` to `100`, which means
Celeborn will involve more workers in offering slots.
+- Since 0.6.0, Celeborn introduced `celeborn.master.slot.assign.minWorkers`
with default value of `100`, which means Celeborn will involve more workers in
offering slots when number of reducers are less.
- Since 0.6.0, Celeborn deprecate
`celeborn.worker.congestionControl.low.watermark`. Please use
`celeborn.worker.congestionControl.diskBuffer.low.watermark` instead.
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 8316f1706..dbd277849 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -201,6 +201,8 @@ private[celeborn] class Master(
private val tagsManager = new TagsManager(Option(configService))
private val slotsAssignMaxWorkers = conf.masterSlotAssignMaxWorkers
+ private val slotsAssignMinWorkers = conf.masterSlotAssignMinWorkers
+ private val slotsAssignExtraSlots = conf.masterSlotAssignExtraSlots
private val slotsAssignLoadAwareDiskGroupNum =
conf.masterSlotAssignLoadAwareDiskGroupNum
private val slotsAssignLoadAwareDiskGroupGradient =
conf.masterSlotAssignLoadAwareDiskGroupGradient
@@ -974,11 +976,14 @@ private[celeborn] class Master(
.asScala.map { case (worker, slots) => worker.toUniqueId -> slots
}.asJava,
requestSlots.requestId)
- logInfo(s"Offer slots successfully for $numReducers reducers of
$shuffleKey" +
- s" on ${slots.size()} workers.")
-
+ var offerSlotsMsg = s"Successfully offered slots for $numReducers reducers
of $shuffleKey" +
+ s" on ${slots.size()} workers"
val workersNotSelected =
availableWorkers.asScala.filter(!slots.containsKey(_))
- val offerSlotsExtraSize = Math.min(conf.masterSlotAssignExtraSlots,
workersNotSelected.size)
+ val offerSlotsExtraSize = Math.min(
+ Math.max(
+ slotsAssignExtraSlots,
+ slots.size() - slotsAssignMinWorkers),
+ workersNotSelected.size)
if (offerSlotsExtraSize > 0) {
var index = Random.nextInt(workersNotSelected.size)
(1 to offerSlotsExtraSize).foreach(_ => {
@@ -987,8 +992,9 @@ private[celeborn] class Master(
(new util.ArrayList[PartitionLocation](), new
util.ArrayList[PartitionLocation]()))
index = (index + 1) % workersNotSelected.size
})
- logInfo(s"Offered extra $offerSlotsExtraSize slots for $shuffleKey")
+ offerSlotsMsg += s", offered $offerSlotsExtraSize extra slots"
}
+ logInfo(offerSlotsMsg + ".")
ShuffleAuditLogger.audit(
shuffleKey,