This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new bd015ba70 [CELEBORN-1770] FlushNotifier should setException for all 
Throwables in Flusher
bd015ba70 is described below

commit bd015ba702f9fdab3edd03507f19cbe1c3f6e140
Author: zhengtao <[email protected]>
AuthorDate: Thu Dec 12 14:23:04 2024 +0800

    [CELEBORN-1770] FlushNotifier should setException for all Throwables in 
Flusher
    
    Non-IOException (will throw illegalReferenceCountException If a netty's 
buffer reference count is incorrect)
    should also be set in FlushNotifier.
    Provides Utils to convert non-IOExceptions to IOExceptions.
    
    In some test scenarios where data replication is enabled and workers are 
randomly terminated, it will throw illegalReferenceCountException which won't 
be caught.
    
![image](https://github.com/user-attachments/assets/6503204a-9594-4a7e-850c-c411e0b07117)
    
    No.
    
    Existing UT & cluster test.
    
    Closes #2988 from zaynt4606/clb1770-m.
    
    Authored-by: zhengtao <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit f7b036d4c7c8b371941c4215c11a876171096d1d)
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/common/util/ExceptionUtils.java     |  8 ++++++++
 .../celeborn/service/deploy/worker/storage/Flusher.scala    | 13 +++++--------
 2 files changed, 13 insertions(+), 8 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java 
b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
index ae6c0422a..2a72e78fc 100644
--- a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
+++ b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
@@ -44,6 +44,14 @@ public class ExceptionUtils {
     }
   }
 
+  public static IOException wrapThrowableToIOException(Throwable throwable) {
+    if (throwable instanceof IOException) {
+      return (IOException) throwable;
+    } else {
+      return new IOException(throwable.getMessage(), throwable);
+    }
+  }
+
   public static String stringifyException(Throwable exception) {
     if (exception == null) {
       return "(null)";
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index ffee36f0c..5b0089753 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -31,7 +31,7 @@ import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{DiskStatus, TimeWindow}
 import org.apache.celeborn.common.metrics.source.{AbstractSource, 
ThreadPoolSource}
 import org.apache.celeborn.common.protocol.StorageInfo
-import org.apache.celeborn.common.util.{ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{ExceptionUtils, ThreadUtils, Utils}
 import org.apache.celeborn.service.deploy.worker.WorkerSource
 import 
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
 import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
@@ -77,13 +77,10 @@ abstract private[worker] class Flusher(
                   }
                 } catch {
                   case t: Throwable =>
-                    if (t.isInstanceOf[IOException]) {
-                      task.notifier.setException(t.asInstanceOf[IOException])
-                      processIOException(
-                        t.asInstanceOf[IOException],
-                        DiskStatus.READ_OR_WRITE_FAILURE)
-                    }
-                    logWarning(s"Flusher-${this}-thread-${index} encounter 
exception.", t)
+                    val e = ExceptionUtils.wrapThrowableToIOException(t)
+                    task.notifier.setException(e)
+                    processIOException(e, DiskStatus.READ_OR_WRITE_FAILURE)
+                    logWarning(s"Flusher-$this-thread-$index encounter 
exception.", t)
                 }
                 lastBeginFlushTime.set(index, -1)
               }

Reply via email to