This is an automated email from the ASF dual-hosted git repository. zhouky pushed a commit to branch branch-0.3 in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit a1de276b87f3a74cea3bf0bea8fbafb0a0dd6457 Author: zky.zhoukeyong <[email protected]> AuthorDate: Mon Aug 7 10:13:53 2023 +0800 [CELEBORN-152] Add config to limit max workers when offering slots ### What changes were proposed in this pull request? Add config to limit max workers when offering slots, the config can be set both in server side and client side. Celeborn will choose the smaller positive configs from client and master. ### Why are the changes needed? For large Celeborn clusters, users may want to limit the number of workers that a shuffle can spread, reasons are: 1. One worker failure will not affect all applications 2. One huge shuffle will not affect all applications 3. It's more efficient to limit a shuffle within a restricted number of workers, say 100, than spreading across a large number of workers, say 1000, because the network connections in pushing data is `number of ShuffleClient` * `number of allocated Workers` The recommended number of Workers should depend on workload and Worker hardware, and this can be configured per application, so it's relatively flexible. ### Does this PR introduce _any_ user-facing change? No, added a new configuration. ### How was this patch tested? Added ITs and passes GA. Closes #1790 from waitinfuture/152. Authored-by: zky.zhoukeyong <[email protected]> Signed-off-by: zky.zhoukeyong <[email protected]> --- .../apache/celeborn/client/LifecycleManager.scala | 4 +- common/src/main/proto/TransportMessages.proto | 1 + .../org/apache/celeborn/common/CelebornConf.scala | 20 ++++++++ .../common/protocol/message/ControlMessages.scala | 4 ++ docs/configuration/client.md | 1 + docs/configuration/master.md | 1 + .../celeborn/service/deploy/master/Master.scala | 14 ++++- .../spark/SlotsAssignMaxWorkersLargeTest.scala | 59 +++++++++++++++++++++ .../spark/SlotsAssignMaxWorkersSmallTest.scala | 60 ++++++++++++++++++++++ 9 files changed, 161 insertions(+), 3 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 5dbaa95aa..dfe3d4165 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -58,6 +58,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends private val lifecycleHost = Utils.localHostName(conf) private val shuffleExpiredCheckIntervalMs = conf.shuffleExpiredCheckIntervalMs + private val slotsAssignMaxWorkers = conf.clientSlotAssignMaxWorkers private val pushReplicateEnabled = conf.clientPushReplicateEnabled private val pushRackAwareEnabled = conf.clientReserveSlotsRackAwareEnabled private val partitionSplitThreshold = conf.shufflePartitionSplitThreshold @@ -1014,7 +1015,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends lifecycleHost, pushReplicateEnabled, pushRackAwareEnabled, - userIdentifier) + userIdentifier, + slotsAssignMaxWorkers) val res = requestMasterRequestSlots(req) if (res.status != StatusCode.SUCCESS) { requestMasterRequestSlots(req) diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index ae37c27a8..075bde8c7 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -185,6 +185,7 @@ message PbRequestSlots { int32 storageType = 7; PbUserIdentifier userIdentifier = 8; bool shouldRackAware = 9; + int32 maxWorkers = 10; } message PbSlotInfo { diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 98ede7709..dd748a130 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -511,6 +511,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def masterSlotAssignLoadAwareFetchTimeWeight: Double = get(MASTER_SLOT_ASSIGN_LOADAWARE_FETCHTIME_WEIGHT) def masterSlotAssignExtraSlots: Int = get(MASTER_SLOT_ASSIGN_EXTRA_SLOTS) + def masterSlotAssignMaxWorkers: Int = get(MASTER_SLOT_ASSIGN_MAX_WORKERS) def initialEstimatedPartitionSize: Long = get(ESTIMATED_PARTITION_SIZE_INITIAL_SIZE) def estimatedPartitionSizeUpdaterInitialDelay: Long = get(ESTIMATED_PARTITION_SIZE_UPDATE_INITIAL_DELAY) @@ -770,6 +771,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def pushDataTimeoutMs: Long = get(CLIENT_PUSH_DATA_TIMEOUT) def clientPushLimitStrategy: String = get(CLIENT_PUSH_LIMIT_STRATEGY) def clientPushSlowStartInitialSleepTime: Long = get(CLIENT_PUSH_SLOW_START_INITIAL_SLEEP_TIME) + def clientSlotAssignMaxWorkers: Int = get(CLIENT_SLOT_ASSIGN_MAX_WORKERS) def clientPushSlowStartMaxSleepMills: Long = get(CLIENT_PUSH_SLOW_START_MAX_SLEEP_TIME) def clientPushLimitInFlightTimeoutMs: Long = if (clientPushReplicateEnabled) { @@ -1874,6 +1876,15 @@ object CelebornConf extends Logging { .intConf .createWithDefault(2) + val MASTER_SLOT_ASSIGN_MAX_WORKERS: ConfigEntry[Int] = + buildConf("celeborn.master.slot.assign.maxWorkers") + .categories("master") + .version("0.3.1") + .doc("Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one " + + s"from Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`.") + .intConf + .createWithDefault(10000) + val ESTIMATED_PARTITION_SIZE_INITIAL_SIZE: ConfigEntry[Long] = buildConf("celeborn.master.estimatedPartitionSize.initialSize") .withAlternative("celeborn.shuffle.initialEstimatedPartitionSize") @@ -3347,6 +3358,15 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(false) + val CLIENT_SLOT_ASSIGN_MAX_WORKERS: ConfigEntry[Int] = + buildConf("celeborn.client.slot.assign.maxWorkers") + .categories("client") + .version("0.3.1") + .doc("Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one " + + s"from Master side and Client side, see `${CelebornConf.MASTER_SLOT_ASSIGN_MAX_WORKERS.key}`.") + .intConf + .createWithDefault(10000) + val CLIENT_CLOSE_IDLE_CONNECTIONS: ConfigEntry[Boolean] = buildConf("celeborn.client.closeIdleConnections") .categories("client") diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index df2df24ad..9607b98c9 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -166,6 +166,7 @@ object ControlMessages extends Logging { shouldReplicate: Boolean, shouldRackAware: Boolean, userIdentifier: UserIdentifier, + maxWorkers: Int, override var requestId: String = ZERO_UUID) extends MasterRequestMessage @@ -478,6 +479,7 @@ object ControlMessages extends Logging { shouldReplicate, shouldRackAware, userIdentifier, + maxWorkers, requestId) => val payload = PbRequestSlots.newBuilder() .setApplicationId(applicationId) @@ -486,6 +488,7 @@ object ControlMessages extends Logging { .setHostname(hostname) .setShouldReplicate(shouldReplicate) .setShouldRackAware(shouldRackAware) + .setMaxWorkers(maxWorkers) .setRequestId(requestId) .setUserIdentifier(PbSerDeUtils.toPbUserIdentifier(userIdentifier)) .build().toByteArray @@ -826,6 +829,7 @@ object ControlMessages extends Logging { pbRequestSlots.getShouldReplicate, pbRequestSlots.getShouldRackAware, userIdentifier, + pbRequestSlots.getMaxWorkers, pbRequestSlots.getRequestId) case REQUEST_SLOTS_RESPONSE_VALUE => diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 14b317598..2ffbc5e2b 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -89,6 +89,7 @@ license: | | celeborn.client.shuffle.partitionSplit.mode | SOFT | soft: the shuffle file size might be larger than split threshold. hard: the shuffle file size will be limited to split threshold. | 0.3.0 | | celeborn.client.shuffle.partitionSplit.threshold | 1G | Shuffle file size threshold, if file size exceeds this, trigger split. | 0.3.0 | | celeborn.client.shuffle.rangeReadFilter.enabled | false | If a spark application have skewed partition, this value can set to true to improve performance. | 0.2.0 | +| celeborn.client.slot.assign.maxWorkers | 10000 | Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. | 0.3.1 | | celeborn.client.spark.push.sort.memory.threshold | 64m | When SortBasedPusher use memory over the threshold, will trigger push data. If the pipeline push feature is enabled (`celeborn.client.spark.push.sort.pipeline.enabled=true`), the SortBasedPusher will trigger a data push when the memory usage exceeds half of the threshold(by default, 32m). | 0.3.0 | | celeborn.client.spark.push.sort.pipeline.enabled | false | Whether to enable pipelining for sort based shuffle writer. If true, double buffering will be used to pipeline push | 0.3.0 | | celeborn.client.spark.push.unsafeRow.fastWrite.enabled | true | This is Celeborn's optimization on UnsafeRow for Spark and it's true by default. If you have changed UnsafeRow's memory layout set this to false. | 0.2.2 | diff --git a/docs/configuration/master.md b/docs/configuration/master.md index 695b3d84a..a9bdbb031 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -32,6 +32,7 @@ license: | | celeborn.master.slot.assign.loadAware.fetchTimeWeight | 1.0 | Weight of average fetch time when calculating ordering in load-aware assignment strategy | 0.3.0 | | celeborn.master.slot.assign.loadAware.flushTimeWeight | 0.0 | Weight of average flush time when calculating ordering in load-aware assignment strategy | 0.3.0 | | celeborn.master.slot.assign.loadAware.numDiskGroups | 5 | This configuration is a guidance for load-aware slot allocation algorithm. This value is control how many disk groups will be created. | 0.3.0 | +| celeborn.master.slot.assign.maxWorkers | 10000 | Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`. | 0.3.1 | | celeborn.master.slot.assign.policy | ROUNDROBIN | Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. Loadaware policy will be ignored when `HDFS` is enabled in `celeborn.storage.activeTypes` | 0.3.0 | | celeborn.master.userResourceConsumption.update.interval | 30s | Time length for a window about compute user resource consumption. | 0.3.0 | | celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available options: HDD,SSD,HDFS. | 0.3.0 | diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 3c34b4ab2..865787f10 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -20,6 +20,7 @@ package org.apache.celeborn.service.deploy.master import java.io.IOException import java.net.BindException import java.util +import java.util.Collections import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} import scala.collection.JavaConverters._ @@ -117,6 +118,7 @@ private[celeborn] class Master( private def diskReserveSize = conf.workerDiskReserveSize + private val slotsAssignMaxWorkers = conf.masterSlotAssignMaxWorkers private val slotsAssignLoadAwareDiskGroupNum = conf.masterSlotAssignLoadAwareDiskGroupNum private val slotsAssignLoadAwareDiskGroupGradient = conf.masterSlotAssignLoadAwareDiskGroupGradient @@ -301,7 +303,7 @@ private[celeborn] class Master( userResourceConsumption, requestId)) - case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _) => + case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _) => logTrace(s"Received RequestSlots request $requestSlots.") executeWithLeaderChecker(context, handleRequestSlots(context, requestSlots)) @@ -557,7 +559,15 @@ private[celeborn] class Master( val numReducers = requestSlots.partitionIdList.size() val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, requestSlots.shuffleId) - val availableWorkers = workersAvailable() + var availableWorkers = workersAvailable() + Collections.shuffle(availableWorkers) + val numWorkers = Math.min( + Math.max( + if (requestSlots.shouldReplicate) 2 else 1, + if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers + else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)), + availableWorkers.size()) + availableWorkers = availableWorkers.subList(0, numWorkers) // offer slots val slots = masterSource.sample(MasterSource.OFFER_SLOTS_TIME, s"offerSlots-${Random.nextInt()}") { diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersLargeTest.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersLargeTest.scala new file mode 100644 index 000000000..4114e13c1 --- /dev/null +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersLargeTest.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.tests.spark + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.scalatest.BeforeAndAfterEach +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.celeborn.client.ShuffleClient +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.protocol.ShuffleMode + +class SlotsAssignMaxWorkersLargeTest extends AnyFunSuite + with SparkTestBase + with BeforeAndAfterEach { + + override def beforeAll(): Unit = { + logInfo("test initialized, setup Celeborn mini cluster") + val masterConf = Map( + s"${CelebornConf.CLIENT_SLOT_ASSIGN_MAX_WORKERS.key}" -> "10") + setUpMiniCluster(masterConf = masterConf, workerConf = null) + } + + override def beforeEach(): Unit = { + ShuffleClient.reset() + } + + override def afterEach(): Unit = { + System.gc() + } + + test("celeborn spark integration test - slots assign maxWorkers small") { + val sparkConf = new SparkConf() + .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false") + .setAppName("celeborn-demo").setMaster("local[2]") + val ss = SparkSession.builder() + .config(updateSparkConf(sparkConf, ShuffleMode.HASH)) + .getOrCreate() + ss.sparkContext.parallelize(1 to 1000, 2) + .map { i => (i, Range(1, 1000).mkString(",")) }.groupByKey(16).collect() + ss.stop() + } +} diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersSmallTest.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersSmallTest.scala new file mode 100644 index 000000000..20c33ae0e --- /dev/null +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SlotsAssignMaxWorkersSmallTest.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.tests.spark + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.scalatest.BeforeAndAfterEach +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.celeborn.client.ShuffleClient +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.protocol.ShuffleMode + +class SlotsAssignMaxWorkersSmallTest extends AnyFunSuite + with SparkTestBase + with BeforeAndAfterEach { + + override def beforeAll(): Unit = { + logInfo("test initialized, setup Celeborn mini cluster") + val masterConf = Map( + s"${CelebornConf.CLIENT_SLOT_ASSIGN_MAX_WORKERS.key}" -> "5") + setUpMiniCluster(masterConf = masterConf, workerConf = null) + } + + override def beforeEach(): Unit = { + ShuffleClient.reset() + } + + override def afterEach(): Unit = { + System.gc() + } + + test("celeborn spark integration test - slots assign maxWorkers small") { + val sparkConf = new SparkConf() + .set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "true") + .set(s"spark.${CelebornConf.CLIENT_SLOT_ASSIGN_MAX_WORKERS.key}", "1") + .setAppName("celeborn-demo").setMaster("local[2]") + val ss = SparkSession.builder() + .config(updateSparkConf(sparkConf, ShuffleMode.HASH)) + .getOrCreate() + ss.sparkContext.parallelize(1 to 1000, 2) + .map { i => (i, Range(1, 1000).mkString(",")) }.groupByKey(16).collect() + ss.stop() + } +}
