This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 082f0dd8c [CELEBORN-1775][FOLLOWUP] Improve logging around commit files
082f0dd8c is described below

commit 082f0dd8c5dc93f57d9234a5742690c29610ac88
Author: Sanskar Modi <[email protected]>
AuthorDate: Wed May 21 16:37:38 2025 -0700

    [CELEBORN-1775][FOLLOWUP] Improve logging around commit files
    
    ### What changes were proposed in this pull request?
    Minor logging improvement around commit files to log shuffleKey.
    
    ### Why are the changes needed?
    Ditto.
    
    ### Does this PR introduce _any_ user-facing change?
    Some logs will change.
    
    ### How was this patch tested?
    NA
    
    Closes #3270 from s0nskar/CELEBORN-1775.
    
    Authored-by: Sanskar Modi <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../apache/celeborn/client/commit/CommitHandler.scala   | 17 +++++++++--------
 .../celeborn/service/deploy/worker/Controller.scala     | 11 ++++++-----
 2 files changed, 15 insertions(+), 13 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index c6f535d33..87fb3564c 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -330,6 +330,7 @@ abstract class CommitHandler(
       while (iter.hasNext) {
         val status = iter.next()
         val worker = status.workerInfo
+        val shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId)
         if (status.future.isCompleted) {
           status.future.value.get match {
             case scala.util.Success(res) =>
@@ -337,10 +338,10 @@ abstract class CommitHandler(
                 case StatusCode.SUCCESS | StatusCode.PARTIAL_SUCCESS | 
StatusCode.SHUFFLE_NOT_REGISTERED | StatusCode.REQUEST_FAILED | 
StatusCode.WORKER_EXCLUDED | StatusCode.COMMIT_FILE_EXCEPTION =>
                   if (res.status == StatusCode.SUCCESS) {
                     logDebug(s"Request commitFiles return ${res.status} for " +
-                      s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} from 
worker ${worker.readableAddress()}")
+                      s"$shuffleKey from worker ${worker.readableAddress()}")
                   } else {
                     logWarning(s"Request commitFiles return ${res.status} for 
" +
-                      s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} from 
worker ${worker.readableAddress()}")
+                      s"$shuffleKey from worker ${worker.readableAddress()}")
                     if (res.status != StatusCode.WORKER_EXCLUDED) {
                       commitFilesFailedWorkers.put(worker, (res.status, 
System.currentTimeMillis()))
                     }
@@ -350,12 +351,12 @@ abstract class CommitHandler(
                 case StatusCode.COMMIT_FILES_MOCK_FAILURE =>
                   if (status.retriedTimes < maxRetries) {
                     logError(s"Request commitFiles return ${res.status} for " +
-                      s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} for 
${status.retriedTimes}/$maxRetries, will retry")
+                      s"$shuffleKey for ${status.retriedTimes}/$maxRetries, 
will retry")
                     retryCommitFiles(status, currentTime)
                   } else {
                     logError(
                       s"Request commitFiles return 
${StatusCode.COMMIT_FILES_MOCK_FAILURE} for " +
-                        s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} for 
${status.retriedTimes}/$maxRetries, will not retry")
+                        s"$shuffleKey for ${status.retriedTimes}/$maxRetries, 
will not retry")
                     val res = createFailResponse(status)
                     processResponse(res, worker)
                     iter.remove()
@@ -367,13 +368,13 @@ abstract class CommitHandler(
             case scala.util.Failure(e) =>
               if (status.retriedTimes < maxRetries) {
                 logError(
-                  s"Ask worker(${worker.readableAddress()}) CommitFiles for 
$shuffleId failed" +
+                  s"Ask worker(${worker.readableAddress()}) CommitFiles for 
$shuffleKey failed" +
                     s" (attempt ${status.retriedTimes}/$maxRetries), will 
retry.",
                   e)
                 retryCommitFiles(status, currentTime)
               } else {
                 logError(
-                  s"Ask worker(${worker.readableAddress()}) CommitFiles for 
$shuffleId failed" +
+                  s"Ask worker(${worker.readableAddress()}) CommitFiles for 
$shuffleKey failed" +
                     s" (attempt ${status.retriedTimes}/$maxRetries), will not 
retry.",
                   e)
                 val res = createFailResponse(status)
@@ -384,12 +385,12 @@ abstract class CommitHandler(
         } else if (currentTime - status.startTime > timeout) {
           if (status.retriedTimes < maxRetries) {
             logError(
-              s"Ask worker(${worker.readableAddress()}) CommitFiles for 
$shuffleId failed because of Timeout" +
+              s"Ask worker(${worker.readableAddress()}) CommitFiles for 
$shuffleKey failed because of Timeout" +
                 s" (attempt ${status.retriedTimes}/$maxRetries), will retry.")
             retryCommitFiles(status, currentTime)
           } else {
             logError(
-              s"Ask worker(${worker.readableAddress()}) CommitFiles for 
$shuffleId failed because of Timeout" +
+              s"Ask worker(${worker.readableAddress()}) CommitFiles for 
$shuffleKey failed because of Timeout" +
                 s" (attempt ${status.retriedTimes}/$maxRetries), will not 
retry.")
           }
         }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 52650cb6c..a6870c836 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -620,19 +620,20 @@ private[deploy] class Controller(
         new BiFunction[Void, Throwable, Unit] {
           override def apply(v: Void, t: Throwable): Unit = {
             if (null != t) {
+              val errMsg = s"Exception while handling commitFiles for 
shuffleId: $shuffleKey"
               t match {
                 case _: CancellationException =>
-                  logWarning("While handling commitFiles, canceled.")
+                  logWarning(s"$errMsg, operation was cancelled.")
                 case ee: ExecutionException =>
-                  logError("While handling commitFiles, ExecutionException 
raised.", ee)
+                  logError(s"$errMsg, ExecutionException was raised.", ee)
                 case ie: InterruptedException =>
-                  logWarning("While handling commitFiles, interrupted.")
+                  logWarning(s"$errMsg, operation was interrupted.")
                   Thread.currentThread().interrupt()
                   throw ie
                 case _: TimeoutException =>
-                  logWarning(s"While handling commitFiles, timeout after 
$shuffleCommitTimeout ms.")
+                  logWarning(s"$errMsg, operation timed out after 
$shuffleCommitTimeout ms.")
                 case throwable: Throwable =>
-                  logError("While handling commitFiles, exception occurs.", 
throwable)
+                  logError(s"$errMsg, an unexpected exception occurred.", 
throwable)
               }
               commitInfo.synchronized {
                 commitInfo.response = CommitFilesResponse(

Reply via email to