This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 33ba0e02f [CELEBORN-1775] Improve some logs
33ba0e02f is described below

commit 33ba0e02f56bfa032c02d1e41c52573c79661b1b
Author: onebox-li <[email protected]>
AuthorDate: Mon Dec 16 16:24:18 2024 +0800

    [CELEBORN-1775] Improve some logs
    
    ### What changes were proposed in this pull request?
    Improve some logs, mainly including checking commit result and waiting 
partition location empty when worker gracefully shutdown.
    
    ### Why are the changes needed?
    Ditto.
    
    ### Does this PR introduce _any_ user-facing change?
    Some logs changed.
    
    ### How was this patch tested?
    Manual test.
    
    Closes #2995 from onebox-li/improve-logs.
    
    Authored-by: onebox-li <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../celeborn/client/commit/CommitHandler.scala     | 26 +++++++++++++---------
 .../network/client/TransportClientFactory.java     |  2 +-
 .../common/meta/WorkerPartitionLocationInfo.scala  | 16 +++++++++++--
 .../celeborn/common/rpc/netty/Dispatcher.scala     |  2 +-
 .../deploy/worker/memory/MemoryManager.java        |  4 ++--
 .../service/deploy/worker/Controller.scala         |  2 +-
 .../deploy/worker/storage/StorageManager.scala     |  8 +++++--
 7 files changed, 40 insertions(+), 20 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index 9cade38cc..637a5c640 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -316,10 +316,15 @@ abstract class CommitHandler(
             case scala.util.Success(res) =>
               res.status match {
                 case StatusCode.SUCCESS | StatusCode.PARTIAL_SUCCESS | 
StatusCode.SHUFFLE_NOT_REGISTERED | StatusCode.REQUEST_FAILED | 
StatusCode.WORKER_EXCLUDED | StatusCode.COMMIT_FILE_EXCEPTION =>
-                  logInfo(s"Request commitFiles return ${res.status} for " +
-                    s"${Utils.makeShuffleKey(appUniqueId, shuffleId)}")
-                  if (res.status != StatusCode.SUCCESS && res.status != 
StatusCode.WORKER_EXCLUDED) {
-                    commitFilesFailedWorkers.put(worker, (res.status, 
System.currentTimeMillis()))
+                  if (res.status == StatusCode.SUCCESS) {
+                    logDebug(s"Request commitFiles return ${res.status} for " +
+                      s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} from 
worker ${worker.readableAddress()}")
+                  } else {
+                    logWarning(s"Request commitFiles return ${res.status} for 
" +
+                      s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} from 
worker ${worker.readableAddress()}")
+                    if (res.status != StatusCode.WORKER_EXCLUDED) {
+                      commitFilesFailedWorkers.put(worker, (res.status, 
System.currentTimeMillis()))
+                    }
                   }
                   processResponse(res, worker)
                   iter.remove()
@@ -333,7 +338,7 @@ abstract class CommitHandler(
                       s"Request commitFiles return 
${StatusCode.COMMIT_FILES_MOCK_FAILURE} for " +
                         s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} for 
${status.retriedTimes}/$maxRetries, will not retry")
                     val res = createFailResponse(status)
-                    processResponse(res, status.workerInfo)
+                    processResponse(res, worker)
                     iter.remove()
                   }
                 case _ =>
@@ -341,16 +346,15 @@ abstract class CommitHandler(
               }
 
             case scala.util.Failure(e) =>
-              val worker = status.workerInfo
               if (status.retriedTimes < maxRetries) {
                 logError(
-                  s"Ask worker($worker) CommitFiles for $shuffleId failed" +
+                  s"Ask worker(${worker.readableAddress()}) CommitFiles for 
$shuffleId failed" +
                     s" (attempt ${status.retriedTimes}/$maxRetries), will 
retry.",
                   e)
                 retryCommitFiles(status, currentTime)
               } else {
                 logError(
-                  s"Ask worker($worker) CommitFiles for $shuffleId failed" +
+                  s"Ask worker(${worker.readableAddress()}) CommitFiles for 
$shuffleId failed" +
                     s" (attempt ${status.retriedTimes}/$maxRetries), will not 
retry.",
                   e)
                 val res = createFailResponse(status)
@@ -361,12 +365,12 @@ abstract class CommitHandler(
         } else if (currentTime - status.startTime > timeout) {
           if (status.retriedTimes < maxRetries) {
             logError(
-              s"Ask worker($worker) CommitFiles for $shuffleId failed because 
of Timeout" +
+              s"Ask worker(${worker.readableAddress()}) CommitFiles for 
$shuffleId failed because of Timeout" +
                 s" (attempt ${status.retriedTimes}/$maxRetries), will retry.")
             retryCommitFiles(status, currentTime)
           } else {
             logError(
-              s"Ask worker($worker) CommitFiles for $shuffleId failed because 
of Timeout" +
+              s"Ask worker(${worker.readableAddress()}) CommitFiles for 
$shuffleId failed because of Timeout" +
                 s" (attempt ${status.retriedTimes}/$maxRetries), will not 
retry.")
           }
         }
@@ -382,7 +386,7 @@ abstract class CommitHandler(
     while (iter.hasNext) {
       val status = iter.next()
       logError(
-        s"Ask worker(${status.workerInfo}) CommitFiles for $shuffleId timed 
out")
+        s"Ask worker(${status.workerInfo.readableAddress()}) CommitFiles for 
$shuffleId timed out")
       val res = createFailResponse(status)
       processResponse(res, status.workerInfo)
       iter.remove()
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index 7b87be40b..647ef280b 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -112,7 +112,7 @@ public class TransportClientFactory implements Closeable {
 
     IOMode ioMode = IOMode.valueOf(conf.ioMode());
     this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
-    logger.info("mode " + ioMode + " threads " + conf.clientThreads());
+    logger.info("Module {} mode {} threads {}", conf.getModuleName(), ioMode, 
conf.clientThreads());
     this.workerGroup =
         NettyUtils.createEventLoop(ioMode, conf.clientThreads(), 
conf.getModuleName() + "-client");
     this.pooledAllocator =
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala
 
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala
index 96ff20971..373f36565 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerPartitionLocationInfo.scala
@@ -181,10 +181,22 @@ class WorkerPartitionLocationInfo extends Logging {
   }
 
   def toStringSimplified: String = {
+    val primaryLocationStr = new StringBuilder
+    for ((shuffleKey, locations) <- primaryPartitionLocations.asScala) {
+      if (!locations.isEmpty) {
+        primaryLocationStr.append(s"($shuffleKey: ${locations.keySet()}) ")
+      }
+    }
+    val replicaLocationStr = new StringBuilder
+    for ((shuffleKey, locations) <- replicaPartitionLocations.asScala) {
+      if (!locations.isEmpty) {
+        replicaLocationStr.append(s"($shuffleKey: ${locations.keySet()}) ")
+      }
+    }
     s"""
        | Partition Location Info:
-       | primary: 
${primaryPartitionLocations.values().asScala.map(_.keySet().asScala)}
-       | replica: 
${replicaPartitionLocations.values().asScala.map(_.keySet().asScala)}
+       | primary: $primaryLocationStr
+       | replica: $replicaLocationStr
        |""".stripMargin
   }
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
index 5b7320726..c93203c36 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
@@ -217,7 +217,7 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, 
rpcSource: RpcSource)
     val numThreads = 
nettyEnv.celebornConf.rpcDispatcherNumThreads(availableCores, role)
 
     val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, 
"celeborn-dispatcher")
-    logInfo(s"Dispatcher numThreads: $numThreads")
+    logInfo(s"Celeborn dispatcher numThreads: $numThreads")
     for (i <- 0 until numThreads) {
       pool.execute(new MessageLoop)
     }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index 0b7adcb91..db368aa21 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -272,8 +272,8 @@ public class MemoryManager {
             + "max direct memory: {}, pause push memory: {}, "
             + "pause replication memory: {},  "
             + "read buffer memory limit: {} target: {}, "
-            + "memory shuffle storage limit: {},"
-            + "resume memory ratio: {},",
+            + "memory shuffle storage limit: {}, "
+            + "resume memory ratio: {}",
         Utils.bytesToString(maxDirectMemory),
         Utils.bytesToString(pausePushDataThreshold),
         Utils.bytesToString(pauseReplicateThreshold),
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 0b0c53c1f..0481b274e 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -542,7 +542,7 @@ private[deploy] class Controller(
               s"${emptyFilePrimaryIds.size()} empty primary partitions 
${emptyFilePrimaryIds.asScala.mkString(",")}, " +
               s"${failedPrimaryIds.size()} failed primary partitions, " +
               s"${committedReplicaIds.size()} committed replica partitions, " +
-              s"${emptyFileReplicaIds.size()} empty replica partitions 
${emptyFileReplicaIds.asScala.mkString(",")} , " +
+              s"${emptyFileReplicaIds.size()} empty replica partitions 
${emptyFileReplicaIds.asScala.mkString(",")}, " +
               s"${failedReplicaIds.size()} failed replica partitions.")
           CommitFilesResponse(
             StatusCode.SUCCESS,
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 00c332ce0..c2f1c66a1 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -1049,8 +1049,12 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) {
           diskInfo.dirs
         } else {
-          logDebug(s"Disk unavailable for $suggestedMountPoint, return all 
healthy" +
-            s" working dirs. diskInfo $diskInfo")
+          if (suggestedMountPoint.isEmpty) {
+            logDebug(s"Location suggestedMountPoint is not set, return all 
healthy working dirs.")
+          } else {
+            logInfo(s"Disk(${diskInfo.mountPoint}) unavailable for 
$suggestedMountPoint, return all healthy" +
+              s" working dirs.")
+          }
           healthyWorkingDirs()
         }
       if (dirs.isEmpty && hdfsFlusher.isEmpty && s3Flusher.isEmpty) {

Reply via email to