This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.2 by this push:
new 4fd5bea812e [FLINK-36746][core] Fix the deadlock bug in
SerializedThrowable
4fd5bea812e is described below
commit 4fd5bea812ee5726c0c4ee5e27fda320273ebb6e
Author: Yuepeng Pan <[email protected]>
AuthorDate: Mon Nov 3 23:54:45 2025 +0800
[FLINK-36746][core] Fix the deadlock bug in SerializedThrowable
Co-authored-by: raoraoxiong <[email protected]>
Co-authored-by: Yun Tang <[email protected]>
This closes #27186
---
.../org/apache/flink/util/SerializedThrowable.java | 38 +++++++-----
.../runtime/util/SerializedThrowableTest.java | 68 ++++++++++++++++++++++
2 files changed, 93 insertions(+), 13 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
index ae1ea5e14e1..7937e2de8a6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
+++ b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
@@ -68,10 +68,14 @@ public class SerializedThrowable extends Exception
implements Serializable {
if (!(exception instanceof SerializedThrowable)) {
// serialize and memoize the original message
byte[] serialized;
- try {
- serialized = InstantiationUtil.serializeObject(exception);
- } catch (Throwable t) {
- serialized = null;
+ // introduce the synchronization here to avoid deadlock of multi
thread serializing
+ // exceptions
+ synchronized (SerializedThrowable.class) {
+ try {
+ serialized = InstantiationUtil.serializeObject(exception);
+ } catch (Throwable t) {
+ serialized = null;
+ }
}
this.serializedException = serialized;
this.cachedException = new WeakReference<>(exception);
@@ -94,7 +98,7 @@ public class SerializedThrowable extends Exception implements
Serializable {
}
}
// mimic suppressed exceptions
- addAllSuppressed(exception.getSuppressed());
+ this.addAllSuppressed(exception.getSuppressed(), alreadySeen);
} else {
// copy from that serialized throwable
SerializedThrowable other = (SerializedThrowable) exception;
@@ -104,7 +108,7 @@ public class SerializedThrowable extends Exception
implements Serializable {
this.cachedException = other.cachedException;
this.setStackTrace(other.getStackTrace());
this.initCause(other.getCause());
- this.addAllSuppressed(other.getSuppressed());
+ this.addAllSuppressed(other.getSuppressed(), alreadySeen);
}
}
@@ -141,15 +145,23 @@ public class SerializedThrowable extends Exception
implements Serializable {
return fullStringifiedStackTrace;
}
- private void addAllSuppressed(Throwable[] suppressed) {
+ /**
+ * Add all suppressed exceptions to this exception.
+ *
+ * @param suppressed The suppressed exceptions to add.
+ * @param alreadySeen The set of exceptions that have already been seen.
+ */
+ private void addAllSuppressed(Throwable[] suppressed, Set<Throwable>
alreadySeen) {
for (Throwable s : suppressed) {
- SerializedThrowable serializedThrowable;
- if (s instanceof SerializedThrowable) {
- serializedThrowable = (SerializedThrowable) s;
- } else {
- serializedThrowable = new SerializedThrowable(s);
+ if (alreadySeen.add(s)) {
+ SerializedThrowable serializedThrowable;
+ if (s instanceof SerializedThrowable) {
+ serializedThrowable = (SerializedThrowable) s;
+ } else {
+ serializedThrowable = new SerializedThrowable(s);
+ }
+ this.addSuppressed(serializedThrowable);
}
- this.addSuppressed(serializedThrowable);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
index 2f81a91fd20..64a9340937e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
@@ -19,12 +19,22 @@
package org.apache.flink.runtime.util;
import org.apache.flink.core.testutils.CommonTestUtils;
+import
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.SerializedThrowable;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -179,4 +189,62 @@ class SerializedThrowableTest {
.isInstanceOf(SerializedThrowable.class)
.hasMessage("java.lang.Exception: suppressed");
}
+
+ @Test
+ void testCyclicSuppressedThrowableSerialized() {
+ SerializedThrowable serializedThrowable = new
SerializedThrowable(mockThrowable());
+ assertThat(serializedThrowable).isNotNull();
+ }
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.SECONDS)
+ void testCyclicSuppressedThrowableConcurrentSerialized() throws
InterruptedException {
+ Throwable throwable = mockThrowable();
+ int threadNum = 16;
+ CountDownLatch countDownLatch = new CountDownLatch(threadNum);
+ List<Thread> threads = new ArrayList<>();
+ for (int i = 0; i < threadNum; i++) {
+ String threadName = "thread-" + i;
+ Thread t = createThread(countDownLatch, throwable, threadName);
+ t.start();
+ countDownLatch.countDown();
+ threads.add(t);
+ }
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ }
+
+ private static Thread createThread(
+ CountDownLatch countDownLatch, Throwable throwable, String
threadName) {
+ Thread t =
+ new Thread(
+ () -> {
+ try {
+ countDownLatch.await();
+ SerializedThrowable serializedThrowable =
+ new SerializedThrowable(throwable);
+ assertThat(serializedThrowable).isNotNull();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ t.setName(threadName);
+ return t;
+ }
+
+ private static Throwable mockThrowable() {
+ SocketAddress remoteAddr = new InetSocketAddress(80);
+ RemoteTransportException remoteTransportException =
+ new RemoteTransportException(
+ "Connection unexpectedly closed by remote task manager
'"
+ + remoteAddr
+ + "'. "
+ + "This might indicate that the remote task
manager was lost.",
+ remoteAddr,
+ new IOException("connection reset by peer."));
+ RuntimeException runtimeException = new
RuntimeException(remoteTransportException);
+ remoteTransportException.addSuppressed(runtimeException);
+ return remoteTransportException;
+ }
}