This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 817eee96 [CELEBORN-58][REFACTOR] Aggregate reserve failed logs
together (#1005)
817eee96 is described below
commit 817eee969f2a7ce5ebcee55efa3525cf6b84912d
Author: nafiy <[email protected]>
AuthorDate: Sat Nov 26 20:56:39 2022 +0800
[CELEBORN-58][REFACTOR] Aggregate reserve failed logs together (#1005)
---
.../main/scala/org/apache/celeborn/client/LifecycleManager.scala | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index c28621bf..139c4503 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1195,6 +1195,7 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
shuffleId: Int,
slots: WorkerResource): util.List[WorkerInfo] = {
val reserveSlotFailedWorkers = new ConcurrentHashMap[WorkerInfo,
(StatusCode, Long)]()
+ val failureInfos = new util.concurrent.CopyOnWriteArrayList[String]()
val parallelism = Math.min(Math.max(1, slots.size()),
conf.rpcMaxParallelism)
ThreadUtils.parmap(slots.asScala.to, "ReserveSlot", parallelism) {
case (workerInfo, (masterLocations, slaveLocations)) =>
@@ -1215,13 +1216,16 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
s"partitions buffer for ${Utils.makeShuffleKey(applicationId,
shuffleId)}" +
s" from worker ${workerInfo.readableAddress()}.")
} else {
- logError(s"[reserveSlots] Failed to" +
+ failureInfos.add(s"[reserveSlots] Failed to" +
s" reserve buffers for ${Utils.makeShuffleKey(applicationId,
shuffleId)}" +
s" from worker ${workerInfo.readableAddress()}. Reason:
${res.reason}")
reserveSlotFailedWorkers.put(workerInfo, (res.status,
System.currentTimeMillis()))
}
}
-
+ if (failureInfos.asScala.nonEmpty) {
+ logError(s"Aggregated error of reserveSlots
failure:${failureInfos.asScala.foldLeft("")(
+ (x, y) => s"$x \n $y")}")
+ }
recordWorkerFailure(reserveSlotFailedWorkers)
new
util.ArrayList[WorkerInfo](reserveSlotFailedWorkers.asScala.keys.toList.asJava)
}