[
https://issues.apache.org/jira/browse/FLINK-20065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17230099#comment-17230099
]
Till Rohrmann commented on FLINK-20065:
---------------------------------------
I did a bit of investigation and I think I have found the problem. First of all
the change FLINK-20033 introduced was that the {{JobMaster.jobStatusChanged}}
is now called synchronously when the {{ExecutionGraph}} job status changes.
Before we send an asynchronous message which was executed in the
{{JobMaster's}} main thread once the state transition operation in the
{{ExecutionGraph}} has completely finished.
Now here is the problem this change has introduced: When calling stop with
savepoint we only return the savepoint path after the {{ExecutionGraph}} has
reached a terminal state. Moreover, we want to restart the checkpoint
coordinator if the savepoint has failed. Since the savepoint future can be
completed from a different thread than the main thread, we have to splice it
back via calling {{handleAsync}} with the main thread executor:
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java#L921.
Before the {{ExecutionGraph's}} {{terminationFuture}} is completed, we notify
the {{JobStatusListener}} about the job status change. Since the listener
reacts to this message directly (change introduced with FLINK-20033),
{{JobMaster.jobStatusChanged}} will be called immediately. In this method, we
trigger the {{OnCompletionActions.jobReachedGloballyTerminalState}} using the
{{scheduledExecutorService}}. This call back will trigger the shutdown of the
{{JobMaster}}. Now depending how fast the {{scheduledExecutorService}} executes
the callback it can happen that the {{JobMaster}} receives the stop message
before receiving the {{RunAsync}} message responsible for running the
{{handleAsync}}. As a consequence we will never run the {{handleAsync}} and,
hence, never return the savepoint path.
I think the underlying problem is that there is no notion for tasks which need
to be executed before shutting down. One idea could be to make the
{{Scheduler}} responsible for tracking ongoing operations and only to shutdown
after either cancelling them or after they are completed. A slightly less
invasive solution could be to not let the savepoint depend on the
{{handleAsync}} block. That way we could return the savepoint path as soon as
it is ready.
For the time being I would suggest to revert FLINK-20033 for the {{1.12.0}}
release and to fix it properly with the next release.
> UnalignedCheckpointCompatibilityITCase.test failed with AskTimeoutException
> ---------------------------------------------------------------------------
>
> Key: FLINK-20065
> URL: https://issues.apache.org/jira/browse/FLINK-20065
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.12.0, 1.11.3
> Reporter: Dian Fu
> Assignee: Arvid Heise
> Priority: Blocker
> Labels: test-stability
> Fix For: 1.12.0, 1.11.3
>
> Attachments: debug.log, thread.dump
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=9362&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=45cc9205-bdb7-5b54-63cd-89fdc0983323
> {code}
> 2020-11-09T22:19:47.2714024Z [ERROR] test[type: SAVEPOINT, startAligned:
> true](org.apache.flink.test.checkpointing.UnalignedCheckpointCompatibilityITCase)
> Time elapsed: 1.293 s <<< ERROR!
> 2020-11-09T22:19:47.2715260Z java.util.concurrent.ExecutionException:
> java.util.concurrent.TimeoutException: Invocation of public default
> java.util.concurrent.CompletableFuture
> org.apache.flink.runtime.webmonitor.RestfulGateway.stopWithSavepoint(org.apache.flink.api.common.JobID,java.lang.String,boolean,org.apache.flink.api.common.time.Time)
> timed out.
> 2020-11-09T22:19:47.2716743Z at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2020-11-09T22:19:47.2718213Z at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2020-11-09T22:19:47.2719166Z at
> org.apache.flink.test.checkpointing.UnalignedCheckpointCompatibilityITCase.runAndTakeSavepoint(UnalignedCheckpointCompatibilityITCase.java:113)
> 2020-11-09T22:19:47.2720278Z at
> org.apache.flink.test.checkpointing.UnalignedCheckpointCompatibilityITCase.test(UnalignedCheckpointCompatibilityITCase.java:97)
> 2020-11-09T22:19:47.2721126Z at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-11-09T22:19:47.2721771Z at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-11-09T22:19:47.2722773Z at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-11-09T22:19:47.2723479Z at
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-11-09T22:19:47.2724187Z at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-11-09T22:19:47.2725026Z at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-11-09T22:19:47.2725817Z at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-11-09T22:19:47.2726595Z at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-11-09T22:19:47.2727515Z at
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2020-11-09T22:19:47.2728192Z at
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 2020-11-09T22:19:47.2744089Z at
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2020-11-09T22:19:47.2744907Z at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2020-11-09T22:19:47.2745573Z at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2020-11-09T22:19:47.2746037Z at
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-11-09T22:19:47.2746445Z at
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-11-09T22:19:47.2746868Z at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-11-09T22:19:47.2747443Z at
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-11-09T22:19:47.2747876Z at
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2020-11-09T22:19:47.2748297Z at
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 2020-11-09T22:19:47.2748694Z at
> org.junit.runners.Suite.runChild(Suite.java:128)
> 2020-11-09T22:19:47.2749054Z at
> org.junit.runners.Suite.runChild(Suite.java:27)
> 2020-11-09T22:19:47.2749414Z at
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2020-11-09T22:19:47.2749819Z at
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2020-11-09T22:19:47.2750373Z at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2020-11-09T22:19:47.2750923Z at
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2020-11-09T22:19:47.2751555Z at
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2020-11-09T22:19:47.2752148Z at
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 2020-11-09T22:19:47.2752938Z at
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
> 2020-11-09T22:19:47.3085383Z at
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
> 2020-11-09T22:19:47.3086377Z at
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
> 2020-11-09T22:19:47.3087146Z at
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
> 2020-11-09T22:19:47.3088051Z at
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
> 2020-11-09T22:19:47.3088815Z at
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
> 2020-11-09T22:19:47.3089472Z at
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
> 2020-11-09T22:19:47.3090109Z at
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> 2020-11-09T22:19:47.3091501Z Caused by:
> java.util.concurrent.TimeoutException: Invocation of public default
> java.util.concurrent.CompletableFuture
> org.apache.flink.runtime.webmonitor.RestfulGateway.stopWithSavepoint(org.apache.flink.api.common.JobID,java.lang.String,boolean,org.apache.flink.api.common.time.Time)
> timed out.
> 2020-11-09T22:19:47.3092730Z at
> com.sun.proxy.$Proxy33.stopWithSavepoint(Unknown Source)
> 2020-11-09T22:19:47.3093348Z at
> org.apache.flink.runtime.minicluster.MiniCluster.lambda$stopWithSavepoint$9(MiniCluster.java:599)
> 2020-11-09T22:19:47.3094052Z at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> 2020-11-09T22:19:47.3094682Z at
> java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
> 2020-11-09T22:19:47.3095359Z at
> java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
> 2020-11-09T22:19:47.3096070Z at
> org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:621)
> 2020-11-09T22:19:47.3096926Z at
> org.apache.flink.runtime.minicluster.MiniCluster.stopWithSavepoint(MiniCluster.java:599)
> 2020-11-09T22:19:47.3097996Z at
> org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.stopWithSavepoint(PerJobMiniClusterFactory.java:169)
> 2020-11-09T22:19:47.3098985Z at
> org.apache.flink.test.checkpointing.UnalignedCheckpointCompatibilityITCase.runAndTakeSavepoint(UnalignedCheckpointCompatibilityITCase.java:112)
> 2020-11-09T22:19:47.3099640Z ... 36 more
> 2020-11-09T22:19:47.3101706Z Caused by: akka.pattern.AskTimeoutException: Ask
> timed out on [Actor[akka://flink/user/rpc/dispatcher_18#-978053777]] after
> [10000 ms]. Message of type
> [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason
> for `AskTimeoutException` is that the recipient actor didn't send a reply.
> 2020-11-09T22:19:47.3103053Z at
> akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> 2020-11-09T22:19:47.3103640Z at
> akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635)
> 2020-11-09T22:19:47.3104114Z at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
> 2020-11-09T22:19:47.3104521Z at
> akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
> 2020-11-09T22:19:47.3104994Z at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.run(LightArrayRevolverScheduler.scala:337)
> 2020-11-09T22:19:47.3105546Z at
> akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(LightArrayRevolverScheduler.scala:141)
> 2020-11-09T22:19:47.3106140Z at
> akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(LightArrayRevolverScheduler.scala:140)
> 2020-11-09T22:19:47.3106646Z at
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> 2020-11-09T22:19:47.3107072Z at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> 2020-11-09T22:19:47.3107770Z at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 2020-11-09T22:19:47.3108203Z at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 2020-11-09T22:19:47.3108659Z at
> akka.actor.LightArrayRevolverScheduler.close(LightArrayRevolverScheduler.scala:139)
> 2020-11-09T22:19:47.3109107Z at
> akka.actor.ActorSystemImpl.stopScheduler(ActorSystem.scala:937)
> 2020-11-09T22:19:47.3109575Z at
> akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply$mcV$sp(ActorSystem.scala:872)
> 2020-11-09T22:19:47.3110253Z at
> akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply(ActorSystem.scala:872)
> 2020-11-09T22:19:47.3110724Z at
> akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply(ActorSystem.scala:872)
> 2020-11-09T22:19:47.3111176Z at
> akka.actor.ActorSystemImpl$$anon$3.run(ActorSystem.scala:892)
> 2020-11-09T22:19:47.3111666Z at
> akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$addRec$1$1.applyOrElse(ActorSystem.scala:1068)
> 2020-11-09T22:19:47.3112220Z at
> akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$addRec$1$1.applyOrElse(ActorSystem.scala:1068)
> 2020-11-09T22:19:47.3112970Z at
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> 2020-11-09T22:19:47.3113403Z at
> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> 2020-11-09T22:19:47.3113915Z at
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> 2020-11-09T22:19:47.3114373Z at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> 2020-11-09T22:19:47.3114909Z at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> 2020-11-09T22:19:47.3115439Z at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> 2020-11-09T22:19:47.3115961Z at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> 2020-11-09T22:19:47.3116507Z at
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> 2020-11-09T22:19:47.3116953Z at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> 2020-11-09T22:19:47.3117501Z at
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 2020-11-09T22:19:47.3117989Z at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> 2020-11-09T22:19:47.3118536Z at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 2020-11-09T22:19:47.3118985Z at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 2020-11-09T22:19:47.3119440Z at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 2020-11-09T22:19:47.3119872Z at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)