[
https://issues.apache.org/jira/browse/FLINK-36746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yun Tang updated FLINK-36746:
-----------------------------
Fix Version/s: 2.3.0
> Found deadlock in SerializedThrowable
> -------------------------------------
>
> Key: FLINK-36746
> URL: https://issues.apache.org/jira/browse/FLINK-36746
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.15.0
> Reporter: raoraoxiong
> Assignee: RocMarshal
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.2.0, 2.3.0
>
>
> When a job failover, it may occurr CIRCULAR REFERENCE when job serialized
> throwable object like this
>
>
> {code:java}
> 2024-04-07 12:19:01,242 WARN [49126]
> [org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1112)]
> - search-ipv6 (298/480)#0 (7c9485e8359657b4da729358270805ea) switched from
> RUNNING to FAILED with failure cause:
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Lost connection to task manager '***'. This indicates that the remote task
> manager was lost.
> at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:156)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:821)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:750)
> Suppressed: java.lang.RuntimeException:
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Lost connection to task manager '***'. This indicates that the remote task
> manager was lost.
> at
> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:319)
> at
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:237)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.clear(SpillingAdaptiveSpanningRecordDeserializer.java:138)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.releaseDeserializer(AbstractStreamTaskNetworkInput.java:218)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.close(AbstractStreamTaskNetworkInput.java:210)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.close(StreamTaskNetworkInput.java:117)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.close(StreamOneInputProcessor.java:88)
> at
> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.close(StreamMultipleInputProcessor.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInternal(StreamTask.java:977)
> at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:254)
> at
> org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
> at
> org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:968)
> at
> org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:938)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:956)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:938)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:749)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> ... 1 more
> Caused by: [CIRCULAR REFERENCE:
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Lost connection to task manager '***'. This indicates that the remote task
> manager was lost.]
> Caused by:
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> readAddress(..) failed: Connection reset by peer
> {code}
>
>
> for this task manager, i took a java stack dump, found there was a jvm
> deadlock:
>
> {code:java}
> Found one Java-level deadlock:============================="search-ipv6
> (298/480)#0":
> waiting to lock monitor 0x00007f4ab4c119b8 (object 0x00000002ed3d1b90, a
> java.lang.RuntimeException), which is held by "join-finder-dim
> (298/480)#0""join-finder-dim (298/480)#0":
> waiting to lock monitor 0x00007f49f77574d8 (object 0x00000002ed2ba668, a
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException),
> which is held by "search-ipv6 (298/480)#0"
> Java stack information for the threads listed above:
> ==================================================="search-ipv6 (298/480)#0":
>
> at java.lang.Throwable.writeObject(Throwable.java:1008)
> - waiting to lock <0x00000002ed3d1b90> (a java.lang.RuntimeException)
> at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.ArrayList.writeObject(ArrayList.java:768)
> at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>
> at
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
> at java.lang.Throwable.writeObject(Throwable.java:1014) -
> locked <0x00000002ed2ba668> (a
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException)
> at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
>
> at
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
>
> at
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
>
> at
> org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:96)
>
> at
> org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:70)
>
> at
> org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:1057)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:855)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:750)"join-finder-dim
> (298/480)#0":
> at java.lang.Throwable.writeObject(Throwable.java:1008)
> - waiting to lock <0x00000002ed2ba668> (a
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException)
> at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown
> Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>
> at
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
> at java.lang.Throwable.writeObject(Throwable.java:1014)
> - locked <0x00000002ed3d1b90> (a java.lang.RuntimeException)
> at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown
> Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
>
> at
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
>
> at
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
>
> at
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93)
>
> at
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> Found 1 deadlock.
> {code}
>
>
> then I made a simple test case to reproduce this bug:
> {code:java}
> import org.apache.flink.util.SerializedThrowable;
> import java.io.IOException;
> import java.net.InetSocketAddress;
> import java.net.SocketAddress;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.concurrent.CountDownLatch;
> public class Test {
> public static void main(String[] args) throws InterruptedException {
> mock();
> }
> private static boolean mock() throws InterruptedException {
> int threadNum = 2;
> Throwable throwable = mockThrowable();
> throwable.printStackTrace(System.out);
> CountDownLatch countDownLatch = new CountDownLatch(threadNum);
> List<Thread> list = new ArrayList<>();
> for (int i = 0; i < threadNum; i++) {
> String threadName = "t" + i;
> Thread t = new Thread(() -> {
> try {
> countDownLatch.await();
> SerializedThrowable serializedThrowable = new
> SerializedThrowable(throwable);
> } catch (InterruptedException e) {
> throw new RuntimeException(e);
> }
> });
> t.setName(threadName);
> t.start();
> countDownLatch.countDown();
> list.add(t);
> }
> for (Thread thread : list) {
> thread.join();
> }
> return true;
> }
> private static Throwable mockThrowable() {
> SocketAddress remoteAddr = new InetSocketAddress( 80);
> RemoteTransportException remoteTransportException = new
> RemoteTransportException(
> "Connection unexpectedly closed by remote task manager '"
> + remoteAddr
> + "'. "
> + "This might indicate that the remote task manager was
> lost.",
> remoteAddr, new IOException("connection reset by peer."));
> RuntimeException runtimeException = new
> RuntimeException(remoteTransportException);
> remoteTransportException.addSuppressed(runtimeException);
> return remoteTransportException;
> }
> }{code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)