Yun Gao created FLINK-23938: ------------------------------- Summary: Do not resume channels if the barrier is received via RPC Key: FLINK-23938 URL: https://issues.apache.org/jira/browse/FLINK-23938 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing Affects Versions: 1.14.0 Reporter: Yun Gao
Currently for a task, if all its predecessors are finished, they would notify the JM and emit EndOfPartition at the same time. If JM first received the notification, then for the next checkpoint, it would directly trigger this task. In this case, the task would fake a barrier for the channels that have not received EndOfPartition yet. This is right based on the current logic since the predecessors would wait till all the pending records are processed before head to finish. However, in this case when processing the barriers, we should not resume the corresponding channels, otherwise the upstream subpartition would throws exception since it is not blocked. {code:java} 28074 55458 [Map -> Map (10/12)#0] WARN org.apache.flink.runtime.taskmanager.Task [] - Map -> Map (10/12)#0 (0e7fa4cb19227c4bba52d11f031178f0) switched from RUNNING to FAILED with failure cause: java.lang.IllegalStateException: Should be blocked by checkpoint. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.resumeConsumption(PipelinedSubpartition.java:381) at org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.resumeConsumption(PipelinedSubpartitionView.java:79) at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.resumeConsumption(LocalInputChannel.java:283) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.resumeConsumption(SingleInputGate.java:857) at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.resumeConsumption(InputGateWithMetrics.java:67) at org.apache.flink.streaming.runtime.io.checkpointing.ChannelState.unblockAllChannels(ChannelState.java:76) at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:70) at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:240) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.checkCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:257) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:239) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerUnfinishedChannelsCheckpoint(StreamTask.java:1201) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$11(StreamTask.java:1118) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:818) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:745) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:784) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:727) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:786) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)