This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 8516df4be [CELEBORN-1151] Request slots when register shuffle should 
filter the workers excluded by application
8516df4be is described below

commit 8516df4beb37627a4a312ae3906a3ba860ad4037
Author: wangshengjie <[email protected]>
AuthorDate: Tue Dec 12 10:02:18 2023 +0800

    [CELEBORN-1151] Request slots when register shuffle should filter the 
workers excluded by application
    
    ### What changes were proposed in this pull request?
    When request slots, filter workers excluded by application
    
    ### Why are the changes needed?
    If worker alive but can not service, register shuffle will remove the 
worker from application client exclude list and next shuffle may reserve slots 
on this worker,this will cause application revive unexpectly
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, request slots will filter workers excluded by application
    
    ### How was this patch tested?
    UT,
    
    Closes #2131 from wangshengjie123/fix-request-slots-blacklist.
    
    Authored-by: wangshengjie <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../apache/celeborn/client/LifecycleManager.scala  |  17 +++-
 .../celeborn/client/WorkerStatusTracker.scala      |   2 +-
 common/src/main/proto/TransportMessages.proto      |   1 +
 .../org/apache/celeborn/common/CelebornConf.scala  |  11 +++
 .../common/protocol/message/ControlMessages.scala  |   6 ++
 docs/configuration/client.md                       |   1 +
 .../celeborn/service/deploy/master/Master.scala    |  10 +-
 .../tests/client/LifecycleManagerSuit.scala        | 108 +++++++++++++++++++++
 8 files changed, 151 insertions(+), 5 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index fa6face39..9454ae2c0 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -95,6 +95,8 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
   private val rpcCacheConcurrencyLevel = conf.clientRpcCacheConcurrencyLevel
   private val rpcCacheExpireTime = conf.clientRpcCacheExpireTime
 
+  private val excludedWorkersFilter = 
conf.registerShuffleFilterExcludedWorkerEnabled
+
   private val registerShuffleResponseRpcCache: Cache[Int, ByteBuffer] = 
CacheBuilder.newBuilder()
     .concurrencyLevel(rpcCacheConcurrencyLevel)
     .expireAfterAccess(rpcCacheExpireTime, TimeUnit.MILLISECONDS)
@@ -495,6 +497,10 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
         return
       case StatusCode.SUCCESS =>
         logInfo(s"OfferSlots for $shuffleId Success!Slots Info: 
${res.workerResource}")
+      case StatusCode.WORKER_EXCLUDED =>
+        logInfo(s"OfferSlots for $shuffleId failed due to all workers be 
excluded!")
+        
replyRegisterShuffle(RegisterShuffleResponse(StatusCode.WORKER_EXCLUDED, 
Array.empty))
+        return
       case _ => // won't happen
         throw new UnsupportedOperationException()
     }
@@ -1244,9 +1250,15 @@ class LifecycleManager(val appUniqueId: String, val 
conf: CelebornConf) extends
     }
   }
 
