This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new f7b036d4c [CELEBORN-1770] FlushNotifier should setException for all
Throwables in Flusher
f7b036d4c is described below
commit f7b036d4c7c8b371941c4215c11a876171096d1d
Author: zhengtao <[email protected]>
AuthorDate: Thu Dec 12 14:23:04 2024 +0800
[CELEBORN-1770] FlushNotifier should setException for all Throwables in
Flusher
### What changes were proposed in this pull request?
Non-IOException (will throw illegalReferenceCountException If a netty's
buffer reference count is incorrect)
should also be set in FlushNotifier.
Provides Utils to convert non-IOExceptions to IOExceptions.
### Why are the changes needed?
In some test scenarios where data replication is enabled and workers are
randomly terminated, it will throw illegalReferenceCountException which won't
be caught.

### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UT & cluster test.
Closes #2988 from zaynt4606/clb1770-m.
Authored-by: zhengtao <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/common/util/ExceptionUtils.java | 8 ++++++++
.../celeborn/service/deploy/worker/storage/Flusher.scala | 13 ++++---------
2 files changed, 12 insertions(+), 9 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
index dc1d8687f..8f0025177 100644
--- a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
+++ b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
@@ -44,6 +44,14 @@ public class ExceptionUtils {
}
}
+ public static IOException wrapThrowableToIOException(Throwable throwable) {
+ if (throwable instanceof IOException) {
+ return (IOException) throwable;
+ } else {
+ return new IOException(throwable.getMessage(), throwable);
+ }
+ }
+
public static String stringifyException(Throwable exception) {
if (exception == null) {
return "(null)";
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index 977b3b72f..205ce8bcc 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -29,7 +29,7 @@ import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DiskStatus, TimeWindow}
import org.apache.celeborn.common.metrics.source.{AbstractSource,
ThreadPoolSource}
import org.apache.celeborn.common.protocol.StorageInfo
-import org.apache.celeborn.common.util.{ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{ExceptionUtils, ThreadUtils, Utils}
import org.apache.celeborn.service.deploy.worker.WorkerSource
import
org.apache.celeborn.service.deploy.worker.WorkerSource.FLUSH_WORKING_QUEUE_SIZE
import
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
@@ -77,14 +77,9 @@ abstract private[worker] class Flusher(
}
} catch {
case t: Throwable =>
- t match {
- case exception: IOException =>
- task.notifier.setException(exception)
- processIOException(
- exception,
- DiskStatus.READ_OR_WRITE_FAILURE)
- case _ =>
- }
+ val e = ExceptionUtils.wrapThrowableToIOException(t)
+ task.notifier.setException(e)
+ processIOException(e, DiskStatus.READ_OR_WRITE_FAILURE)
logWarning(s"Flusher-$this-thread-$index encounter
exception.", t)
}
lastBeginFlushTime.set(index, -1)