This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8d27e9ccad70cfd37168e1dec5bbd03386603b3d Author: Matthias Pohl <[email protected]> AuthorDate: Wed Dec 15 12:31:31 2021 +0100 [FLINK-25432][runtime] Refactors JobGraphWriter interface to implement LocallyCleanableResource and GloballyCleanableResource --- .../flink/runtime/dispatcher/Dispatcher.java | 4 +- .../runtime/dispatcher/SingleJobJobGraphStore.java | 10 --- .../runtime/jobmanager/DefaultJobGraphStore.java | 91 ++++++++++++++++------ .../flink/runtime/jobmanager/JobGraphWriter.java | 28 +++---- .../jobmanager/StandaloneJobGraphStore.java | 10 --- .../runtime/jobmanager/ThrowingJobGraphWriter.java | 7 -- .../dispatcher/DispatcherFailoverITCase.java | 5 +- .../dispatcher/DispatcherResourceCleanupTest.java | 2 +- .../flink/runtime/dispatcher/DispatcherTest.java | 41 +++++++--- .../runtime/dispatcher/NoOpJobGraphWriter.java | 7 -- .../runner/SessionDispatcherLeaderProcessTest.java | 5 +- .../jobmanager/DefaultJobGraphStoreTest.java | 19 +++-- .../jobmanager/StandaloneJobGraphStoreTest.java | 5 +- .../jobmanager/ZooKeeperJobGraphsStoreITCase.java | 17 ++-- .../runtime/testutils/TestingJobGraphStore.java | 48 ++++++------ 15 files changed, 178 insertions(+), 121 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 240ced5..e6b7e8e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -882,7 +882,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher private boolean cleanUpJobGraph(JobID jobId, boolean cleanupHA) { if (cleanupHA) { try { - jobGraphWriter.removeJobGraph(jobId); + jobGraphWriter.globalCleanup(jobId); return true; } catch (Exception e) { log.warn( @@ -893,7 +893,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher } } try { - jobGraphWriter.releaseJobGraph(jobId); + jobGraphWriter.localCleanup(jobId); } catch (Exception e) { log.warn("Could not properly release job {} from submitted job graph store.", jobId, e); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobJobGraphStore.java index f2c450a..b063b4b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobJobGraphStore.java @@ -65,16 +65,6 @@ public class SingleJobJobGraphStore implements JobGraphStore { } @Override - public void removeJobGraph(JobID jobId) { - // ignore - } - - @Override - public void releaseJobGraph(JobID jobId) { - // ignore - } - - @Override public Collection<JobID> getJobIds() { return Collections.singleton(jobGraph.getJobID()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java index 832f9c9..fda4964 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.persistence.StateHandleStore; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.ThrowingRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +39,9 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -238,42 +242,83 @@ public class DefaultJobGraphStore<R extends ResourceVersion<R>> } @Override - public void removeJobGraph(JobID jobId) throws Exception { + public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) { checkNotNull(jobId, "Job ID"); - String name = jobGraphStoreUtil.jobIDToName(jobId); - LOG.debug("Removing job graph {} from {}.", jobId, jobGraphStateHandleStore); + return runAsyncWithLockAssertRunning( + () -> { + LOG.debug("Removing job graph {} from {}.", jobId, jobGraphStateHandleStore); - synchronized (lock) { - verifyIsRunning(); - if (addedJobGraphs.contains(jobId)) { - if (jobGraphStateHandleStore.releaseAndTryRemove(name)) { - addedJobGraphs.remove(jobId); - } else { - throw new FlinkException( + if (addedJobGraphs.contains(jobId)) { + final String name = jobGraphStoreUtil.jobIDToName(jobId); + releaseAndRemoveOrThrowCompletionException(jobId, name); + + addedJobGraphs.remove(jobId); + } + + LOG.info("Removed job graph {} from {}.", jobId, jobGraphStateHandleStore); + }, + executor); + } + + @GuardedBy("lock") + private void releaseAndRemoveOrThrowCompletionException(JobID jobId, String jobName) { + boolean success; + try { + success = jobGraphStateHandleStore.releaseAndTryRemove(jobName); + } catch (Exception e) { + throw new CompletionException(e); + } + + if (!success) { + throw new CompletionException( + new FlinkException( String.format( "Could not remove job graph with job id %s from %s.", - jobId, jobGraphStateHandleStore)); - } - } + jobId, jobGraphStateHandleStore))); } - - LOG.info("Removed job graph {} from {}.", jobId, jobGraphStateHandleStore); } + /** + * Releases the locks on the specified {@link JobGraph}. + * + * <p>Releasing the locks allows that another instance can delete the job from the {@link + * JobGraphStore}. + * + * @param jobId specifying the job to release the locks for + * @param executor the executor being used for the asynchronous execution of the local cleanup. + * @returns The cleanup result future. + */ @Override - public void releaseJobGraph(JobID jobId) throws Exception { + public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor executor) { checkNotNull(jobId, "Job ID"); - LOG.debug("Releasing job graph {} from {}.", jobId, jobGraphStateHandleStore); + return runAsyncWithLockAssertRunning( + () -> { + LOG.debug("Releasing job graph {} from {}.", jobId, jobGraphStateHandleStore); - synchronized (lock) { - verifyIsRunning(); - jobGraphStateHandleStore.release(jobGraphStoreUtil.jobIDToName(jobId)); - addedJobGraphs.remove(jobId); - } + jobGraphStateHandleStore.release(jobGraphStoreUtil.jobIDToName(jobId)); + addedJobGraphs.remove(jobId); - LOG.info("Released job graph {} from {}.", jobId, jobGraphStateHandleStore); + LOG.info("Released job graph {} from {}.", jobId, jobGraphStateHandleStore); + }, + executor); + } + + private CompletableFuture<Void> runAsyncWithLockAssertRunning( + ThrowingRunnable<Exception> runnable, Executor executor) { + return CompletableFuture.runAsync( + () -> { + synchronized (lock) { + verifyIsRunning(); + try { + runnable.run(); + } catch (Exception e) { + throw new CompletionException(e); + } + } + }, + executor); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java index 23542a3..bb2c5fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java @@ -19,10 +19,16 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource; +import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.concurrent.FutureUtils; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** Allows to store and remove job graphs. */ -public interface JobGraphWriter { +public interface JobGraphWriter extends LocallyCleanableResource, GloballyCleanableResource { /** * Adds the {@link JobGraph} instance. * @@ -30,17 +36,13 @@ public interface JobGraphWriter { */ void putJobGraph(JobGraph jobGraph) throws Exception; - /** Removes the {@link JobGraph} with the given {@link JobID} if it exists. */ - void removeJobGraph(JobID jobId) throws Exception; + @Override + default CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor executor) { + return FutureUtils.completedVoidFuture(); + } - /** - * Releases the locks on the specified {@link JobGraph}. - * - * <p>Releasing the locks allows that another instance can delete the job from the {@link - * JobGraphStore}. - * - * @param jobId specifying the job to release the locks for - * @throws Exception if the locks cannot be released - */ - void releaseJobGraph(JobID jobId) throws Exception; + @Override + default CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) { + return FutureUtils.completedVoidFuture(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStore.java index 656df2f..f7d8135 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStore.java @@ -48,16 +48,6 @@ public class StandaloneJobGraphStore implements JobGraphStore { } @Override - public void removeJobGraph(JobID jobId) { - // Nothing to do - } - - @Override - public void releaseJobGraph(JobID jobId) { - // nothing to do - } - - @Override public Collection<JobID> getJobIds() { return Collections.emptyList(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ThrowingJobGraphWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ThrowingJobGraphWriter.java index 224b7fb..8bf9aa0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ThrowingJobGraphWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ThrowingJobGraphWriter.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.jobmanager; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobGraph; /** {@link JobGraphWriter} implementation which does not allow to store {@link JobGraph}. */ @@ -29,10 +28,4 @@ public enum ThrowingJobGraphWriter implements JobGraphWriter { public void putJobGraph(JobGraph jobGraph) { throw new UnsupportedOperationException("Cannot store job graphs."); } - - @Override - public void removeJobGraph(JobID jobId) {} - - @Override - public void releaseJobGraph(JobID jobId) {} } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java index ef6be61..0cd5d34 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.TestingJobGraphStore; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.concurrent.FutureUtils; import org.junit.After; import org.junit.Before; @@ -112,8 +113,8 @@ public class DispatcherFailoverITCase extends AbstractDispatcherTest { final Error jobGraphRemovalError = new Error("Unable to remove job graph."); final TestingJobGraphStore jobGraphStore = TestingJobGraphStore.newBuilder() - .setRemoveJobGraphConsumer( - graph -> { + .setGlobalCleanupFunction( + (ignoredJobId, ignoredExecutor) -> { throw jobGraphRemovalError; }) .build(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index 5b84a63..4992c76 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -620,7 +620,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { public void testHABlobsAreNotRemovedIfHAJobGraphRemovalFails() throws Exception { jobGraphWriter = TestingJobGraphStore.newBuilder() - .setRemoveJobGraphConsumer( + .setGlobalCleanupConsumer( ignored -> { throw new Exception("Failed to Remove future"); }) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 97bd0e1..e830a35 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -88,6 +88,7 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TimeUtils; +import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.ThrowingRunnable; import org.assertj.core.api.Assertions; @@ -751,10 +752,16 @@ public class DispatcherTest extends AbstractDispatcherTest { // Track cleanup - job-graph final TestingJobGraphStore jobGraphStore = TestingJobGraphStore.newBuilder() - .setReleaseJobGraphConsumer( - jobId -> cleanUpEvents.add(CLEANUP_JOB_GRAPH_RELEASE)) - .setRemoveJobGraphConsumer( - jobId -> cleanUpEvents.add(CLEANUP_JOB_GRAPH_REMOVE)) + .setLocalCleanupFunction( + (jobId, executor) -> { + cleanUpEvents.add(CLEANUP_JOB_GRAPH_RELEASE); + return FutureUtils.completedVoidFuture(); + }) + .setGlobalCleanupFunction( + (jobId, executor) -> { + cleanUpEvents.add(CLEANUP_JOB_GRAPH_REMOVE); + return FutureUtils.completedVoidFuture(); + }) .build(); jobGraphStore.start(null); haServices.setJobGraphStore(jobGraphStore); @@ -909,8 +916,16 @@ public class DispatcherTest extends AbstractDispatcherTest { final TestingJobGraphStore testingJobGraphStore = TestingJobGraphStore.newBuilder() - .setRemoveJobGraphConsumer(removeJobGraphFuture::complete) - .setReleaseJobGraphConsumer(releaseJobGraphFuture::complete) + .setGlobalCleanupFunction( + (jobId, executor) -> { + removeJobGraphFuture.complete(jobId); + return FutureUtils.completedVoidFuture(); + }) + .setLocalCleanupFunction( + (jobId, executor) -> { + releaseJobGraphFuture.complete(jobId); + return FutureUtils.completedVoidFuture(); + }) .build(); testingJobGraphStore.start(null); @@ -1147,10 +1162,16 @@ public class DispatcherTest extends AbstractDispatcherTest { // Track cleanup - job-graph final TestingJobGraphStore jobGraphStore = TestingJobGraphStore.newBuilder() - .setReleaseJobGraphConsumer( - jobId -> cleanUpEvents.add(CLEANUP_JOB_GRAPH_RELEASE)) - .setRemoveJobGraphConsumer( - jobId -> cleanUpEvents.add(CLEANUP_JOB_GRAPH_REMOVE)) + .setLocalCleanupFunction( + (jobId, executor) -> { + cleanUpEvents.add(CLEANUP_JOB_GRAPH_RELEASE); + return FutureUtils.completedVoidFuture(); + }) + .setGlobalCleanupFunction( + (jobId, executor) -> { + cleanUpEvents.add(CLEANUP_JOB_GRAPH_REMOVE); + return FutureUtils.completedVoidFuture(); + }) .build(); jobGraphStore.start(null); haServices.setJobGraphStore(jobGraphStore); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpJobGraphWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpJobGraphWriter.java index 7a2d156..bc17d94 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpJobGraphWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpJobGraphWriter.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.dispatcher; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.JobGraphWriter; @@ -28,10 +27,4 @@ public enum NoOpJobGraphWriter implements JobGraphWriter { @Override public void putJobGraph(JobGraph jobGraph) throws Exception {} - - @Override - public void removeJobGraph(JobID jobId) throws Exception {} - - @Override - public void releaseJobGraph(JobID jobId) throws Exception {} } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java index 583c771..f48ca06 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java @@ -422,6 +422,7 @@ public class SessionDispatcherLeaderProcessTest { dispatcherServiceFactory = createFactoryBasedOnGenericSupplier(() -> testingDispatcherService); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); try (final SessionDispatcherLeaderProcess dispatcherLeaderProcess = createDispatcherLeaderProcess()) { dispatcherLeaderProcess.start(); @@ -430,10 +431,12 @@ public class SessionDispatcherLeaderProcessTest { dispatcherLeaderProcess.getDispatcherGateway().get(); // now remove the Job from the JobGraphStore and notify the dispatcher service - jobGraphStore.removeJobGraph(JOB_GRAPH.getJobID()); + jobGraphStore.globalCleanupAsync(JOB_GRAPH.getJobID(), executorService).join(); dispatcherLeaderProcess.onRemovedJobGraph(JOB_GRAPH.getJobID()); assertThat(terminateJobFuture.get()).isEqualTo(JOB_GRAPH.getJobID()); + } finally { + assertThat(executorService.shutdownNow()).isEmpty(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java index 568e937..d45a39a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.util.AbstractID; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.Executors; import org.junit.After; import org.junit.Before; @@ -190,7 +191,7 @@ public class DefaultJobGraphStoreTest extends TestLogger { } @Test - public void testRemoveJobGraph() throws Exception { + public void testGlobalCleanup() throws Exception { final CompletableFuture<JobID> removeFuture = new CompletableFuture<>(); final TestingStateHandleStore<JobGraph> stateHandleStore = builder.setAddFunction((ignore, state) -> jobGraphStorageHelper.store(state)) @@ -200,20 +201,24 @@ public class DefaultJobGraphStoreTest extends TestLogger { final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); jobGraphStore.putJobGraph(testingJobGraph); - jobGraphStore.removeJobGraph(testingJobGraph.getJobID()); + jobGraphStore + .globalCleanupAsync(testingJobGraph.getJobID(), Executors.directExecutor()) + .join(); final JobID actual = removeFuture.get(timeout, TimeUnit.MILLISECONDS); assertThat(actual, is(testingJobGraph.getJobID())); } @Test - public void testRemoveJobGraphWithNonExistName() throws Exception { + public void testGlobalCleanupWithNonExistName() throws Exception { final CompletableFuture<JobID> removeFuture = new CompletableFuture<>(); final TestingStateHandleStore<JobGraph> stateHandleStore = builder.setRemoveFunction(name -> removeFuture.complete(JobID.fromHexString(name))) .build(); final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); - jobGraphStore.removeJobGraph(testingJobGraph.getJobID()); + jobGraphStore + .globalCleanupAsync(testingJobGraph.getJobID(), Executors.directExecutor()) + .join(); try { removeFuture.get(timeout, TimeUnit.MILLISECONDS); @@ -340,13 +345,15 @@ public class DefaultJobGraphStoreTest extends TestLogger { } @Test - public void testReleasingJobGraphShouldReleaseHandle() throws Exception { + public void testLocalCleanupShouldReleaseHandle() throws Exception { final CompletableFuture<String> releaseFuture = new CompletableFuture<>(); final TestingStateHandleStore<JobGraph> stateHandleStore = builder.setReleaseConsumer(releaseFuture::complete).build(); final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); jobGraphStore.putJobGraph(testingJobGraph); - jobGraphStore.releaseJobGraph(testingJobGraph.getJobID()); + jobGraphStore + .localCleanupAsync(testingJobGraph.getJobID(), Executors.directExecutor()) + .join(); final String actual = releaseFuture.get(); assertThat(actual, is(testingJobGraph.getJobID().toString())); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStoreTest.java index 9f89d9f..0fdab01 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/StandaloneJobGraphStoreTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.util.concurrent.Executors; import org.junit.Test; @@ -31,7 +32,7 @@ public class StandaloneJobGraphStoreTest { /** Tests that all operations work and don't change the state. */ @Test - public void testNoOps() { + public void testNoOps() throws Exception { StandaloneJobGraphStore jobGraphs = new StandaloneJobGraphStore(); JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph(); @@ -41,7 +42,7 @@ public class StandaloneJobGraphStoreTest { jobGraphs.putJobGraph(jobGraph); assertEquals(0, jobGraphs.getJobIds().size()); - jobGraphs.removeJobGraph(jobGraph.getJobID()); + jobGraphs.globalCleanupAsync(jobGraph.getJobID(), Executors.directExecutor()).join(); assertEquals(0, jobGraphs.getJobIds().size()); assertNull(jobGraphs.recoverJobGraph(new JobID())); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphsStoreITCase.java index 989ee91..cb3e33f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphsStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphsStoreITCase.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.Executors; import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework; import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.PathChildrenCache; @@ -130,7 +131,7 @@ public class ZooKeeperJobGraphsStoreITCase extends TestLogger { verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId)); // Remove - jobGraphs.removeJobGraph(jobGraph.getJobID()); + jobGraphs.globalCleanupAsync(jobGraph.getJobID(), Executors.directExecutor()).join(); // Empty state assertEquals(0, jobGraphs.getJobIds().size()); @@ -140,7 +141,7 @@ public class ZooKeeperJobGraphsStoreITCase extends TestLogger { verify(listener, never()).onRemovedJobGraph(any(JobID.class)); // Don't fail if called again - jobGraphs.removeJobGraph(jobGraph.getJobID()); + jobGraphs.globalCleanupAsync(jobGraph.getJobID(), Executors.directExecutor()).join(); } finally { jobGraphs.stop(); } @@ -193,7 +194,9 @@ public class ZooKeeperJobGraphsStoreITCase extends TestLogger { verifyJobGraphs(expected.get(jobGraph.getJobID()), jobGraph); - jobGraphs.removeJobGraph(jobGraph.getJobID()); + jobGraphs + .globalCleanupAsync(jobGraph.getJobID(), Executors.directExecutor()) + .join(); } // Empty state @@ -313,7 +316,9 @@ public class ZooKeeperJobGraphsStoreITCase extends TestLogger { assertThat(recoveredJobGraph, is(notNullValue())); try { - otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobID()); + otherSubmittedJobGraphStore + .globalCleanupAsync(recoveredJobGraph.getJobID(), Executors.directExecutor()) + .join(); fail( "It should not be possible to remove the JobGraph since the first store still has a lock on it."); } catch (Exception ignored) { @@ -323,7 +328,9 @@ public class ZooKeeperJobGraphsStoreITCase extends TestLogger { submittedJobGraphStore.stop(); // now we should be able to delete the job graph - otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobID()); + otherSubmittedJobGraphStore + .globalCleanupAsync(recoveredJobGraph.getJobID(), Executors.directExecutor()) + .join(); assertThat( otherSubmittedJobGraphStore.recoverJobGraph(recoveredJobGraph.getJobID()), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobGraphStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobGraphStore.java index e9a797f..20a4b74 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobGraphStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobGraphStore.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.JobGraphStore; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.BiFunctionWithException; import org.apache.flink.util.function.FunctionWithException; import org.apache.flink.util.function.ThrowingConsumer; @@ -35,6 +36,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.BiFunction; /** In-Memory implementation of {@link JobGraphStore} for testing purposes. */ public class TestingJobGraphStore implements JobGraphStore { @@ -54,9 +58,9 @@ public class TestingJobGraphStore implements JobGraphStore { private final ThrowingConsumer<JobGraph, ? extends Exception> putJobGraphConsumer; - private final ThrowingConsumer<JobID, ? extends Exception> removeJobGraphConsumer; + private final BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupFunction; - private final ThrowingConsumer<JobID, ? extends Exception> releaseJobGraphConsumer; + private final BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupFunction; private boolean started; @@ -68,16 +72,16 @@ public class TestingJobGraphStore implements JobGraphStore { BiFunctionWithException<JobID, Map<JobID, JobGraph>, JobGraph, ? extends Exception> recoverJobGraphFunction, ThrowingConsumer<JobGraph, ? extends Exception> putJobGraphConsumer, - ThrowingConsumer<JobID, ? extends Exception> removeJobGraphConsumer, - ThrowingConsumer<JobID, ? extends Exception> releaseJobGraphConsumer, + BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupFunction, + BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupFunction, Collection<JobGraph> initialJobGraphs) { this.startConsumer = startConsumer; this.stopRunnable = stopRunnable; this.jobIdsFunction = jobIdsFunction; this.recoverJobGraphFunction = recoverJobGraphFunction; this.putJobGraphConsumer = putJobGraphConsumer; - this.removeJobGraphConsumer = removeJobGraphConsumer; - this.releaseJobGraphConsumer = releaseJobGraphConsumer; + this.globalCleanupFunction = globalCleanupFunction; + this.localCleanupFunction = localCleanupFunction; for (JobGraph initialJobGraph : initialJobGraphs) { storedJobs.put(initialJobGraph.getJobID(), initialJobGraph); @@ -110,16 +114,15 @@ public class TestingJobGraphStore implements JobGraphStore { } @Override - public synchronized void removeJobGraph(JobID jobId) throws Exception { + public synchronized CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) { verifyIsStarted(); - removeJobGraphConsumer.accept(jobId); - storedJobs.remove(jobId); + return globalCleanupFunction.apply(jobId, executor).thenRun(() -> storedJobs.remove(jobId)); } @Override - public synchronized void releaseJobGraph(JobID jobId) throws Exception { + public synchronized CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor executor) { verifyIsStarted(); - releaseJobGraphConsumer.accept(jobId); + return localCleanupFunction.apply(jobId, executor); } @Override @@ -156,10 +159,11 @@ public class TestingJobGraphStore implements JobGraphStore { private ThrowingConsumer<JobGraph, ? extends Exception> putJobGraphConsumer = ignored -> {}; - private ThrowingConsumer<JobID, ? extends Exception> removeJobGraphConsumer = ignored -> {}; + private BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupFunction = + (ignoredJobId, ignoredExecutor) -> FutureUtils.completedVoidFuture(); - private ThrowingConsumer<JobID, ? extends Exception> releaseJobGraphConsumer = - ignored -> {}; + private BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupFunction = + (ignoredJobId, ignoredExecutor) -> FutureUtils.completedVoidFuture(); private Collection<JobGraph> initialJobGraphs = Collections.emptyList(); @@ -198,15 +202,15 @@ public class TestingJobGraphStore implements JobGraphStore { return this; } - public Builder setRemoveJobGraphConsumer( - ThrowingConsumer<JobID, ? extends Exception> removeJobGraphConsumer) { - this.removeJobGraphConsumer = removeJobGraphConsumer; + public Builder setGlobalCleanupFunction( + BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupFunction) { + this.globalCleanupFunction = globalCleanupFunction; return this; } - public Builder setReleaseJobGraphConsumer( - ThrowingConsumer<JobID, ? extends Exception> releaseJobGraphConsumer) { - this.releaseJobGraphConsumer = releaseJobGraphConsumer; + public Builder setLocalCleanupFunction( + BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupFunction) { + this.localCleanupFunction = localCleanupFunction; return this; } @@ -228,8 +232,8 @@ public class TestingJobGraphStore implements JobGraphStore { jobIdsFunction, recoverJobGraphFunction, putJobGraphConsumer, - removeJobGraphConsumer, - releaseJobGraphConsumer, + globalCleanupFunction, + localCleanupFunction, initialJobGraphs); if (startJobGraphStore) {
