Rui Fan created FLINK-38804:
-------------------------------

             Summary: The shutdown sequence is incorrect causes TaskManager is 
shutting down
                 Key: FLINK-38804
                 URL: https://issues.apache.org/jira/browse/FLINK-38804
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing
            Reporter: Rui Fan
            Assignee: Rui Fan


h3. The existing logic

When task encounters any exception, {{postFailureCleanUpRegistry.close();}} do 
some cleanups.

[https://github.com/apache/flink/blob/7ba97419d200adef672b67c52c2b5a79f277cfda/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L801]

TaskManager will shutdown if {{postFailureCleanUpRegistry.close();}} meets any 
expetion, it works as expected to avoid poential bugs or resource leaks.
h3. Root cause and solution

The following unexpected exception occurred within 
{{{}postFailureCleanUpRegistry.close();{}}}.

Cause: The shutdown sequence is incorrect. {{RemoteInputChannel}} depends on 
{{ChannelStatePersister}} (which uses 
{{{}ChannelStateWriteRequestExecutorImpl{}}}) to snapshot buffers. However, 
{{ChannelStateWriteRequestExecutorImpl}} is being closed prematurely while 
{{RemoteInputChannel}} is still active. If a new buffer arrives, the active 
{{RemoteInputChannel}} attempts to use the already-closed executor, resulting 
in the failure.

Solution: Ensure the RemoteInputChannel (or remote components) is closed before 
ChannelStatePersister and ChannelStateWriteRequestExecutorImpl.

 

Core stack:

 

{{}}
{code:java}
Caused by: java.lang.IllegalStateException: not running at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:349)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:340)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequestExecutorImpl.java:250)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:284)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:194)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister.maybePersist(ChannelStatePersister.java:112)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.onBuffer(RemoteInputChannel.java:623)
 ~{code}
{{}}

{{{}{}}}Full stack:

{{}}
{code:java}
241672 [7c150f0de176c52cc390e6d10dd12c8d] Task [failing-map (1/3)#0] ERROR 
FATAL - exception in exception handler of task failing-map (1/3)#0 
(e6367521ffc2df19786f2f8c034c0262_b8c789ec3a44294cb45da029ffe0e6fd_0_0). 
java.lang.RuntimeException: java.io.IOException: java.lang.RuntimeException: 
unable to send request to worker at 
org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) 
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19] 
at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:237)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:189)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:111)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:161)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.clear(SpillingAdaptiveSpanningRecordDeserializer.java:140)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.releaseDeserializer(AbstractStreamTaskNetworkInput.java:328)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.close(AbstractStreamTaskNetworkInput.java:320)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.close(StreamTaskNetworkInput.java:142)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.close(StreamOneInputProcessor.java:88)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInternal(StreamTask.java:1110)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257) 
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19] 
at 
org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:83)
 
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19] 
at 
org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
 
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19] 
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:1101)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$2(Task.java:958)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:973)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$3(Task.java:958)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257) 
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19] 
at 
org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:83)
 
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19] 
at 
org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
 
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19] 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:794) 
[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569) 
[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at java.base/java.lang.Thread.run(Thread.java:829) [?:?] Caused by: 
java.io.IOException: java.lang.RuntimeException: unable to send request to 
worker at 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.checkError(InputChannel.java:275)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkPartitionRequestQueueInitialized(RemoteInputChannel.java:883)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyCreditAvailable(RemoteInputChannel.java:375)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:430)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:235)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 ... 23 more Caused by: java.lang.RuntimeException: unable to send request to 
worker at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:287)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:194)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister.maybePersist(ChannelStatePersister.java:112)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.onBuffer(RemoteInputChannel.java:623)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeBufferOrEvent(CreditBasedPartitionRequestClientHandler.java:404)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:297)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:197)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:112)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] ... 1 more Caused by: 
java.lang.IllegalStateException: not running at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:349)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:340)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequestExecutorImpl.java:250)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:284)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:194)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister.maybePersist(ChannelStatePersister.java:112)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.onBuffer(RemoteInputChannel.java:623)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeBufferOrEvent(CreditBasedPartitionRequestClientHandler.java:404)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:297)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:197)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:112)
 
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
 at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] ... 1 more{code}
{{}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to