dmvk commented on a change in pull request #18536:
URL: https://github.com/apache/flink/pull/18536#discussion_r796361359



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code GloballyCleanableResource} is supposed to be used by any class that 
provides artifacts for
+ * a given job that can be cleaned up globally.
+ *
+ * @see LocallyCleanableResource
+ */
+@FunctionalInterface
+public interface GloballyCleanableResource {
+
+    void globalCleanup(JobID jobId) throws Throwable;

Review comment:
       We should use a more strict checked exception here, eg. `IOException` 
sounds reasonable; same for the `Local...`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code GloballyCleanableResource} is supposed to be used by any class that 
provides artifacts for
+ * a given job that can be cleaned up globally.
+ *
+ * @see LocallyCleanableResource
+ */
+@FunctionalInterface
+public interface GloballyCleanableResource {

Review comment:
       nit: it might be useful to elaborate what is the difference between 
local and global cleanup in javadocs (same vith the `Local...` one)
   
   plus  methods are missing javadocs

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
##########
@@ -185,15 +185,15 @@ public boolean get(JobID jobId, BlobKey blobKey, File 
localFile) throws IOExcept
 
         private final Queue<? super CloseOperations> closeOperations;
         private final RunnableWithException internalCleanupRunnable;
-        private final Consumer<JobID> internalJobCleanupConsumer;
+        private final ThrowingConsumer<JobID, Exception> 
internalJobCleanupConsumer;

Review comment:
       signature -> `ThrowingConsumer<JobID, IOException>`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -286,7 +291,12 @@ public void closeAndCleanupAllData() throws Exception {
     }
 
     @Override
-    public void cleanupJobData(JobID jobID) {
-        Optional.ofNullable(jobCleanupFuture).ifPresent(f -> 
f.complete(jobID));
+    public void globalCleanup(JobID jobID) throws Exception {
+        Optional.ofNullable(globalCleanupFuture).ifPresent(f -> 
f.complete(jobID));

Review comment:
       why don't we use a simple null check here? This doesn't really add 
anything on readability or performance. (same for local cleanup)

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -109,12 +114,16 @@ public void 
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
         final JobID jobId = jobGraph.getJobID();
 
         // Construct job graph store.
-        final Error jobGraphRemovalError = new Error("Unable to remove job 
graph.");
+        final Error temporaryError = new Error("Unable to remove job graph.");
+        final AtomicReference<? extends Error> temporaryErrorRef =

Review comment:
       Why error instead of a exception?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -109,12 +114,16 @@ public void 
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
         final JobID jobId = jobGraph.getJobID();
 
         // Construct job graph store.
-        final Error jobGraphRemovalError = new Error("Unable to remove job 
graph.");
+        final Error temporaryError = new Error("Unable to remove job graph.");
+        final AtomicReference<? extends Error> temporaryErrorRef =
+                new AtomicReference<>(temporaryError);
         final TestingJobGraphStore jobGraphStore =
                 TestingJobGraphStore.newBuilder()
-                        .setRemoveJobGraphConsumer(
+                        .setGlobalCleanupConsumer(
                                 graph -> {
-                                    throw jobGraphRemovalError;
+                                    if (temporaryErrorRef.get() != null) {
+                                        throw 
temporaryErrorRef.getAndSet(null);
+                                    }

Review comment:
       ```suggestion
                                       final Error error = 
temporaryErrorRef.getAndSet(null);
                                       if (error != null) {
                                           throw error;
                                       }
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java
##########
@@ -42,5 +41,9 @@
      * @param jobId specifying the job to release the locks for
      * @throws Exception if the locks cannot be released
      */

Review comment:
       should we remove the javadocs?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
##########
@@ -861,61 +869,73 @@ public boolean deletePermanent(JobID jobId, 
PermanentBlobKey key) {
     }
 
     /**
-     * Removes all BLOBs from local and HA store belonging to the given job ID.
+     * Deletes locally stored artifacts for the job represented by the given 
{@link JobID}. This
+     * doesn't touch the job's entry in the {@link BlobStore} to enable 
recovering.
      *
-     * @param jobId ID of the job this blob belongs to
-     * @param cleanupBlobStoreFiles True if the corresponding blob store files 
shall be cleaned up
-     *     as well. Otherwise false.
-     * @return <tt>true</tt> if the job directory is successfully deleted or 
non-existing;
-     *     <tt>false</tt> otherwise
+     * @param jobId The {@code JobID} of the job that is subject to cleanup.
+     * @throws IOException if the cleanup failed.
      */
-    public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) {
+    @Override
+    public void localCleanup(JobID jobId) throws IOException {
         checkNotNull(jobId);
 
         final File jobDir =
                 new File(
                         BlobUtils.getStorageLocationPath(
                                 storageDir.deref().getAbsolutePath(), jobId));
 
+        FileUtils.deleteDirectory(jobDir);
+
+        // NOTE on why blobExpiryTimes are not cleaned up:
+        //       Instead of going through blobExpiryTimes, keep lingering 
entries - they
+        //       will be cleaned up by the timer task which tolerates 
non-existing files
+        //       If inserted again with the same IDs (via put()), the TTL will 
be updated
+        //       again.
+    }
+
+    /**
+     * Removes all BLOBs from local and HA store belonging to the given {@link 
JobID}.
+     *
+     * @param jobId ID of the job this blob belongs to
+     * @throws Exception if the cleanup fails.
+     */
+    @Override
+    public void globalCleanup(JobID jobId) throws Exception {
+        checkNotNull(jobId);
+
         readWriteLock.writeLock().lock();
 
         try {
-            // delete locally
-            boolean deletedLocally = false;
-            try {
-                FileUtils.deleteDirectory(jobDir);
-
-                // NOTE on why blobExpiryTimes are not cleaned up:
-                //       Instead of going through blobExpiryTimes, keep 
lingering entries - they
-                //       will be cleaned up by the timer task which tolerates 
non-existing files
-                //       If inserted again with the same IDs (via put()), the 
TTL will be updated
-                //       again.
+            Exception exception = null;
 
-                deletedLocally = true;
+            try {
+                localCleanup(jobId);
             } catch (IOException e) {
-                LOG.warn(
-                        "Failed to locally delete BLOB storage directory at "
-                                + jobDir.getAbsolutePath(),
-                        e);
+                exception = e;
             }
 
-            // delete in HA blob store files
-            final boolean deletedHA = !cleanupBlobStoreFiles || 
blobStore.deleteAll(jobId);
+            if (!blobStore.deleteAll(jobId)) {
+                exception =
+                        ExceptionUtils.firstOrSuppressed(
+                                new FlinkException(
+                                        "Error while cleaning up the BlobStore 
for job " + jobId),
+                                exception);
+            }
 
-            return deletedLocally && deletedHA;
+            ExceptionUtils.tryRethrowException(exception);
         } finally {
             readWriteLock.writeLock().unlock();
         }
     }
 
-    public void retainJobs(Collection<JobID> jobsToRetain) throws IOException {
+    public void retainJobs(Collection<JobID> jobsToRetain) throws Exception {

Review comment:
       I don't think we should change the signature here. See the comment on 
`GloballyCleanableResource`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
##########
@@ -266,21 +266,22 @@ private void testDeleteBlobFails(@Nullable final JobID 
jobId, BlobKey.BlobType b
     }
 
     @Test
-    public void testJobCleanup() throws IOException {

Review comment:
       again, we should get rid of all these signature changes, they're not 
needed

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
##########
@@ -33,7 +35,8 @@
  * <p>Contains extra logic for adding jobs with tasks, and removing jobs when 
they do not contain
  * tasks any more
  */
-public class JobManagerMetricGroup extends 
ComponentMetricGroup<JobManagerMetricGroup> {
+public class JobManagerMetricGroup extends 
ComponentMetricGroup<JobManagerMetricGroup>
+        implements LocallyCleanableResource, GloballyCleanableResource {

Review comment:
       This makes me think: JobManagerMetricGroup is not really globally 
cleanable -> it doesn't have any globally persistent state, so it shouldn't 
implement `GloballyCleanableResource` 🤔 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
##########
@@ -861,61 +869,73 @@ public boolean deletePermanent(JobID jobId, 
PermanentBlobKey key) {
     }
 
     /**
-     * Removes all BLOBs from local and HA store belonging to the given job ID.
+     * Deletes locally stored artifacts for the job represented by the given 
{@link JobID}. This
+     * doesn't touch the job's entry in the {@link BlobStore} to enable 
recovering.
      *
-     * @param jobId ID of the job this blob belongs to
-     * @param cleanupBlobStoreFiles True if the corresponding blob store files 
shall be cleaned up
-     *     as well. Otherwise false.
-     * @return <tt>true</tt> if the job directory is successfully deleted or 
non-existing;
-     *     <tt>false</tt> otherwise
+     * @param jobId The {@code JobID} of the job that is subject to cleanup.
+     * @throws IOException if the cleanup failed.
      */
-    public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) {
+    @Override
+    public void localCleanup(JobID jobId) throws IOException {
         checkNotNull(jobId);
 
         final File jobDir =
                 new File(
                         BlobUtils.getStorageLocationPath(
                                 storageDir.deref().getAbsolutePath(), jobId));
 
+        FileUtils.deleteDirectory(jobDir);
+
+        // NOTE on why blobExpiryTimes are not cleaned up:
+        //       Instead of going through blobExpiryTimes, keep lingering 
entries - they
+        //       will be cleaned up by the timer task which tolerates 
non-existing files
+        //       If inserted again with the same IDs (via put()), the TTL will 
be updated
+        //       again.

Review comment:
       nit: 
   
   ```suggestion
           // NOTE on why blobExpiryTimes are not cleaned up: Instead of going 
through blobExpiryTimes,
           // keep lingering entries - they will be cleaned up by the timer 
task which tolerates
           // non-existing files If inserted again with the same IDs (via 
put()), the TTL will be
           // updated again.
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -162,34 +162,98 @@ public Dispatcher(
             DispatcherBootstrapFactory dispatcherBootstrapFactory,
             DispatcherServices dispatcherServices)
             throws Exception {
+        this(
+                rpcService,
+                fencingToken,
+                recoveredJobs,
+                recoveredDirtyJobs,
+                dispatcherBootstrapFactory,
+                dispatcherServices,
+                new JobManagerRunnerRegistry(16));
+    }
+
+    private Dispatcher(

Review comment:
       I miss how this change improves testability as we're only introducing 
new private constructors

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerRegistry.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource;
+import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/** {@code JobManagerRunner} collects running jobs represented by {@link 
JobManagerRunner}. */
+public class JobManagerRunnerRegistry

Review comment:
       Same as with Metrics, `JobManagerRunnerRegistry` should really only 
implement the `LocallyCleanableResource` interface as it doesn't have any 
global resources.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -943,17 +945,16 @@ private void cleanUpHighAvailabilityJobData(JobID jobId) {
     private void terminateRunningJobs() {
         log.info("Stopping all currently running jobs of dispatcher {}.", 
getAddress());
 
-        final HashSet<JobID> jobsToRemove = new 
HashSet<>(runningJobs.keySet());
+        final Set<JobID> jobsToRemove = 
jobManagerRunnerRegistry.getRunningJobIds();

Review comment:
       this doesn't share the same semantics as the original code as we no 
longer make a copy of the set. Is that an issue? If not we may want to inline 
this into the for loop.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/** {@code DefaultResourceCleaner} is the default implementation of {@link 
ResourceCleaner}. */
+public class DefaultResourceCleaner implements ResourceCleaner {
+
+    private final Collection<BiFunction<JobID, Executor, 
CompletableFuture<Void>>>
+            prioritizedOrderedJobRelatedCleanups = new ArrayList<>();
+    private final Collection<BiFunction<JobID, Executor, 
CompletableFuture<Void>>>
+            jobRelatedCleanups = new ArrayList<>();
+    private final Executor cleanupExecutor;
+
+    DefaultResourceCleaner(Executor cleanupExecutor) {

Review comment:
       It would be good to also inject the `ComponentMainThreadExecutor` here 
and assert that cleanupAsync is executed in the main thread as some 
implementation rely on it (eg. JobManagerRunnerRegistry)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -162,34 +162,98 @@ public Dispatcher(
             DispatcherBootstrapFactory dispatcherBootstrapFactory,
             DispatcherServices dispatcherServices)
             throws Exception {
+        this(
+                rpcService,
+                fencingToken,
+                recoveredJobs,
+                recoveredDirtyJobs,
+                dispatcherBootstrapFactory,
+                dispatcherServices,
+                new JobManagerRunnerRegistry(16));
+    }
+
+    private Dispatcher(
+            RpcService rpcService,
+            DispatcherId fencingToken,
+            Collection<JobGraph> recoveredJobs,
+            Collection<JobResult> globallyTerminatedJobs,
+            DispatcherBootstrapFactory dispatcherBootstrapFactory,
+            DispatcherServices dispatcherServices,
+            JobManagerRunnerRegistry jobManagerRunnerRegistry)
+            throws Exception {
+        this(
+                rpcService,
+                fencingToken,
+                recoveredJobs,
+                globallyTerminatedJobs,
+                dispatcherServices.getConfiguration(),
+                dispatcherServices.getHighAvailabilityServices(),
+                dispatcherServices.getResourceManagerGatewayRetriever(),
+                dispatcherServices.getHeartbeatServices(),
+                dispatcherServices.getBlobServer(),
+                dispatcherServices.getFatalErrorHandler(),
+                dispatcherServices.getJobGraphWriter(),
+                dispatcherServices.getJobResultStore(),
+                dispatcherServices.getJobManagerMetricGroup(),
+                dispatcherServices.getMetricQueryServiceAddress(),
+                dispatcherServices.getIoExecutor(),
+                dispatcherServices.getHistoryServerArchivist(),
+                dispatcherServices.getArchivedExecutionGraphStore(),
+                dispatcherServices.getJobManagerRunnerFactory(),
+                dispatcherBootstrapFactory,
+                dispatcherServices.getOperationCaches(),
+                jobManagerRunnerRegistry);
+    }
+
+    private Dispatcher(
+            RpcService rpcService,
+            DispatcherId fencingToken,
+            Collection<JobGraph> recoveredJobs,
+            Collection<JobResult> recoveredDirtyJobs,
+            Configuration configuration,
+            HighAvailabilityServices highAvailabilityServices,
+            GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            HeartbeatServices heartbeatServices,
+            BlobServer blobServer,
+            FatalErrorHandler fatalErrorHandler,
+            JobGraphWriter jobGraphWriter,
+            JobResultStore jobResultStore,
+            JobManagerMetricGroup jobManagerMetricGroup,
+            @Nullable String metricServiceQueryAddress,
+            Executor ioExecutor,
+            HistoryServerArchivist historyServerArchivist,
+            ExecutionGraphInfoStore executionGraphInfoStore,
+            JobManagerRunnerFactory jobManagerRunnerFactory,
+            DispatcherBootstrapFactory dispatcherBootstrapFactory,
+            DispatcherOperationCaches dispatcherOperationCaches,
+            JobManagerRunnerRegistry jobManagerRunnerRegistry)
+            throws Exception {
         super(rpcService, RpcServiceUtils.createRandomName(DISPATCHER_NAME), 
fencingToken);
-        checkNotNull(dispatcherServices);
         assertRecoveredJobsAndDirtyJobResults(recoveredJobs, 
recoveredDirtyJobs);
 
-        this.configuration = dispatcherServices.getConfiguration();
-        this.highAvailabilityServices = 
dispatcherServices.getHighAvailabilityServices();
-        this.resourceManagerGatewayRetriever =
-                dispatcherServices.getResourceManagerGatewayRetriever();
-        this.heartbeatServices = dispatcherServices.getHeartbeatServices();
-        this.blobServer = dispatcherServices.getBlobServer();
-        this.fatalErrorHandler = dispatcherServices.getFatalErrorHandler();
-        this.jobGraphWriter = dispatcherServices.getJobGraphWriter();
-        this.jobResultStore = dispatcherServices.getJobResultStore();
-        this.jobManagerMetricGroup = 
dispatcherServices.getJobManagerMetricGroup();
-        this.metricServiceQueryAddress = 
dispatcherServices.getMetricQueryServiceAddress();
-        this.ioExecutor = dispatcherServices.getIoExecutor();
+        this.configuration = checkNotNull(configuration);
+        this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
+        this.resourceManagerGatewayRetriever = 
checkNotNull(resourceManagerGatewayRetriever);
+        this.heartbeatServices = checkNotNull(heartbeatServices);
+        this.blobServer = checkNotNull(blobServer);
+        this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
+        this.jobGraphWriter = checkNotNull(jobGraphWriter);
+        this.jobResultStore = checkNotNull(jobResultStore);
+        this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup);
+        this.metricServiceQueryAddress = metricServiceQueryAddress;
+        this.ioExecutor = checkNotNull(ioExecutor);

Review comment:
       I'm not really a big fan of these safeguards.
   
   - It's hard to keep them in sync when new code is added
   - We should be able to enforce this in compile time simply by checking 
annotations (eg. https://github.com/uber/NullAway)
   - It adds noise
   
   What was the reason for adding these checks?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -169,28 +177,30 @@ public void 
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
         // This will clear internal state of election service, so a new 
contender can register.
         leaderElectionService.stop();
 
+        assertThat(
+                "The JobGraph is still stored in the JobGraphStore.",
+                haServices.getJobGraphStore().getJobIds(),
+                CoreMatchers.is(Collections.singleton(jobId)));
+        assertThat(
+                "The JobResultStore has this job marked as dirty.",
+                haServices.getJobResultStore().getDirtyResults().stream()
+                        .map(JobResult::getJobId)
+                        .collect(Collectors.toSet()),
+                CoreMatchers.is(Collections.singleton(jobId)));
+
         // Run a second dispatcher, that restores our finished job.
         final Dispatcher secondDispatcher = createRecoveredDispatcher(null);
         toTerminate.add(secondDispatcher);
-        final DispatcherGateway secondDispatcherGateway =
-                secondDispatcher.getSelfGateway(DispatcherGateway.class);
+
+        // new Dispatcher becomes new leader
         leaderElectionService.isLeader(UUID.randomUUID());
 
-        // Now make sure that restored job started from checkpoint.
-        final JobMasterGateway secondJobMasterGateway =
-                connectToLeadingJobMaster(leaderElectionService).get();
-        try (final JobMasterTester tester =
-                new JobMasterTester(rpcService, jobId, 
secondJobMasterGateway)) {
-            final CompletableFuture<List<TaskDeploymentDescriptor>> 
descriptorsFuture =
-                    tester.deployVertices(2);
-            awaitStatus(secondDispatcherGateway, jobId, JobStatus.RUNNING);
-            final Optional<JobManagerTaskRestore> maybeRestore =
-                    descriptorsFuture.get().stream()
-                            .map(TaskDeploymentDescriptor::getTaskRestore)
-                            .filter(Objects::nonNull)
-                            .findAny();
-            assertTrue("Job has recovered from checkpoint.", 
maybeRestore.isPresent());
-        }
+        assertThrows(

Review comment:
       should we simply poll the job result from the second dispatcher here?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
##########
@@ -619,34 +618,6 @@ public boolean hasCleanJobResultEntry(JobID jobId) throws 
IOException {
         }
     }
 
-    @Test
-    public void testHABlobsAreNotRemovedIfHAJobGraphRemovalFails() throws 
Exception {

Review comment:
       why is this test no longer relevant?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java
##########
@@ -42,5 +41,9 @@
      * @param jobId specifying the job to release the locks for
      * @throws Exception if the locks cannot be released
      */

Review comment:
       or maybe move it to the implementation

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactoryTest.java
##########
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.dispatcher.JobManagerRunnerRegistry;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.JobResultStore;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.jobmanager.JobGraphWriter;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+import org.apache.flink.util.concurrent.Executors;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * {@code DispatcherResourceCleanerFactoryTest} verifies that the resources 
are properly cleaned up
+ * for both, the {@link GloballyCleanableResource} and {@link 
LocallyCleanableResource} of the
+ * {@link org.apache.flink.runtime.dispatcher.Dispatcher}.
+ */
+public class DispatcherResourceCleanerFactoryTest {
+
+    private static final JobID JOB_ID = new JobID();
+
+    private CleanableJobManagerRegistry jobManagerRunnerRegistry;
+    private CleanableJobGraphWriter jobGraphWriter;
+    private CleanableBlobServer blobServer;
+    private CleanableHighAvailabilityServices highAvailabilityServices;
+    private JobManagerMetricGroup jobManagerMetricGroup;
+
+    private DispatcherResourceCleanerFactory testInstance;
+
+    @BeforeEach
+    public void setup() throws IOException {
+        jobManagerRunnerRegistry = new CleanableJobManagerRegistry();
+        jobGraphWriter = new CleanableJobGraphWriter();
+        blobServer = new CleanableBlobServer();
+        highAvailabilityServices = new CleanableHighAvailabilityServices();
+
+        MetricRegistry metricRegistry = 
TestingMetricRegistry.builder().build();
+        jobManagerMetricGroup =
+                JobManagerMetricGroup.createJobManagerMetricGroup(
+                        metricRegistry, "ignored hostname");
+        jobManagerMetricGroup.addJob(JOB_ID, "ignored job name");
+
+        testInstance =
+                new DispatcherResourceCleanerFactory(
+                        Executors.directExecutor(),
+                        jobManagerRunnerRegistry,
+                        jobGraphWriter,
+                        blobServer,
+                        highAvailabilityServices,
+                        jobManagerMetricGroup);
+    }
+
+    @Test
+    public void testLocalResourceCleaning() {
+        assertGlobalCleanupNotTriggered();
+        assertLocalCleanupNotTriggered();
+        assertJobManagerMetricGroupNotCleaned();
+
+        final CompletableFuture<Void> cleanupResultFuture =
+                testInstance.createLocalResourceCleaner().cleanupAsync(JOB_ID);
+
+        assertGlobalCleanupNotTriggered();
+        assertLocalCleanupTriggeredWaitingForJobManagerRunnerRegistry();
+        assertJobManagerMetricGroupNotCleaned();
+
+        assertThat(cleanupResultFuture).isNotCompleted();
+
+        jobManagerRunnerRegistry.completeLocalCleanup();
+
+        assertGlobalCleanupNotTriggered();
+        assertLocalCleanupTriggered();
+        assertJobManagerMetricGroupCleaned();
+
+        assertThat(cleanupResultFuture).isCompleted();
+    }
+
+    @Test
+    public void testGlobalResourceCleaning()
+            throws ExecutionException, InterruptedException, TimeoutException {
+        assertGlobalCleanupNotTriggered();
+        assertLocalCleanupNotTriggered();
+        assertJobManagerMetricGroupNotCleaned();
+
+        final CompletableFuture<Void> cleanupResultFuture =
+                
testInstance.createGlobalResourceCleaner().cleanupAsync(JOB_ID);
+
+        assertGlobalCleanupTriggeredWaitingForJobManagerRunnerRegistry();
+        assertLocalCleanupNotTriggered();
+        assertJobManagerMetricGroupNotCleaned();
+
+        assertThat(cleanupResultFuture).isNotCompleted();
+
+        jobManagerRunnerRegistry.completeGlobalCleanup();
+
+        assertGlobalCleanupTriggered();
+        assertLocalCleanupNotTriggered();
+        assertJobManagerMetricGroupCleaned();
+
+        assertThat(cleanupResultFuture).isCompleted();
+    }
+
+    private void assertLocalCleanupNotTriggered() {
+        
assertThat(jobManagerRunnerRegistry.getLocalCleanupFuture()).isNotDone();
+        assertThat(jobGraphWriter.getLocalCleanupFuture()).isNotDone();
+        assertThat(blobServer.getLocalCleanupFuture()).isNotDone();
+        
assertThat(highAvailabilityServices.getLocalCleanupFuture()).isNotDone();
+    }
+
+    private void 
assertLocalCleanupTriggeredWaitingForJobManagerRunnerRegistry() {
+        assertThat(jobManagerRunnerRegistry.getLocalCleanupFuture()).isDone();
+
+        // the JobManagerRunnerRegistry needs to be cleaned up first
+        assertThat(jobGraphWriter.getLocalCleanupFuture()).isNotDone();
+        assertThat(blobServer.getLocalCleanupFuture()).isNotDone();
+        
assertThat(highAvailabilityServices.getLocalCleanupFuture()).isNotDone();
+    }
+
+    private void assertGlobalCleanupNotTriggered() {
+        
assertThat(jobManagerRunnerRegistry.getGlobalCleanupFuture()).isNotDone();
+        assertThat(jobGraphWriter.getGlobalCleanupFuture()).isNotDone();
+        assertThat(blobServer.getGlobalCleanupFuture()).isNotDone();
+        
assertThat(highAvailabilityServices.getGlobalCleanupFuture()).isNotDone();
+    }
+
+    private void 
assertGlobalCleanupTriggeredWaitingForJobManagerRunnerRegistry() {
+        assertThat(jobManagerRunnerRegistry.getGlobalCleanupFuture()).isDone();
+
+        // the JobManagerRunnerRegistry needs to be cleaned up first
+        assertThat(jobGraphWriter.getGlobalCleanupFuture()).isNotDone();
+        assertThat(blobServer.getGlobalCleanupFuture()).isNotDone();
+        
assertThat(highAvailabilityServices.getGlobalCleanupFuture()).isNotDone();
+    }
+
+    private void assertJobManagerMetricGroupNotCleaned() {
+        
assertThat(jobManagerMetricGroup.numRegisteredJobMetricGroups()).isEqualTo(1);
+    }
+
+    private void assertLocalCleanupTriggered() {
+        
assertThat(jobManagerRunnerRegistry.getLocalCleanupFuture()).isCompleted();
+        assertThat(jobGraphWriter.getLocalCleanupFuture()).isCompleted();
+        assertThat(blobServer.getLocalCleanupFuture()).isCompleted();
+        
assertThat(highAvailabilityServices.getLocalCleanupFuture()).isCompleted();
+    }
+
+    private void assertGlobalCleanupTriggered() {
+        
assertThat(jobManagerRunnerRegistry.getGlobalCleanupFuture()).isCompleted();
+        assertThat(jobGraphWriter.getGlobalCleanupFuture()).isCompleted();
+        assertThat(blobServer.getGlobalCleanupFuture()).isCompleted();
+        
assertThat(highAvailabilityServices.getGlobalCleanupFuture()).isCompleted();
+    }
+
+    private void assertJobManagerMetricGroupCleaned() {
+        
assertThat(jobManagerMetricGroup.numRegisteredJobMetricGroups()).isEqualTo(0);
+    }
+
+    private static class AbstractTestingCleanableResource
+            implements LocallyCleanableResource, GloballyCleanableResource {
+
+        private final CompletableFuture<JobID> localCleanupFuture = new 
CompletableFuture<>();
+        private final CompletableFuture<JobID> globalCleanupFuture = new 
CompletableFuture<>();
+
+        @Override
+        public void globalCleanup(JobID jobId) {
+            throw new UnsupportedOperationException("Synchronous globalCleanup 
is not supported.");
+        }
+
+        @Override
+        public void localCleanup(JobID jobId) {
+            throw new UnsupportedOperationException("Synchronous localCleanup 
is not supported.");
+        }
+
+        @Override
+        public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor 
ignoredExecutor) {
+            localCleanupFuture.complete(jobId);
+
+            return FutureUtils.completedVoidFuture();
+        }
+
+        @Override
+        public CompletableFuture<Void> globalCleanupAsync(JobID jobId, 
Executor ignoredExecutor) {
+            globalCleanupFuture.complete(jobId);
+
+            return FutureUtils.completedVoidFuture();
+        }
+
+        public CompletableFuture<JobID> getLocalCleanupFuture() {
+            return localCleanupFuture;
+        }
+
+        public CompletableFuture<JobID> getGlobalCleanupFuture() {
+            return globalCleanupFuture;
+        }
+    }
+
+    private static class CleanableJobGraphWriter extends 
AbstractTestingCleanableResource
+            implements JobGraphWriter {
+
+        @Override
+        public void putJobGraph(JobGraph jobGraph) {
+            throw new UnsupportedOperationException("putJobGraph operation not 
supported.");
+        }
+    }
+
+    private static class CleanableHighAvailabilityServices extends 
AbstractTestingCleanableResource

Review comment:
       Why can't we reuse `TestingHighAvailabilityServices` here? Same applies 
to other classes in this test.
   
   The reasoning behind this is very same as with using `mockito`, if you 
decide to change something in the interface you have to go trough all 
implementations one by one.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code GloballyCleanableResource} is supposed to be used by any class that 
provides artifacts for
+ * a given job that can be cleaned up globally.
+ *
+ * @see LocallyCleanableResource
+ */
+@FunctionalInterface
+public interface GloballyCleanableResource {

Review comment:
       also we should add some notes about thread safety guarantees (sync 
method needs to be thread safe)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to