[
https://issues.apache.org/jira/browse/FLINK-23938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404877#comment-17404877
]
Yun Gao commented on FLINK-23938:
---------------------------------
Fixed on master via f57fc380b8d6ceb0ad11641367759097e0e1b36a
> Do not resume channels if the barrier is received via RPC
> ---------------------------------------------------------
>
> Key: FLINK-23938
> URL: https://issues.apache.org/jira/browse/FLINK-23938
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Checkpointing
> Affects Versions: 1.14.0
> Reporter: Yun Gao
> Assignee: Yun Gao
> Priority: Major
> Labels: pull-request-available
>
> Currently for a task, if all its predecessors are finished, they would notify
> the JM and emit EndOfPartition at the same time. If JM first received the
> notification, then for the next checkpoint, it would directly trigger this
> task.
> In this case, the task would fake a barrier for the channels that have not
> received EndOfPartition yet. This is right based on the current logic since
> the predecessors would wait till all the pending records are processed before
> head to finish.
> However, in this case when processing the barriers, we should not resume the
> corresponding channels, otherwise the upstream subpartition would throws
> exception since it is not blocked.
> {code:java}
> 28074 55458 [Map -> Map (10/12)#0] WARN
> org.apache.flink.runtime.taskmanager.Task [] - Map -> Map (10/12)#0
> (0e7fa4cb19227c4bba52d11f031178f0) switched from RUNNING to FAILED with
> failure cause: java.lang.IllegalStateException: Should be blocked by
> checkpoint.
> at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> at
> org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.resumeConsumption(PipelinedSubpartition.java:381)
> at
> org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.resumeConsumption(PipelinedSubpartitionView.java:79)
> at
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.resumeConsumption(LocalInputChannel.java:283)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.resumeConsumption(SingleInputGate.java:857)
> at
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.resumeConsumption(InputGateWithMetrics.java:67)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.ChannelState.unblockAllChannels(ChannelState.java:76)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:70)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:240)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.checkCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:257)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:239)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerUnfinishedChannelsCheckpoint(StreamTask.java:1201)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$11(StreamTask.java:1118)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:818)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:745)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:784)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:727)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:786)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572)
> at java.lang.Thread.run(Thread.java:748)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)