This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch test
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit cb8302d97c4412491aa4c5463e2a86c05ec3177b
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Fri Jun 9 19:44:16 2023 +0800

    Revert "fast fail for reserve slots"
    
    This reverts commit 4fd6e56da5e6006a8e9ed400f6e6a1f0e35425d3.
---
 .../apache/celeborn/client/LifecycleManager.scala  | 30 ++++++++++------------
 1 file changed, 13 insertions(+), 17 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 02a5e9fce..1f766fed6 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1164,22 +1164,18 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
     val parallelism = Math.min(Math.max(1, slots.size()), 
conf.rpcMaxParallelism)
     ThreadUtils.parmap(slots.asScala.to, "ReserveSlot", parallelism) {
       case (workerInfo, (masterLocations, slaveLocations)) =>
-        val res = if (blacklist.contains(workerInfo)) {
-          ReserveSlotsResponse(StatusCode.RESERVE_SLOTS_FAILED, "")
-        } else {
-          requestReserveSlots(
-            workerInfo.endpoint,
-            ReserveSlots(
-              applicationId,
-              shuffleId,
-              masterLocations,
-              slaveLocations,
-              partitionSplitThreshold,
-              partitionSplitMode,
-              getPartitionType(shuffleId),
-              rangeReadFilter,
-              userIdentifier))
-        }
+        val res = requestReserveSlots(
+          workerInfo.endpoint,
+          ReserveSlots(
+            applicationId,
+            shuffleId,
+            masterLocations,
+            slaveLocations,
+            partitionSplitThreshold,
+            partitionSplitMode,
+            getPartitionType(shuffleId),
+            rangeReadFilter,
+            userIdentifier))
         if (res.status.equals(StatusCode.SUCCESS)) {
           logDebug(s"Successfully allocated " +
             s"partitions buffer for ${Utils.makeShuffleKey(applicationId, 
shuffleId)}" +
@@ -1189,13 +1185,13 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
             s" reserve buffers for ${Utils.makeShuffleKey(applicationId, 
shuffleId)}" +
             s" from worker ${workerInfo.readableAddress()}. Reason: 
${res.reason}")
           reserveSlotFailedWorkers.put(workerInfo, (res.status, 
System.currentTimeMillis()))
-          recordWorkerFailure(reserveSlotFailedWorkers)
         }
     }
     if (failureInfos.asScala.nonEmpty) {
       logError(s"Aggregated error of reserveSlots 
failure:${failureInfos.asScala.foldLeft("")(
         (x, y) => s"$x \n $y")}")
     }
+    recordWorkerFailure(reserveSlotFailedWorkers)
     new 
util.ArrayList[WorkerInfo](reserveSlotFailedWorkers.asScala.keys.toList.asJava)
   }
 

Reply via email to