This is an automated email from the ASF dual-hosted git repository. feiwang pushed a commit to branch branch-0.6 in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push: new 7b5dc530b [CELEBORN-2092] Inc COMMIT_FILES_FAIL_COUNT when TimerWriter::close timeout 7b5dc530b is described below commit 7b5dc530b155953b4b0ad1d036ce61d1d17a3e36 Author: Wang, Fei <fwan...@ebay.com> AuthorDate: Thu Jul 31 21:12:21 2025 -0700 [CELEBORN-2092] Inc COMMIT_FILES_FAIL_COUNT when TimerWriter::close timeout ### What changes were proposed in this pull request? Inc COMMIT_FILES_FAIL_COUNT when TimerWriter::close timeout ### Why are the changes needed? 1. the COMMIT_FILES_FAIL_COUNT is 0 even we meet SHUFFLE_DATA_LOST caused by commit files failure Spark executor log: ``` 25/07/30 10:10:39 WARN CelebornShuffleReader: Handle fetch exceptions for 0-0org.apache.celeborn.common.exception.CelebornIOException: Failed to load file group of shuffle 0 partition 441! Request GetReducerFileGroup(0,false,V1) return SHUFFLE_DATA_LOST for 0. ``` Spark driver log: ``` 25/07/30 10:10:38 ERROR ReducePartitionCommitHandler: Failed to handle stageEnd for 0, lost file! 25/07/30 10:10:38 ERROR ReducePartitionCommitHandler: For shuffle application_1750652300305_10219240_1-0 partition data lost: Lost partition 307-0 in worker [Host:hdc42-mcc10-01-0910-2704-064-tess0028.stratus.rno.ebay.com:RpcPort:9200:PushPort:9202:FetchPort:9201:ReplicatePort:9203] Lost partition 1289-0 in worker [Host:hdc42-mcc10-01-0910-2704-064-tess0028.stratus.rno.ebay.com:RpcPort:9200:PushPort:9202:FetchPort:9201:ReplicatePort:9203] ``` Worker log: ``` java.io.IOException: Wait pending actions timeout. at org.apache.celeborn.service.deploy.worker.storage.TierWriterBase.waitOnNoPending(TierWriter.scala:158) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Closes #3403 from turboFei/commit_failed. Authored-by: Wang, Fei <fwan...@ebay.com> Signed-off-by: Wang, Fei <fwan...@ebay.com> (cherry picked from commit 604485779ccadfa29dcd7994fe949e6bf0db5255) Signed-off-by: Wang, Fei <fwan...@ebay.com> --- client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala | 2 +- .../scala/org/apache/celeborn/service/deploy/worker/Controller.scala | 1 + .../org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index b339c75e6..be93a00c7 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -1136,7 +1136,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends logDebug(s"Succeed $message") context.reply(MapperEndResponse(StatusCode.SUCCESS)) case false => - logError(s"Failed $message") + logError(s"Failed $message, reply ${StatusCode.SHUFFLE_DATA_LOST}.") context.reply(MapperEndResponse(StatusCode.SHUFFLE_DATA_LOST)) } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala index f01792729..96a18a76a 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala @@ -362,6 +362,7 @@ private[deploy] class Controller( } catch { case e: IOException => logError(s"Commit file for $shuffleKey $uniqueId failed.", e) + workerSource.incCounter(WorkerSource.COMMIT_FILES_FAIL_COUNT) failedIds.add(uniqueId) } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala index 7c3f1b966..29865ba39 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala @@ -155,7 +155,7 @@ abstract class TierWriterBase( waitTime -= WAIT_INTERVAL_MS } if (counter.get > 0 && failWhenTimeout) { - val ioe = new IOException("Wait pending actions timeout.") + val ioe = new IOException(s"Wait pending actions timeout after $writerCloseTimeoutMs ms.") notifier.setException(ioe) throw ioe }