Anton Kalashnikov created FLINK-22379:
-----------------------------------------
Summary: Introduce a new JobStatus to avoid premature checkpoint
triggering
Key: FLINK-22379
URL: https://issues.apache.org/jira/browse/FLINK-22379
Project: Flink
Issue Type: Improvement
Components: Runtime / Checkpointing
Reporter: Anton Kalashnikov
Right now, when JobStatus switches to RUNNING it allows CheckpointCoordinator
to trigger checkpoint which is ok. But unfortunately, JobStatus switches to
RUNNING before TaskState(ExecutionState) switches even to SCHEDULED. And this
leads to several problems, one of them you can see in the log:
{noformat}
WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Failed
to trigger checkpoint for job
bc943302f92d979824fbc8f4cabc5db3.)org.apache.flink.runtime.checkpoint.CheckpointException:
Checkpoint triggering task Source: EventSource -> Timestamps/Watermarks (1/7)
of job bc943302f92d979824fbc8f4cabc5db3 has not being executed at the moment.
Aborting checkpoint. Failure reason: Not all required tasks are currently
running. at
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_272] at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
akka.actor.Actor$class.aroundReceive(Actor.scala:517)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
akka.actor.ActorCell.invoke(ActorCell.scala:561)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
akka.dispatch.Mailbox.run(Mailbox.scala:225)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
akka.dispatch.Mailbox.exec(Mailbox.scala:235)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]{noformat}
To avoid this problem, it is a good idea to introduce new JobStatus between
CREATED and RUNNING(RESTORING?). And then:
* JobStatus CREATED switches to RESTORING at the same time when right now
CREATED switches to RUNNING
* JobStatus RESTORING switches to RUNNING when all tasks switched their states
from INITIALIZING to RUNNING
It also makes sense to rename ExecutionState.INITIALIZING to RESTORING in order
to have the same name for job and task.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)