Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157489280 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -357,6 +362,28 @@ public void start() throws Exception { return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort()); } + @Override + public CompletableFuture<Either<Throwable, SerializedJobExecutionResult>> getJobExecutionResult( + final JobID jobId, + final Time timeout) { + final Either<Throwable, SerializedJobExecutionResult> jobExecutionResult = + jobExecutionResultCache.get(jobId); + if (jobExecutionResult == null) { + return FutureUtils.completedExceptionally(new JobExecutionResultNotFoundException(jobId)); + } else { + return CompletableFuture.completedFuture(jobExecutionResult); + } + } + + @Override + public CompletableFuture<Boolean> isJobExecutionResultPresent(final JobID jobId, final Time timeout) { + final boolean jobExecutionResultPresent = jobExecutionResultCache.contains(jobId); + if (!jobManagerRunners.containsKey(jobId) && !jobExecutionResultPresent) { + return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + } + return CompletableFuture.completedFuture(jobExecutionResultPresent); --- End diff -- But this would never return a future containing `false`.
---