This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit a1de276b87f3a74cea3bf0bea8fbafb0a0dd6457
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Mon Aug 7 10:13:53 2023 +0800

    [CELEBORN-152] Add config to limit max workers when offering slots
    
    ### What changes were proposed in this pull request?
    Add config to limit max workers when offering slots, the config can be set 
both
    in server side and client side. Celeborn will choose the smaller positive 
configs from client and master.
    
    ### Why are the changes needed?
    For large Celeborn clusters, users may want to limit the number of workers 
that
    a shuffle can spread, reasons are:
    
    1. One worker failure will not affect all applications
    2. One huge shuffle will not affect all applications
    3. It's more efficient to limit a shuffle within a restricted number of 
workers, say 100, than
        spreading across a large number of workers, say 1000, because the 
network connections
       in pushing data is `number of ShuffleClient` * `number of allocated 
Workers`
    
    The recommended number of Workers should depend on workload and Worker 
hardware,
    and this can be configured per application, so it's relatively flexible.
    
    ### Does this PR introduce _any_ user-facing change?
    No, added a new configuration.
    
    ### How was this patch tested?
    Added ITs and passes GA.
    
    Closes #1790 from waitinfuture/152.
    
    Authored-by: zky.zhoukeyong <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../apache/celeborn/client/LifecycleManager.scala  |  4 +-
 common/src/main/proto/TransportMessages.proto      |  1 +
 .../org/apache/celeborn/common/CelebornConf.scala  | 20 ++++++++
 .../common/protocol/message/ControlMessages.scala  |  4 ++
 docs/configuration/client.md                       |  1 +
 docs/configuration/master.md                       |  1 +
 .../celeborn/service/deploy/master/Master.scala    | 14 ++++-
 .../spark/SlotsAssignMaxWorkersLargeTest.scala     | 59 +++++++++++++++++++++
 .../spark/SlotsAssignMaxWorkersSmallTest.scala     | 60 ++++++++++++++++++++++
 9 files changed, 161 insertions(+), 3 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 5dbaa95aa..dfe3d4165 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -58,6 +58,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
   private val lifecycleHost = Utils.localHostName(conf)
 
   private val shuffleExpiredCheckIntervalMs = 
conf.shuffleExpiredCheckIntervalMs
+  private val slotsAssignMaxWorkers = conf.clientSlotAssignMaxWorkers
   private val pushReplicateEnabled = conf.clientPushReplicateEnabled
   private val pushRackAwareEnabled = conf.clientReserveSlotsRackAwareEnabled
   private val partitionSplitThreshold = conf.shufflePartitionSplitThreshold
@@ -1014,7 +1015,8 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
         lifecycleHost,
         pushReplicateEnabled,
         pushRackAwareEnabled,
