This is an automated email from the ASF dual-hosted git repository. zhouky pushed a commit to branch test in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit cb8302d97c4412491aa4c5463e2a86c05ec3177b Author: zky.zhoukeyong <[email protected]> AuthorDate: Fri Jun 9 19:44:16 2023 +0800 Revert "fast fail for reserve slots" This reverts commit 4fd6e56da5e6006a8e9ed400f6e6a1f0e35425d3. --- .../apache/celeborn/client/LifecycleManager.scala | 30 ++++++++++------------ 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 02a5e9fce..1f766fed6 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -1164,22 +1164,18 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin val parallelism = Math.min(Math.max(1, slots.size()), conf.rpcMaxParallelism) ThreadUtils.parmap(slots.asScala.to, "ReserveSlot", parallelism) { case (workerInfo, (masterLocations, slaveLocations)) => - val res = if (blacklist.contains(workerInfo)) { - ReserveSlotsResponse(StatusCode.RESERVE_SLOTS_FAILED, "") - } else { - requestReserveSlots( - workerInfo.endpoint, - ReserveSlots( - applicationId, - shuffleId, - masterLocations, - slaveLocations, - partitionSplitThreshold, - partitionSplitMode, - getPartitionType(shuffleId), - rangeReadFilter, - userIdentifier)) - } + val res = requestReserveSlots( + workerInfo.endpoint, + ReserveSlots( + applicationId, + shuffleId, + masterLocations, + slaveLocations, + partitionSplitThreshold, + partitionSplitMode, + getPartitionType(shuffleId), + rangeReadFilter, + userIdentifier)) if (res.status.equals(StatusCode.SUCCESS)) { logDebug(s"Successfully allocated " + s"partitions buffer for ${Utils.makeShuffleKey(applicationId, shuffleId)}" + @@ -1189,13 +1185,13 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin s" reserve buffers for ${Utils.makeShuffleKey(applicationId, shuffleId)}" + s" from worker ${workerInfo.readableAddress()}. Reason: ${res.reason}") reserveSlotFailedWorkers.put(workerInfo, (res.status, System.currentTimeMillis())) - recordWorkerFailure(reserveSlotFailedWorkers) } } if (failureInfos.asScala.nonEmpty) { logError(s"Aggregated error of reserveSlots failure:${failureInfos.asScala.foldLeft("")( (x, y) => s"$x \n $y")}") } + recordWorkerFailure(reserveSlotFailedWorkers) new util.ArrayList[WorkerInfo](reserveSlotFailedWorkers.asScala.keys.toList.asJava) }
