dmvk commented on a change in pull request #16535:
URL: https://github.com/apache/flink/pull/16535#discussion_r683346101



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -743,31 +744,43 @@ private void registerJobManagerRunnerTerminationFuture(
 
     private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState 
cleanupJobState) {
         final JobManagerRunner job = checkNotNull(runningJobs.remove(jobId));
-
-        final CompletableFuture<Void> jobTerminationFuture = job.closeAsync();
-
-        return jobTerminationFuture.thenRunAsync(
-                () -> cleanUpJobData(jobId, cleanupJobState.cleanupHAData), 
ioExecutor);
+        return CompletableFuture.supplyAsync(
+                        () -> cleanUpJobGraph(jobId, 
cleanupJobState.cleanupHAData), ioExecutor)
+                .thenCompose(
+                        jobGraphRemoved -> job.closeAsync().thenApply(ignored 
-> jobGraphRemoved))
+                .thenAcceptAsync(
+                        jobGraphRemoved ->
+                                cleanUpRemainingJobData(
+                                        jobId, cleanupJobState.cleanupHAData, 
jobGraphRemoved),
+                        ioExecutor);
     }
 
-    private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
-        jobManagerMetricGroup.removeJob(jobId);
-
-        boolean jobGraphRemoved = false;
+    private boolean cleanUpJobGraph(JobID jobId, boolean cleanupHA) {
         if (cleanupHA) {
             try {
                 jobGraphWriter.removeJobGraph(jobId);
-
                 // only clean up the HA blobs and ha service data for the 
particular job
                 // if we could remove the job from HA storage
-                jobGraphRemoved = true;
+                return true;
             } catch (Exception e) {
                 log.warn(
                         "Could not properly remove job {} from submitted job 
graph store.",
                         jobId,
                         e);
+                return false;
             }
+        }
+        try {
+            jobGraphWriter.releaseJobGraph(jobId);
+        } catch (Exception e) {
+            log.warn("Could not properly release job {} from submitted job 
graph store.", jobId, e);
+        }
+        return false;
+    }
 
+    private void cleanUpRemainingJobData(JobID jobId, boolean cleanupHA, 
boolean jobGraphRemoved) {

Review comment:
       👍 I think that's a reasonable trade-off for now (until FLINK-11813 is 
resolved). If we were not able to remove job graph, we're up for the bigger 
troubles anyway :)
   
   Dispatcher tests seem to pass, so let's see if we introduce any test failure 
in e2e tests by this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to