This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new dfe9d8e74 [CELEBORN-2176] Fix uncaught exception in DataPusher
dfe9d8e74 is described below
commit dfe9d8e7434ca4cd9536b1ffbf8d0a3d0dbbab2e
Author: lijianfu03 <[email protected]>
AuthorDate: Fri Oct 17 17:53:27 2025 +0800
[CELEBORN-2176] Fix uncaught exception in DataPusher
### What changes were proposed in this pull request?
If DataPusher throws exception not like CelebornIOException or
IOException, do not just log it in ThreadExceptionHandler but set exception ref
to avoid data lost.
### Why are the changes needed?
As described in jira, such problem will make pushthread stopped and lead to
data lost.
The problem job which occurs this issue
<img width="600" height="400" alt="image"
src="https://github.com/user-attachments/assets/6eedbf35-4ef7-4785-ae2b-7714ed86b93b"
/>
and the normal job which no exeception throws during pushthread working
<img width="600" height="400" alt="image"
src="https://github.com/user-attachments/assets/c818da8b-ab58-4bf2-85c0-a7e4da958398"
/>
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
by UT
Closes #3507 from buska88/celeborn-2176.
Authored-by: lijianfu03 <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit 29070da17a586354b68bbfea3654edcb518f829e)
Signed-off-by: SteNicholas <[email protected]>
---
.../apache/celeborn/client/write/DataPusher.java | 3 ++
.../commit/ReducePartitionCommitHandler.scala | 5 +++
.../celeborn/client/write/DataPushQueueSuiteJ.java | 40 ++++++++++++++++++++++
3 files changed, 48 insertions(+)
diff --git
a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
index bc02570da..73af67b07 100644
--- a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
+++ b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
@@ -139,6 +139,9 @@ public class DataPusher {
} catch (InterruptedException e) {
logger.error("DataPusher push thread interrupted while
pushing data.");
break;
+ } catch (Throwable e) {
+ logger.error("Unexpected exception occurs.", e);
+ exceptionRef.set(new IOException(e));
}
}
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
index 6d8ee3c2a..41f8e7ab0 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
@@ -324,6 +324,11 @@ class ReducePartitionCommitHandler(
}
}
}
+ if (!mapperAttemptFinishedSuccess) {
+ val another = shuffleMapperAttempts.get(shuffleId)(mapId)
+ logInfo(
+ s"Skip mapper $mapId (attempt $attemptId) for shuffle $shuffleId,
another attempt $another already completed")
+ }
(mapperAttemptFinishedSuccess, allMapperFinished)
}
diff --git
a/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
b/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
index 7081b79d0..841e04eee 100644
---
a/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
+++
b/client/src/test/java/org/apache/celeborn/client/write/DataPushQueueSuiteJ.java
@@ -138,6 +138,46 @@ public class DataPushQueueSuiteJ {
client.shutdown();
}
+ @Test
+ public void testDataPusherUnCaughtException() throws Exception {
+ int shuffleId = 0;
+ int mapId = 0;
+ int attemptId = 0;
+ int numMappers = 10;
+ CelebornConf conf = new CelebornConf();
+ final File tempFile = new File(tempDir, UUID.randomUUID().toString());
+ DummyShuffleClient client = new DummyShuffleClient(conf, tempFile);
+ LongAdder[] mapStatusLengths = new LongAdder[numPartitions];
+ for (int i = 0; i < numPartitions; i++) {
+ mapStatusLengths[i] = new LongAdder();
+ }
+ DataPusher dataPusher =
+ new DataPusher(
+ shuffleId,
+ mapId,
+ attemptId,
+ 0,
+ numMappers,
+ numPartitions,
+ conf,
+ client,
+ null,
+ integer -> {},
+ mapStatusLengths) {
+ @Override
+ protected void pushData(PushTask task) throws IOException {
+ throw new OutOfMemoryError();
+ }
+ };
+ dataPusher.addTask(0, new byte[10], 0);
+ try {
+ dataPusher.waitOnTermination();
+ } catch (Throwable e) {
+ Assert.assertTrue(e.getCause() instanceof OutOfMemoryError);
+ }
+ client.shutdown();
+ }
+
public static byte[] intToBytes(int value) {
byte[] src = new byte[4];
src[0] = (byte) (value & 0xFF);