Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5767#discussion_r177684059
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
---
@@ -645,6 +637,56 @@ private void
registerOrphanedJobManagerTerminationFuture(CompletableFuture<Void>
jobManagerRunnerTerminationFuture));
}
+ private CompletableFuture<JobMasterGateway>
getJobMasterGatewayFuture(JobID jobId) {
+ final JobManagerRunner jobManagerRunner =
jobManagerRunners.get(jobId);
+
+ if (jobManagerRunner == null) {
+ return FutureUtils.completedExceptionally(new
FlinkJobNotFoundException(jobId));
+ } else {
+ final CompletableFuture<JobMasterGateway>
leaderGatewayFuture = jobManagerRunner.getLeaderGatewayFuture();
+ return leaderGatewayFuture.thenApplyAsync(
+ (JobMasterGateway jobMasterGateway) -> {
+ // check whether the retrieved
JobMasterGateway belongs still to a running JobMaster
+ if
(jobManagerRunners.containsKey(jobId)) {
+ return jobMasterGateway;
+ } else {
+ throw new
CompletionException(new FlinkJobNotFoundException(jobId));
+ }
+ },
+ getMainThreadExecutor());
+ }
+ }
+
+ private <T> List<T> flattenOptionalCollection(Collection<Optional<T>>
optionalCollection) {
+ return
optionalCollection.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
+ }
+
+ @Nonnull
+ private <T> List<CompletableFuture<Optional<T>>>
queryJobMastersForInformation(Function<JobMasterGateway, CompletableFuture<T>>
queryFunction) {
+ final int numberJobsRunning = jobManagerRunners.size();
+
+ ArrayList<CompletableFuture<Optional<T>>>
optionalJobInformation = new ArrayList<>(
+ numberJobsRunning);
+
+ for (JobID jobId : jobManagerRunners.keySet()) {
+ final CompletableFuture<JobMasterGateway>
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
+
+ final CompletableFuture<Optional<T>> optionalRequest =
jobMasterGatewayFuture
+ .thenCompose(queryFunction::apply)
+ .handle(
+ (T value, Throwable throwable) -> {
+ if (throwable != null) {
--- End diff --
This is equivalent to `.handle((T value, Throwable throwable) ->
Optional.ofNullable(value));`
---