This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 84c4a2627 [CELEBORN-1818] Fix incorrect timeout exception when waiting 
on no pending writes
84c4a2627 is described below

commit 84c4a2627e91262215df16d0e166d93a6e89e107
Author: xinyuwang1 <[email protected]>
AuthorDate: Tue Jan 7 13:50:39 2025 +0800

    [CELEBORN-1818] Fix incorrect timeout exception when waiting on no pending 
writes
    
    ### What changes were proposed in this pull request?
    Do not throw "Wait pending actions timeout" exception when waiting pending 
writes times out.
    
    ### Why are the changes needed?
    When pendingWrites is reduced to zero, the method waitOnNoPending will jump 
out of the while loop. Meanwhile, if new PushData/PushMergedData request comes, 
pendingWrites will increment and be larger then zero. As a result, "wait 
pending actions timeout" exception will be thrown in waitOnNoPending.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    manual test
    
    Closes #3049 from littlexyw/fix_wait_pending_writes_timeout.
    
    Authored-by: xinyuwang1 <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit 61c90e3a078c59eab2573a2d89254c4f27894bdb)
---
 .../service/deploy/worker/storage/PartitionDataWriter.java    | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index 9c7ee3d16..15667a608 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -426,7 +426,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
     }
 
     try {
-      waitOnNoPending(numPendingWrites);
+      waitOnNoPending(numPendingWrites, false);
       closed = true;
 
       synchronized (flushLock) {
@@ -439,7 +439,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
       }
 
       tryClose.run();
-      waitOnNoPending(notifier.numPendingFlushes);
+      waitOnNoPending(notifier.numPendingFlushes, true);
     } finally {
       returnBuffer(false);
       try {
@@ -496,7 +496,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
       if (memoryFileInfo != null) {
         evictInternal();
         if (isClosed()) {
-          waitOnNoPending(notifier.numPendingFlushes);
+          waitOnNoPending(notifier.numPendingFlushes, true);
           storageManager.notifyFileInfoCommitted(shuffleKey, 
getFile().getName(), diskFileInfo);
         }
       }
@@ -551,7 +551,8 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
     }
   }
 
-  protected void waitOnNoPending(AtomicInteger counter) throws IOException {
+  protected void waitOnNoPending(AtomicInteger counter, boolean 
failWhenTimeout)
+      throws IOException {
     long waitTime = writerCloseTimeoutMs;
     while (counter.get() > 0 && waitTime > 0) {
       try {
@@ -564,7 +565,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
       }
       waitTime -= WAIT_INTERVAL_MS;
     }
-    if (counter.get() > 0) {
+    if (counter.get() > 0 && failWhenTimeout) {
       IOException ioe = new IOException("Wait pending actions timeout, 
Counter: " + counter.get());
       notifier.setException(ioe);
       throw ioe;

Reply via email to