This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 6ea1ee2ec [CELEBORN-152] Add config to limit max workers when offering
slots
6ea1ee2ec is described below
commit 6ea1ee2ec424c1011eef80a5d74c54cc1464f471
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Mon Aug 7 10:13:53 2023 +0800
[CELEBORN-152] Add config to limit max workers when offering slots
### What changes were proposed in this pull request?
Add config to limit max workers when offering slots, the config can be set
both
in server side and client side. Celeborn will choose the smaller positive
configs from client and master.
### Why are the changes needed?
For large Celeborn clusters, users may want to limit the number of workers
that
a shuffle can spread, reasons are:
1. One worker failure will not affect all applications
2. One huge shuffle will not affect all applications
3. It's more efficient to limit a shuffle within a restricted number of
workers, say 100, than
spreading across a large number of workers, say 1000, because the
network connections
in pushing data is `number of ShuffleClient` * `number of allocated
Workers`
The recommended number of Workers should depend on workload and Worker
hardware,
and this can be configured per application, so it's relatively flexible.
### Does this PR introduce _any_ user-facing change?
No, added a new configuration.
### How was this patch tested?
Added ITs and passes GA.
Closes #1790 from waitinfuture/152.
Authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../apache/celeborn/client/LifecycleManager.scala | 4 +-
common/src/main/proto/TransportMessages.proto | 1 +
.../org/apache/celeborn/common/CelebornConf.scala | 20 ++++++++
.../common/protocol/message/ControlMessages.scala | 4 ++
docs/configuration/client.md | 1 +
docs/configuration/master.md | 1 +
.../celeborn/service/deploy/master/Master.scala | 14 ++++-
.../spark/SlotsAssignMaxWorkersLargeTest.scala | 59 +++++++++++++++++++++
.../spark/SlotsAssignMaxWorkersSmallTest.scala | 60 ++++++++++++++++++++++
9 files changed, 161 insertions(+), 3 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 5dbaa95aa..dfe3d4165 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -58,6 +58,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
private val lifecycleHost = Utils.localHostName(conf)
private val shuffleExpiredCheckIntervalMs =
conf.shuffleExpiredCheckIntervalMs
+ private val slotsAssignMaxWorkers = conf.clientSlotAssignMaxWorkers
private val pushReplicateEnabled = conf.clientPushReplicateEnabled
private val pushRackAwareEnabled = conf.clientReserveSlotsRackAwareEnabled
private val partitionSplitThreshold = conf.shufflePartitionSplitThreshold
@@ -1014,7 +1015,8 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
lifecycleHost,
pushReplicateEnabled,
pushRackAwareEnabled,
- userIdentifier)
+ userIdentifier,
+ slotsAssignMaxWorkers)
val res = requestMasterRequestSlots(req)
if (res.status != StatusCode.SUCCESS) {
requestMasterRequestSlots(req)
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 172dc1a91..b59ab2c2a 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -185,6 +185,7 @@ message PbRequestSlots {
int32 storageType = 7;
PbUserIdentifier userIdentifier = 8;
bool shouldRackAware = 9;
+ int32 maxWorkers = 10;
}
message PbSlotInfo {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 381c9da27..afa8276b0 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -511,6 +511,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def masterSlotAssignLoadAwareFetchTimeWeight: Double =
get(MASTER_SLOT_ASSIGN_LOADAWARE_FETCHTIME_WEIGHT)
def masterSlotAssignExtraSlots: Int = get(MASTER_SLOT_ASSIGN_EXTRA_SLOTS)
+ def masterSlotAssignMaxWorkers: Int = get(MASTER_SLOT_ASSIGN_MAX_WORKERS)
def initialEstimatedPartitionSize: Long =
get(ESTIMATED_PARTITION_SIZE_INITIAL_SIZE)
def estimatedPartitionSizeUpdaterInitialDelay: Long =
get(ESTIMATED_PARTITION_SIZE_UPDATE_INITIAL_DELAY)
@@ -770,6 +771,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def pushDataTimeoutMs: Long = get(CLIENT_PUSH_DATA_TIMEOUT)
def clientPushLimitStrategy: String = get(CLIENT_PUSH_LIMIT_STRATEGY)
def clientPushSlowStartInitialSleepTime: Long =
get(CLIENT_PUSH_SLOW_START_INITIAL_SLEEP_TIME)
+ def clientSlotAssignMaxWorkers: Int = get(CLIENT_SLOT_ASSIGN_MAX_WORKERS)
def clientPushSlowStartMaxSleepMills: Long =
get(CLIENT_PUSH_SLOW_START_MAX_SLEEP_TIME)
def clientPushLimitInFlightTimeoutMs: Long =
if (clientPushReplicateEnabled) {
@@ -1874,6 +1876,15 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(2)
+ val MASTER_SLOT_ASSIGN_MAX_WORKERS: ConfigEntry[Int] =
+ buildConf("celeborn.master.slot.assign.maxWorkers")
+ .categories("master")
+ .version("0.3.1")
+ .doc("Max workers that slots of one shuffle can be allocated on. Will
choose the smaller positive one " +
+ s"from Master side and Client side, see
`celeborn.client.slot.assign.maxWorkers`.")
+ .intConf
+ .createWithDefault(10000)
+
val ESTIMATED_PARTITION_SIZE_INITIAL_SIZE: ConfigEntry[Long] =
buildConf("celeborn.master.estimatedPartitionSize.initialSize")
.withAlternative("celeborn.shuffle.initialEstimatedPartitionSize")
@@ -3347,6 +3358,15 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
+ val CLIENT_SLOT_ASSIGN_MAX_WORKERS: ConfigEntry[Int] =
+ buildConf("celeborn.client.slot.assign.maxWorkers")
+ .categories("client")
+ .version("0.3.1")
+ .doc("Max workers that slots of one shuffle can be allocated on. Will
choose the smaller positive one " +
+ s"from Master side and Client side, see
`${CelebornConf.MASTER_SLOT_ASSIGN_MAX_WORKERS.key}`.")
+ .intConf
+ .createWithDefault(10000)
+
val CLIENT_CLOSE_IDLE_CONNECTIONS: ConfigEntry[Boolean] =
buildConf("celeborn.client.closeIdleConnections")
.categories("client")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index faeb02b17..c9c6a2f49 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -166,6 +166,7 @@ object ControlMessages extends Logging {
shouldReplicate: Boolean,
shouldRackAware: Boolean,
userIdentifier: UserIdentifier,
+ maxWorkers: Int,
override var requestId: String = ZERO_UUID)
extends MasterRequestMessage
@@ -471,6 +472,7 @@ object ControlMessages extends Logging {
shouldReplicate,
shouldRackAware,
userIdentifier,
+ maxWorkers,
requestId) =>
val payload = PbRequestSlots.newBuilder()
.setApplicationId(applicationId)
@@ -479,6 +481,7 @@ object ControlMessages extends Logging {
.setHostname(hostname)
.setShouldReplicate(shouldReplicate)
.setShouldRackAware(shouldRackAware)
+ .setMaxWorkers(maxWorkers)
.setRequestId(requestId)
.setUserIdentifier(PbSerDeUtils.toPbUserIdentifier(userIdentifier))
.build().toByteArray
@@ -800,6 +803,7 @@ object ControlMessages extends Logging {
pbRequestSlots.getShouldReplicate,
pbRequestSlots.getShouldRackAware,
userIdentifier,
+ pbRequestSlots.getMaxWorkers,
pbRequestSlots.getRequestId)
case REQUEST_SLOTS_RESPONSE_VALUE =>
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 9f7a96656..bedc97ad5 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -89,6 +89,7 @@ license: |
| celeborn.client.shuffle.partitionSplit.mode | SOFT | soft: the shuffle file
size might be larger than split threshold. hard: the shuffle file size will be
limited to split threshold. | 0.3.0 |
| celeborn.client.shuffle.partitionSplit.threshold | 1G | Shuffle file size
threshold, if file size exceeds this, trigger split. | 0.3.0 |
| celeborn.client.shuffle.rangeReadFilter.enabled | false | If a spark
application have skewed partition, this value can set to true to improve
performance. | 0.2.0 |
+| celeborn.client.slot.assign.maxWorkers | 10000 | Max workers that slots of
one shuffle can be allocated on. Will choose the smaller positive one from
Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. |
0.3.1 |
| celeborn.client.spark.push.sort.memory.threshold | 64m | When
SortBasedPusher use memory over the threshold, will trigger push data. If the
pipeline push feature is enabled
(`celeborn.client.spark.push.sort.pipeline.enabled=true`), the SortBasedPusher
will trigger a data push when the memory usage exceeds half of the threshold(by
default, 32m). | 0.3.0 |
| celeborn.client.spark.push.sort.pipeline.enabled | false | Whether to enable
pipelining for sort based shuffle writer. If true, double buffering will be
used to pipeline push | 0.3.0 |
| celeborn.client.spark.push.unsafeRow.fastWrite.enabled | true | This is
Celeborn's optimization on UnsafeRow for Spark and it's true by default. If you
have changed UnsafeRow's memory layout set this to false. | 0.2.2 |
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 695b3d84a..a9bdbb031 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -32,6 +32,7 @@ license: |
| celeborn.master.slot.assign.loadAware.fetchTimeWeight | 1.0 | Weight of
average fetch time when calculating ordering in load-aware assignment strategy
| 0.3.0 |
| celeborn.master.slot.assign.loadAware.flushTimeWeight | 0.0 | Weight of
average flush time when calculating ordering in load-aware assignment strategy
| 0.3.0 |
| celeborn.master.slot.assign.loadAware.numDiskGroups | 5 | This configuration
is a guidance for load-aware slot allocation algorithm. This value is control
how many disk groups will be created. | 0.3.0 |
+| celeborn.master.slot.assign.maxWorkers | 10000 | Max workers that slots of
one shuffle can be allocated on. Will choose the smaller positive one from
Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`. |
0.3.1 |
| celeborn.master.slot.assign.policy | ROUNDROBIN | Policy for master to
assign slots, Celeborn supports two types of policy: roundrobin and loadaware.
Loadaware policy will be ignored when `HDFS` is enabled in
`celeborn.storage.activeTypes` | 0.3.0 |
| celeborn.master.userResourceConsumption.update.interval | 30s | Time length
for a window about compute user resource consumption. | 0.3.0 |
| celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available
options: HDD,SSD,HDFS. | 0.3.0 |
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 04d98a217..d5ce62827 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -20,6 +20,7 @@ package org.apache.celeborn.service.deploy.master
import java.io.IOException
import java.net.BindException
import java.util
+import java.util.Collections
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
import scala.collection.JavaConverters._
@@ -117,6 +118,7 @@ private[celeborn] class Master(
private def diskReserveSize = conf.workerDiskReserveSize
+ private val slotsAssignMaxWorkers = conf.masterSlotAssignMaxWorkers
private val slotsAssignLoadAwareDiskGroupNum =
conf.masterSlotAssignLoadAwareDiskGroupNum
private val slotsAssignLoadAwareDiskGroupGradient =
conf.masterSlotAssignLoadAwareDiskGroupGradient
@@ -301,7 +303,7 @@ private[celeborn] class Master(
userResourceConsumption,
requestId))
- case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _) =>
+ case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _) =>
logTrace(s"Received RequestSlots request $requestSlots.")
executeWithLeaderChecker(context, handleRequestSlots(context,
requestSlots))
@@ -554,7 +556,15 @@ private[celeborn] class Master(
val numReducers = requestSlots.partitionIdList.size()
val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId,
requestSlots.shuffleId)
- val availableWorkers = workersAvailable()
+ var availableWorkers = workersAvailable()
+ Collections.shuffle(availableWorkers)
+ val numWorkers = Math.min(
+ Math.max(
+ if (requestSlots.shouldReplicate) 2 else 1,
+ if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers
+ else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)),
+ availableWorkers.size())
+ availableWorkers = availableWorkers.subList(0, numWorkers)
// offer slots
val slots =
masterSource.sample(MasterSource.OFFER_SLOTS_TIME,
s"offerSlots-${Random.nextInt()}") {
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersLargeTest.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersLargeTest.scala
new file mode 100644
index 000000000..4114e13c1
--- /dev/null
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersLargeTest.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.tests.spark
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.celeborn.client.ShuffleClient
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.ShuffleMode
+
+class SlotsAssignMaxWorkersLargeTest extends AnyFunSuite
+ with SparkTestBase
+ with BeforeAndAfterEach {
+
+ override def beforeAll(): Unit = {
+ logInfo("test initialized, setup Celeborn mini cluster")
+ val masterConf = Map(
+ s"${CelebornConf.CLIENT_SLOT_ASSIGN_MAX_WORKERS.key}" -> "10")
+ setUpMiniCluster(masterConf = masterConf, workerConf = null)
+ }
+
+ override def beforeEach(): Unit = {
+ ShuffleClient.reset()
+ }
+
+ override def afterEach(): Unit = {
+ System.gc()
+ }
+
+ test("celeborn spark integration test - slots assign maxWorkers small") {
+ val sparkConf = new SparkConf()
+ .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
+ .setAppName("celeborn-demo").setMaster("local[2]")
+ val ss = SparkSession.builder()
+ .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+ .getOrCreate()
+ ss.sparkContext.parallelize(1 to 1000, 2)
+ .map { i => (i, Range(1, 1000).mkString(",")) }.groupByKey(16).collect()
+ ss.stop()
+ }
+}
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersSmallTest.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersSmallTest.scala
new file mode 100644
index 000000000..20c33ae0e
--- /dev/null
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersSmallTest.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.tests.spark
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.celeborn.client.ShuffleClient
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.ShuffleMode
+
+class SlotsAssignMaxWorkersSmallTest extends AnyFunSuite
+ with SparkTestBase
+ with BeforeAndAfterEach {
+
+ override def beforeAll(): Unit = {
+ logInfo("test initialized, setup Celeborn mini cluster")
+ val masterConf = Map(
+ s"${CelebornConf.CLIENT_SLOT_ASSIGN_MAX_WORKERS.key}" -> "5")
+ setUpMiniCluster(masterConf = masterConf, workerConf = null)
+ }
+
+ override def beforeEach(): Unit = {
+ ShuffleClient.reset()
+ }
+
+ override def afterEach(): Unit = {
+ System.gc()
+ }
+
+ test("celeborn spark integration test - slots assign maxWorkers small") {
+ val sparkConf = new SparkConf()
+ .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "true")
+ .set(s"spark.${CelebornConf.CLIENT_SLOT_ASSIGN_MAX_WORKERS.key}", "1")
+ .setAppName("celeborn-demo").setMaster("local[2]")
+ val ss = SparkSession.builder()
+ .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+ .getOrCreate()
+ ss.sparkContext.parallelize(1 to 1000, 2)
+ .map { i => (i, Range(1, 1000).mkString(",")) }.groupByKey(16).collect()
+ ss.stop()
+ }
+}