This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new bd015ba70 [CELEBORN-1770] FlushNotifier should setException for all
Throwables in Flusher
bd015ba70 is described below
commit bd015ba702f9fdab3edd03507f19cbe1c3f6e140
Author: zhengtao <[email protected]>
AuthorDate: Thu Dec 12 14:23:04 2024 +0800
[CELEBORN-1770] FlushNotifier should setException for all Throwables in
Flusher
Non-IOException (will throw illegalReferenceCountException If a netty's
buffer reference count is incorrect)
should also be set in FlushNotifier.
Provides Utils to convert non-IOExceptions to IOExceptions.
In some test scenarios where data replication is enabled and workers are
randomly terminated, it will throw illegalReferenceCountException which won't
be caught.

No.
Existing UT & cluster test.
Closes #2988 from zaynt4606/clb1770-m.
Authored-by: zhengtao <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit f7b036d4c7c8b371941c4215c11a876171096d1d)
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/common/util/ExceptionUtils.java | 8 ++++++++
.../celeborn/service/deploy/worker/storage/Flusher.scala | 13 +++++--------
2 files changed, 13 insertions(+), 8 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
index ae6c0422a..2a72e78fc 100644
--- a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
+++ b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
@@ -44,6 +44,14 @@ public class ExceptionUtils {
}
}
+ public static IOException wrapThrowableToIOException(Throwable throwable) {
+ if (throwable instanceof IOException) {
+ return (IOException) throwable;
+ } else {
+ return new IOException(throwable.getMessage(), throwable);
+ }
+ }
+
public static String stringifyException(Throwable exception) {
if (exception == null) {
return "(null)";
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index ffee36f0c..5b0089753 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -31,7 +31,7 @@ import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DiskStatus, TimeWindow}
import org.apache.celeborn.common.metrics.source.{AbstractSource,
ThreadPoolSource}
import org.apache.celeborn.common.protocol.StorageInfo
-import org.apache.celeborn.common.util.{ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{ExceptionUtils, ThreadUtils, Utils}
import org.apache.celeborn.service.deploy.worker.WorkerSource
import
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
@@ -77,13 +77,10 @@ abstract private[worker] class Flusher(
}
} catch {
case t: Throwable =>
- if (t.isInstanceOf[IOException]) {
- task.notifier.setException(t.asInstanceOf[IOException])
- processIOException(
- t.asInstanceOf[IOException],
- DiskStatus.READ_OR_WRITE_FAILURE)
- }
- logWarning(s"Flusher-${this}-thread-${index} encounter
exception.", t)
+ val e = ExceptionUtils.wrapThrowableToIOException(t)
+ task.notifier.setException(e)
+ processIOException(e, DiskStatus.READ_OR_WRITE_FAILURE)
+ logWarning(s"Flusher-$this-thread-$index encounter
exception.", t)
}
lastBeginFlushTime.set(index, -1)
}