[ 
https://issues.apache.org/jira/browse/FLINK-36746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-36746:
--------------------------------

    Assignee: raoraoxiong

> Found deadlock in SerializedThrowable
> -------------------------------------
>
>                 Key: FLINK-36746
>                 URL: https://issues.apache.org/jira/browse/FLINK-36746
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.15.0
>            Reporter: raoraoxiong
>            Assignee: raoraoxiong
>            Priority: Major
>
> When a job failover, it may occurr CIRCULAR REFERENCE when job serialized 
> throwable object like this
>  
>  
> {code:java}
> 2024-04-07 12:19:01,242 WARN  [49126] 
> [org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1112)]  
> - search-ipv6 (298/480)#0 (7c9485e8359657b4da729358270805ea) switched from 
> RUNNING to FAILED with failure cause: 
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Lost connection to task manager '***'. This indicates that the remote task 
> manager was lost.
>     at 
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:156)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:821)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>     at java.lang.Thread.run(Thread.java:750)
>     Suppressed: java.lang.RuntimeException: 
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Lost connection to task manager '***'. This indicates that the remote task 
> manager was lost.
>         at 
> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:319)
>         at 
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:237)
>         at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
>         at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
>         at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
>         at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
>         at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.clear(SpillingAdaptiveSpanningRecordDeserializer.java:138)
>         at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.releaseDeserializer(AbstractStreamTaskNetworkInput.java:218)
>         at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.close(AbstractStreamTaskNetworkInput.java:210)
>         at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.close(StreamTaskNetworkInput.java:117)
>         at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.close(StreamOneInputProcessor.java:88)
>         at 
> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.close(StreamMultipleInputProcessor.java:105)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInternal(StreamTask.java:977)
>         at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:254)
>         at 
> org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
>         at 
> org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:968)
>         at 
> org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:938)
>         at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:956)
>         at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:938)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:749)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>         ... 1 more
>     Caused by: [CIRCULAR REFERENCE: 
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Lost connection to task manager '***'. This indicates that the remote task 
> manager was lost.]
> Caused by: 
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
>  readAddress(..) failed: Connection reset by peer
> {code}
>  
>  
> for this task manager, i took a java stack dump, found  there was a jvm 
> deadlock:
>  
> {code:java}
> Found one Java-level deadlock:============================="search-ipv6 
> (298/480)#0":  waiting to lock monitor 0x00007f4ab4c119b8 (object 
> 0x00000002ed3d1b90, a java.lang.RuntimeException),  which is held by 
> "join-finder-dim (298/480)#0""join-finder-dim (298/480)#0":  waiting to lock 
> monitor 0x00007f49f77574d8 (object 0x00000002ed2ba668, a 
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException),
>   which is held by "search-ipv6 (298/480)#0"
> Java stack information for the threads listed 
> above:==================================================="search-ipv6 
> (298/480)#0":    at java.lang.Throwable.writeObject(Throwable.java:1008)    - 
> waiting to lock <0x00000002ed3d1b90> (a java.lang.RuntimeException)    at 
> sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source)    at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)    at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154)    
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)   
>  at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)  
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)    
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)    at 
> java.util.ArrayList.writeObject(ArrayList.java:768)    at 
> sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)    at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)    at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154)    
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)   
>  at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)  
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)    
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)   
>  at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)    
> at java.lang.Throwable.writeObject(Throwable.java:1014)    - locked 
> <0x00000002ed2ba668> (a 
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException) 
>    at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source)    at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)    at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154)    
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)   
>  at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)  
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)    
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)    at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72) 
>    at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) 
>    at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) 
>    at 
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) 
>    at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) 
>    at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) 
>    at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) 
>    at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) 
>    at 
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) 
>    at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) 
>    at 
> org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:96)
>     at 
> org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:70)
>     at 
> org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:1057)    
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:855)    at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)    at 
> java.lang.Thread.run(Thread.java:750)"join-finder-dim (298/480)#0":    at 
> java.lang.Throwable.writeObject(Throwable.java:1008)    - waiting to lock 
> <0x00000002ed2ba668> (a 
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException) 
>    at sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source)    at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)    at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154)    
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)   
>  at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)  
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)    
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)   
>  at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)    
> at java.lang.Throwable.writeObject(Throwable.java:1014)    - locked 
> <0x00000002ed3d1b90> (a java.lang.RuntimeException)    at 
> sun.reflect.GeneratedMethodAccessor88.invoke(Unknown Source)    at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)    at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154)    
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)   
>  at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)  
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)    
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)    at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:72) 
>    at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) 
>    at 
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) 
>    at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) 
>    at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) 
>    at 
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) 
>    at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) 
>    at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62) 
>    at 
> org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)
>     at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:97) 
>    at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:93) 
>    at 
> org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
> Found 1 deadlock. {code}
>  
>  
> then I made a simple test case to reproduce this bug:
> {code:java}
> import org.apache.flink.util.SerializedThrowable;
> import java.io.IOException;
> import java.net.InetSocketAddress;
> import java.net.SocketAddress;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.concurrent.CountDownLatch;
> public class Test {
>     public static void main(String[] args) throws InterruptedException {
>         mock();
>     }
>     private static boolean mock() throws InterruptedException {
>         int threadNum = 2;
>         Throwable throwable = mockThrowable();
>         throwable.printStackTrace(System.out);
>         CountDownLatch countDownLatch = new CountDownLatch(threadNum);
>         List<Thread> list = new ArrayList<>();
>         for (int i = 0; i < threadNum; i++) {
>             String threadName = "t" + i;
>             Thread t = new Thread(() -> {
>                 try {
>                     countDownLatch.await();
>                     SerializedThrowable serializedThrowable = new 
> SerializedThrowable(throwable);
>                 } catch (InterruptedException e) {
>                     throw new RuntimeException(e);
>                 }
>             });
>             t.setName(threadName);
>             t.start();
>             countDownLatch.countDown();
>             list.add(t);
>         }
>         for (Thread thread : list) {
>             thread.join();
>         }
>         return true;
>     }
>     private static Throwable mockThrowable() {
>         SocketAddress remoteAddr = new InetSocketAddress( 80);
>         RemoteTransportException remoteTransportException = new 
> RemoteTransportException(
>             "Connection unexpectedly closed by remote task manager '"
>                 + remoteAddr
>                 + "'. "
>                 + "This might indicate that the remote task manager was 
> lost.",
>             remoteAddr, new IOException("connection reset by peer."));
>         RuntimeException runtimeException = new 
> RuntimeException(remoteTransportException);
>         remoteTransportException.addSuppressed(runtimeException);
>         return remoteTransportException;
>     }
> }{code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to