Yun Gao created FLINK-23938:
-------------------------------
Summary: Do not resume channels if the barrier is received via RPC
Key: FLINK-23938
URL: https://issues.apache.org/jira/browse/FLINK-23938
Project: Flink
Issue Type: Sub-task
Components: Runtime / Checkpointing
Affects Versions: 1.14.0
Reporter: Yun Gao
Currently for a task, if all its predecessors are finished, they would notify
the JM and emit EndOfPartition at the same time. If JM first received the
notification, then for the next checkpoint, it would directly trigger this
task.
In this case, the task would fake a barrier for the channels that have not
received EndOfPartition yet. This is right based on the current logic since the
predecessors would wait till all the pending records are processed before head
to finish.
However, in this case when processing the barriers, we should not resume the
corresponding channels, otherwise the upstream subpartition would throws
exception since it is not blocked.
{code:java}
28074 55458 [Map -> Map (10/12)#0] WARN
org.apache.flink.runtime.taskmanager.Task [] - Map -> Map (10/12)#0
(0e7fa4cb19227c4bba52d11f031178f0) switched from RUNNING to FAILED with failure
cause: java.lang.IllegalStateException: Should be blocked by checkpoint.
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.resumeConsumption(PipelinedSubpartition.java:381)
at
org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.resumeConsumption(PipelinedSubpartitionView.java:79)
at
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.resumeConsumption(LocalInputChannel.java:283)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.resumeConsumption(SingleInputGate.java:857)
at
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.resumeConsumption(InputGateWithMetrics.java:67)
at
org.apache.flink.streaming.runtime.io.checkpointing.ChannelState.unblockAllChannels(ChannelState.java:76)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:70)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:240)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.checkCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:257)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:239)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerUnfinishedChannelsCheckpoint(StreamTask.java:1201)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$11(StreamTask.java:1118)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:818)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:745)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:784)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:727)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:786)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572)
at java.lang.Thread.run(Thread.java:748)
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)