[
https://issues.apache.org/jira/browse/FLINK-38804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Fan updated FLINK-38804:
----------------------------
Description:
h3. The existing logic
When task encounters any exception, {{postFailureCleanUpRegistry.close();}} do
some cleanups.
[https://github.com/apache/flink/blob/7ba97419d200adef672b67c52c2b5a79f277cfda/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L801]
TaskManager will shutdown if {{postFailureCleanUpRegistry.close();}} meets any
expetion, it works as expected to avoid poential bugs or resource leaks.
h3. Root cause and solution
The following unexpected exception occurred within
{{{}postFailureCleanUpRegistry.close();{}}}.
Cause: The shutdown sequence is incorrect. {{RemoteInputChannel}} depends on
{{ChannelStatePersister}} (which uses
{{{}ChannelStateWriteRequestExecutorImpl{}}}) to snapshot buffers. However,
{{ChannelStateWriteRequestExecutorImpl}} is being closed prematurely while
{{RemoteInputChannel}} is still active. If a new buffer arrives, the active
{{RemoteInputChannel}} attempts to use the already-closed executor, resulting
in the failure.
Solution: Ensure the RemoteInputChannel (or remote components) is closed before
ChannelStatePersister and ChannelStateWriteRequestExecutorImpl.
h3. Core stack:
{code:java}
Caused by: java.lang.IllegalStateException: not running
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:349)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:340)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequestExecutorImpl.java:250)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:284)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:194)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister.maybePersist(ChannelStatePersister.java:112)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.onBuffer(RemoteInputChannel.java:623)
~{code}
Full stack:
{code:java}
241672 [7c150f0de176c52cc390e6d10dd12c8d] Task [failing-map (1/3)#0] ERROR
FATAL - exception in exception handler of task failing-map (1/3)#0
(e6367521ffc2df19786f2f8c034c0262_b8c789ec3a44294cb45da029ffe0e6fd_0_0).
java.lang.RuntimeException: java.io.IOException: java.lang.RuntimeException:
unable to send request to worker
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:237)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:189)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:111)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?]
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?]
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:161)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.clear(SpillingAdaptiveSpanningRecordDeserializer.java:140)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.releaseDeserializer(AbstractStreamTaskNetworkInput.java:328)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.close(AbstractStreamTaskNetworkInput.java:320)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.close(StreamTaskNetworkInput.java:142)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.close(StreamOneInputProcessor.java:88)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInternal(StreamTask.java:1110)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:83)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:1101)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$2(Task.java:958)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:973)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$3(Task.java:958)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:83)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:794)
[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.io.IOException: java.lang.RuntimeException: unable to send
request to worker
at
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.checkError(InputChannel.java:275)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkPart{code}
was:
h3. The existing logic
When task encounters any exception, {{postFailureCleanUpRegistry.close();}} do
some cleanups.
[https://github.com/apache/flink/blob/7ba97419d200adef672b67c52c2b5a79f277cfda/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L801]
TaskManager will shutdown if {{postFailureCleanUpRegistry.close();}} meets any
expetion, it works as expected to avoid poential bugs or resource leaks.
h3. Root cause and solution
The following unexpected exception occurred within
{{{}postFailureCleanUpRegistry.close();{}}}.
Cause: The shutdown sequence is incorrect. {{RemoteInputChannel}} depends on
{{ChannelStatePersister}} (which uses
{{{}ChannelStateWriteRequestExecutorImpl{}}}) to snapshot buffers. However,
{{ChannelStateWriteRequestExecutorImpl}} is being closed prematurely while
{{RemoteInputChannel}} is still active. If a new buffer arrives, the active
{{RemoteInputChannel}} attempts to use the already-closed executor, resulting
in the failure.
Solution: Ensure the RemoteInputChannel (or remote components) is closed before
ChannelStatePersister and ChannelStateWriteRequestExecutorImpl.
h3. Core stack:
{code:java}
Caused by: java.lang.IllegalStateException: not running at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:349)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:340)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequestExecutorImpl.java:250)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:284)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:194)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister.maybePersist(ChannelStatePersister.java:112)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.onBuffer(RemoteInputChannel.java:623)
~{code}
Full stack:
{code:java}
241672 [7c150f0de176c52cc390e6d10dd12c8d] Task [failing-map (1/3)#0] ERROR
FATAL - exception in exception handler of task failing-map (1/3)#0
(e6367521ffc2df19786f2f8c034c0262_b8c789ec3a44294cb45da029ffe0e6fd_0_0).
java.lang.RuntimeException: java.io.IOException: java.lang.RuntimeException:
unable to send request to worker at
org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:237)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:189)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:111)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:161)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.clear(SpillingAdaptiveSpanningRecordDeserializer.java:140)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.releaseDeserializer(AbstractStreamTaskNetworkInput.java:328)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.close(AbstractStreamTaskNetworkInput.java:320)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.close(StreamTaskNetworkInput.java:142)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.close(StreamOneInputProcessor.java:88)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInternal(StreamTask.java:1110)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:83)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:1101)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$2(Task.java:958)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:973)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$3(Task.java:958)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:83)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:794)
[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at java.base/java.lang.Thread.run(Thread.java:829) [?:?] Caused by:
java.io.IOException: java.lang.RuntimeException: unable to send request to
worker at
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.checkError(InputChannel.java:275)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkPartitionRequestQueueInitialized(RemoteInputChannel.java:883)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyCreditAvailable(RemoteInputChannel.java:375)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:430)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:235)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
... 23 more Caused by: java.lang.RuntimeException: unable to send request to
worker at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:287)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:194)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister.maybePersist(ChannelStatePersister.java:112)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.onBuffer(RemoteInputChannel.java:623)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeBufferOrEvent(CreditBasedPartitionRequestClientHandler.java:404)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:297)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:197)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:112)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] ... 1 more Caused by:
java.lang.IllegalStateException: not running at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:349)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:340)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequestExecutorImpl.java:250)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:284)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:194)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister.maybePersist(ChannelStatePersister.java:112)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.onBuffer(RemoteInputChannel.java:623)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeBufferOrEvent(CreditBasedPartitionRequestClientHandler.java:404)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:297)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:197)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:112)
~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
~[flink-shaded-netty-4.1.100.Final-20.0.jar:?] ... 1 more{code}
> The shutdown sequence is incorrect causes TaskManager is shutting down
> ----------------------------------------------------------------------
>
> Key: FLINK-38804
> URL: https://issues.apache.org/jira/browse/FLINK-38804
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Reporter: Rui Fan
> Assignee: Rui Fan
> Priority: Major
>
> h3. The existing logic
> When task encounters any exception, {{postFailureCleanUpRegistry.close();}}
> do some cleanups.
> [https://github.com/apache/flink/blob/7ba97419d200adef672b67c52c2b5a79f277cfda/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L801]
> TaskManager will shutdown if {{postFailureCleanUpRegistry.close();}} meets
> any expetion, it works as expected to avoid poential bugs or resource leaks.
> h3. Root cause and solution
> The following unexpected exception occurred within
> {{{}postFailureCleanUpRegistry.close();{}}}.
> Cause: The shutdown sequence is incorrect. {{RemoteInputChannel}} depends on
> {{ChannelStatePersister}} (which uses
> {{{}ChannelStateWriteRequestExecutorImpl{}}}) to snapshot buffers. However,
> {{ChannelStateWriteRequestExecutorImpl}} is being closed prematurely while
> {{RemoteInputChannel}} is still active. If a new buffer arrives, the active
> {{RemoteInputChannel}} attempts to use the already-closed executor, resulting
> in the failure.
> Solution: Ensure the RemoteInputChannel (or remote components) is closed
> before ChannelStatePersister and ChannelStateWriteRequestExecutorImpl.
>
> h3. Core stack:
>
> {code:java}
> Caused by: java.lang.IllegalStateException: not running
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:349)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:340)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequestExecutorImpl.java:250)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:284)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.addInputData(ChannelStateWriterImpl.java:194)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister.maybePersist(ChannelStatePersister.java:112)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.onBuffer(RemoteInputChannel.java:623)
> ~{code}
>
> Full stack:
> {code:java}
> 241672 [7c150f0de176c52cc390e6d10dd12c8d] Task [failing-map (1/3)#0] ERROR
> FATAL - exception in exception handler of task failing-map (1/3)#0
> (e6367521ffc2df19786f2f8c034c0262_b8c789ec3a44294cb45da029ffe0e6fd_0_0).
> java.lang.RuntimeException: java.io.IOException: java.lang.RuntimeException:
> unable to send request to worker
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
> ~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:237)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:189)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:111)
> ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
> ~[flink-shaded-netty-4.1.100.Final-20.0.jar:?]
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:161)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.clear(SpillingAdaptiveSpanningRecordDeserializer.java:140)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.releaseDeserializer(AbstractStreamTaskNetworkInput.java:328)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.close(AbstractStreamTaskNetworkInput.java:320)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.close(StreamTaskNetworkInput.java:142)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.close(StreamOneInputProcessor.java:88)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInternal(StreamTask.java:1110)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257)
> ~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:83)
>
> ~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
>
> ~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:1101)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$2(Task.java:958)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:973)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$3(Task.java:958)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257)
> ~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:83)
>
> ~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
>
> ~[flink-core-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:794)
> [flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569)
> [flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at java.base/java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: java.io.IOException: java.lang.RuntimeException: unable to send
> request to worker
> at
> org.apache.flink.runtime.io.network.partition.consumer.InputChannel.checkError(InputChannel.java:275)
>
> ~[flink-runtime-os-troubleshooting-85b41a8ff19.jar:os-troubleshooting-85b41a8ff19]
> at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkPart{code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)