[ 
https://issues.apache.org/jira/browse/FLINK-22379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan resolved FLINK-22379.
---------------------------------------
    Fix Version/s: 1.14.0
       Resolution: Fixed

Merged into master as ae402bc9134201892dac59e0421e5b3834430656.

> Do not trigger checkpoint when non source tasks are INITIALIZING
> ----------------------------------------------------------------
>
>                 Key: FLINK-22379
>                 URL: https://issues.apache.org/jira/browse/FLINK-22379
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.13.0
>            Reporter: Anton Kalashnikov
>            Assignee: Anton Kalashnikov
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.14.0
>
>
> It looks like checkpoints are still being declined by not ready tasks:
> {noformat}
> 2021-04-21 12:59:10,192 INFO  org.apache.flink.runtime.taskmanager.Task       
>              Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512) 
> switched from DEPLOYING to INITIALIZING.
> 2021-04-21 12:59:10,193 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       Co-Keyed-Process 
> (5/6) (c5873c9cf471a32925b54ed110250512) switched from DEPLOYING to 
> INITIALIZING.
> ...
> 2021-04-21 12:59:10,544 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    Decline 
> checkpoint 12 by task c5873c9cf471a32925b54ed110250512 of job 
> b643c49a878c1728d0564f194c8e563e at a579506c-cf1b-4a9c-9964-ab51ae9a1a71 @ 
> localhost (dataPort=38367).
> org.apache.flink.util.SerializedThrowable: Checkpoint was declined (tasks not 
> ready)
>         at 
> org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel.checkpointStarted(RecoveredInputChannel.java:263)
>  ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate.checkpointStarted(IndexedInputGate.java:36)
>  ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.WaitingForFirstBarrierUnaligned.barrierReceived(WaitingForFirstBarrierUnaligned.java:69)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:65)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:228)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:557)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) 
> ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
> ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at java.lang.Thread.run(Thread.java:834) [?:?]
> {noformat}
> 100ms later in the log…
> {noformat}
> 2021-04-21 12:59:10,611 INFO  org.apache.flink.runtime.taskmanager.Task       
>              Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512) 
> switched from INITIALIZING to RUNNING.
> 2021-04-21 12:59:10,611 DEBUG 
> org.apache.flink.runtime.io.network.partition.PipelinedSubpartition 
> Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512): Creating read 
> view for subpartition 2 of partition 
> 0e0bca97cc41ba9b4eb0f54995370397#4@c5873c9cf471a32925b54ed110250512.
> 2021-04-21 12:59:10,612 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       Co-Keyed-Process 
> (5/6) (c5873c9cf471a32925b54ed110250512) switched from INITIALIZING to 
> RUNNING.
> {noformat}
> It's because {{checkTasksStarted}} checks that only input tasks(SOURCE) are 
> ready rather than all tasks are ready. So if inside of 
> {{org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator#calculateCheckpointPlan}}
> replace
> {noformat}
> checkTasksStarted(result.getTasksToTrigger());
> {noformat}
> by
> {noformat}
> checkTasksStarted(result.getTasksToWaitFor());
> {noformat}
> it should work. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to