[ https://issues.apache.org/jira/browse/FLINK-8887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417115#comment-16417115 ]
ASF GitHub Bot commented on FLINK-8887: --------------------------------------- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5767#discussion_r177684059 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -645,6 +637,56 @@ private void registerOrphanedJobManagerTerminationFuture(CompletableFuture<Void> jobManagerRunnerTerminationFuture)); } + private CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture(JobID jobId) { + final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId); + + if (jobManagerRunner == null) { + return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + } else { + final CompletableFuture<JobMasterGateway> leaderGatewayFuture = jobManagerRunner.getLeaderGatewayFuture(); + return leaderGatewayFuture.thenApplyAsync( + (JobMasterGateway jobMasterGateway) -> { + // check whether the retrieved JobMasterGateway belongs still to a running JobMaster + if (jobManagerRunners.containsKey(jobId)) { + return jobMasterGateway; + } else { + throw new CompletionException(new FlinkJobNotFoundException(jobId)); + } + }, + getMainThreadExecutor()); + } + } + + private <T> List<T> flattenOptionalCollection(Collection<Optional<T>> optionalCollection) { + return optionalCollection.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList()); + } + + @Nonnull + private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>> queryFunction) { + final int numberJobsRunning = jobManagerRunners.size(); + + ArrayList<CompletableFuture<Optional<T>>> optionalJobInformation = new ArrayList<>( + numberJobsRunning); + + for (JobID jobId : jobManagerRunners.keySet()) { + final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId); + + final CompletableFuture<Optional<T>> optionalRequest = jobMasterGatewayFuture + .thenCompose(queryFunction::apply) + .handle( + (T value, Throwable throwable) -> { + if (throwable != null) { --- End diff -- This is equivalent to `.handle((T value, Throwable throwable) -> Optional.ofNullable(value));` > ClusterClient.getJobStatus can throw FencingTokenException > ---------------------------------------------------------- > > Key: FLINK-8887 > URL: https://issues.apache.org/jira/browse/FLINK-8887 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination > Affects Versions: 1.5.0 > Reporter: Gary Yao > Assignee: Till Rohrmann > Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > *Description* > Calling {{RestClusterClient.getJobStatus}} or > {{MiniClusterClient.getJobStatus}} can result in a {{FencingTokenException}}. > *Analysis* > {{Dispatcher.requestJobStatus}} first looks the {{JobManagerRunner}} up by > job id. If a reference is found, {{requestJobStatus}} is called on the > respective instance. If not, the {{ArchivedExecutionGraphStore}} is queried. > However, between the lookup and the method call, the {{JobMaster}} of the > respective job may have lost leadership already (job finished), and has set > the fencing token to {{null}}. > *Stacktrace* > {noformat} > Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: > Fencing token mismatch: Ignoring message LocalFencedMessage(null, > LocalRpcInvocation(requestJobStatus(Time))) because the fencing token null > did not match the expected fencing token b8423c75bc6838244b8c93c8bd4a4f51. > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:73) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {noformat} > {noformat} > Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: > Fencing token not set: Ignoring message LocalFencedMessage(null, > LocalRpcInvocation(requestJobStatus(Time))) because the fencing token is null. > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:56) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)