This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 7b5dc530b [CELEBORN-2092] Inc COMMIT_FILES_FAIL_COUNT when 
TimerWriter::close timeout
7b5dc530b is described below

commit 7b5dc530b155953b4b0ad1d036ce61d1d17a3e36
Author: Wang, Fei <fwan...@ebay.com>
AuthorDate: Thu Jul 31 21:12:21 2025 -0700

    [CELEBORN-2092] Inc COMMIT_FILES_FAIL_COUNT when TimerWriter::close timeout
    
    ### What changes were proposed in this pull request?
    Inc COMMIT_FILES_FAIL_COUNT when TimerWriter::close timeout
    
    ### Why are the changes needed?
    
    1. the COMMIT_FILES_FAIL_COUNT is 0 even we meet SHUFFLE_DATA_LOST caused 
by commit files failure
    
    Spark executor log:
    ```
    
    25/07/30 10:10:39 WARN CelebornShuffleReader: Handle fetch exceptions for 
0-0org.apache.celeborn.common.exception.CelebornIOException: Failed to load 
file group of shuffle 0 partition 441! Request GetReducerFileGroup(0,false,V1) 
return SHUFFLE_DATA_LOST for 0.
    ```
    
    Spark driver log:
    ```
    25/07/30 10:10:38 ERROR ReducePartitionCommitHandler: Failed to handle 
stageEnd for 0, lost file!
    
    25/07/30 10:10:38 ERROR ReducePartitionCommitHandler:
    For shuffle application_1750652300305_10219240_1-0 partition data lost:
    Lost partition 307-0 in worker 
[Host:hdc42-mcc10-01-0910-2704-064-tess0028.stratus.rno.ebay.com:RpcPort:9200:PushPort:9202:FetchPort:9201:ReplicatePort:9203]
    Lost partition 1289-0 in worker 
[Host:hdc42-mcc10-01-0910-2704-064-tess0028.stratus.rno.ebay.com:RpcPort:9200:PushPort:9202:FetchPort:9201:ReplicatePort:9203]
    ```
    
    Worker log:
    ```
    java.io.IOException: Wait pending actions timeout.
            at 
org.apache.celeborn.service.deploy.worker.storage.TierWriterBase.waitOnNoPending(TierWriter.scala:158)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Closes #3403 from turboFei/commit_failed.
    
    Authored-by: Wang, Fei <fwan...@ebay.com>
    Signed-off-by: Wang, Fei <fwan...@ebay.com>
    (cherry picked from commit 604485779ccadfa29dcd7994fe949e6bf0db5255)
    Signed-off-by: Wang, Fei <fwan...@ebay.com>
---
 client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala | 2 +-
 .../scala/org/apache/celeborn/service/deploy/worker/Controller.scala    | 1 +
 .../org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala  | 2 +-
 3 files changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index b339c75e6..be93a00c7 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1136,7 +1136,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
           logDebug(s"Succeed $message")
           context.reply(MapperEndResponse(StatusCode.SUCCESS))
         case false =>
-          logError(s"Failed $message")
+          logError(s"Failed $message, reply ${StatusCode.SHUFFLE_DATA_LOST}.")
           context.reply(MapperEndResponse(StatusCode.SHUFFLE_DATA_LOST))
       }
     }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index f01792729..96a18a76a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -362,6 +362,7 @@ private[deploy] class Controller(
               } catch {
                 case e: IOException =>
                   logError(s"Commit file for $shuffleKey $uniqueId failed.", e)
+                  workerSource.incCounter(WorkerSource.COMMIT_FILES_FAIL_COUNT)
                   failedIds.add(uniqueId)
               }
             }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index 7c3f1b966..29865ba39 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -155,7 +155,7 @@ abstract class TierWriterBase(
       waitTime -= WAIT_INTERVAL_MS
     }
     if (counter.get > 0 && failWhenTimeout) {
-      val ioe = new IOException("Wait pending actions timeout.")
+      val ioe = new IOException(s"Wait pending actions timeout after 
$writerCloseTimeoutMs ms.")
       notifier.setException(ioe)
       throw ioe
     }

Reply via email to