This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new aceee64c7 [CELEBORN-2018] Support min number of workers selected for 
shuffle
aceee64c7 is described below

commit aceee64c73f8feb310dc393676a7941131348a7e
Author: Sanskar Modi <[email protected]>
AuthorDate: Sun Jun 1 08:23:53 2025 -0700

    [CELEBORN-2018] Support min number of workers selected for shuffle
    
    ### What changes were proposed in this pull request?
    Support min number of workers to assign slots on for a shuffle.
    
    ### Why are the changes needed?
    
    PR https://github.com/apache/celeborn/pull/3039 updated the default value 
of `celeborn.master.slot.assign.extraSlots` to avoid skew in shuffle stage with 
less number of reducers. However, it will also affect the stage with large 
number of reducers, thus not ideal.
    
    We are introducing a new config `celeborn.master.slot.assign.minWorkers` 
which will ensure that shuffle stages with less number of reducers will not 
cause load imbalance on few nodes.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes
    
    ### How was this patch tested?
    NA
    
    Closes #3297 from s0nskar/min_workers.
    
    Authored-by: Sanskar Modi <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../scala/org/apache/celeborn/common/CelebornConf.scala  | 13 +++++++++++--
 docs/configuration/master.md                             |  3 ++-
 docs/migration.md                                        |  2 +-
 .../apache/celeborn/service/deploy/master/Master.scala   | 16 +++++++++++-----
 4 files changed, 25 insertions(+), 9 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index a70381bbd..0ec7de075 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -674,6 +674,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
     get(MASTER_SLOT_ASSIGN_LOADAWARE_FETCHTIME_WEIGHT)
   def masterSlotAssignExtraSlots: Int = get(MASTER_SLOT_ASSIGN_EXTRA_SLOTS)
   def masterSlotAssignMaxWorkers: Int = get(MASTER_SLOT_ASSIGN_MAX_WORKERS)
+  def masterSlotAssignMinWorkers: Int = get(MASTER_SLOT_ASSIGN_MIN_WORKERS)
   def initialEstimatedPartitionSize: Long = 
get(ESTIMATED_PARTITION_SIZE_INITIAL_SIZE)
   def estimatedPartitionSizeUpdaterInitialDelay: Long =
     get(ESTIMATED_PARTITION_SIZE_UPDATE_INITIAL_DELAY)
