This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 8516df4be [CELEBORN-1151] Request slots when register shuffle should
filter the workers excluded by application
8516df4be is described below
commit 8516df4beb37627a4a312ae3906a3ba860ad4037
Author: wangshengjie <[email protected]>
AuthorDate: Tue Dec 12 10:02:18 2023 +0800
[CELEBORN-1151] Request slots when register shuffle should filter the
workers excluded by application
### What changes were proposed in this pull request?
When request slots, filter workers excluded by application
### Why are the changes needed?
If worker alive but can not service, register shuffle will remove the
worker from application client exclude list and next shuffle may reserve slots
on this worker,this will cause application revive unexpectly
### Does this PR introduce _any_ user-facing change?
Yes, request slots will filter workers excluded by application
### How was this patch tested?
UT,
Closes #2131 from wangshengjie123/fix-request-slots-blacklist.
Authored-by: wangshengjie <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../apache/celeborn/client/LifecycleManager.scala | 17 +++-
.../celeborn/client/WorkerStatusTracker.scala | 2 +-
common/src/main/proto/TransportMessages.proto | 1 +
.../org/apache/celeborn/common/CelebornConf.scala | 11 +++
.../common/protocol/message/ControlMessages.scala | 6 ++
docs/configuration/client.md | 1 +
.../celeborn/service/deploy/master/Master.scala | 10 +-
.../tests/client/LifecycleManagerSuit.scala | 108 +++++++++++++++++++++
8 files changed, 151 insertions(+), 5 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index fa6face39..9454ae2c0 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -95,6 +95,8 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
private val rpcCacheConcurrencyLevel = conf.clientRpcCacheConcurrencyLevel
private val rpcCacheExpireTime = conf.clientRpcCacheExpireTime
+ private val excludedWorkersFilter =
conf.registerShuffleFilterExcludedWorkerEnabled
+
private val registerShuffleResponseRpcCache: Cache[Int, ByteBuffer] =
CacheBuilder.newBuilder()
.concurrencyLevel(rpcCacheConcurrencyLevel)
.expireAfterAccess(rpcCacheExpireTime, TimeUnit.MILLISECONDS)
@@ -495,6 +497,10 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
return
case StatusCode.SUCCESS =>
logInfo(s"OfferSlots for $shuffleId Success!Slots Info:
${res.workerResource}")
+ case StatusCode.WORKER_EXCLUDED =>
+ logInfo(s"OfferSlots for $shuffleId failed due to all workers be
excluded!")
+
replyRegisterShuffle(RegisterShuffleResponse(StatusCode.WORKER_EXCLUDED,
Array.empty))
+ return
case _ => // won't happen
throw new UnsupportedOperationException()
}
@@ -1244,9 +1250,15 @@ class LifecycleManager(val appUniqueId: String, val
conf: CelebornConf) extends
}
}
- private def requestMasterRequestSlotsWithRetry(
+ def requestMasterRequestSlotsWithRetry(
shuffleId: Int,
ids: util.ArrayList[Integer]): RequestSlotsResponse = {
+ val excludedWorkerSet =
+ if (excludedWorkersFilter) {
+ workerStatusTracker.excludedWorkers.asScala.keys.toSet
+ } else {
+ Set.empty[WorkerInfo]
+ }
val req =
RequestSlots(
appUniqueId,
@@ -1257,7 +1269,8 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
pushRackAwareEnabled,
userIdentifier,
slotsAssignMaxWorkers,
- availableStorageTypes)
+ availableStorageTypes,
+ excludedWorkerSet)
val res = requestMasterRequestSlots(req)
if (res.status != StatusCode.SUCCESS) {
requestMasterRequestSlots(req)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
index ef570e787..d2f602424 100644
--- a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
@@ -17,7 +17,7 @@
package org.apache.celeborn.client
-import java.util.{HashSet => JHashSet, List => JList, Set => JSet}
+import java.util.{HashSet => JHashSet, Set => JSet}
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index b33271e0e..4fd2bbc6b 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -215,6 +215,7 @@ message PbRequestSlots {
bool shouldRackAware = 9;
int32 maxWorkers = 10;
int32 availableStorageTypes = 11;
+ repeated PbWorkerInfo excludedWorkerSet = 12;
}
message PbSlotInfo {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 0960529e6..c24e84524 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -880,6 +880,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def readLocalShuffleThreads: Int = get(READ_LOCAL_SHUFFLE_THREADS)
def readStreamCreatorPoolThreads: Int = get(READ_STREAM_CREATOR_POOL_THREADS)
+ def registerShuffleFilterExcludedWorkerEnabled: Boolean =
+ get(REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED)
+
// //////////////////////////////////////////////////////
// Worker //
// //////////////////////////////////////////////////////
@@ -4198,4 +4201,12 @@ object CelebornConf extends Logging {
.doc("Interval for refreshing the corresponding dynamic config
periodically.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")
+
+ val REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.client.shuffle.register.filterExcludedWorker.enabled")
+ .categories("client")
+ .version("0.4.0")
+ .doc("Whether to filter excluded worker when register shuffle.")
+ .booleanConf
+ .createWithDefault(false)
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 63fae0775..394edb795 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -171,6 +171,7 @@ object ControlMessages extends Logging {
userIdentifier: UserIdentifier,
maxWorkers: Int,
availableStorageTypes: Int,
+ excludedWorkerSet: Set[WorkerInfo] = Set.empty,
override var requestId: String = ZERO_UUID)
extends MasterRequestMessage
@@ -555,6 +556,7 @@ object ControlMessages extends Logging {
userIdentifier,
maxWorkers,
availableStorageTypes,
+ excludedWorkerSet,
requestId) =>
val payload = PbRequestSlots.newBuilder()
.setApplicationId(applicationId)
@@ -567,6 +569,7 @@ object ControlMessages extends Logging {
.setRequestId(requestId)
.setAvailableStorageTypes(availableStorageTypes)
.setUserIdentifier(PbSerDeUtils.toPbUserIdentifier(userIdentifier))
+
.addAllExcludedWorkerSet(excludedWorkerSet.map(PbSerDeUtils.toPbWorkerInfo(_,
true)).asJava)
.build().toByteArray
new TransportMessage(MessageType.REQUEST_SLOTS, payload)
@@ -930,6 +933,8 @@ object ControlMessages extends Logging {
case REQUEST_SLOTS_VALUE =>
val pbRequestSlots = PbRequestSlots.parseFrom(message.getPayload)
val userIdentifier =
PbSerDeUtils.fromPbUserIdentifier(pbRequestSlots.getUserIdentifier)
+ val excludedWorkerInfoSet =
+
pbRequestSlots.getExcludedWorkerSetList.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toSet
RequestSlots(
pbRequestSlots.getApplicationId,
pbRequestSlots.getShuffleId,
@@ -940,6 +945,7 @@ object ControlMessages extends Logging {
userIdentifier,
pbRequestSlots.getMaxWorkers,
pbRequestSlots.getAvailableStorageTypes,
+ excludedWorkerInfoSet,
pbRequestSlots.getRequestId)
case REQUEST_SLOTS_RESPONSE_VALUE =>
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index c46103e98..dfab73579 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -98,6 +98,7 @@ license: |
| celeborn.client.shuffle.partitionSplit.mode | SOFT | soft: the shuffle file
size might be larger than split threshold. hard: the shuffle file size will be
limited to split threshold. | 0.3.0 |
| celeborn.client.shuffle.partitionSplit.threshold | 1G | Shuffle file size
threshold, if file size exceeds this, trigger split. | 0.3.0 |
| celeborn.client.shuffle.rangeReadFilter.enabled | false | If a spark
application have skewed partition, this value can set to true to improve
performance. | 0.2.0 |
+| celeborn.client.shuffle.register.filterExcludedWorker.enabled | false |
Whether to filter excluded worker when register shuffle. | 0.4.0 |
| celeborn.client.slot.assign.maxWorkers | 10000 | Max workers that slots of
one shuffle can be allocated on. Will choose the smaller positive one from
Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. |
0.3.1 |
| celeborn.client.spark.fetch.throwsFetchFailure | false | client throws
FetchFailedException instead of CelebornIOException | 0.4.0 |
| celeborn.client.spark.push.sort.memory.threshold | 64m | When
SortBasedPusher use memory over the threshold, will trigger push data. If the
pipeline push feature is enabled
(`celeborn.client.spark.push.sort.pipeline.enabled=true`), the SortBasedPusher
will trigger a data push when the memory usage exceeds half of the threshold(by
default, 32m). | 0.3.0 |
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 85e7a2da9..2235d8cfc 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -357,7 +357,7 @@ private[celeborn] class Master(
// keep it for compatible reason
context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS))
- case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _, _) =>
+ case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _, _, _) =>
logTrace(s"Received RequestSlots request $requestSlots.")
executeWithLeaderChecker(context, handleRequestSlots(context,
requestSlots))
@@ -664,8 +664,14 @@ private[celeborn] class Master(
val numReducers = requestSlots.partitionIdList.size()
val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId,
requestSlots.shuffleId)
- val availableWorkers = workersAvailable()
+ val availableWorkers = workersAvailable(requestSlots.excludedWorkerSet)
val numAvailableWorkers = availableWorkers.size()
+
+ if (numAvailableWorkers == 0) {
+ logError(s"Offer slots for $shuffleKey failed due to all workers are
excluded!")
+ context.reply(RequestSlotsResponse(StatusCode.WORKER_EXCLUDED, new
WorkerResource()))
+ }
+
val numWorkers = Math.min(
Math.max(
if (requestSlots.shouldReplicate) 2 else 1,
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuit.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuit.scala
new file mode 100644
index 000000000..ac99d12fd
--- /dev/null
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuit.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.tests.client
+
+import java.util
+
+import org.apache.celeborn.client.{LifecycleManager, WithShuffleClientSuite}
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.message.StatusCode
+import org.apache.celeborn.service.deploy.MiniClusterFeature
+
+class LifecycleManagerSuit extends WithShuffleClientSuite with
MiniClusterFeature {
+ private val masterPort = 19097
+
+ celebornConf.set(CelebornConf.MASTER_ENDPOINTS.key, s"localhost:$masterPort")
+ .set(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key, "true")
+ .set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ val masterConf = Map(
+ "celeborn.master.host" -> "localhost",
+ "celeborn.master.port" -> masterPort.toString)
+ val workerConf = Map(
+ "celeborn.master.endpoints" -> s"localhost:$masterPort")
+ setUpMiniCluster(masterConf, workerConf)
+ }
+
+ test("CELEBORN-1151: test request slots with client blacklist worker with
filter enabled") {
+
celebornConf.set(CelebornConf.REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED,
true)
+ val lifecycleManager: LifecycleManager = new LifecycleManager(APP,
celebornConf)
+
+ val arrayList = new util.ArrayList[Integer]()
+ (0 to 10).foreach(i => {
+ arrayList.add(i)
+ })
+
+ // test request slots without worker excluded
+ val headWorkerInfo = workerInfos.keySet.head.workerInfo
+ val res1 = lifecycleManager.requestMasterRequestSlotsWithRetry(0,
arrayList)
+ .workerResource.keySet()
+ assert(res1.contains(headWorkerInfo))
+
+ // test request slots with 1 worker excluded, result should not contains
the excluded worker
+ val commitFilesFailedWorkers = new LifecycleManager.ShuffleFailedWorkers()
+ commitFilesFailedWorkers.put(
+ workerInfos.keySet.head.workerInfo,
+ (StatusCode.PUSH_DATA_TIMEOUT_PRIMARY, System.currentTimeMillis()))
+
lifecycleManager.workerStatusTracker.recordWorkerFailure(commitFilesFailedWorkers)
+ val res2 = lifecycleManager.requestMasterRequestSlotsWithRetry(1,
arrayList)
+ .workerResource.keySet()
+ assert(!res2.contains(headWorkerInfo))
+
+ // test request slots with all workers excluded, response should be
WORKER_EXCLUDED
+ workerInfos.keySet.foreach(worker =>
+ commitFilesFailedWorkers.put(
+ worker.workerInfo,
+ (StatusCode.PUSH_DATA_TIMEOUT_PRIMARY, System.currentTimeMillis())))
+
lifecycleManager.workerStatusTracker.recordWorkerFailure(commitFilesFailedWorkers)
+ val status = lifecycleManager.requestMasterRequestSlotsWithRetry(2,
arrayList).status
+ assert(status == StatusCode.WORKER_EXCLUDED)
+
+ lifecycleManager.stop()
+ }
+
+ test("CELEBORN-1151: test request slots with client blacklist worker with
filter not enabled") {
+
celebornConf.set(CelebornConf.REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED,
false)
+ val lifecycleManager: LifecycleManager = new LifecycleManager(APP,
celebornConf)
+
+ val arrayList = new util.ArrayList[Integer]()
+ (0 to 10).foreach(i => {
+ arrayList.add(i)
+ })
+
+ // test request slots with all workers excluded, response should not
excluded any worker
+ val commitFilesFailedWorkers = new LifecycleManager.ShuffleFailedWorkers()
+ workerInfos.keySet.foreach(worker =>
+ commitFilesFailedWorkers.put(
+ worker.workerInfo,
+ (StatusCode.PUSH_DATA_TIMEOUT_PRIMARY, System.currentTimeMillis())))
+
lifecycleManager.workerStatusTracker.recordWorkerFailure(commitFilesFailedWorkers)
+ val res = lifecycleManager.requestMasterRequestSlotsWithRetry(0, arrayList)
+ .workerResource.keySet()
+ assert(res.size() == workerInfos.size)
+ assert(res.contains(workerInfos.keySet.head.workerInfo))
+ lifecycleManager.stop()
+ }
+
+ override def afterAll(): Unit = {
+ logInfo("all test complete , stop celeborn mini cluster")
+ shutdownMiniCluster()
+ }
+}