This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3966744a18d9f0e408474969c1f61ea43c8b41d5 Author: Matthias Pohl <[email protected]> AuthorDate: Wed Mar 16 18:09:01 2022 +0100 [FLINK-26690][runtime] Makes globalCleanupAsync call the removal even if the JobGraph is not put into the JobGraphStore, yet This can happen if cleanup is triggered after a failover of a dirty JobResultStore entry (i.e. of a globally-terminated job). In that case, no recovery of the JobGraph happens and, therefore, no JobGraph is added to the internal addedJobGraphs collection. This required KubernetesStateHandleStore.releaseAndTryRemove to work for non-existing state as well. The ZooKeeperStateHandleStore implementation is already idempotent in this matter. ZooKeeperStateHandleStore.releaseAndTryRemove already works like that. --- .../KubernetesStateHandleStore.java | 9 ++++++-- .../KubernetesStateHandleStoreTest.java | 27 ++++++++++++++++++++++ .../runtime/jobmanager/DefaultJobGraphStore.java | 8 +++---- .../runtime/persistence/StateHandleStore.java | 3 ++- .../jobmanager/DefaultJobGraphStoreTest.java | 27 ++++++++++++++-------- .../zookeeper/ZooKeeperStateHandleStoreTest.java | 12 ++++++++++ 6 files changed, 69 insertions(+), 17 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java index cc7153d..0716b58 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java @@ -51,6 +51,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; @@ -464,13 +465,14 @@ public class KubernetesStateHandleStore<T extends Serializable> * It returns the {@link RetrievableStateHandle} stored under the given state node if any. * * @param key Key to be removed from ConfigMap - * @return True if the state handle is removed successfully + * @return True if the state handle isn't listed anymore. * @throws Exception if removing the key or discarding the state failed */ @Override public boolean releaseAndTryRemove(String key) throws Exception { checkNotNull(key, "Key in ConfigMap."); final AtomicReference<RetrievableStateHandle<T>> stateHandleRefer = new AtomicReference<>(); + final AtomicBoolean stateHandleDoesNotExist = new AtomicBoolean(false); return updateConfigMap( configMap -> { final String content = configMap.getData().get(key); @@ -496,6 +498,8 @@ public class KubernetesStateHandleStore<T extends Serializable> Objects.requireNonNull(configMap.getData().remove(key)); } return Optional.of(configMap); + } else { + stateHandleDoesNotExist.set(true); } return Optional.empty(); }) @@ -516,7 +520,8 @@ public class KubernetesStateHandleStore<T extends Serializable> throw new CompletionException(e); } } - return CompletableFuture.completedFuture(updated); + return CompletableFuture.completedFuture( + stateHandleDoesNotExist.get() || updated); }) .get(); } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java index 9896d49..51294f4 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java @@ -804,6 +804,33 @@ public class KubernetesStateHandleStoreTest extends KubernetesHighAvailabilityTe } @Test + public void testRemoveOfNonExistingState() throws Exception { + new Context() { + { + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final KubernetesStateHandleStore< + TestingLongStateHandleHelper.LongStateHandle> + store = + new KubernetesStateHandleStore<>( + flinkKubeClient, + LEADER_CONFIGMAP_NAME, + longStateStorage, + filter, + LOCK_IDENTITY); + assertThat(store.getAllAndLock().size(), is(0)); + assertThat(store.releaseAndTryRemove(key), is(true)); + assertThat(store.getAllAndLock().size(), is(0)); + + assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0)); + }); + } + }; + } + + @Test public void testRemoveFailedShouldNotDiscardState() throws Exception { new Context() { { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java index fda4964..e66a172 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java @@ -249,12 +249,10 @@ public class DefaultJobGraphStore<R extends ResourceVersion<R>> () -> { LOG.debug("Removing job graph {} from {}.", jobId, jobGraphStateHandleStore); - if (addedJobGraphs.contains(jobId)) { - final String name = jobGraphStoreUtil.jobIDToName(jobId); - releaseAndRemoveOrThrowCompletionException(jobId, name); + final String name = jobGraphStoreUtil.jobIDToName(jobId); + releaseAndRemoveOrThrowCompletionException(jobId, name); - addedJobGraphs.remove(jobId); - } + addedJobGraphs.remove(jobId); LOG.info("Removed job graph {} from {}.", jobId, jobGraphStateHandleStore); }, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java index 5cbfeac..f8e6d69 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java @@ -121,7 +121,8 @@ public interface StateHandleStore<T extends Serializable, R extends ResourceVers * node if any. Also the state on the external storage will be discarded. * * @param name Key name in ConfigMap or child path name in ZooKeeper - * @return True if the state handle could be removed. + * @return {@code true} if the state handle is removed (also if it didn't exist in the first + * place); otherwise {@code false}. * @throws Exception if releasing, removing the handles or discarding the state failed */ boolean releaseAndTryRemove(String name) throws Exception; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java index d45a39a..1343e7f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java @@ -41,8 +41,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -51,6 +51,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; /** @@ -220,14 +221,22 @@ public class DefaultJobGraphStoreTest extends TestLogger { .globalCleanupAsync(testingJobGraph.getJobID(), Executors.directExecutor()) .join(); - try { - removeFuture.get(timeout, TimeUnit.MILLISECONDS); - fail( - "We should get an expected timeout because we are removing a non-existed job graph."); - } catch (TimeoutException ex) { - // expected - } - assertThat(removeFuture.isDone(), is(false)); + assertThat(removeFuture.isDone(), is(true)); + } + + @Test + public void testGlobalCleanupFailsIfRemovalReturnsFalse() throws Exception { + final TestingStateHandleStore<JobGraph> stateHandleStore = + builder.setRemoveFunction(name -> false).build(); + + final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); + assertThrows( + ExecutionException.class, + () -> + jobGraphStore + .globalCleanupAsync( + testingJobGraph.getJobID(), Executors.directExecutor()) + .get()); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java index dc81131..ddf6fc7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java @@ -224,6 +224,18 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger { } @Test + public void testCleanupOfNonExistingState() throws Exception { + final ZooKeeperStateHandleStore<TestingLongStateHandleHelper.LongStateHandle> testInstance = + new ZooKeeperStateHandleStore<>( + ZOOKEEPER.getClient(), new TestingLongStateHandleHelper()); + + final String pathInZooKeeper = "/testCleanupOfNonExistingState"; + + assertTrue(testInstance.releaseAndTryRemove(pathInZooKeeper)); + assertFalse(testInstance.exists(pathInZooKeeper).isExisting()); + } + + @Test public void testRepeatableCleanupWithLockOnNode() throws Exception { final CuratorFramework client = ZooKeeperUtils.useNamespaceAndEnsurePath(
