[
https://issues.apache.org/jira/browse/FLINK-8887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417257#comment-16417257
]
ASF GitHub Bot commented on FLINK-8887:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5767#discussion_r177728701
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
---
@@ -645,6 +637,56 @@ private void
registerOrphanedJobManagerTerminationFuture(CompletableFuture<Void>
jobManagerRunnerTerminationFuture));
}
+ private CompletableFuture<JobMasterGateway>
getJobMasterGatewayFuture(JobID jobId) {
+ final JobManagerRunner jobManagerRunner =
jobManagerRunners.get(jobId);
+
+ if (jobManagerRunner == null) {
+ return FutureUtils.completedExceptionally(new
FlinkJobNotFoundException(jobId));
+ } else {
+ final CompletableFuture<JobMasterGateway>
leaderGatewayFuture = jobManagerRunner.getLeaderGatewayFuture();
+ return leaderGatewayFuture.thenApplyAsync(
+ (JobMasterGateway jobMasterGateway) -> {
+ // check whether the retrieved
JobMasterGateway belongs still to a running JobMaster
+ if
(jobManagerRunners.containsKey(jobId)) {
+ return jobMasterGateway;
+ } else {
+ throw new
CompletionException(new FlinkJobNotFoundException(jobId));
+ }
+ },
+ getMainThreadExecutor());
+ }
+ }
+
+ private <T> List<T> flattenOptionalCollection(Collection<Optional<T>>
optionalCollection) {
+ return
optionalCollection.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
+ }
+
+ @Nonnull
+ private <T> List<CompletableFuture<Optional<T>>>
queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>>
queryFunction) {
+ final int numberJobsRunning = jobManagerRunners.size();
+
+ ArrayList<CompletableFuture<Optional<T>>>
optionalJobInformation = new ArrayList<>(
+ numberJobsRunning);
+
+ for (JobID jobId : jobManagerRunners.keySet()) {
+ final CompletableFuture<JobMasterGateway>
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
+
+ final CompletableFuture<Optional<T>> optionalRequest =
jobMasterGatewayFuture
+ .thenCompose(queryFunction::apply)
+ .handle(
+ (T value, Throwable throwable) -> {
+ if (throwable != null) {
--- End diff --
Good catch. Will change it :-)
> ClusterClient.getJobStatus can throw FencingTokenException
> ----------------------------------------------------------
>
> Key: FLINK-8887
> URL: https://issues.apache.org/jira/browse/FLINK-8887
> Project: Flink
> Issue Type: Bug
> Components: Distributed Coordination
> Affects Versions: 1.5.0
> Reporter: Gary Yao
> Assignee: Till Rohrmann
> Priority: Blocker
> Labels: flip-6
> Fix For: 1.5.0
>
>
> *Description*
> Calling {{RestClusterClient.getJobStatus}} or
> {{MiniClusterClient.getJobStatus}} can result in a {{FencingTokenException}}.
> *Analysis*
> {{Dispatcher.requestJobStatus}} first looks the {{JobManagerRunner}} up by
> job id. If a reference is found, {{requestJobStatus}} is called on the
> respective instance. If not, the {{ArchivedExecutionGraphStore}} is queried.
> However, between the lookup and the method call, the {{JobMaster}} of the
> respective job may have lost leadership already (job finished), and has set
> the fencing token to {{null}}.
> *Stacktrace*
> {noformat}
> Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException:
> Fencing token mismatch: Ignoring message LocalFencedMessage(null,
> LocalRpcInvocation(requestJobStatus(Time))) because the fencing token null
> did not match the expected fencing token b8423c75bc6838244b8c93c8bd4a4f51.
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:73)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> at
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}
> {noformat}
> Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException:
> Fencing token not set: Ignoring message LocalFencedMessage(null,
> LocalRpcInvocation(requestJobStatus(Time))) because the fencing token is null.
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:56)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
> at
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)