This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.2 by this push:
     new 4fd5bea812e [FLINK-36746][core] Fix the deadlock bug in 
SerializedThrowable
4fd5bea812e is described below

commit 4fd5bea812ee5726c0c4ee5e27fda320273ebb6e
Author: Yuepeng Pan <[email protected]>
AuthorDate: Mon Nov 3 23:54:45 2025 +0800

    [FLINK-36746][core] Fix the deadlock bug in SerializedThrowable
    
    Co-authored-by: raoraoxiong <[email protected]>
    Co-authored-by: Yun Tang <[email protected]>
    
    This closes #27186
---
 .../org/apache/flink/util/SerializedThrowable.java | 38 +++++++-----
 .../runtime/util/SerializedThrowableTest.java      | 68 ++++++++++++++++++++++
 2 files changed, 93 insertions(+), 13 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java 
b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
index ae1ea5e14e1..7937e2de8a6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
+++ b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
@@ -68,10 +68,14 @@ public class SerializedThrowable extends Exception 
implements Serializable {
         if (!(exception instanceof SerializedThrowable)) {
             // serialize and memoize the original message
             byte[] serialized;
-            try {
-                serialized = InstantiationUtil.serializeObject(exception);
-            } catch (Throwable t) {
-                serialized = null;
+            // introduce the synchronization here to avoid deadlock of multi 
thread serializing
+            // exceptions
+            synchronized (SerializedThrowable.class) {
+                try {
+                    serialized = InstantiationUtil.serializeObject(exception);
+                } catch (Throwable t) {
+                    serialized = null;
+                }
             }
             this.serializedException = serialized;
             this.cachedException = new WeakReference<>(exception);
@@ -94,7 +98,7 @@ public class SerializedThrowable extends Exception implements 
Serializable {
                 }
             }
             // mimic suppressed exceptions
-            addAllSuppressed(exception.getSuppressed());
+            this.addAllSuppressed(exception.getSuppressed(), alreadySeen);
         } else {
             // copy from that serialized throwable
             SerializedThrowable other = (SerializedThrowable) exception;
@@ -104,7 +108,7 @@ public class SerializedThrowable extends Exception 
implements Serializable {
             this.cachedException = other.cachedException;
             this.setStackTrace(other.getStackTrace());
             this.initCause(other.getCause());
-            this.addAllSuppressed(other.getSuppressed());
+            this.addAllSuppressed(other.getSuppressed(), alreadySeen);
         }
     }
 
@@ -141,15 +145,23 @@ public class SerializedThrowable extends Exception 
implements Serializable {
         return fullStringifiedStackTrace;
     }
 
-    private void addAllSuppressed(Throwable[] suppressed) {
+    /**
+     * Add all suppressed exceptions to this exception.
+     *
+     * @param suppressed The suppressed exceptions to add.
+     * @param alreadySeen The set of exceptions that have already been seen.
+     */
+    private void addAllSuppressed(Throwable[] suppressed, Set<Throwable> 
alreadySeen) {
         for (Throwable s : suppressed) {
-            SerializedThrowable serializedThrowable;
-            if (s instanceof SerializedThrowable) {
-                serializedThrowable = (SerializedThrowable) s;
-            } else {
-                serializedThrowable = new SerializedThrowable(s);
+            if (alreadySeen.add(s)) {
+                SerializedThrowable serializedThrowable;
+                if (s instanceof SerializedThrowable) {
+                    serializedThrowable = (SerializedThrowable) s;
+                } else {
+                    serializedThrowable = new SerializedThrowable(s);
+                }
+                this.addSuppressed(serializedThrowable);
             }
-            this.addSuppressed(serializedThrowable);
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
index 2f81a91fd20..64a9340937e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
@@ -19,12 +19,22 @@
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
+import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
 import org.apache.flink.testutils.ClassLoaderUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedThrowable;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -179,4 +189,62 @@ class SerializedThrowableTest {
                 .isInstanceOf(SerializedThrowable.class)
                 .hasMessage("java.lang.Exception: suppressed");
     }
+
+    @Test
+    void testCyclicSuppressedThrowableSerialized() {
+        SerializedThrowable serializedThrowable = new 
SerializedThrowable(mockThrowable());
+        assertThat(serializedThrowable).isNotNull();
+    }
+
+    @Test
+    @Timeout(value = 5, unit = TimeUnit.SECONDS)
+    void testCyclicSuppressedThrowableConcurrentSerialized() throws 
InterruptedException {
+        Throwable throwable = mockThrowable();
+        int threadNum = 16;
+        CountDownLatch countDownLatch = new CountDownLatch(threadNum);
+        List<Thread> threads = new ArrayList<>();
+        for (int i = 0; i < threadNum; i++) {
+            String threadName = "thread-" + i;
+            Thread t = createThread(countDownLatch, throwable, threadName);
+            t.start();
+            countDownLatch.countDown();
+            threads.add(t);
+        }
+        for (Thread thread : threads) {
+            thread.join();
+        }
+    }
+
+    private static Thread createThread(
+            CountDownLatch countDownLatch, Throwable throwable, String 
threadName) {
+        Thread t =
+                new Thread(
+                        () -> {
+                            try {
+                                countDownLatch.await();
+                                SerializedThrowable serializedThrowable =
+                                        new SerializedThrowable(throwable);
+                                assertThat(serializedThrowable).isNotNull();
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+        t.setName(threadName);
+        return t;
+    }
+
+    private static Throwable mockThrowable() {
+        SocketAddress remoteAddr = new InetSocketAddress(80);
+        RemoteTransportException remoteTransportException =
+                new RemoteTransportException(
+                        "Connection unexpectedly closed by remote task manager 
'"
+                                + remoteAddr
+                                + "'. "
+                                + "This might indicate that the remote task 
manager was lost.",
+                        remoteAddr,
+                        new IOException("connection reset by peer."));
+        RuntimeException runtimeException = new 
RuntimeException(remoteTransportException);
+        remoteTransportException.addSuppressed(runtimeException);
+        return remoteTransportException;
+    }
 }

Reply via email to