Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5648#discussion_r172775779 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -404,16 +404,25 @@ public void start() throws Exception { final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId); if (jobManagerRunner != null) { - return jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout); - } else { - final JobDetails jobDetails = archivedExecutionGraphStore.getAvailableJobDetails(jobId); - - if (jobDetails != null) { - return CompletableFuture.completedFuture(jobDetails.getStatus()); - } else { - return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + try { + return jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout); --- End diff -- This is an asynchronous call that isn't throwing the exception. You have to add a handler to the returned `CompletableFuture`. It also only properly resolves one of the exceptions, and IMO shouldn't catch `Exception` but the specific exceptions we want the workaround to work for as to not hide other issues. In any case, I'm not sure if adding workarounds to the Dispatcher is the right way to go. These issues revealed that some scenarios are not properly handled, and I would prefer waiting for @tillrohrmann to really fix this in the Dispatcher and related components. We can temporarily handle both exceptions in the `MiniClusterClient` by adding a *single* retry (with a short sleep) if a **specific** exception occurs.
---