This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 7ae1fcc1c [CELEBORN-1558] Fix the incorrect decrement of pendingWrites 
in handlePushMergeData
7ae1fcc1c is described below

commit 7ae1fcc1c49e89bd17ceea908a9df6a8faac77ef
Author: Shuang <[email protected]>
AuthorDate: Mon Aug 12 15:18:17 2024 +0800

    [CELEBORN-1558] Fix the incorrect decrement of pendingWrites in 
handlePushMergeData
    
    ### What changes were proposed in this pull request?
    
    1. Fix the incorrect decrement of pendingWrites for FileWriter
    2. Improve some logs about hardSplit/ExceptionLogs
    
    ### Why are the changes needed?
    There are multiple file writers that write data in handlePushMergeData. If 
the previous FileWriter has already been closed, the next 
decrementPendingWrites will use an incorrect FileWriter. And this will cause 
timeout when commitFiles.
    
    > java.io.IOException: Wait pending actions timeout, counter 1
    at 
org.apache.celeborn.service.deploy.worker.storage.PartitionDataWriter.waitOnNoPending(PartitionDataWriter.java)
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GA & manual test
    
    Closes #2677 from RexXiong/CELEBORN-1558.
    
    Authored-by: Shuang <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 6b24f5d9bd4f5693493e25ac5abb64bef4cea8d2)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../deploy/worker/storage/PartitionDataWriter.java       |  2 +-
 .../celeborn/service/deploy/worker/PushDataHandler.scala | 16 +++++++++++-----
 2 files changed, 12 insertions(+), 6 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index f40f8ee66..b8cf6300f 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -553,7 +553,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
       waitTime -= WAIT_INTERVAL_MS;
     }
     if (counter.get() > 0) {
-      IOException ioe = new IOException("Wait pending actions timeout.");
+      IOException ioe = new IOException("Wait pending actions timeout, 
Counter: " + counter.get());
       notifier.setException(ioe);
       throw ioe;
     }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 0fe387f58..895e029f4 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -265,8 +265,10 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     fileWriter.incrementPendingWrites()
 
     if (fileWriter.isClosed) {
+      val diskFileInfo = fileWriter.getDiskFileInfo
       logWarning(
-        s"[handlePushData] FileWriter is already closed! File path 
${fileWriter.getDiskFileInfo.getFilePath}")
+        s"[handlePushData] FileWriter is already closed! File path 
${diskFileInfo.getFilePath} " +
+          s"length ${diskFileInfo.getFileLength}")
       callbackWithTimer.onFailure(new CelebornIOException("File already 
closed!"))
       fileWriter.decrementPendingWrites()
       return
@@ -536,8 +538,10 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
 
     val closedFileWriter = fileWriters.find(_.isClosed)
     if (closedFileWriter.isDefined) {
+      val diskFileInfo = closedFileWriter.get.getDiskFileInfo
       logWarning(
-        s"[handlePushMergedData] FileWriter is already closed! File path 
${closedFileWriter.get.getDiskFileInfo.getFilePath}")
+        s"[handlePushMergedData] FileWriter is already closed! File path 
${diskFileInfo.getFilePath} " +
+          s"length ${diskFileInfo.getFileLength}")
       callbackWithTimer.onFailure(new CelebornIOException("File already 
closed!"))
       fileWriters.foreach(_.decrementPendingWrites())
       return
@@ -812,8 +816,10 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     fileWriter.incrementPendingWrites()
 
     if (fileWriter.isClosed) {
+      val diskFileInfo = fileWriter.getDiskFileInfo
       logWarning(
-        s"[handleMapPartitionPushData] FileWriter is already closed! File path 
${fileWriter.getDiskFileInfo.getFilePath}")
+        s"[handleMapPartitionPushData] FileWriter is already closed! File path 
${diskFileInfo.getFilePath} " +
+          s"length ${diskFileInfo.getFileLength}")
       callback.onFailure(new CelebornIOException("File already closed!"))
       fileWriter.decrementPendingWrites()
       return
@@ -1223,7 +1229,7 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
         } else {
           workerSource.incCounter(WorkerSource.WRITE_DATA_HARD_SPLIT_COUNT)
           
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
-          logTrace(
+          logInfo(
             s"""
                |CheckDiskFullAndSplit hardSplit
                |diskFull:$diskFull,
@@ -1287,8 +1293,8 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
         var index = 0
         var fileWriter: PartitionDataWriter = null
         while (index < fileWriters.length) {
+          fileWriter = fileWriters(index)
           if (!writePromise.isCompleted) {
-            fileWriter = fileWriters(index)
             val offset = body.readerIndex() + batchOffsets(index)
             val length =
               if (index == fileWriters.length - 1) {

Reply via email to