[ 
https://issues.apache.org/jira/browse/FLINK-22379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-22379:
-----------------------------------
    Description: 
It looks like checkpoints are still being declined by not ready tasks:

{noformat}
2021-04-21 12:59:10,192 INFO  org.apache.flink.runtime.taskmanager.Task         
           Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512) switched 
from DEPLOYING to INITIALIZING.
2021-04-21 12:59:10,193 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       Co-Keyed-Process 
(5/6) (c5873c9cf471a32925b54ed110250512) switched from DEPLOYING to 
INITIALIZING.
...
2021-04-21 12:59:10,544 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    Decline checkpoint 
12 by task c5873c9cf471a32925b54ed110250512 of job 
b643c49a878c1728d0564f194c8e563e at a579506c-cf1b-4a9c-9964-ab51ae9a1a71 @ 
localhost (dataPort=38367).
org.apache.flink.util.SerializedThrowable: Checkpoint was declined (tasks not 
ready)
        at 
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel.checkpointStarted(RecoveredInputChannel.java:263)
 ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at 
org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate.checkpointStarted(IndexedInputGate.java:36)
 ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at 
org.apache.flink.streaming.runtime.io.checkpointing.WaitingForFirstBarrierUnaligned.barrierReceived(WaitingForFirstBarrierUnaligned.java:69)
 ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:65)
 ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:228)
 ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
 ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
 ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
 ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
 ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)
 ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)
 ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
 ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:557)
 ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) 
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
        at java.lang.Thread.run(Thread.java:834) [?:?]
{noformat}
100ms later in the log…

{noformat}
2021-04-21 12:59:10,611 INFO  org.apache.flink.runtime.taskmanager.Task         
           Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512) switched 
from INITIALIZING to RUNNING.
2021-04-21 12:59:10,611 DEBUG 
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition 
Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512): Creating read view 
for subpartition 2 of partition 
0e0bca97cc41ba9b4eb0f54995370397#4@c5873c9cf471a32925b54ed110250512.
2021-04-21 12:59:10,612 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       Co-Keyed-Process 
(5/6) (c5873c9cf471a32925b54ed110250512) switched from INITIALIZING to RUNNING.
{noformat}

It's because {{checkTasksStarted}} checks that only input tasks(SOURCE) are 
ready rather than all tasks are ready. So if inside of 
{{org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator#calculateCheckpointPlan}}
replace
{noformat}
checkTasksStarted(result.getTasksToTrigger());
{noformat}
by
{noformat}
checkTasksStarted(result.getTasksToWaitFor());
{noformat}
it should work. 


  was:
Right now, when JobStatus switches to RUNNING it allows CheckpointCoordinator 
to trigger checkpoint which is ok. But unfortunately, JobStatus switches to 
RUNNING before TaskState(ExecutionState) switches even to SCHEDULED. And this 
leads to several problems, one of them you can see in the log:

{noformat}
      WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - 
Failed to trigger checkpoint for job bc943302f92d979824fbc8f4cabc5db3.)
        org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint 
triggering task Source: EventSource -> Timestamps/Watermarks (1/7) of job 
bc943302f92d979824fbc8f4cabc5db3 has not being executed at the moment. Aborting 
checkpoint. Failure reason: Not all required tasks are currently running.
                at 
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at 
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ~[?:1.8.0_272]
                at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
                at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
{noformat}

To avoid this problem, it is a good idea to introduce new JobStatus between 
CREATED and RUNNING(RESTORING?). And then:
 * JobStatus CREATED switches to RESTORING at the same time when right now 
CREATED switches to RUNNING
 * JobStatus RESTORING switches to RUNNING when all tasks switched their states 
from INITIALIZING to RUNNING

It also makes sense to rename ExecutionState.INITIALIZING to RESTORING in order 
to have the same name for job and task.


> Do not trigger checkpoint when non source tasks are INITIALIZING
> ----------------------------------------------------------------
>
>                 Key: FLINK-22379
>                 URL: https://issues.apache.org/jira/browse/FLINK-22379
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.13.0
>            Reporter: Anton Kalashnikov
>            Priority: Major
>              Labels: pull-request-available
>
> It looks like checkpoints are still being declined by not ready tasks:
> {noformat}
> 2021-04-21 12:59:10,192 INFO  org.apache.flink.runtime.taskmanager.Task       
>              Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512) 
> switched from DEPLOYING to INITIALIZING.
> 2021-04-21 12:59:10,193 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       Co-Keyed-Process 
> (5/6) (c5873c9cf471a32925b54ed110250512) switched from DEPLOYING to 
> INITIALIZING.
> ...
> 2021-04-21 12:59:10,544 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    Decline 
> checkpoint 12 by task c5873c9cf471a32925b54ed110250512 of job 
> b643c49a878c1728d0564f194c8e563e at a579506c-cf1b-4a9c-9964-ab51ae9a1a71 @ 
> localhost (dataPort=38367).
> org.apache.flink.util.SerializedThrowable: Checkpoint was declined (tasks not 
> ready)
>         at 
> org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel.checkpointStarted(RecoveredInputChannel.java:263)
>  ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate.checkpointStarted(IndexedInputGate.java:36)
>  ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.WaitingForFirstBarrierUnaligned.barrierReceived(WaitingForFirstBarrierUnaligned.java:69)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:65)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:228)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:557)
>  ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) 
> ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
> ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at java.lang.Thread.run(Thread.java:834) [?:?]
> {noformat}
> 100ms later in the log…
> {noformat}
> 2021-04-21 12:59:10,611 INFO  org.apache.flink.runtime.taskmanager.Task       
>              Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512) 
> switched from INITIALIZING to RUNNING.
> 2021-04-21 12:59:10,611 DEBUG 
> org.apache.flink.runtime.io.network.partition.PipelinedSubpartition 
> Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512): Creating read 
> view for subpartition 2 of partition 
> 0e0bca97cc41ba9b4eb0f54995370397#4@c5873c9cf471a32925b54ed110250512.
> 2021-04-21 12:59:10,612 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       Co-Keyed-Process 
> (5/6) (c5873c9cf471a32925b54ed110250512) switched from INITIALIZING to 
> RUNNING.
> {noformat}
> It's because {{checkTasksStarted}} checks that only input tasks(SOURCE) are 
> ready rather than all tasks are ready. So if inside of 
> {{org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator#calculateCheckpointPlan}}
> replace
> {noformat}
> checkTasksStarted(result.getTasksToTrigger());
> {noformat}
> by
> {noformat}
> checkTasksStarted(result.getTasksToWaitFor());
> {noformat}
> it should work. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to