@@ -2974,9 +2975,9 @@ object CelebornConf extends Logging {
       .withAlternative("celeborn.slots.assign.extraSlots")
       .categories("master")
       .version("0.3.0")
-      .doc("Extra slots number when master assign slots.")
+      .doc("Extra slots number when master assign slots. Provided enough 
workers are available.")
       .intConf
-      .createWithDefault(100)
+      .createWithDefault(2)
 
   val MASTER_SLOT_ASSIGN_MAX_WORKERS: ConfigEntry[Int] =
     buildConf("celeborn.master.slot.assign.maxWorkers")
@@ -2987,6 +2988,14 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(10000)
 
+  val MASTER_SLOT_ASSIGN_MIN_WORKERS: ConfigEntry[Int] =
+    buildConf("celeborn.master.slot.assign.minWorkers")
+      .categories("master")
+      .version("0.6.0")
+      .doc("Min workers that slots of one shuffle should be allocated on. 
Provided enough workers are available.")
+      .intConf
+      .createWithDefault(100)
+
   val ESTIMATED_PARTITION_SIZE_INITIAL_SIZE: ConfigEntry[Long] =
     buildConf("celeborn.master.estimatedPartitionSize.initialSize")
       .withAlternative("celeborn.shuffle.initialEstimatedPartitionSize")
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index add7c136e..f24d89e52 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -71,12 +71,13 @@ license: |
 | celeborn.master.port | 9097 | false | Port for master to bind. | 0.2.0 |  | 
 | celeborn.master.rackResolver.refresh.interval | 30s | false | Interval for 
refreshing the node rack information periodically. | 0.5.0 |  | 
 | celeborn.master.send.applicationMeta.threads | 8 | false | Number of threads 
used by the Master to send ApplicationMeta to Workers. | 0.5.0 |  | 
-| celeborn.master.slot.assign.extraSlots | 100 | false | Extra slots number 
when master assign slots. | 0.3.0 | celeborn.slots.assign.extraSlots | 
+| celeborn.master.slot.assign.extraSlots | 2 | false | Extra slots number when 
master assign slots. Provided enough workers are available. | 0.3.0 | 
celeborn.slots.assign.extraSlots | 
 | celeborn.master.slot.assign.loadAware.diskGroupGradient | 0.1 | false | This 
value means how many more workload will be placed into a faster disk group than 
a slower group. | 0.3.0 | celeborn.slots.assign.loadAware.diskGroupGradient | 
 | celeborn.master.slot.assign.loadAware.fetchTimeWeight | 1.0 | false | Weight 
of average fetch time when calculating ordering in load-aware assignment 
strategy | 0.3.0 | celeborn.slots.assign.loadAware.fetchTimeWeight | 
 | celeborn.master.slot.assign.loadAware.flushTimeWeight | 0.0 | false | Weight 
of average flush time when calculating ordering in load-aware assignment 
strategy | 0.3.0 | celeborn.slots.assign.loadAware.flushTimeWeight | 
 | celeborn.master.slot.assign.loadAware.numDiskGroups | 5 | false | This 
configuration is a guidance for load-aware slot allocation algorithm. This 
value is control how many disk groups will be created. | 0.3.0 | 
celeborn.slots.assign.loadAware.numDiskGroups | 
 | celeborn.master.slot.assign.maxWorkers | 10000 | false | Max workers that 
slots of one shuffle can be allocated on. Will choose the smaller positive one 
from Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`. 
| 0.3.1 |  | 
+| celeborn.master.slot.assign.minWorkers | 100 | false | Min workers that 
slots of one shuffle should be allocated on. Provided enough workers are 
available. | 0.6.0 |  | 
 | celeborn.master.slot.assign.policy | ROUNDROBIN | false | Policy for master 
to assign slots, Celeborn supports two types of policy: roundrobin and 
loadaware. Loadaware policy will be ignored when `HDFS` is enabled in 
`celeborn.storage.availableTypes` | 0.3.0 | celeborn.slots.assign.policy | 
 | celeborn.master.userResourceConsumption.metrics.enabled | false | false | 
Whether to enable resource consumption metrics. | 0.6.0 |  | 
 | celeborn.master.userResourceConsumption.update.interval | 30s | false | Time 
length for a window about compute user resource consumption. | 0.3.0 |  | 
diff --git a/docs/migration.md b/docs/migration.md
index 34b7fe7c4..b3105e3e1 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -33,7 +33,7 @@ license: |
 
 - Since 0.6.0, Celeborn modified `celeborn.master.hdfs.expireDirs.timeout` to 
`celeborn.master.dfs.expireDirs.timeout`. Please use 
`cceleborn.master.dfs.expireDirs.timeout` if you want to set timeout for an 
expired dirs to be deleted.
 
-- Since 0.6.0, Celeborn changed the default value of 
`celeborn.master.slot.assign.extraSlots` from `2` to `100`, which means 
Celeborn will involve more workers in offering slots.
+- Since 0.6.0, Celeborn introduced `celeborn.master.slot.assign.minWorkers` 
with default value of `100`, which means Celeborn will involve more workers in 
offering slots when number of reducers are less.
 
 - Since 0.6.0, Celeborn deprecate 
`celeborn.worker.congestionControl.low.watermark`. Please use 
`celeborn.worker.congestionControl.diskBuffer.low.watermark` instead.
 
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 8316f1706..a91996986 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -201,6 +201,8 @@ private[celeborn] class Master(
   private val tagsManager = new TagsManager(Option(configService))
 
   private val slotsAssignMaxWorkers = conf.masterSlotAssignMaxWorkers
+  private val slotsAssignMinWorkers = conf.masterSlotAssignMinWorkers
+  private val slotsAssignExtraSlots = conf.masterSlotAssignExtraSlots
   private val slotsAssignLoadAwareDiskGroupNum = 
conf.masterSlotAssignLoadAwareDiskGroupNum
   private val slotsAssignLoadAwareDiskGroupGradient =
     conf.masterSlotAssignLoadAwareDiskGroupGradient
@@ -974,11 +976,14 @@ private[celeborn] class Master(
         .asScala.map { case (worker, slots) => worker.toUniqueId -> slots 
}.asJava,
       requestSlots.requestId)
 
-    logInfo(s"Offer slots successfully for $numReducers reducers of 
$shuffleKey" +
-      s" on ${slots.size()} workers.")
-
+    var offerSlotsMsg = s"Successfully offered slots for $numReducers reducers 
of $shuffleKey" +
+      s" on ${slots.size()} workers"
     val workersNotSelected = 
availableWorkers.asScala.filter(!slots.containsKey(_))
-    val offerSlotsExtraSize = Math.min(conf.masterSlotAssignExtraSlots, 
workersNotSelected.size)
+    val offerSlotsExtraSize = Math.min(
+      Math.max(
+        slotsAssignExtraSlots,
+        slotsAssignMinWorkers - slots.size()),
+      workersNotSelected.size)
     if (offerSlotsExtraSize > 0) {
       var index = Random.nextInt(workersNotSelected.size)
       (1 to offerSlotsExtraSize).foreach(_ => {
@@ -987,8 +992,9 @@ private[celeborn] class Master(
           (new util.ArrayList[PartitionLocation](), new 
util.ArrayList[PartitionLocation]()))
         index = (index + 1) % workersNotSelected.size
       })
-      logInfo(s"Offered extra $offerSlotsExtraSize slots for $shuffleKey")
+      offerSlotsMsg += s", offered $offerSlotsExtraSize extra slots"
     }
+    logInfo(offerSlotsMsg + ".")
 
     ShuffleAuditLogger.audit(
       shuffleKey,

Reply via email to