Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5648#discussion_r172775779
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
    @@ -404,16 +404,25 @@ public void start() throws Exception {
                final JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
     
                if (jobManagerRunner != null) {
    -                   return 
jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
    -           } else {
    -                   final JobDetails jobDetails = 
archivedExecutionGraphStore.getAvailableJobDetails(jobId);
    -
    -                   if (jobDetails != null) {
    -                           return 
CompletableFuture.completedFuture(jobDetails.getStatus());
    -                   } else {
    -                           return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
    +                   try {
    +                           return 
jobManagerRunner.getJobManagerGateway().requestJobStatus(timeout);
    --- End diff --
    
    This is an asynchronous call that isn't throwing the exception. You have to 
add a handler to the returned `CompletableFuture`. It also only properly 
resolves one of the exceptions, and IMO shouldn't catch `Exception` but the 
specific exceptions we want the workaround to work for as to not hide other 
issues.
    
    In any case, I'm not sure if adding workarounds to the Dispatcher is the 
right way to go. These issues revealed that some scenarios are not properly 
handled, and I would prefer waiting for @tillrohrmann to really fix this in the 
Dispatcher and related components.
    
    We can temporarily handle both exceptions in the `MiniClusterClient` by 
adding a *single* retry (with a short sleep) if a **specific** exception occurs.


---

Reply via email to