[
https://issues.apache.org/jira/browse/FLINK-22379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Piotr Nowojski updated FLINK-22379:
-----------------------------------
Description:
It looks like checkpoints are still being declined by not ready tasks:
{noformat}
2021-04-21 12:59:10,192 INFO org.apache.flink.runtime.taskmanager.Task
Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512) switched
from DEPLOYING to INITIALIZING.
2021-04-21 12:59:10,193 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph Co-Keyed-Process
(5/6) (c5873c9cf471a32925b54ed110250512) switched from DEPLOYING to
INITIALIZING.
...
2021-04-21 12:59:10,544 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator Decline checkpoint
12 by task c5873c9cf471a32925b54ed110250512 of job
b643c49a878c1728d0564f194c8e563e at a579506c-cf1b-4a9c-9964-ab51ae9a1a71 @
localhost (dataPort=38367).
org.apache.flink.util.SerializedThrowable: Checkpoint was declined (tasks not
ready)
at
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel.checkpointStarted(RecoveredInputChannel.java:263)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate.checkpointStarted(IndexedInputGate.java:36)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.io.checkpointing.WaitingForFirstBarrierUnaligned.barrierReceived(WaitingForFirstBarrierUnaligned.java:69)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:65)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:228)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:557)
~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at java.lang.Thread.run(Thread.java:834) [?:?]
{noformat}
100ms later in the log…
{noformat}
2021-04-21 12:59:10,611 INFO org.apache.flink.runtime.taskmanager.Task
Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512) switched
from INITIALIZING to RUNNING.
2021-04-21 12:59:10,611 DEBUG
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition
Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512): Creating read view
for subpartition 2 of partition
0e0bca97cc41ba9b4eb0f54995370397#4@c5873c9cf471a32925b54ed110250512.
2021-04-21 12:59:10,612 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph Co-Keyed-Process
(5/6) (c5873c9cf471a32925b54ed110250512) switched from INITIALIZING to RUNNING.
{noformat}
It's because {{checkTasksStarted}} checks that only input tasks(SOURCE) are
ready rather than all tasks are ready. So if inside of
{{org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator#calculateCheckpointPlan}}
replace
{noformat}
checkTasksStarted(result.getTasksToTrigger());
{noformat}
by
{noformat}
checkTasksStarted(result.getTasksToWaitFor());
{noformat}
it should work.
was:
Right now, when JobStatus switches to RUNNING it allows CheckpointCoordinator
to trigger checkpoint which is ok. But unfortunately, JobStatus switches to
RUNNING before TaskState(ExecutionState) switches even to SCHEDULED. And this
leads to several problems, one of them you can see in the log:
{noformat}
WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] -
Failed to trigger checkpoint for job bc943302f92d979824fbc8f4cabc5db3.)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
triggering task Source: EventSource -> Timestamps/Watermarks (1/7) of job
bc943302f92d979824fbc8f4cabc5db3 has not being executed at the moment. Aborting
checkpoint. Failure reason: Not all required tasks are currently running.
at
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_272]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
{noformat}
To avoid this problem, it is a good idea to introduce new JobStatus between
CREATED and RUNNING(RESTORING?). And then:
* JobStatus CREATED switches to RESTORING at the same time when right now
CREATED switches to RUNNING
* JobStatus RESTORING switches to RUNNING when all tasks switched their states
from INITIALIZING to RUNNING
It also makes sense to rename ExecutionState.INITIALIZING to RESTORING in order
to have the same name for job and task.
> Do not trigger checkpoint when non source tasks are INITIALIZING
> ----------------------------------------------------------------
>
> Key: FLINK-22379
> URL: https://issues.apache.org/jira/browse/FLINK-22379
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing
> Affects Versions: 1.13.0
> Reporter: Anton Kalashnikov
> Priority: Major
> Labels: pull-request-available
>
> It looks like checkpoints are still being declined by not ready tasks:
> {noformat}
> 2021-04-21 12:59:10,192 INFO org.apache.flink.runtime.taskmanager.Task
> Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512)
> switched from DEPLOYING to INITIALIZING.
> 2021-04-21 12:59:10,193 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph Co-Keyed-Process
> (5/6) (c5873c9cf471a32925b54ed110250512) switched from DEPLOYING to
> INITIALIZING.
> ...
> 2021-04-21 12:59:10,544 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator Decline
> checkpoint 12 by task c5873c9cf471a32925b54ed110250512 of job
> b643c49a878c1728d0564f194c8e563e at a579506c-cf1b-4a9c-9964-ab51ae9a1a71 @
> localhost (dataPort=38367).
> org.apache.flink.util.SerializedThrowable: Checkpoint was declined (tasks not
> ready)
> at
> org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel.checkpointStarted(RecoveredInputChannel.java:263)
> ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate.checkpointStarted(IndexedInputGate.java:36)
> ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.checkpointing.WaitingForFirstBarrierUnaligned.barrierReceived(WaitingForFirstBarrierUnaligned.java:69)
> ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:65)
> ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:228)
> ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
> ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
> ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)
> ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)
> ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:557)
> ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
> ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
> at java.lang.Thread.run(Thread.java:834) [?:?]
> {noformat}
> 100ms later in the log…
> {noformat}
> 2021-04-21 12:59:10,611 INFO org.apache.flink.runtime.taskmanager.Task
> Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512)
> switched from INITIALIZING to RUNNING.
> 2021-04-21 12:59:10,611 DEBUG
> org.apache.flink.runtime.io.network.partition.PipelinedSubpartition
> Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512): Creating read
> view for subpartition 2 of partition
> 0e0bca97cc41ba9b4eb0f54995370397#4@c5873c9cf471a32925b54ed110250512.
> 2021-04-21 12:59:10,612 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph Co-Keyed-Process
> (5/6) (c5873c9cf471a32925b54ed110250512) switched from INITIALIZING to
> RUNNING.
> {noformat}
> It's because {{checkTasksStarted}} checks that only input tasks(SOURCE) are
> ready rather than all tasks are ready. So if inside of
> {{org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator#calculateCheckpointPlan}}
> replace
> {noformat}
> checkTasksStarted(result.getTasksToTrigger());
> {noformat}
> by
> {noformat}
> checkTasksStarted(result.getTasksToWaitFor());
> {noformat}
> it should work.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)