[
https://issues.apache.org/jira/browse/FLINK-31168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748491#comment-17748491
]
Matthias Pohl edited comment on FLINK-31168 at 7/28/23 9:20 AM:
----------------------------------------------------------------
The most-recent failures seem to have been caused by the job recovery not being
successful:
* [20230711.1 (#51165) Dispatcher #1 output in line
8688|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51165&view=logs&j=a596f69e-60d2-5a4b-7d39-dc69e4cdaed3&t=712ade8c-ca16-5b76-3acd-14df33bc1cb1&l=8688]
* [20230717.1 (#51299) Dispatcher #1 output in line
9918|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51299&view=logs&j=a596f69e-60d2-5a4b-7d39-dc69e4cdaed3&t=712ade8c-ca16-5b76-3acd-14df33bc1cb1&l=9918]
I'm gonna raise the priority of this issue to blocker because it could be
related to the leader election changes.
The job is not picked up anymore and therefore, cannot be saved in the
{{ExecutionGraphInfoStore}}. The job client will wait for the initialization
phase to be over and then requests the JobResult which calls
{{Dispatcher.requestJobStatus}}. {{requestJobStatus}} won't find a
{{JobManagerRunner}} in {{Dispatcher#jobManagerRunnerRegistry}} and non in the
{{Dispatcher#executionGraphInfoStore}} (see
[Dispatcher#requestJobStatus|https://github.com/apache/flink/blob/ab9445aca56e7d139e8fd9bcc23e5f7e06288e66/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L902]).
Therefore, the response future will complete exceptionally with a
{{FlinkJobNotFoundException}} causing the error which we're seeing in the last
two CI failures.
was (Author: mapohl):
The most-recent failures seem to have been caused by the job recovery not being
successful:
* [20230711.1 (#51165) Dispatcher #1 output in line
8688|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51165&view=logs&j=a596f69e-60d2-5a4b-7d39-dc69e4cdaed3&t=712ade8c-ca16-5b76-3acd-14df33bc1cb1&l=8688]
* [20230717.1 (#51299) Dispatcher #1 output in line
9918|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51299&view=logs&j=a596f69e-60d2-5a4b-7d39-dc69e4cdaed3&t=712ade8c-ca16-5b76-3acd-14df33bc1cb1&l=9918]
I'm gonna raise the priority of this issue to blocker because it could be
related to the leader election changes.
The job is not picked up anymore and therefore, cannot be saved in the
{{ExecutionGraphInfoStore}}. The job client will wait for the initialization
phase to be over and then requests the JobResult which calls
{{Dispatcher.requestJobStatus}}. {{requestJobStatus}} won't find a
{{JobManagerRunner}} in {{Dispatcher#jobManagerRunnerRegistry}} and non in the
{{Dispatcher#executionGraphInfoStore}} (see
[Dispatcher#requestJobStatus|https://github.com/apache/flink/blob/ab9445aca56e7d139e8fd9bcc23e5f7e06288e66/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L902].
Therefore, the response future will complete exceptionally with a
{{FlinkJobNotFoundException}} causing the error which we're seeing in the last
two CI failures.
> JobManagerHAProcessFailureRecoveryITCase failed due to job not being found
> --------------------------------------------------------------------------
>
> Key: FLINK-31168
> URL: https://issues.apache.org/jira/browse/FLINK-31168
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.15.3, 1.16.1, 1.18.0
> Reporter: Matthias Pohl
> Assignee: Matthias Pohl
> Priority: Blocker
> Labels: test-stability
> Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46342&view=logs&j=b0a398c0-685b-599c-eb57-c8c2a771138e&t=747432ad-a576-5911-1e2a-68c6bedc248a&l=12706
> We see this build failure because a job couldn't be found:
> {code}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Error while waiting for job to be initialized
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:319)
> at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:958)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
> at
> org.apache.flink.test.recovery.JobManagerHAProcessFailureRecoveryITCase.testJobManagerFailure(JobManagerHAProcessFailureRecoveryITCase.java:235)
> at
> org.apache.flink.test.recovery.JobManagerHAProcessFailureRecoveryITCase$4.run(JobManagerHAProcessFailureRecoveryITCase.java:336)
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Error while waiting for job to be initialized
> at
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
> at
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
> at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
> ... 4 more
> Caused by: java.lang.RuntimeException: Error while waiting for job to be
> initialized
> at
> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160)
> at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82)
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at
> java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:479)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> at
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException:
> [org.apache.flink.runtime.rest.NotFoundException: Job
> 865dcd87f4828dbeb3d93eb52e2636b1 not found
> at
> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:99)
> at
> java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
> at
> java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at
> org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.lambda$getExecutionGraphInternal$0(DefaultExecutionGraphCache.java:109)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:252)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1387)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$1(ClassLoadingUtils.java:93)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)
> at akka.dispatch.OnComplete.internal(Future.scala:299)
> at akka.dispatch.OnComplete.internal(Future.scala:297)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
> at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> at
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could
> not find Flink job (865dcd87f4828dbeb3d93eb52e2636b1)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.requestExecutionGraphInfo(Dispatcher.java:840)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> ... 5 more
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)