This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new f7b036d4c [CELEBORN-1770] FlushNotifier should setException for all 
Throwables in Flusher
f7b036d4c is described below

commit f7b036d4c7c8b371941c4215c11a876171096d1d
Author: zhengtao <[email protected]>
AuthorDate: Thu Dec 12 14:23:04 2024 +0800

    [CELEBORN-1770] FlushNotifier should setException for all Throwables in 
Flusher
    
    ### What changes were proposed in this pull request?
    Non-IOException (will throw illegalReferenceCountException If a netty's 
buffer reference count is incorrect)
    should also be set in FlushNotifier.
    Provides Utils to convert non-IOExceptions to IOExceptions.
    
    ### Why are the changes needed?
    In some test scenarios where data replication is enabled and workers are 
randomly terminated, it will throw illegalReferenceCountException which won't 
be caught.
    
![image](https://github.com/user-attachments/assets/6503204a-9594-4a7e-850c-c411e0b07117)
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UT & cluster test.
    
    Closes #2988 from zaynt4606/clb1770-m.
    
    Authored-by: zhengtao <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/common/util/ExceptionUtils.java     |  8 ++++++++
 .../celeborn/service/deploy/worker/storage/Flusher.scala    | 13 ++++---------
 2 files changed, 12 insertions(+), 9 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java 
b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
index dc1d8687f..8f0025177 100644
--- a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
+++ b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
@@ -44,6 +44,14 @@ public class ExceptionUtils {
     }
   }
 
+  public static IOException wrapThrowableToIOException(Throwable throwable) {
+    if (throwable instanceof IOException) {
+      return (IOException) throwable;
+    } else {
+      return new IOException(throwable.getMessage(), throwable);
+    }
+  }
+
   public static String stringifyException(Throwable exception) {
     if (exception == null) {
       return "(null)";
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index 977b3b72f..205ce8bcc 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -29,7 +29,7 @@ import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{DiskStatus, TimeWindow}
 import org.apache.celeborn.common.metrics.source.{AbstractSource, 
ThreadPoolSource}
 import org.apache.celeborn.common.protocol.StorageInfo
-import org.apache.celeborn.common.util.{ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{ExceptionUtils, ThreadUtils, Utils}
 import org.apache.celeborn.service.deploy.worker.WorkerSource
 import 
org.apache.celeborn.service.deploy.worker.WorkerSource.FLUSH_WORKING_QUEUE_SIZE
 import 
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
@@ -77,14 +77,9 @@ abstract private[worker] class Flusher(
                   }
                 } catch {
                   case t: Throwable =>
-                    t match {
-                      case exception: IOException =>
-                        task.notifier.setException(exception)
-                        processIOException(
-                          exception,
-                          DiskStatus.READ_OR_WRITE_FAILURE)
-                      case _ =>
-                    }
+                    val e = ExceptionUtils.wrapThrowableToIOException(t)
+                    task.notifier.setException(e)
+                    processIOException(e, DiskStatus.READ_OR_WRITE_FAILURE)
                     logWarning(s"Flusher-$this-thread-$index encounter 
exception.", t)
                 }
                 lastBeginFlushTime.set(index, -1)

Reply via email to