-        userIdentifier)
+        userIdentifier,
+        slotsAssignMaxWorkers)
     val res = requestMasterRequestSlots(req)
     if (res.status != StatusCode.SUCCESS) {
       requestMasterRequestSlots(req)
diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index ae37c27a8..075bde8c7 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -185,6 +185,7 @@ message PbRequestSlots {
   int32 storageType = 7;
   PbUserIdentifier userIdentifier = 8;
   bool shouldRackAware = 9;
+  int32 maxWorkers = 10;
 }
 
 message PbSlotInfo {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 98ede7709..dd748a130 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -511,6 +511,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def masterSlotAssignLoadAwareFetchTimeWeight: Double =
     get(MASTER_SLOT_ASSIGN_LOADAWARE_FETCHTIME_WEIGHT)
   def masterSlotAssignExtraSlots: Int = get(MASTER_SLOT_ASSIGN_EXTRA_SLOTS)
+  def masterSlotAssignMaxWorkers: Int = get(MASTER_SLOT_ASSIGN_MAX_WORKERS)
   def initialEstimatedPartitionSize: Long = 
get(ESTIMATED_PARTITION_SIZE_INITIAL_SIZE)
   def estimatedPartitionSizeUpdaterInitialDelay: Long =
     get(ESTIMATED_PARTITION_SIZE_UPDATE_INITIAL_DELAY)
@@ -770,6 +771,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def pushDataTimeoutMs: Long = get(CLIENT_PUSH_DATA_TIMEOUT)
   def clientPushLimitStrategy: String = get(CLIENT_PUSH_LIMIT_STRATEGY)
   def clientPushSlowStartInitialSleepTime: Long = 
get(CLIENT_PUSH_SLOW_START_INITIAL_SLEEP_TIME)
+  def clientSlotAssignMaxWorkers: Int = get(CLIENT_SLOT_ASSIGN_MAX_WORKERS)
   def clientPushSlowStartMaxSleepMills: Long = 
get(CLIENT_PUSH_SLOW_START_MAX_SLEEP_TIME)
   def clientPushLimitInFlightTimeoutMs: Long =
     if (clientPushReplicateEnabled) {
@@ -1874,6 +1876,15 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(2)
 
+  val MASTER_SLOT_ASSIGN_MAX_WORKERS: ConfigEntry[Int] =
+    buildConf("celeborn.master.slot.assign.maxWorkers")
+      .categories("master")
+      .version("0.3.1")
+      .doc("Max workers that slots of one shuffle can be allocated on. Will 
choose the smaller positive one " +
+        s"from Master side and Client side, see 
`celeborn.client.slot.assign.maxWorkers`.")
+      .intConf
+      .createWithDefault(10000)
+
   val ESTIMATED_PARTITION_SIZE_INITIAL_SIZE: ConfigEntry[Long] =
     buildConf("celeborn.master.estimatedPartitionSize.initialSize")
       .withAlternative("celeborn.shuffle.initialEstimatedPartitionSize")
@@ -3347,6 +3358,15 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(false)
 
+  val CLIENT_SLOT_ASSIGN_MAX_WORKERS: ConfigEntry[Int] =
+    buildConf("celeborn.client.slot.assign.maxWorkers")
+      .categories("client")
+      .version("0.3.1")
+      .doc("Max workers that slots of one shuffle can be allocated on. Will 
choose the smaller positive one " +
+        s"from Master side and Client side, see 
`${CelebornConf.MASTER_SLOT_ASSIGN_MAX_WORKERS.key}`.")
+      .intConf
+      .createWithDefault(10000)
+
   val CLIENT_CLOSE_IDLE_CONNECTIONS: ConfigEntry[Boolean] =
     buildConf("celeborn.client.closeIdleConnections")
       .categories("client")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index df2df24ad..9607b98c9 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -166,6 +166,7 @@ object ControlMessages extends Logging {
       shouldReplicate: Boolean,
       shouldRackAware: Boolean,
       userIdentifier: UserIdentifier,
+      maxWorkers: Int,
       override var requestId: String = ZERO_UUID)
     extends MasterRequestMessage
 
@@ -478,6 +479,7 @@ object ControlMessages extends Logging {
           shouldReplicate,
           shouldRackAware,
           userIdentifier,
+          maxWorkers,
           requestId) =>
       val payload = PbRequestSlots.newBuilder()
         .setApplicationId(applicationId)
@@ -486,6 +488,7 @@ object ControlMessages extends Logging {
         .setHostname(hostname)
         .setShouldReplicate(shouldReplicate)
         .setShouldRackAware(shouldRackAware)
+        .setMaxWorkers(maxWorkers)
         .setRequestId(requestId)
         .setUserIdentifier(PbSerDeUtils.toPbUserIdentifier(userIdentifier))
         .build().toByteArray
@@ -826,6 +829,7 @@ object ControlMessages extends Logging {
           pbRequestSlots.getShouldReplicate,
           pbRequestSlots.getShouldRackAware,
           userIdentifier,
+          pbRequestSlots.getMaxWorkers,
           pbRequestSlots.getRequestId)
 
       case REQUEST_SLOTS_RESPONSE_VALUE =>
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 14b317598..2ffbc5e2b 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -89,6 +89,7 @@ license: |
 | celeborn.client.shuffle.partitionSplit.mode | SOFT | soft: the shuffle file 
size might be larger than split threshold. hard: the shuffle file size will be 
limited to split threshold. | 0.3.0 | 
 | celeborn.client.shuffle.partitionSplit.threshold | 1G | Shuffle file size 
threshold, if file size exceeds this, trigger split. | 0.3.0 | 
 | celeborn.client.shuffle.rangeReadFilter.enabled | false | If a spark 
application have skewed partition, this value can set to true to improve 
performance. | 0.2.0 | 
+| celeborn.client.slot.assign.maxWorkers | 10000 | Max workers that slots of 
one shuffle can be allocated on. Will choose the smaller positive one from 
Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. | 
0.3.1 | 
 | celeborn.client.spark.push.sort.memory.threshold | 64m | When 
SortBasedPusher use memory over the threshold, will trigger push data. If the 
pipeline push feature is enabled 
(`celeborn.client.spark.push.sort.pipeline.enabled=true`), the SortBasedPusher 
will trigger a data push when the memory usage exceeds half of the threshold(by 
default, 32m). | 0.3.0 | 
 | celeborn.client.spark.push.sort.pipeline.enabled | false | Whether to enable 
pipelining for sort based shuffle writer. If true, double buffering will be 
used to pipeline push | 0.3.0 | 
 | celeborn.client.spark.push.unsafeRow.fastWrite.enabled | true | This is 
Celeborn's optimization on UnsafeRow for Spark and it's true by default. If you 
have changed UnsafeRow's memory layout set this to false. | 0.2.2 | 
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 695b3d84a..a9bdbb031 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -32,6 +32,7 @@ license: |
 | celeborn.master.slot.assign.loadAware.fetchTimeWeight | 1.0 | Weight of 
average fetch time when calculating ordering in load-aware assignment strategy 
| 0.3.0 | 
 | celeborn.master.slot.assign.loadAware.flushTimeWeight | 0.0 | Weight of 
average flush time when calculating ordering in load-aware assignment strategy 
| 0.3.0 | 
 | celeborn.master.slot.assign.loadAware.numDiskGroups | 5 | This configuration 
is a guidance for load-aware slot allocation algorithm. This value is control 
how many disk groups will be created. | 0.3.0 | 
+| celeborn.master.slot.assign.maxWorkers | 10000 | Max workers that slots of 
one shuffle can be allocated on. Will choose the smaller positive one from 
Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`. | 
0.3.1 | 
 | celeborn.master.slot.assign.policy | ROUNDROBIN | Policy for master to 
assign slots, Celeborn supports two types of policy: roundrobin and loadaware. 
Loadaware policy will be ignored when `HDFS` is enabled in 
`celeborn.storage.activeTypes` | 0.3.0 | 
 | celeborn.master.userResourceConsumption.update.interval | 30s | Time length 
for a window about compute user resource consumption. | 0.3.0 | 
 | celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available 
options: HDD,SSD,HDFS.  | 0.3.0 | 
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 3c34b4ab2..865787f10 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -20,6 +20,7 @@ package org.apache.celeborn.service.deploy.master
 import java.io.IOException
 import java.net.BindException
 import java.util
+import java.util.Collections
 import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
 
 import scala.collection.JavaConverters._
@@ -117,6 +118,7 @@ private[celeborn] class Master(
 
   private def diskReserveSize = conf.workerDiskReserveSize
 
+  private val slotsAssignMaxWorkers = conf.masterSlotAssignMaxWorkers
   private val slotsAssignLoadAwareDiskGroupNum = 
conf.masterSlotAssignLoadAwareDiskGroupNum
   private val slotsAssignLoadAwareDiskGroupGradient =
     conf.masterSlotAssignLoadAwareDiskGroupGradient
@@ -301,7 +303,7 @@ private[celeborn] class Master(
           userResourceConsumption,
           requestId))
 
-    case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _) =>
+    case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _) =>
       logTrace(s"Received RequestSlots request $requestSlots.")
       executeWithLeaderChecker(context, handleRequestSlots(context, 
requestSlots))
 
@@ -557,7 +559,15 @@ private[celeborn] class Master(
     val numReducers = requestSlots.partitionIdList.size()
     val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, 
requestSlots.shuffleId)
 
-    val availableWorkers = workersAvailable()
+    var availableWorkers = workersAvailable()
+    Collections.shuffle(availableWorkers)
+    val numWorkers = Math.min(
+      Math.max(
+        if (requestSlots.shouldReplicate) 2 else 1,
+        if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers
+        else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)),
+      availableWorkers.size())
+    availableWorkers = availableWorkers.subList(0, numWorkers)
     // offer slots
     val slots =
       masterSource.sample(MasterSource.OFFER_SLOTS_TIME, 
s"offerSlots-${Random.nextInt()}") {
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersLargeTest.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersLargeTest.scala
new file mode 100644
index 000000000..4114e13c1
--- /dev/null
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersLargeTest.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.tests.spark
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.celeborn.client.ShuffleClient
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.ShuffleMode
+
+class SlotsAssignMaxWorkersLargeTest extends AnyFunSuite
+  with SparkTestBase
+  with BeforeAndAfterEach {
+
+  override def beforeAll(): Unit = {
+    logInfo("test initialized, setup Celeborn mini cluster")
+    val masterConf = Map(
+      s"${CelebornConf.CLIENT_SLOT_ASSIGN_MAX_WORKERS.key}" -> "10")
+    setUpMiniCluster(masterConf = masterConf, workerConf = null)
+  }
+
+  override def beforeEach(): Unit = {
+    ShuffleClient.reset()
+  }
+
+  override def afterEach(): Unit = {
+    System.gc()
+  }
+
+  test("celeborn spark integration test - slots assign maxWorkers small") {
+    val sparkConf = new SparkConf()
+      .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
+      .setAppName("celeborn-demo").setMaster("local[2]")
+    val ss = SparkSession.builder()
+      .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+      .getOrCreate()
+    ss.sparkContext.parallelize(1 to 1000, 2)
+      .map { i => (i, Range(1, 1000).mkString(",")) }.groupByKey(16).collect()
+    ss.stop()
+  }
+}
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersSmallTest.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersSmallTest.scala
new file mode 100644
index 000000000..20c33ae0e
--- /dev/null
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersSmallTest.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.tests.spark
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.celeborn.client.ShuffleClient
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.ShuffleMode
+
+class SlotsAssignMaxWorkersSmallTest extends AnyFunSuite
+  with SparkTestBase
+  with BeforeAndAfterEach {
+
+  override def beforeAll(): Unit = {
+    logInfo("test initialized, setup Celeborn mini cluster")
+    val masterConf = Map(
+      s"${CelebornConf.CLIENT_SLOT_ASSIGN_MAX_WORKERS.key}" -> "5")
+    setUpMiniCluster(masterConf = masterConf, workerConf = null)
+  }
+
+  override def beforeEach(): Unit = {
+    ShuffleClient.reset()
+  }
+
+  override def afterEach(): Unit = {
+    System.gc()
+  }
+
+  test("celeborn spark integration test - slots assign maxWorkers small") {
+    val sparkConf = new SparkConf()
+      .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "true")
+      .set(s"spark.${CelebornConf.CLIENT_SLOT_ASSIGN_MAX_WORKERS.key}", "1")
+      .setAppName("celeborn-demo").setMaster("local[2]")
+    val ss = SparkSession.builder()
+      .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+      .getOrCreate()
+    ss.sparkContext.parallelize(1 to 1000, 2)
+      .map { i => (i, Range(1, 1000).mkString(",")) }.groupByKey(16).collect()
+    ss.stop()
+  }
+}

Reply via email to