-  private def requestMasterRequestSlotsWithRetry(
+  def requestMasterRequestSlotsWithRetry(
       shuffleId: Int,
       ids: util.ArrayList[Integer]): RequestSlotsResponse = {
+    val excludedWorkerSet =
+      if (excludedWorkersFilter) {
+        workerStatusTracker.excludedWorkers.asScala.keys.toSet
+      } else {
+        Set.empty[WorkerInfo]
+      }
     val req =
       RequestSlots(
         appUniqueId,
@@ -1257,7 +1269,8 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
         pushRackAwareEnabled,
         userIdentifier,
         slotsAssignMaxWorkers,
-        availableStorageTypes)
+        availableStorageTypes,
+        excludedWorkerSet)
     val res = requestMasterRequestSlots(req)
     if (res.status != StatusCode.SUCCESS) {
       requestMasterRequestSlots(req)
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala 
b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
index ef570e787..d2f602424 100644
--- a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
@@ -17,7 +17,7 @@
 
 package org.apache.celeborn.client
 
-import java.util.{HashSet => JHashSet, List => JList, Set => JSet}
+import java.util.{HashSet => JHashSet, Set => JSet}
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index b33271e0e..4fd2bbc6b 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -215,6 +215,7 @@ message PbRequestSlots {
   bool shouldRackAware = 9;
   int32 maxWorkers = 10;
   int32 availableStorageTypes = 11;
+  repeated PbWorkerInfo excludedWorkerSet = 12;
 }
 
 message PbSlotInfo {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 0960529e6..c24e84524 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -880,6 +880,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def readLocalShuffleThreads: Int = get(READ_LOCAL_SHUFFLE_THREADS)
   def readStreamCreatorPoolThreads: Int = get(READ_STREAM_CREATOR_POOL_THREADS)
 
+  def registerShuffleFilterExcludedWorkerEnabled: Boolean =
+    get(REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED)
+
   // //////////////////////////////////////////////////////
   //                       Worker                        //
   // //////////////////////////////////////////////////////
@@ -4198,4 +4201,12 @@ object CelebornConf extends Logging {
       .doc("Interval for refreshing the corresponding dynamic config 
periodically.")
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("120s")
+
+  val REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.client.shuffle.register.filterExcludedWorker.enabled")
+      .categories("client")
+      .version("0.4.0")
+      .doc("Whether to filter excluded worker when register shuffle.")
+      .booleanConf
+      .createWithDefault(false)
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 63fae0775..394edb795 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -171,6 +171,7 @@ object ControlMessages extends Logging {
       userIdentifier: UserIdentifier,
       maxWorkers: Int,
       availableStorageTypes: Int,
+      excludedWorkerSet: Set[WorkerInfo] = Set.empty,
       override var requestId: String = ZERO_UUID)
     extends MasterRequestMessage
 
@@ -555,6 +556,7 @@ object ControlMessages extends Logging {
           userIdentifier,
           maxWorkers,
           availableStorageTypes,
+          excludedWorkerSet,
           requestId) =>
       val payload = PbRequestSlots.newBuilder()
         .setApplicationId(applicationId)
@@ -567,6 +569,7 @@ object ControlMessages extends Logging {
         .setRequestId(requestId)
         .setAvailableStorageTypes(availableStorageTypes)
         .setUserIdentifier(PbSerDeUtils.toPbUserIdentifier(userIdentifier))
+        
.addAllExcludedWorkerSet(excludedWorkerSet.map(PbSerDeUtils.toPbWorkerInfo(_, 
true)).asJava)
         .build().toByteArray
       new TransportMessage(MessageType.REQUEST_SLOTS, payload)
 
@@ -930,6 +933,8 @@ object ControlMessages extends Logging {
       case REQUEST_SLOTS_VALUE =>
         val pbRequestSlots = PbRequestSlots.parseFrom(message.getPayload)
         val userIdentifier = 
PbSerDeUtils.fromPbUserIdentifier(pbRequestSlots.getUserIdentifier)
+        val excludedWorkerInfoSet =
+          
pbRequestSlots.getExcludedWorkerSetList.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toSet
         RequestSlots(
           pbRequestSlots.getApplicationId,
           pbRequestSlots.getShuffleId,
@@ -940,6 +945,7 @@ object ControlMessages extends Logging {
           userIdentifier,
           pbRequestSlots.getMaxWorkers,
           pbRequestSlots.getAvailableStorageTypes,
+          excludedWorkerInfoSet,
           pbRequestSlots.getRequestId)
 
       case REQUEST_SLOTS_RESPONSE_VALUE =>
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index c46103e98..dfab73579 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -98,6 +98,7 @@ license: |
 | celeborn.client.shuffle.partitionSplit.mode | SOFT | soft: the shuffle file 
size might be larger than split threshold. hard: the shuffle file size will be 
limited to split threshold. | 0.3.0 | 
 | celeborn.client.shuffle.partitionSplit.threshold | 1G | Shuffle file size 
threshold, if file size exceeds this, trigger split. | 0.3.0 | 
 | celeborn.client.shuffle.rangeReadFilter.enabled | false | If a spark 
application have skewed partition, this value can set to true to improve 
performance. | 0.2.0 | 
+| celeborn.client.shuffle.register.filterExcludedWorker.enabled | false | 
Whether to filter excluded worker when register shuffle. | 0.4.0 | 
 | celeborn.client.slot.assign.maxWorkers | 10000 | Max workers that slots of 
one shuffle can be allocated on. Will choose the smaller positive one from 
Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. | 
0.3.1 | 
 | celeborn.client.spark.fetch.throwsFetchFailure | false | client throws 
FetchFailedException instead of CelebornIOException | 0.4.0 | 
 | celeborn.client.spark.push.sort.memory.threshold | 64m | When 
SortBasedPusher use memory over the threshold, will trigger push data. If the 
pipeline push feature is enabled 
(`celeborn.client.spark.push.sort.pipeline.enabled=true`), the SortBasedPusher 
will trigger a data push when the memory usage exceeds half of the threshold(by 
default, 32m). | 0.3.0 | 
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 85e7a2da9..2235d8cfc 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -357,7 +357,7 @@ private[celeborn] class Master(
       // keep it for compatible reason
       context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS))
 
-    case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _, _) =>
+    case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _, _, _) =>
       logTrace(s"Received RequestSlots request $requestSlots.")
       executeWithLeaderChecker(context, handleRequestSlots(context, 
requestSlots))
 
@@ -664,8 +664,14 @@ private[celeborn] class Master(
     val numReducers = requestSlots.partitionIdList.size()
     val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, 
requestSlots.shuffleId)
 
-    val availableWorkers = workersAvailable()
+    val availableWorkers = workersAvailable(requestSlots.excludedWorkerSet)
     val numAvailableWorkers = availableWorkers.size()
+
+    if (numAvailableWorkers == 0) {
+      logError(s"Offer slots for $shuffleKey failed due to all workers are 
excluded!")
+      context.reply(RequestSlotsResponse(StatusCode.WORKER_EXCLUDED, new 
WorkerResource()))
+    }
+
     val numWorkers = Math.min(
       Math.max(
         if (requestSlots.shouldReplicate) 2 else 1,
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuit.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuit.scala
new file mode 100644
index 000000000..ac99d12fd
--- /dev/null
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuit.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.tests.client
+
+import java.util
+
+import org.apache.celeborn.client.{LifecycleManager, WithShuffleClientSuite}
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.message.StatusCode
+import org.apache.celeborn.service.deploy.MiniClusterFeature
+
+class LifecycleManagerSuit extends WithShuffleClientSuite with 
MiniClusterFeature {
+  private val masterPort = 19097
+
+  celebornConf.set(CelebornConf.MASTER_ENDPOINTS.key, s"localhost:$masterPort")
+    .set(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key, "true")
+    .set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    val masterConf = Map(
+      "celeborn.master.host" -> "localhost",
+      "celeborn.master.port" -> masterPort.toString)
+    val workerConf = Map(
+      "celeborn.master.endpoints" -> s"localhost:$masterPort")
+    setUpMiniCluster(masterConf, workerConf)
+  }
+
+  test("CELEBORN-1151: test request slots with client blacklist worker with 
filter enabled") {
+    
celebornConf.set(CelebornConf.REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED, 
true)
+    val lifecycleManager: LifecycleManager = new LifecycleManager(APP, 
celebornConf)
+
+    val arrayList = new util.ArrayList[Integer]()
+    (0 to 10).foreach(i => {
+      arrayList.add(i)
+    })
+
+    // test request slots without worker excluded
+    val headWorkerInfo = workerInfos.keySet.head.workerInfo
+    val res1 = lifecycleManager.requestMasterRequestSlotsWithRetry(0, 
arrayList)
+      .workerResource.keySet()
+    assert(res1.contains(headWorkerInfo))
+
+    // test request slots with 1 worker excluded, result should not contains 
the excluded worker
+    val commitFilesFailedWorkers = new LifecycleManager.ShuffleFailedWorkers()
+    commitFilesFailedWorkers.put(
+      workerInfos.keySet.head.workerInfo,
+      (StatusCode.PUSH_DATA_TIMEOUT_PRIMARY, System.currentTimeMillis()))
+    
lifecycleManager.workerStatusTracker.recordWorkerFailure(commitFilesFailedWorkers)
+    val res2 = lifecycleManager.requestMasterRequestSlotsWithRetry(1, 
arrayList)
+      .workerResource.keySet()
+    assert(!res2.contains(headWorkerInfo))
+
+    // test request slots with all workers excluded, response should be 
WORKER_EXCLUDED
+    workerInfos.keySet.foreach(worker =>
+      commitFilesFailedWorkers.put(
+        worker.workerInfo,
+        (StatusCode.PUSH_DATA_TIMEOUT_PRIMARY, System.currentTimeMillis())))
+    
lifecycleManager.workerStatusTracker.recordWorkerFailure(commitFilesFailedWorkers)
+    val status = lifecycleManager.requestMasterRequestSlotsWithRetry(2, 
arrayList).status
+    assert(status == StatusCode.WORKER_EXCLUDED)
+
+    lifecycleManager.stop()
+  }
+
+  test("CELEBORN-1151: test request slots with client blacklist worker with 
filter not enabled") {
+    
celebornConf.set(CelebornConf.REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED, 
false)
+    val lifecycleManager: LifecycleManager = new LifecycleManager(APP, 
celebornConf)
+
+    val arrayList = new util.ArrayList[Integer]()
+    (0 to 10).foreach(i => {
+      arrayList.add(i)
+    })
+
+    // test request slots with all workers excluded, response should not 
excluded any worker
+    val commitFilesFailedWorkers = new LifecycleManager.ShuffleFailedWorkers()
+    workerInfos.keySet.foreach(worker =>
+      commitFilesFailedWorkers.put(
+        worker.workerInfo,
+        (StatusCode.PUSH_DATA_TIMEOUT_PRIMARY, System.currentTimeMillis())))
+    
lifecycleManager.workerStatusTracker.recordWorkerFailure(commitFilesFailedWorkers)
+    val res = lifecycleManager.requestMasterRequestSlotsWithRetry(0, arrayList)
+      .workerResource.keySet()
+    assert(res.size() == workerInfos.size)
+    assert(res.contains(workerInfos.keySet.head.workerInfo))
+    lifecycleManager.stop()
+  }
+
+  override def afterAll(): Unit = {
+    logInfo("all test complete , stop celeborn mini cluster")
+    shutdownMiniCluster()
+  }
+}

Reply via email to