This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new a1b8e6897 [CELEBORN-807] Adjust shutdown worker logs in 
LifecycleManager
a1b8e6897 is described below

commit a1b8e68970f1b9a3a82c58d06e0df875856e1fc1
Author: onebox-li <[email protected]>
AuthorDate: Wed Jul 19 11:38:54 2023 +0800

    [CELEBORN-807] Adjust shutdown worker logs in LifecycleManager
    
    ### What changes were proposed in this pull request?
    In a long run celeborn cluster,  there are some shutdown workers. Whether 
it is a new task or an old task, even if the worker is not assigned , it will 
always log below, seems a little noisy.
    ERROR CommitManager: Worker xx shutdown, commit all it's partition location.
    
    ### Why are the changes needed?
    Ditto
    
    ### Does this PR introduce _any_ user-facing change?
    shutdown worker logs in LifecycleManager changes
    
    ### How was this patch tested?
    manually test
    
    Closes #1730 from onebox-li/adjust-log.
    
    Authored-by: onebox-li <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 061febe46f41450f5c4aa69a1cfcd01033a79818)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/celeborn/client/CommitManager.scala | 39 ++++++++++---------
 .../celeborn/client/WorkerStatusTracker.scala      | 44 +++++++++++++++-------
 2 files changed, 51 insertions(+), 32 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
index 3fd9f3cf6..316e9ba70 100644
--- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
@@ -250,6 +250,10 @@ class CommitManager(appUniqueId: String, val conf: 
CelebornConf, lifecycleManage
     getCommitHandler(shuffleId).isStageEnd(shuffleId)
   }
 
+  def isStageEndOrInProcess(shuffleId: Int): Boolean = {
+    getCommitHandler(shuffleId).isStageEndOrInProcess(shuffleId)
+  }
+
   def setStageEnd(shuffleId: Int): Unit = {
     getCommitHandler(shuffleId).setStageEnd(shuffleId)
   }
