This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch test
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit 4fd6e56da5e6006a8e9ed400f6e6a1f0e35425d3
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Wed Jun 7 22:42:26 2023 +0800

    fast fail for reserve slots
---
 .../apache/celeborn/client/LifecycleManager.scala  | 30 ++++++++++++----------
 1 file changed, 17 insertions(+), 13 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 1f766fed6..02a5e9fce 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1164,18 +1164,22 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
     val parallelism = Math.min(Math.max(1, slots.size()), 
conf.rpcMaxParallelism)
     ThreadUtils.parmap(slots.asScala.to, "ReserveSlot", parallelism) {
       case (workerInfo, (masterLocations, slaveLocations)) =>
-        val res = requestReserveSlots(
-          workerInfo.endpoint,
-          ReserveSlots(
-            applicationId,
-            shuffleId,
-            masterLocations,
-            slaveLocations,
-            partitionSplitThreshold,
-            partitionSplitMode,
-            getPartitionType(shuffleId),
-            rangeReadFilter,
-            userIdentifier))
+        val res = if (blacklist.contains(workerInfo)) {
+          ReserveSlotsResponse(StatusCode.RESERVE_SLOTS_FAILED, "")
+        } else {
+          requestReserveSlots(
+            workerInfo.endpoint,
+            ReserveSlots(
+              applicationId,
+              shuffleId,
+              masterLocations,
+              slaveLocations,
+              partitionSplitThreshold,
+              partitionSplitMode,
+              getPartitionType(shuffleId),
+              rangeReadFilter,
+              userIdentifier))
+        }
         if (res.status.equals(StatusCode.SUCCESS)) {
           logDebug(s"Successfully allocated " +
             s"partitions buffer for ${Utils.makeShuffleKey(applicationId, 
shuffleId)}" +
@@ -1185,13 +1189,13 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
             s" reserve buffers for ${Utils.makeShuffleKey(applicationId, 
shuffleId)}" +
             s" from worker ${workerInfo.readableAddress()}. Reason: 
${res.reason}")
           reserveSlotFailedWorkers.put(workerInfo, (res.status, 
System.currentTimeMillis()))
+          recordWorkerFailure(reserveSlotFailedWorkers)
         }
     }
     if (failureInfos.asScala.nonEmpty) {
       logError(s"Aggregated error of reserveSlots 
failure:${failureInfos.asScala.foldLeft("")(
         (x, y) => s"$x \n $y")}")
     }
-    recordWorkerFailure(reserveSlotFailedWorkers)
     new 
util.ArrayList[WorkerInfo](reserveSlotFailedWorkers.asScala.keys.toList.asJava)
   }
 

Reply via email to