[ 
https://issues.apache.org/jira/browse/FLINK-23938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404877#comment-17404877
 ] 

Yun Gao commented on FLINK-23938:
---------------------------------

Fixed on master via  f57fc380b8d6ceb0ad11641367759097e0e1b36a

> Do not resume channels if the barrier is received via RPC
> ---------------------------------------------------------
>
>                 Key: FLINK-23938
>                 URL: https://issues.apache.org/jira/browse/FLINK-23938
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.14.0
>            Reporter: Yun Gao
>            Assignee: Yun Gao
>            Priority: Major
>              Labels: pull-request-available
>
> Currently for a task, if all its predecessors are finished, they would notify 
> the JM and emit EndOfPartition at the same time. If JM first received the 
> notification, then for the next checkpoint, it would directly trigger this 
> task. 
> In this case, the task would fake a barrier for the channels that have not 
> received EndOfPartition yet. This is right based on the current logic since 
> the predecessors would wait till all the pending records are processed before 
> head to finish.
> However, in this case when processing the barriers, we should not resume the 
> corresponding channels, otherwise the upstream subpartition would throws 
> exception since it is not blocked. 
> {code:java}
> 28074 55458 [Map -> Map (10/12)#0] WARN  
> org.apache.flink.runtime.taskmanager.Task [] - Map -> Map (10/12)#0 
> (0e7fa4cb19227c4bba52d11f031178f0) switched from RUNNING to FAILED with 
> failure cause: java.lang.IllegalStateException: Should be blocked by 
> checkpoint.
>       at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
>       at 
> org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.resumeConsumption(PipelinedSubpartition.java:381)
>       at 
> org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.resumeConsumption(PipelinedSubpartitionView.java:79)
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.resumeConsumption(LocalInputChannel.java:283)
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.resumeConsumption(SingleInputGate.java:857)
>       at 
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.resumeConsumption(InputGateWithMetrics.java:67)
>       at 
> org.apache.flink.streaming.runtime.io.checkpointing.ChannelState.unblockAllChannels(ChannelState.java:76)
>       at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:70)
>       at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
>       at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:240)
>       at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.checkCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:257)
>       at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:239)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerUnfinishedChannelsCheckpoint(StreamTask.java:1201)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$11(StreamTask.java:1118)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:818)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:745)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:784)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:727)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:786)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572)
>       at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to