This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 061febe46 [CELEBORN-807] Adjust shutdown worker logs in
LifecycleManager
061febe46 is described below
commit 061febe46f41450f5c4aa69a1cfcd01033a79818
Author: onebox-li <[email protected]>
AuthorDate: Wed Jul 19 11:38:54 2023 +0800
[CELEBORN-807] Adjust shutdown worker logs in LifecycleManager
### What changes were proposed in this pull request?
In a long run celeborn cluster, there are some shutdown workers. Whether
it is a new task or an old task, even if the worker is not assigned , it will
always log below, seems a little noisy.
ERROR CommitManager: Worker xx shutdown, commit all it's partition location.
### Why are the changes needed?
Ditto
### Does this PR introduce _any_ user-facing change?
shutdown worker logs in LifecycleManager changes
### How was this patch tested?
manually test
Closes #1730 from onebox-li/adjust-log.
Authored-by: onebox-li <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/celeborn/client/CommitManager.scala | 39 ++++++++++---------
.../celeborn/client/WorkerStatusTracker.scala | 44 +++++++++++++++-------
2 files changed, 51 insertions(+), 32 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
index 3fd9f3cf6..316e9ba70 100644
--- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
@@ -250,6 +250,10 @@ class CommitManager(appUniqueId: String, val conf:
CelebornConf, lifecycleManage
getCommitHandler(shuffleId).isStageEnd(shuffleId)
}
+ def isStageEndOrInProcess(shuffleId: Int): Boolean = {
+ getCommitHandler(shuffleId).isStageEndOrInProcess(shuffleId)
+ }
+
def setStageEnd(shuffleId: Int): Unit = {
getCommitHandler(shuffleId).setStageEnd(shuffleId)
}
@@ -300,27 +304,26 @@ class CommitManager(appUniqueId: String, val conf:
CelebornConf, lifecycleManage
override def notifyChangedWorkersStatus(workersStatus: WorkersStatus):
Unit = {
if (workersStatus.shutdownWorkers != null) {
- workersStatus.shutdownWorkers.asScala.foreach { case shuttingWorker =>
- logError(s"Worker ${shuttingWorker.toUniqueId()} shutdown, " +
- "commit all it's partition location.")
- lifecycleManager.shuffleAllocatedWorkers.asScala.foreach {
- case (shuffleId, workerToPartitionLocationInfos) =>
+ lifecycleManager.shuffleAllocatedWorkers.asScala.foreach {
+ case (shuffleId, workerToPartitionLocationInfos) =>
+ if (!isStageEndOrInProcess(shuffleId)) {
val shuffleCommittedInfo = committedPartitionInfo.get(shuffleId)
- val partitionLocationInfo =
workerToPartitionLocationInfos.get(shuttingWorker)
- if (partitionLocationInfo != null) {
- partitionLocationInfo.getPrimaryPartitions().asScala.foreach {
partitionLocation =>
- shuffleCommittedInfo.synchronized {
-
shuffleCommittedInfo.unhandledPartitionLocations.add(partitionLocation)
- }
- }
-
- partitionLocationInfo.getReplicaPartitions().asScala.foreach {
partitionLocation =>
- shuffleCommittedInfo.synchronized {
-
shuffleCommittedInfo.unhandledPartitionLocations.add(partitionLocation)
- }
+ val needCommitPartitionLocations = new
util.HashSet[PartitionLocation]()
+
+ workersStatus.shutdownWorkers.asScala.foreach { worker =>
+ val partitionLocationInfos =
workerToPartitionLocationInfos.get(worker)
+ if (partitionLocationInfos != null) {
+ logWarning(s"Worker ${worker.toUniqueId()} shutdown, " +
+ s"commit all it's partition locations for shuffle
$shuffleId.")
+
needCommitPartitionLocations.addAll(partitionLocationInfos.getPrimaryPartitions())
+
needCommitPartitionLocations.addAll(partitionLocationInfos.getReplicaPartitions())
}
}
- }
+ shuffleCommittedInfo.synchronized {
+ shuffleCommittedInfo.unhandledPartitionLocations.addAll(
+ needCommitPartitionLocations)
+ }
+ }
}
}
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
index 305f1e0cb..1a83d2a8f 100644
--- a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
@@ -152,9 +152,10 @@ class WorkerStatusTracker(
def handleHeartbeatResponse(res: HeartbeatFromApplicationResponse): Unit = {
if (res.statusCode == StatusCode.SUCCESS) {
- logInfo(s"Received Worker status from Primary, excluded workers:
${res.excludedWorkers} " +
+ logDebug(s"Received Worker status from Primary, excluded workers:
${res.excludedWorkers} " +
s"unknown workers: ${res.unknownWorkers}, shutdown workers:
${res.shuttingWorkers}")
val current = System.currentTimeMillis()
+ var statusChanged = false
excludedWorkers.asScala.foreach {
case (workerInfo: WorkerInfo, (statusCode, registerTime)) =>
@@ -174,20 +175,28 @@ class WorkerStatusTracker(
!res.shuttingWorkers.contains(workerInfo) &&
!res.unknownWorkers.contains(workerInfo)) {
excludedWorkers.remove(workerInfo)
+ statusChanged = true
}
}
}
-
- if (!res.excludedWorkers.isEmpty) {
-
excludedWorkers.putAll(res.excludedWorkers.asScala.filterNot(excludedWorkers.containsKey)
- .map(_ -> (StatusCode.WORKER_EXCLUDED -> current)).toMap.asJava)
+ for (worker <- res.excludedWorkers.asScala) {
+ if (!excludedWorkers.containsKey(worker)) {
+ excludedWorkers.put(worker, (StatusCode.WORKER_EXCLUDED, current))
+ statusChanged = true
+ }
}
-
- shuttingWorkers.retainAll(res.shuttingWorkers)
- shuttingWorkers.addAll(res.shuttingWorkers)
+ for (worker <- res.unknownWorkers.asScala) {
+ if (!excludedWorkers.containsKey(worker)) {
+ excludedWorkers.put(worker, (StatusCode.WORKER_UNKNOWN, current))
+ statusChanged = true
+ }
+ }
+ val retainResult = shuttingWorkers.retainAll(res.shuttingWorkers)
+ val addResult = shuttingWorkers.addAll(res.shuttingWorkers)
+ statusChanged = statusChanged || retainResult || addResult
+ // Always trigger commit files for shutting down workers from
HeartbeatFromApplicationResponse
+ // See details in CELEBORN-696
if (!res.unknownWorkers.isEmpty || !res.shuttingWorkers.isEmpty) {
-
excludedWorkers.putAll(res.unknownWorkers.asScala.filterNot(excludedWorkers.containsKey)
- .map(_ -> (StatusCode.WORKER_UNKNOWN -> current)).toMap.asJava)
val workerStatus = new WorkersStatus(res.unknownWorkers,
res.shuttingWorkers)
workerStatusListeners.asScala.foreach { listener =>
try {
@@ -198,10 +207,17 @@ class WorkerStatusTracker(
}
}
}
-
- logInfo(
- s"Current excluded workers $excludedWorkers, Current shuttingDown
${shuttingWorkers.asScala.map(
- _.readableAddress()).mkString("\n")}")
+ if (statusChanged) {
+ logWarning("Worker status changed from application heartbeat response")
+ logInfo(
+ s"""
+ |Current excluded workers:
+ |${excludedWorkers.asScala.mkString("\n")}
+ |
+ |Current shutting down workers:
+
|${shuttingWorkers.asScala.map(_.readableAddress()).mkString("\n")}
+ |""".stripMargin)
+ }
}
}
}