This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 61c90e3a0 [CELEBORN-1818] Fix incorrect timeout exception when waiting
on no pending writes
61c90e3a0 is described below
commit 61c90e3a078c59eab2573a2d89254c4f27894bdb
Author: xinyuwang1 <[email protected]>
AuthorDate: Tue Jan 7 13:50:39 2025 +0800
[CELEBORN-1818] Fix incorrect timeout exception when waiting on no pending
writes
### What changes were proposed in this pull request?
Do not throw "Wait pending actions timeout" exception when waiting pending
writes times out.
### Why are the changes needed?
When pendingWrites is reduced to zero, the method waitOnNoPending will jump
out of the while loop. Meanwhile, if new PushData/PushMergedData request comes,
pendingWrites will increment and be larger then zero. As a result, "wait
pending actions timeout" exception will be thrown in waitOnNoPending.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
manual test
Closes #3049 from littlexyw/fix_wait_pending_writes_timeout.
Authored-by: xinyuwang1 <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../service/deploy/worker/storage/PartitionDataWriter.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index 8eae732a1..31a35f079 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -507,7 +507,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
}
try {
- waitOnNoPending(numPendingWrites);
+ waitOnNoPending(numPendingWrites, false);
closed = true;
synchronized (flushLock) {
@@ -520,7 +520,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
}
tryClose.run();
- waitOnNoPending(notifier.numPendingFlushes);
+ waitOnNoPending(notifier.numPendingFlushes, true);
} finally {
returnBuffer(false);
try {
@@ -582,7 +582,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
if (memoryFileInfo != null) {
evictInternal();
if (isClosed()) {
- waitOnNoPending(notifier.numPendingFlushes);
+ waitOnNoPending(notifier.numPendingFlushes, true);
storageManager.notifyFileInfoCommitted(shuffleKey,
getFile().getName(), diskFileInfo);
}
}
@@ -636,7 +636,8 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
}
}
- protected void waitOnNoPending(AtomicInteger counter) throws IOException {
+ protected void waitOnNoPending(AtomicInteger counter, boolean
failWhenTimeout)
+ throws IOException {
long waitTime = writerCloseTimeoutMs;
while (counter.get() > 0 && waitTime > 0) {
try {
@@ -649,7 +650,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
}
waitTime -= WAIT_INTERVAL_MS;
}
- if (counter.get() > 0) {
+ if (counter.get() > 0 && failWhenTimeout) {
IOException ioe = new IOException("Wait pending actions timeout,
Counter: " + counter.get());
notifier.setException(ioe);
throw ioe;