Rui Fan created FLINK-38804:
-------------------------------
Summary: The shutdown sequence is incorrect causes TaskManager is
shutting down
Key: FLINK-38804
URL: https://issues.apache.org/jira/browse/FLINK-38804
Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing
Reporter: Rui Fan
Assignee: Rui Fan
h3. The existing logic
When task encounters any exception, {{postFailureCleanUpRegistry.close();}} do
some cleanups.
[https://github.com/apache/flink/blob/7ba97419d200adef672b67c52c2b5a79f277cfda/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L801]
TaskManager will shutdown if {{postFailureCleanUpRegistry.close();}} meets any
expetion, it works as expected to avoid poential bugs or resource leaks.
h3. Root cause and solution
The following unexpected exception occurred within
{{{}postFailureCleanUpRegistry.close();{}}}.
Cause: The shutdown sequence is incorrect. {{RemoteInputChannel}} depends on
{{ChannelStatePersister}} (which uses
{{{}ChannelStateWriteRequestExecutorImpl{}}}) to snapshot buffers. However,
{{ChannelStateWriteRequestExecutorImpl}} is being closed prematurely while
{{RemoteInputChannel}} is still active. If a new buffer arrives, the active
{{RemoteInputChannel}} attempts to use the already-closed executor, resulting
in the failure.
Solution: Ensure the RemoteInputChannel (or remote components) is closed before
ChannelStatePersister and ChannelStateWriteRequestExecutorImpl.
Core stack:
{{}}
{code:java}
Caused by: java.lang.IllegalStateException: not running at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:349)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:340)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequestExecutorImpl.java:250)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:284)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:194)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister.maybePersist(ChannelStatePersister.java:112)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.onBuffer(RemoteInputChannel.java:623)
~{code}
{{}}
{{{}{}}}Full stack:
{{}}
{code:java}
241672 [7c150f0de176c52cc390e6d10dd12c8d] Task [failing-map (1/3)#0] ERROR
FATAL - exception in exception handler of task failing-map (1/3)#0
(e6367521ffc2df19786f2f8c034c0262_b8c789ec3a44294cb45da029ffe0e6fd_0_0).
java.lang.RuntimeException: java.io.IOException: java.lang.RuntimeException:
unable to send request to worker at
org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:237)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:189)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:111)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:161)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.clear(SpillingAdaptiveSpanningRecordDeserializer.java:140)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.releaseDeserializer(AbstractStreamTaskNetworkInput.java:328)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.close(AbstractStreamTaskNetworkInput.java:320)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.close(StreamTaskNetworkInput.java:142)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.close(StreamOneInputProcessor.java:88)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInternal(StreamTask.java:1110)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:83)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:1101)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$2(Task.java:958)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:973)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$3(Task.java:958)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:83)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:794)
[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at java.base/java.lang.Thread.run(Thread.java:829) [?:?] Caused by:
java.io.IOException: java.lang.RuntimeException: unable to send request to
worker at
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.checkError(InputChannel.java:275)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkPartitionRequestQueueInitialized(RemoteInputChannel.java:883)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyCreditAvailable(RemoteInputChannel.java:375)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:430)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:235)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
... 23 more Caused by: java.lang.RuntimeException: unable to send request to
worker at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:287)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:194)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister.maybePersist(ChannelStatePersister.java:112)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.onBuffer(RemoteInputChannel.java:623)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeBufferOrEvent(CreditBasedPartitionRequestClientHandler.java:404)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:297)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:197)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:112)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] ... 1 more Caused by:
java.lang.IllegalStateException: not running at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:349)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:340)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequestExecutorImpl.java:250)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:284)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:194)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister.maybePersist(ChannelStatePersister.java:112)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.onBuffer(RemoteInputChannel.java:623)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeBufferOrEvent(CreditBasedPartitionRequestClientHandler.java:404)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:297)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:197)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:112)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] ... 1 more{code}
{{}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)