[ 
https://issues.apache.org/jira/browse/FLINK-27257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17538539#comment-17538539
 ] 

Nicholas Jiang commented on FLINK-27257:
----------------------------------------

I have offline discussed with [~wangyang0918]. It's hard to improve the 
isJobRunning implementation because the returned JobStatusMessages of 
ClusterClient#listJobs() don't contain the tasksPerState of the JobDetail, 
which cause that there is no way to judge whether all the ExecutionState of 
tasks are running in the job and for batch tasks how to judge whether the job 
is really running.

The current idea is that if the error is found to be Not all required tasks are 
currently running, then continue to trigger savepoint in the next 
reconciliation until it is successfully triggered.

[~gyfora][~wangyang0918] WDYT?

> Flink kubernetes operator triggers savepoint failed because of not all tasks 
> running
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-27257
>                 URL: https://issues.apache.org/jira/browse/FLINK-27257
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>            Reporter: Yang Wang
>            Assignee: Nicholas Jiang
>            Priority: Major
>             Fix For: kubernetes-operator-1.1.0
>
>
> {code:java}
> 2022-04-15 02:38:56,551 o.a.f.k.o.s.FlinkService       [INFO 
> ][default/flink-example-statemachine] Fetching savepoint result with 
> triggerId: 182d7f176496856d7b33fe2f3767da18
> 2022-04-15 02:38:56,690 o.a.f.k.o.s.FlinkService       
> [ERROR][default/flink-example-statemachine] Savepoint error
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint 
> triggering task Source: Custom Source (1/2) of job 
> 00000000000000000000000000000000 is not being executed at the moment. 
> Aborting checkpoint. Failure reason: Not all required tasks are currently 
> running.
>     at 
> org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:143)
>     at 
> org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:105)
>     at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
>     at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
>     at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at akka.actor.Actor.aroundReceive(Actor.scala:537)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>     at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>     at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> 2022-04-15 02:38:56,693 o.a.f.k.o.o.SavepointObserver  
> [ERROR][default/flink-example-statemachine] Checkpoint triggering task 
> Source: Custom Source (1/2) of job 00000000000000000000000000000000 is not 
> being executed at the moment. Aborting checkpoint. Failure reason: Not all 
> required tasks are currently running. {code}
> How to reproduce?
> Update arbitrary fields(e.g. parallelism) along with 
> {{{}savepointTriggerNonce{}}}.
>  
> The root cause might be the running state return by 
> {{ClusterClient#listJobs()}} does not mean all the tasks are running.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to