[ https://issues.apache.org/jira/browse/FLINK-36746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
xiongraorao updated FLINK-36746: -------------------------------- Description: When a job failover, it may occurr CIRCULAR REFERENCE when job serialized throwable object like this {code:java} 2024-04-07 12:19:01,242 WARN [49126] [org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1112)] - search-ipv6 (298/480)#0 (7c9485e8359657b4da729358270805ea) switched from RUNNING to FAILED with failure cause: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Lost connection to task manager '***'. This indicates that the remote task manager was lost. at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:156) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:821) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:750) Suppressed: java.lang.RuntimeException: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Lost connection to task manager '***'. This indicates that the remote task manager was lost. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:319) at org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:237) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.clear(SpillingAdaptiveSpanningRecordDeserializer.java:138) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.releaseDeserializer(AbstractStreamTaskNetworkInput.java:218) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.close(AbstractStreamTaskNetworkInput.java:210) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.close(StreamTaskNetworkInput.java:117) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.close(StreamOneInputProcessor.java:88) at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.close(StreamMultipleInputProcessor.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInternal(StreamTask.java:977) at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:254) at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:968) at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:938) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:956) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:938) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:749) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ... 1 more Caused by: [CIRCULAR REFERENCE: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Lost connection to task manager '***'. This indicates that the remote task manager was lost.] Caused by: org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer {code} for this task manager, i took a java stack dump, found there was a jvm deadlock: {code:java} Found one Java-level deadlock:============================="search-ipv6 (298/480)#0": waiting to lock monitor 0x00007f4ab4c119b8 (object 0x00000002ed3d1b90, a java.lang.RuntimeException), which is held by "join-finder-dim (298/480)#0""join-finder-dim (298/480)#0": waiting to lock monitor 0x00007f49f77574d8 (object 0x00000002ed2ba668, a org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException), which is held by "search-ipv6 (298/480)#0" Java stack information for the threads listed above:==================================================="search-ipv6 (298/480)#0": at java.lang.Throwable.writeObject(Throwable.java:1008) - waiting to lock <0x00000002ed3d1b90> (a java.lang.RuntimeException) at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at java.util.ArrayList.writeObject(ArrayList.java:768) at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at java.lang.Throwable.writeObject(Throwable.java:1014) - locked <0x00000002ed2ba668> (a org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException) at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) at org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) at org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) at org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:96) at org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:70) at org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:1057) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:855) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:750)"join-finder-dim (298/480)#0": at java.lang.Throwable.writeObject(Throwable.java:1008) - waiting to lock <0x00000002ed2ba668> (a org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException) at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at java.lang.Throwable.writeObject(Throwable.java:1014) - locked <0x00000002ed3d1b90> (a java.lang.RuntimeException) at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) at org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) at org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) at org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) Found 1 deadlock. {code} then I made a simple test case to reproduce this bug: {code:java} import org.apache.flink.util.SerializedThrowable; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; public class Test { public static void main(String[] args) throws InterruptedException { mock(); } private static boolean mock() throws InterruptedException { int threadNum = 2; Throwable throwable = mockThrowable(); throwable.printStackTrace(System.out); CountDownLatch countDownLatch = new CountDownLatch(threadNum); List<Thread> list = new ArrayList<>(); for (int i = 0; i < threadNum; i++) { String threadName = "t" + i; Thread t = new Thread(() -> { try { countDownLatch.await(); SerializedThrowable serializedThrowable = new SerializedThrowable(throwable); } catch (InterruptedException e) { throw new RuntimeException(e); } }); t.setName(threadName); t.start(); countDownLatch.countDown(); list.add(t); } for (Thread thread : list) { thread.join(); } return true; } private static Throwable mockThrowable() { SocketAddress remoteAddr = new InetSocketAddress( 80); RemoteTransportException remoteTransportException = new RemoteTransportException( "Connection unexpectedly closed by remote task manager '" + remoteAddr + "'. " + "This might indicate that the remote task manager was lost.", remoteAddr, new IOException("connection reset by peer.")); RuntimeException runtimeException = new RuntimeException(remoteTransportException); remoteTransportException.addSuppressed(runtimeException); return remoteTransportException; } }{code} was: When a job failover, it may occurr CIRCULAR REFERENCE when job serialized throwable object like this {code:java} 2024-04-07 12:19:01,242 WARN [49126] [org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1112)] - search-ipv6 (298/480)#0 (7c9485e8359657b4da729358270805ea) switched from RUNNING to FAILED with failure cause: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Lost connection to task manager '***'. This indicates that the remote task manager was lost. at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:156) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:821) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:750) Suppressed: java.lang.RuntimeException: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Lost connection to task manager '***'. This indicates that the remote task manager was lost. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:319) at org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:237) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.clear(SpillingAdaptiveSpanningRecordDeserializer.java:138) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.releaseDeserializer(AbstractStreamTaskNetworkInput.java:218) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.close(AbstractStreamTaskNetworkInput.java:210) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.close(StreamTaskNetworkInput.java:117) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.close(StreamOneInputProcessor.java:88) at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.close(StreamMultipleInputProcessor.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInternal(StreamTask.java:977) at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:254) at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:968) at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:938) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:956) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:938) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:749) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ... 1 more Caused by: [CIRCULAR REFERENCE: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Lost connection to task manager '***'. This indicates that the remote task manager was lost.] Caused by: org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer {code} for this task manager, i took a java stack dump, found there was a jvm deadlock: {code:java} Found one Java-level deadlock:============================="search-ipv6 (298/480)#0": waiting to lock monitor 0x00007f4ab4c119b8 (object 0x00000002ed3d1b90, a java.lang.RuntimeException), which is held by "join-finder-dim (298/480)#0""join-finder-dim (298/480)#0": waiting to lock monitor 0x00007f49f77574d8 (object 0x00000002ed2ba668, a org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException), which is held by "search-ipv6 (298/480)#0" Java stack information for the threads listed above:==================================================="search-ipv6 (298/480)#0": at java.lang.Throwable.writeObject(Throwable.java:1008) - waiting to lock <0x00000002ed3d1b90> (a java.lang.RuntimeException) at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at java.util.ArrayList.writeObject(ArrayList.java:768) at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at java.lang.Throwable.writeObject(Throwable.java:1014) - locked <0x00000002ed2ba668> (a org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException) at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) at org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) at org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) at org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:96) at org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:70) at org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:1057) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:855) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:750)"join-finder-dim (298/480)#0": at java.lang.Throwable.writeObject(Throwable.java:1008) - waiting to lock <0x00000002ed2ba668> (a org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException) at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at java.lang.Throwable.writeObject(Throwable.java:1014) - locked <0x00000002ed3d1b90> (a java.lang.RuntimeException) at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) at org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) at org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) at org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) at org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) Found 1 deadlock. {code} then I made a simple test case to reproduce this bug: {code:java} // code placeholder import org.apache.flink.util.SerializedThrowable; import java.io.IOException;import java.net.InetSocketAddress;import java.net.SocketAddress;import java.util.ArrayList;import java.util.List;import java.util.concurrent.CountDownLatch; public class Test { public static void main(String[] args) throws InterruptedException { mock(); } private static boolean mock() throws InterruptedException { int threadNum = 2; Throwable throwable = mockThrowable(); throwable.printStackTrace(System.out); CountDownLatch countDownLatch = new CountDownLatch(threadNum); List<Thread> list = new ArrayList<>(); for (int i = 0; i < threadNum; i++) { String threadName = "t" + i; Thread t = new Thread(() -> { try { countDownLatch.await(); SerializedThrowable serializedThrowable = new SerializedThrowable(throwable); } catch (InterruptedException e) { throw new RuntimeException(e); } }); t.setName(threadName); t.start(); countDownLatch.countDown(); list.add(t); } for (Thread thread : list) { thread.join(); } return true; } private static Throwable mockThrowable() { SocketAddress remoteAddr = new InetSocketAddress( 80); RemoteTransportException remoteTransportException = new RemoteTransportException( "Connection unexpectedly closed by remote task manager '" + remoteAddr + "'. " + "This might indicate that the remote task manager was lost.", remoteAddr, new IOException("connection reset by peer.")); RuntimeException runtimeException = new RuntimeException(remoteTransportException); remoteTransportException.addSuppressed(runtimeException); return remoteTransportException; }} {code} > Found deadlock in SerializedThrowable > ------------------------------------- > > Key: FLINK-36746 > URL: https://issues.apache.org/jira/browse/FLINK-36746 > Project: Flink > Issue Type: Bug > Components: Runtime / Task > Affects Versions: 1.15.0 > Reporter: xiongraorao > Priority: Major > > When a job failover, it may occurr CIRCULAR REFERENCE when job serialized > throwable object like this > > > {code:java} > 2024-04-07 12:19:01,242 WARN [49126] > [org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1112)] > - search-ipv6 (298/480)#0 (7c9485e8359657b4da729358270805ea) switched from > RUNNING to FAILED with failure cause: > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Lost connection to task manager '***'. This indicates that the remote task > manager was lost. > at > org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:156) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:821) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) > at > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at java.lang.Thread.run(Thread.java:750) > Suppressed: java.lang.RuntimeException: > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Lost connection to task manager '***'. This indicates that the remote task > manager was lost. > at > org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:319) > at > org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:237) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.clear(SpillingAdaptiveSpanningRecordDeserializer.java:138) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.releaseDeserializer(AbstractStreamTaskNetworkInput.java:218) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.close(AbstractStreamTaskNetworkInput.java:210) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.close(StreamTaskNetworkInput.java:117) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.close(StreamOneInputProcessor.java:88) > at > org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.close(StreamMultipleInputProcessor.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInternal(StreamTask.java:977) > at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:254) > at > org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) > at > org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:968) > at > org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:938) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:956) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:938) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:749) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > ... 1 more > Caused by: [CIRCULAR REFERENCE: > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Lost connection to task manager '***'. This indicates that the remote task > manager was lost.] > Caused by: > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: > readAddress(..) failed: Connection reset by peer > {code} > > > for this task manager, i took a java stack dump, found there was a jvm > deadlock: > > {code:java} > Found one Java-level deadlock:============================="search-ipv6 > (298/480)#0": waiting to lock monitor 0x00007f4ab4c119b8 (object > 0x00000002ed3d1b90, a java.lang.RuntimeException), which is held by > "join-finder-dim (298/480)#0""join-finder-dim (298/480)#0": waiting to lock > monitor 0x00007f49f77574d8 (object 0x00000002ed2ba668, a > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException), > which is held by "search-ipv6 (298/480)#0" > Java stack information for the threads listed > above:==================================================="search-ipv6 > (298/480)#0": at java.lang.Throwable.writeObject(Throwable.java:1008) - > waiting to lock <0x00000002ed3d1b90> (a java.lang.RuntimeException) at > sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at > java.util.ArrayList.writeObject(ArrayList.java:768) at > sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) > at java.lang.Throwable.writeObject(Throwable.java:1014) - locked > <0x00000002ed2ba668> (a > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException) > at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) > at > org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) > at > org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) > at > org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:96) > at > org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:70) > at > org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:1057) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:855) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at > java.lang.Thread.run(Thread.java:750)"join-finder-dim (298/480)#0": at > java.lang.Throwable.writeObject(Throwable.java:1008) - waiting to lock > <0x00000002ed2ba668> (a > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException) > at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) > at java.lang.Throwable.writeObject(Throwable.java:1014) - locked > <0x00000002ed3d1b90> (a java.lang.RuntimeException) at > sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) > at > org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) > at > org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) > at > org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) > at > org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) > Found 1 deadlock. {code} > > > then I made a simple test case to reproduce this bug: > {code:java} > import org.apache.flink.util.SerializedThrowable; > import java.io.IOException; > import java.net.InetSocketAddress; > import java.net.SocketAddress; > import java.util.ArrayList; > import java.util.List; > import java.util.concurrent.CountDownLatch; > public class Test { > public static void main(String[] args) throws InterruptedException { > mock(); > } > private static boolean mock() throws InterruptedException { > int threadNum = 2; > Throwable throwable = mockThrowable(); > throwable.printStackTrace(System.out); > CountDownLatch countDownLatch = new CountDownLatch(threadNum); > List<Thread> list = new ArrayList<>(); > for (int i = 0; i < threadNum; i++) { > String threadName = "t" + i; > Thread t = new Thread(() -> { > try { > countDownLatch.await(); > SerializedThrowable serializedThrowable = new > SerializedThrowable(throwable); > } catch (InterruptedException e) { > throw new RuntimeException(e); > } > }); > t.setName(threadName); > t.start(); > countDownLatch.countDown(); > list.add(t); > } > for (Thread thread : list) { > thread.join(); > } > return true; > } > private static Throwable mockThrowable() { > SocketAddress remoteAddr = new InetSocketAddress( 80); > RemoteTransportException remoteTransportException = new > RemoteTransportException( > "Connection unexpectedly closed by remote task manager '" > + remoteAddr > + "'. " > + "This might indicate that the remote task manager was > lost.", > remoteAddr, new IOException("connection reset by peer.")); > RuntimeException runtimeException = new > RuntimeException(remoteTransportException); > remoteTransportException.addSuppressed(runtimeException); > return remoteTransportException; > } > }{code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)