Yun Gao created FLINK-23938:
-------------------------------

             Summary: Do not resume channels if the barrier is received via RPC
                 Key: FLINK-23938
                 URL: https://issues.apache.org/jira/browse/FLINK-23938
             Project: Flink
          Issue Type: Sub-task
          Components: Runtime / Checkpointing
    Affects Versions: 1.14.0
            Reporter: Yun Gao


Currently for a task, if all its predecessors are finished, they would notify 
the JM and emit EndOfPartition at the same time. If JM first received the 
notification, then for the next checkpoint, it would directly trigger this 
task. 

In this case, the task would fake a barrier for the channels that have not 
received EndOfPartition yet. This is right based on the current logic since the 
predecessors would wait till all the pending records are processed before head 
to finish.

However, in this case when processing the barriers, we should not resume the 
corresponding channels, otherwise the upstream subpartition would throws 
exception since it is not blocked. 


{code:java}
28074 55458 [Map -> Map (10/12)#0] WARN  
org.apache.flink.runtime.taskmanager.Task [] - Map -> Map (10/12)#0 
(0e7fa4cb19227c4bba52d11f031178f0) switched from RUNNING to FAILED with failure 
cause: java.lang.IllegalStateException: Should be blocked by checkpoint.
        at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
        at 
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.resumeConsumption(PipelinedSubpartition.java:381)
        at 
org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.resumeConsumption(PipelinedSubpartitionView.java:79)
        at 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.resumeConsumption(LocalInputChannel.java:283)
        at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.resumeConsumption(SingleInputGate.java:857)
        at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.resumeConsumption(InputGateWithMetrics.java:67)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.ChannelState.unblockAllChannels(ChannelState.java:76)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:70)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:240)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.checkCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:257)
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:239)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerUnfinishedChannelsCheckpoint(StreamTask.java:1201)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$11(StreamTask.java:1118)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:818)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:745)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:784)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:727)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:786)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:572)
        at java.lang.Thread.run(Thread.java:748)
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to