This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new 21772361f [CELEBORN-1558] Fix the incorrect decrement of pendingWrites 
in handlePushMergeData
21772361f is described below

commit 21772361fdda495329062001286ecc4fa0af0339
Author: Shuang <[email protected]>
AuthorDate: Mon Aug 12 15:18:17 2024 +0800

    [CELEBORN-1558] Fix the incorrect decrement of pendingWrites in 
handlePushMergeData
    
    1. Fix the incorrect decrement of pendingWrites for FileWriter
    2. Improve some logs about hardSplit/ExceptionLogs
    
    There are multiple file writers that write data in handlePushMergeData. If 
the previous FileWriter has already been closed, the next 
decrementPendingWrites will use an incorrect FileWriter. And this will cause 
timeout when commitFiles.
    
    > java.io.IOException: Wait pending actions timeout, counter 1
    at 
org.apache.celeborn.service.deploy.worker.storage.PartitionDataWriter.waitOnNoPending(PartitionDataWriter.java)
    
    No
    
    Pass GA & manual test
    
    Closes #2677 from RexXiong/CELEBORN-1558.
    
    Authored-by: Shuang <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 6b24f5d9bd4f5693493e25ac5abb64bef4cea8d2)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/celeborn/service/deploy/worker/PushDataHandler.scala     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 2fa35285d..b9d231929 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -1267,8 +1267,8 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
         var index = 0
         var fileWriter: FileWriter = null
         while (index < fileWriters.length) {
+          fileWriter = fileWriters(index)
           if (!writePromise.isCompleted) {
-            fileWriter = fileWriters(index)
             val offset = body.readerIndex() + batchOffsets(index)
             val length =
               if (index == fileWriters.length - 1) {

Reply via email to