This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8d27e9ccad70cfd37168e1dec5bbd03386603b3d
Author: Matthias Pohl <[email protected]>
AuthorDate: Wed Dec 15 12:31:31 2021 +0100

    [FLINK-25432][runtime] Refactors JobGraphWriter interface to implement 
LocallyCleanableResource and GloballyCleanableResource
---
 .../flink/runtime/dispatcher/Dispatcher.java       |  4 +-
 .../runtime/dispatcher/SingleJobJobGraphStore.java | 10 ---
 .../runtime/jobmanager/DefaultJobGraphStore.java   | 91 ++++++++++++++++------
 .../flink/runtime/jobmanager/JobGraphWriter.java   | 28 +++----
 .../jobmanager/StandaloneJobGraphStore.java        | 10 ---
 .../runtime/jobmanager/ThrowingJobGraphWriter.java |  7 --
 .../dispatcher/DispatcherFailoverITCase.java       |  5 +-
 .../dispatcher/DispatcherResourceCleanupTest.java  |  2 +-
 .../flink/runtime/dispatcher/DispatcherTest.java   | 41 +++++++---
 .../runtime/dispatcher/NoOpJobGraphWriter.java     |  7 --
 .../runner/SessionDispatcherLeaderProcessTest.java |  5 +-
 .../jobmanager/DefaultJobGraphStoreTest.java       | 19 +++--
 .../jobmanager/StandaloneJobGraphStoreTest.java    |  5 +-
 .../jobmanager/ZooKeeperJobGraphsStoreITCase.java  | 17 ++--
 .../runtime/testutils/TestingJobGraphStore.java    | 48 ++++++------
 15 files changed, 178 insertions(+), 121 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 240ced5..e6b7e8e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -882,7 +882,7 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
     private boolean cleanUpJobGraph(JobID jobId, boolean cleanupHA) {
         if (cleanupHA) {
             try {
-                jobGraphWriter.removeJobGraph(jobId);
+                jobGraphWriter.globalCleanup(jobId);
                 return true;
             } catch (Exception e) {
                 log.warn(
@@ -893,7 +893,7 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
             }
         }
         try {
-            jobGraphWriter.releaseJobGraph(jobId);
+            jobGraphWriter.localCleanup(jobId);
         } catch (Exception e) {
             log.warn("Could not properly release job {} from submitted job 
graph store.", jobId, e);
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobJobGraphStore.java
index f2c450a..b063b4b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobJobGraphStore.java
@@ -65,16 +65,6 @@ public class SingleJobJobGraphStore implements JobGraphStore 
{
     }
 
     @Override
-    public void removeJobGraph(JobID jobId) {
-        // ignore
-    }
-
-    @Override
-    public void releaseJobGraph(JobID jobId) {
-        // ignore
-    }
-
-    @Override
     public Collection<JobID> getJobIds() {
         return Collections.singleton(jobGraph.getJobID());
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java
index 832f9c9..fda4964 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.persistence.StateHandleStore;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +39,9 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -238,42 +242,83 @@ public class DefaultJobGraphStore<R extends 
ResourceVersion<R>>
     }
 
     @Override
-    public void removeJobGraph(JobID jobId) throws Exception {
+    public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor 
executor) {
         checkNotNull(jobId, "Job ID");
-        String name = jobGraphStoreUtil.jobIDToName(jobId);
 
-        LOG.debug("Removing job graph {} from {}.", jobId, 
jobGraphStateHandleStore);
+        return runAsyncWithLockAssertRunning(
+                () -> {
+                    LOG.debug("Removing job graph {} from {}.", jobId, 
jobGraphStateHandleStore);
 
-        synchronized (lock) {
-            verifyIsRunning();
-            if (addedJobGraphs.contains(jobId)) {
-                if (jobGraphStateHandleStore.releaseAndTryRemove(name)) {
-                    addedJobGraphs.remove(jobId);
-                } else {
-                    throw new FlinkException(
+                    if (addedJobGraphs.contains(jobId)) {
+                        final String name = 
jobGraphStoreUtil.jobIDToName(jobId);
+                        releaseAndRemoveOrThrowCompletionException(jobId, 
name);
+
+                        addedJobGraphs.remove(jobId);
+                    }
+
+                    LOG.info("Removed job graph {} from {}.", jobId, 
jobGraphStateHandleStore);
+                },
+                executor);
+    }
+
+    @GuardedBy("lock")
+    private void releaseAndRemoveOrThrowCompletionException(JobID jobId, 
String jobName) {
+        boolean success;
+        try {
+            success = jobGraphStateHandleStore.releaseAndTryRemove(jobName);
+        } catch (Exception e) {
+            throw new CompletionException(e);
+        }
+
+        if (!success) {
+            throw new CompletionException(
+                    new FlinkException(
                             String.format(
                                     "Could not remove job graph with job id %s 
from %s.",
-                                    jobId, jobGraphStateHandleStore));
-                }
-            }
+                                    jobId, jobGraphStateHandleStore)));
         }
-
-        LOG.info("Removed job graph {} from {}.", jobId, 
jobGraphStateHandleStore);
     }
 
+    /**
+     * Releases the locks on the specified {@link JobGraph}.
+     *
+     * <p>Releasing the locks allows that another instance can delete the job 
from the {@link
+     * JobGraphStore}.
+     *
+     * @param jobId specifying the job to release the locks for
+     * @param executor the executor being used for the asynchronous execution 
of the local cleanup.
+     * @returns The cleanup result future.
+     */
     @Override
-    public void releaseJobGraph(JobID jobId) throws Exception {
+    public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor 
executor) {
         checkNotNull(jobId, "Job ID");
 
-        LOG.debug("Releasing job graph {} from {}.", jobId, 
jobGraphStateHandleStore);
+        return runAsyncWithLockAssertRunning(
+                () -> {
+                    LOG.debug("Releasing job graph {} from {}.", jobId, 
jobGraphStateHandleStore);
 
-        synchronized (lock) {
-            verifyIsRunning();
-            
jobGraphStateHandleStore.release(jobGraphStoreUtil.jobIDToName(jobId));
-            addedJobGraphs.remove(jobId);
-        }
+                    
jobGraphStateHandleStore.release(jobGraphStoreUtil.jobIDToName(jobId));
+                    addedJobGraphs.remove(jobId);
 
-        LOG.info("Released job graph {} from {}.", jobId, 
jobGraphStateHandleStore);
+                    LOG.info("Released job graph {} from {}.", jobId, 
jobGraphStateHandleStore);
+                },
+                executor);
+    }
+
+    private CompletableFuture<Void> runAsyncWithLockAssertRunning(
+            ThrowingRunnable<Exception> runnable, Executor executor) {
+        return CompletableFuture.runAsync(
+                () -> {
+                    synchronized (lock) {
+                        verifyIsRunning();
+                        try {
+                            runnable.run();
+                        } catch (Exception e) {
+                            throw new CompletionException(e);
+                        }
+                    }
+                },
+                executor);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java
index 23542a3..bb2c5fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java
@@ -19,10 +19,16 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource;
+import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /** Allows to store and remove job graphs. */
-public interface JobGraphWriter {
+public interface JobGraphWriter extends LocallyCleanableResource, 
GloballyCleanableResource {
     /**
      * Adds the {@link JobGraph} instance.
      *
@@ -30,17 +36,13 @@ public interface JobGraphWriter {
      */
     void putJobGraph(JobGraph jobGraph) throws Exception;
 
-    /** Removes the {@link JobGraph} with the given {@link JobID} if it 
exists. */
-    void removeJobGraph(JobID jobId) throws Exception;
+    @Override
+    default CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor 
executor) {
+        return FutureUtils.completedVoidFuture();
+    }
 
-    /**
-     * Releases the locks on the specified {@link JobGraph}.
-     *
-     * <p>Releasing the locks allows that another instance can delete the job 
from the {@link
-     * JobGraphStore}.
-     *
-     * @param jobId specifying the job to release the locks for
-     * @throws Exception if the locks cannot be released
-     */
-    void releaseJobGraph(JobID jobId) throws Exception;
+    @Override
+    default CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor 
executor) {
+        return FutureUtils.completedVoidFuture();
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStore.java
index 656df2f..f7d8135 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStore.java
@@ -48,16 +48,6 @@ public class StandaloneJobGraphStore implements 
JobGraphStore {
     }
 
     @Override
-    public void removeJobGraph(JobID jobId) {
-        // Nothing to do
-    }
-
-    @Override
-    public void releaseJobGraph(JobID jobId) {
-        // nothing to do
-    }
-
-    @Override
     public Collection<JobID> getJobIds() {
         return Collections.emptyList();
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ThrowingJobGraphWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ThrowingJobGraphWriter.java
index 224b7fb..8bf9aa0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ThrowingJobGraphWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ThrowingJobGraphWriter.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
 /** {@link JobGraphWriter} implementation which does not allow to store {@link 
JobGraph}. */
@@ -29,10 +28,4 @@ public enum ThrowingJobGraphWriter implements JobGraphWriter 
{
     public void putJobGraph(JobGraph jobGraph) {
         throw new UnsupportedOperationException("Cannot store job graphs.");
     }
-
-    @Override
-    public void removeJobGraph(JobID jobId) {}
-
-    @Override
-    public void releaseJobGraph(JobID jobId) {}
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
index ef6be61..0cd5d34 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.TestingJobGraphStore;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.junit.After;
 import org.junit.Before;
@@ -112,8 +113,8 @@ public class DispatcherFailoverITCase extends 
AbstractDispatcherTest {
         final Error jobGraphRemovalError = new Error("Unable to remove job 
graph.");
         final TestingJobGraphStore jobGraphStore =
                 TestingJobGraphStore.newBuilder()
-                        .setRemoveJobGraphConsumer(
-                                graph -> {
+                        .setGlobalCleanupFunction(
+                                (ignoredJobId, ignoredExecutor) -> {
                                     throw jobGraphRemovalError;
                                 })
                         .build();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index 5b84a63..4992c76 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -620,7 +620,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
     public void testHABlobsAreNotRemovedIfHAJobGraphRemovalFails() throws 
Exception {
         jobGraphWriter =
                 TestingJobGraphStore.newBuilder()
-                        .setRemoveJobGraphConsumer(
+                        .setGlobalCleanupConsumer(
                                 ignored -> {
                                     throw new Exception("Failed to Remove 
future");
                                 })
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 97bd0e1..e830a35 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -88,6 +88,7 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TimeUtils;
+import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.assertj.core.api.Assertions;
@@ -751,10 +752,16 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         // Track cleanup - job-graph
         final TestingJobGraphStore jobGraphStore =
                 TestingJobGraphStore.newBuilder()
-                        .setReleaseJobGraphConsumer(
-                                jobId -> 
cleanUpEvents.add(CLEANUP_JOB_GRAPH_RELEASE))
-                        .setRemoveJobGraphConsumer(
-                                jobId -> 
cleanUpEvents.add(CLEANUP_JOB_GRAPH_REMOVE))
+                        .setLocalCleanupFunction(
+                                (jobId, executor) -> {
+                                    
cleanUpEvents.add(CLEANUP_JOB_GRAPH_RELEASE);
+                                    return FutureUtils.completedVoidFuture();
+                                })
+                        .setGlobalCleanupFunction(
+                                (jobId, executor) -> {
+                                    
cleanUpEvents.add(CLEANUP_JOB_GRAPH_REMOVE);
+                                    return FutureUtils.completedVoidFuture();
+                                })
                         .build();
         jobGraphStore.start(null);
         haServices.setJobGraphStore(jobGraphStore);
@@ -909,8 +916,16 @@ public class DispatcherTest extends AbstractDispatcherTest 
{
 
         final TestingJobGraphStore testingJobGraphStore =
                 TestingJobGraphStore.newBuilder()
-                        
.setRemoveJobGraphConsumer(removeJobGraphFuture::complete)
-                        
.setReleaseJobGraphConsumer(releaseJobGraphFuture::complete)
+                        .setGlobalCleanupFunction(
+                                (jobId, executor) -> {
+                                    removeJobGraphFuture.complete(jobId);
+                                    return FutureUtils.completedVoidFuture();
+                                })
+                        .setLocalCleanupFunction(
+                                (jobId, executor) -> {
+                                    releaseJobGraphFuture.complete(jobId);
+                                    return FutureUtils.completedVoidFuture();
+                                })
                         .build();
         testingJobGraphStore.start(null);
 
@@ -1147,10 +1162,16 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         // Track cleanup - job-graph
         final TestingJobGraphStore jobGraphStore =
                 TestingJobGraphStore.newBuilder()
-                        .setReleaseJobGraphConsumer(
-                                jobId -> 
cleanUpEvents.add(CLEANUP_JOB_GRAPH_RELEASE))
-                        .setRemoveJobGraphConsumer(
-                                jobId -> 
cleanUpEvents.add(CLEANUP_JOB_GRAPH_REMOVE))
+                        .setLocalCleanupFunction(
+                                (jobId, executor) -> {
+                                    
cleanUpEvents.add(CLEANUP_JOB_GRAPH_RELEASE);
+                                    return FutureUtils.completedVoidFuture();
+                                })
+                        .setGlobalCleanupFunction(
+                                (jobId, executor) -> {
+                                    
cleanUpEvents.add(CLEANUP_JOB_GRAPH_REMOVE);
+                                    return FutureUtils.completedVoidFuture();
+                                })
                         .build();
         jobGraphStore.start(null);
         haServices.setJobGraphStore(jobGraphStore);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpJobGraphWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpJobGraphWriter.java
index 7a2d156..bc17d94 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpJobGraphWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpJobGraphWriter.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.dispatcher;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.JobGraphWriter;
 
@@ -28,10 +27,4 @@ public enum NoOpJobGraphWriter implements JobGraphWriter {
 
     @Override
     public void putJobGraph(JobGraph jobGraph) throws Exception {}
-
-    @Override
-    public void removeJobGraph(JobID jobId) throws Exception {}
-
-    @Override
-    public void releaseJobGraph(JobID jobId) throws Exception {}
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
index 583c771..f48ca06 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
@@ -422,6 +422,7 @@ public class SessionDispatcherLeaderProcessTest {
         dispatcherServiceFactory =
                 createFactoryBasedOnGenericSupplier(() -> 
testingDispatcherService);
 
+        final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
         try (final SessionDispatcherLeaderProcess dispatcherLeaderProcess =
                 createDispatcherLeaderProcess()) {
             dispatcherLeaderProcess.start();
@@ -430,10 +431,12 @@ public class SessionDispatcherLeaderProcessTest {
             dispatcherLeaderProcess.getDispatcherGateway().get();
 
             // now remove the Job from the JobGraphStore and notify the 
dispatcher service
-            jobGraphStore.removeJobGraph(JOB_GRAPH.getJobID());
+            jobGraphStore.globalCleanupAsync(JOB_GRAPH.getJobID(), 
executorService).join();
             dispatcherLeaderProcess.onRemovedJobGraph(JOB_GRAPH.getJobID());
 
             
assertThat(terminateJobFuture.get()).isEqualTo(JOB_GRAPH.getJobID());
+        } finally {
+            assertThat(executorService.shutdownNow()).isEmpty();
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
index 568e937..d45a39a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
 
 import org.junit.After;
 import org.junit.Before;
@@ -190,7 +191,7 @@ public class DefaultJobGraphStoreTest extends TestLogger {
     }
 
     @Test
-    public void testRemoveJobGraph() throws Exception {
+    public void testGlobalCleanup() throws Exception {
         final CompletableFuture<JobID> removeFuture = new 
CompletableFuture<>();
         final TestingStateHandleStore<JobGraph> stateHandleStore =
                 builder.setAddFunction((ignore, state) -> 
jobGraphStorageHelper.store(state))
@@ -200,20 +201,24 @@ public class DefaultJobGraphStoreTest extends TestLogger {
         final JobGraphStore jobGraphStore = 
createAndStartJobGraphStore(stateHandleStore);
 
         jobGraphStore.putJobGraph(testingJobGraph);
-        jobGraphStore.removeJobGraph(testingJobGraph.getJobID());
+        jobGraphStore
+                .globalCleanupAsync(testingJobGraph.getJobID(), 
Executors.directExecutor())
+                .join();
         final JobID actual = removeFuture.get(timeout, TimeUnit.MILLISECONDS);
         assertThat(actual, is(testingJobGraph.getJobID()));
     }
 
     @Test
-    public void testRemoveJobGraphWithNonExistName() throws Exception {
+    public void testGlobalCleanupWithNonExistName() throws Exception {
         final CompletableFuture<JobID> removeFuture = new 
CompletableFuture<>();
         final TestingStateHandleStore<JobGraph> stateHandleStore =
                 builder.setRemoveFunction(name -> 
removeFuture.complete(JobID.fromHexString(name)))
                         .build();
 
         final JobGraphStore jobGraphStore = 
createAndStartJobGraphStore(stateHandleStore);
-        jobGraphStore.removeJobGraph(testingJobGraph.getJobID());
+        jobGraphStore
+                .globalCleanupAsync(testingJobGraph.getJobID(), 
Executors.directExecutor())
+                .join();
 
         try {
             removeFuture.get(timeout, TimeUnit.MILLISECONDS);
@@ -340,13 +345,15 @@ public class DefaultJobGraphStoreTest extends TestLogger {
     }
 
     @Test
-    public void testReleasingJobGraphShouldReleaseHandle() throws Exception {
+    public void testLocalCleanupShouldReleaseHandle() throws Exception {
         final CompletableFuture<String> releaseFuture = new 
CompletableFuture<>();
         final TestingStateHandleStore<JobGraph> stateHandleStore =
                 builder.setReleaseConsumer(releaseFuture::complete).build();
         final JobGraphStore jobGraphStore = 
createAndStartJobGraphStore(stateHandleStore);
         jobGraphStore.putJobGraph(testingJobGraph);
-        jobGraphStore.releaseJobGraph(testingJobGraph.getJobID());
+        jobGraphStore
+                .localCleanupAsync(testingJobGraph.getJobID(), 
Executors.directExecutor())
+                .join();
 
         final String actual = releaseFuture.get();
         assertThat(actual, is(testingJobGraph.getJobID().toString()));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStoreTest.java
index 9f89d9f..0fdab01 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStoreTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.util.concurrent.Executors;
 
 import org.junit.Test;
 
@@ -31,7 +32,7 @@ public class StandaloneJobGraphStoreTest {
 
     /** Tests that all operations work and don't change the state. */
     @Test
-    public void testNoOps() {
+    public void testNoOps() throws Exception {
         StandaloneJobGraphStore jobGraphs = new StandaloneJobGraphStore();
 
         JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
@@ -41,7 +42,7 @@ public class StandaloneJobGraphStoreTest {
         jobGraphs.putJobGraph(jobGraph);
         assertEquals(0, jobGraphs.getJobIds().size());
 
-        jobGraphs.removeJobGraph(jobGraph.getJobID());
+        jobGraphs.globalCleanupAsync(jobGraph.getJobID(), 
Executors.directExecutor()).join();
         assertEquals(0, jobGraphs.getJobIds().size());
 
         assertNull(jobGraphs.recoverJobGraph(new JobID()));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphsStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphsStoreITCase.java
index 989ee91..cb3e33f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphsStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphsStoreITCase.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
 
 import 
org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
 import 
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.PathChildrenCache;
@@ -130,7 +131,7 @@ public class ZooKeeperJobGraphsStoreITCase extends 
TestLogger {
             verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId));
 
             // Remove
-            jobGraphs.removeJobGraph(jobGraph.getJobID());
+            jobGraphs.globalCleanupAsync(jobGraph.getJobID(), 
Executors.directExecutor()).join();
 
             // Empty state
             assertEquals(0, jobGraphs.getJobIds().size());
@@ -140,7 +141,7 @@ public class ZooKeeperJobGraphsStoreITCase extends 
TestLogger {
             verify(listener, never()).onRemovedJobGraph(any(JobID.class));
 
             // Don't fail if called again
-            jobGraphs.removeJobGraph(jobGraph.getJobID());
+            jobGraphs.globalCleanupAsync(jobGraph.getJobID(), 
Executors.directExecutor()).join();
         } finally {
             jobGraphs.stop();
         }
@@ -193,7 +194,9 @@ public class ZooKeeperJobGraphsStoreITCase extends 
TestLogger {
 
                 verifyJobGraphs(expected.get(jobGraph.getJobID()), jobGraph);
 
-                jobGraphs.removeJobGraph(jobGraph.getJobID());
+                jobGraphs
+                        .globalCleanupAsync(jobGraph.getJobID(), 
Executors.directExecutor())
+                        .join();
             }
 
             // Empty state
@@ -313,7 +316,9 @@ public class ZooKeeperJobGraphsStoreITCase extends 
TestLogger {
         assertThat(recoveredJobGraph, is(notNullValue()));
 
         try {
-            
otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobID());
+            otherSubmittedJobGraphStore
+                    .globalCleanupAsync(recoveredJobGraph.getJobID(), 
Executors.directExecutor())
+                    .join();
             fail(
                     "It should not be possible to remove the JobGraph since 
the first store still has a lock on it.");
         } catch (Exception ignored) {
@@ -323,7 +328,9 @@ public class ZooKeeperJobGraphsStoreITCase extends 
TestLogger {
         submittedJobGraphStore.stop();
 
         // now we should be able to delete the job graph
-        
otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobID());
+        otherSubmittedJobGraphStore
+                .globalCleanupAsync(recoveredJobGraph.getJobID(), 
Executors.directExecutor())
+                .join();
 
         assertThat(
                 
otherSubmittedJobGraphStore.recoverJobGraph(recoveredJobGraph.getJobID()),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobGraphStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobGraphStore.java
index e9a797f..20a4b74 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobGraphStore.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobGraphStore.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.JobGraphStore;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.BiFunctionWithException;
 import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
@@ -35,6 +36,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
 
 /** In-Memory implementation of {@link JobGraphStore} for testing purposes. */
 public class TestingJobGraphStore implements JobGraphStore {
@@ -54,9 +58,9 @@ public class TestingJobGraphStore implements JobGraphStore {
 
     private final ThrowingConsumer<JobGraph, ? extends Exception> 
putJobGraphConsumer;
 
-    private final ThrowingConsumer<JobID, ? extends Exception> 
removeJobGraphConsumer;
+    private final BiFunction<JobID, Executor, CompletableFuture<Void>> 
globalCleanupFunction;
 
-    private final ThrowingConsumer<JobID, ? extends Exception> 
releaseJobGraphConsumer;
+    private final BiFunction<JobID, Executor, CompletableFuture<Void>> 
localCleanupFunction;
 
     private boolean started;
 
@@ -68,16 +72,16 @@ public class TestingJobGraphStore implements JobGraphStore {
             BiFunctionWithException<JobID, Map<JobID, JobGraph>, JobGraph, ? 
extends Exception>
                     recoverJobGraphFunction,
             ThrowingConsumer<JobGraph, ? extends Exception> 
putJobGraphConsumer,
-            ThrowingConsumer<JobID, ? extends Exception> 
removeJobGraphConsumer,
-            ThrowingConsumer<JobID, ? extends Exception> 
releaseJobGraphConsumer,
+            BiFunction<JobID, Executor, CompletableFuture<Void>> 
globalCleanupFunction,
+            BiFunction<JobID, Executor, CompletableFuture<Void>> 
localCleanupFunction,
             Collection<JobGraph> initialJobGraphs) {
         this.startConsumer = startConsumer;
         this.stopRunnable = stopRunnable;
         this.jobIdsFunction = jobIdsFunction;
         this.recoverJobGraphFunction = recoverJobGraphFunction;
         this.putJobGraphConsumer = putJobGraphConsumer;
-        this.removeJobGraphConsumer = removeJobGraphConsumer;
-        this.releaseJobGraphConsumer = releaseJobGraphConsumer;
+        this.globalCleanupFunction = globalCleanupFunction;
+        this.localCleanupFunction = localCleanupFunction;
 
         for (JobGraph initialJobGraph : initialJobGraphs) {
             storedJobs.put(initialJobGraph.getJobID(), initialJobGraph);
@@ -110,16 +114,15 @@ public class TestingJobGraphStore implements 
JobGraphStore {
     }
 
     @Override
-    public synchronized void removeJobGraph(JobID jobId) throws Exception {
+    public synchronized CompletableFuture<Void> globalCleanupAsync(JobID 
jobId, Executor executor) {
         verifyIsStarted();
-        removeJobGraphConsumer.accept(jobId);
-        storedJobs.remove(jobId);
+        return globalCleanupFunction.apply(jobId, executor).thenRun(() -> 
storedJobs.remove(jobId));
     }
 
     @Override
-    public synchronized void releaseJobGraph(JobID jobId) throws Exception {
+    public synchronized CompletableFuture<Void> localCleanupAsync(JobID jobId, 
Executor executor) {
         verifyIsStarted();
-        releaseJobGraphConsumer.accept(jobId);
+        return localCleanupFunction.apply(jobId, executor);
     }
 
     @Override
@@ -156,10 +159,11 @@ public class TestingJobGraphStore implements 
JobGraphStore {
 
         private ThrowingConsumer<JobGraph, ? extends Exception> 
putJobGraphConsumer = ignored -> {};
 
-        private ThrowingConsumer<JobID, ? extends Exception> 
removeJobGraphConsumer = ignored -> {};
+        private BiFunction<JobID, Executor, CompletableFuture<Void>> 
globalCleanupFunction =
+                (ignoredJobId, ignoredExecutor) -> 
FutureUtils.completedVoidFuture();
 
-        private ThrowingConsumer<JobID, ? extends Exception> 
releaseJobGraphConsumer =
-                ignored -> {};
+        private BiFunction<JobID, Executor, CompletableFuture<Void>> 
localCleanupFunction =
+                (ignoredJobId, ignoredExecutor) -> 
FutureUtils.completedVoidFuture();
 
         private Collection<JobGraph> initialJobGraphs = 
Collections.emptyList();
 
@@ -198,15 +202,15 @@ public class TestingJobGraphStore implements 
JobGraphStore {
             return this;
         }
 
-        public Builder setRemoveJobGraphConsumer(
-                ThrowingConsumer<JobID, ? extends Exception> 
removeJobGraphConsumer) {
-            this.removeJobGraphConsumer = removeJobGraphConsumer;
+        public Builder setGlobalCleanupFunction(
+                BiFunction<JobID, Executor, CompletableFuture<Void>> 
globalCleanupFunction) {
+            this.globalCleanupFunction = globalCleanupFunction;
             return this;
         }
 
-        public Builder setReleaseJobGraphConsumer(
-                ThrowingConsumer<JobID, ? extends Exception> 
releaseJobGraphConsumer) {
-            this.releaseJobGraphConsumer = releaseJobGraphConsumer;
+        public Builder setLocalCleanupFunction(
+                BiFunction<JobID, Executor, CompletableFuture<Void>> 
localCleanupFunction) {
+            this.localCleanupFunction = localCleanupFunction;
             return this;
         }
 
@@ -228,8 +232,8 @@ public class TestingJobGraphStore implements JobGraphStore {
                             jobIdsFunction,
                             recoverJobGraphFunction,
                             putJobGraphConsumer,
-                            removeJobGraphConsumer,
-                            releaseJobGraphConsumer,
+                            globalCleanupFunction,
+                            localCleanupFunction,
                             initialJobGraphs);
 
             if (startJobGraphStore) {

Reply via email to