@@ -300,27 +304,26 @@ class CommitManager(appUniqueId: String, val conf: 
CelebornConf, lifecycleManage
 
     override def notifyChangedWorkersStatus(workersStatus: WorkersStatus): 
Unit = {
       if (workersStatus.shutdownWorkers != null) {
-        workersStatus.shutdownWorkers.asScala.foreach { case shuttingWorker =>
-          logError(s"Worker ${shuttingWorker.toUniqueId()} shutdown, " +
-            "commit all it's partition location.")
-          lifecycleManager.shuffleAllocatedWorkers.asScala.foreach {
-            case (shuffleId, workerToPartitionLocationInfos) =>
+        lifecycleManager.shuffleAllocatedWorkers.asScala.foreach {
+          case (shuffleId, workerToPartitionLocationInfos) =>
+            if (!isStageEndOrInProcess(shuffleId)) {
               val shuffleCommittedInfo = committedPartitionInfo.get(shuffleId)
-              val partitionLocationInfo = 
workerToPartitionLocationInfos.get(shuttingWorker)
-              if (partitionLocationInfo != null) {
-                partitionLocationInfo.getPrimaryPartitions().asScala.foreach { 
partitionLocation =>
-                  shuffleCommittedInfo.synchronized {
-                    
shuffleCommittedInfo.unhandledPartitionLocations.add(partitionLocation)
-                  }
-                }
-
-                partitionLocationInfo.getReplicaPartitions().asScala.foreach { 
partitionLocation =>
-                  shuffleCommittedInfo.synchronized {
-                    
shuffleCommittedInfo.unhandledPartitionLocations.add(partitionLocation)
-                  }
+              val needCommitPartitionLocations = new 
util.HashSet[PartitionLocation]()
+
+              workersStatus.shutdownWorkers.asScala.foreach { worker =>
+                val partitionLocationInfos = 
workerToPartitionLocationInfos.get(worker)
+                if (partitionLocationInfos != null) {
+                  logWarning(s"Worker ${worker.toUniqueId()} shutdown, " +
+                    s"commit all it's partition locations for shuffle 
$shuffleId.")
+                  
needCommitPartitionLocations.addAll(partitionLocationInfos.getPrimaryPartitions())
+                  
needCommitPartitionLocations.addAll(partitionLocationInfos.getReplicaPartitions())
                 }
               }
-          }
+              shuffleCommittedInfo.synchronized {
+                shuffleCommittedInfo.unhandledPartitionLocations.addAll(
+                  needCommitPartitionLocations)
+              }
+            }
         }
       }
     }
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala 
b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
index 305f1e0cb..1a83d2a8f 100644
--- a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
@@ -152,9 +152,10 @@ class WorkerStatusTracker(
 
   def handleHeartbeatResponse(res: HeartbeatFromApplicationResponse): Unit = {
     if (res.statusCode == StatusCode.SUCCESS) {
-      logInfo(s"Received Worker status from Primary, excluded workers: 
${res.excludedWorkers} " +
+      logDebug(s"Received Worker status from Primary, excluded workers: 
${res.excludedWorkers} " +
         s"unknown workers: ${res.unknownWorkers}, shutdown workers: 
${res.shuttingWorkers}")
       val current = System.currentTimeMillis()
+      var statusChanged = false
 
       excludedWorkers.asScala.foreach {
         case (workerInfo: WorkerInfo, (statusCode, registerTime)) =>
@@ -174,20 +175,28 @@ class WorkerStatusTracker(
                 !res.shuttingWorkers.contains(workerInfo) &&
                 !res.unknownWorkers.contains(workerInfo)) {
                 excludedWorkers.remove(workerInfo)
+                statusChanged = true
               }
           }
       }
-
-      if (!res.excludedWorkers.isEmpty) {
-        
excludedWorkers.putAll(res.excludedWorkers.asScala.filterNot(excludedWorkers.containsKey)
-          .map(_ -> (StatusCode.WORKER_EXCLUDED -> current)).toMap.asJava)
+      for (worker <- res.excludedWorkers.asScala) {
+        if (!excludedWorkers.containsKey(worker)) {
+          excludedWorkers.put(worker, (StatusCode.WORKER_EXCLUDED, current))
+          statusChanged = true
+        }
       }
-
-      shuttingWorkers.retainAll(res.shuttingWorkers)
-      shuttingWorkers.addAll(res.shuttingWorkers)
+      for (worker <- res.unknownWorkers.asScala) {
+        if (!excludedWorkers.containsKey(worker)) {
+          excludedWorkers.put(worker, (StatusCode.WORKER_UNKNOWN, current))
+          statusChanged = true
+        }
+      }
+      val retainResult = shuttingWorkers.retainAll(res.shuttingWorkers)
+      val addResult = shuttingWorkers.addAll(res.shuttingWorkers)
+      statusChanged = statusChanged || retainResult || addResult
+      // Always trigger commit files for shutting down workers from 
HeartbeatFromApplicationResponse
+      // See details in CELEBORN-696
       if (!res.unknownWorkers.isEmpty || !res.shuttingWorkers.isEmpty) {
-        
excludedWorkers.putAll(res.unknownWorkers.asScala.filterNot(excludedWorkers.containsKey)
-          .map(_ -> (StatusCode.WORKER_UNKNOWN -> current)).toMap.asJava)
         val workerStatus = new WorkersStatus(res.unknownWorkers, 
res.shuttingWorkers)
         workerStatusListeners.asScala.foreach { listener =>
           try {
@@ -198,10 +207,17 @@ class WorkerStatusTracker(
           }
         }
       }
-
-      logInfo(
-        s"Current excluded workers $excludedWorkers, Current shuttingDown 
${shuttingWorkers.asScala.map(
-          _.readableAddress()).mkString("\n")}")
+      if (statusChanged) {
+        logWarning("Worker status changed from application heartbeat response")
+        logInfo(
+          s"""
+             |Current excluded workers:
+             |${excludedWorkers.asScala.mkString("\n")}
+             |
+             |Current shutting down workers:
+             
|${shuttingWorkers.asScala.map(_.readableAddress()).mkString("\n")}
+             |""".stripMargin)
+      }
     }
   }
 }

Reply via email to