tillrohrmann commented on a change in pull request #13864: URL: https://github.com/apache/flink/pull/13864#discussion_r518043106
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.persistence; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.RetrievableStateHandle; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.List; + +/** + * Class which stores state via the provided {@link RetrievableStateStorageHelper} and writes the + * returned state handle to distributed coordination system(e.g. Zookeeper, Kubernetes, etc.). + * + * <p>To avoid concurrent modification issues, the implementation needs to ensure that only the leader could + * update the state store. + * + * @param <T> Type of state + */ +public interface StateHandleStore<T extends Serializable> { + + /** + * Persist the state to distributed storage(e.g. S3, HDFS, etc.). And then creates a state handle, stores it in + * the distributed coordination system(e.g. ZooKeeper, Kubernetes, etc.). + * + * @param name Key name in ConfigMap or child path name in ZooKeeper + * @param state State to be added + * + * @throws AlreadyExistException if the name already exists + * @throws Exception if persisting state or writing state handle failed + */ + RetrievableStateHandle<T> add(String name, T state) throws Exception; + + /** + * Replaces a state handle in the distributed coordination system and discards the old state handle. + * + * @param name Key name in ConfigMap or child path name in ZooKeeper + * @param resourceVersion resource version of previous storage object. If the resource version does not match, the + * replace operation will fail. Since there is an unexpected update operation snuck in. + * @param state State to be replace with + * + * @throws NotExistException if the name does not exist + * @throws Exception if persisting state or writing state handle failed + */ + void replace(String name, ResourceVersion resourceVersion, T state) throws Exception; + + /** + * Returns resource version of state handle with specific name on the underlying storage. + * + * @param name Key name in ConfigMap or child path name in ZooKeeper + * + * @return current resource version on the underlying storage. + * + * @throws Exception if the check existence operation failed + */ + ResourceVersion exists(String name) throws Exception; + + /** + * Gets the {@link RetrievableStateHandle} stored with the given name. + * + * @param name Key name in ConfigMap or child path name in ZooKeeper + * + * @return The retrieved state handle + * + * @throws IOException if the method failed to deserialize the stored state handle + * @throws NotExistException when the name does not exist + * @throws Exception if get state handle failed + */ + RetrievableStateHandle<T> get(String name) throws Exception; + + /** + * Gets all available state handles from the storage. + * + * @return All retrieved state handles. + * + * @throws Exception if get state handle operation failed + */ + List<Tuple2<RetrievableStateHandle<T>, String>> getAll() throws Exception; + + /** + * Return a list of all valid name for state handles. + * + * @return List of valid state handle name. The name is key name in ConfigMap or child path name in ZooKeeper. + * + * @throws Exception if get handle operation failed + */ + Collection<String> getAllHandles() throws Exception; + + /** + * Remove the state handle and discard the state with given name. + * + * @param name Key name in ConfigMap or child path name in ZooKeeper + * + * @return True if the state handle could be removed. + * + * @throws Exception if removing the handles or discarding the state failed + */ + boolean removeHandleAndDiscardState(String name) throws Exception; + + /** + * Remove all the states. Not only the state handles in the distributed coordination system + * will be removed, but also the real state data on the distributed storage will be discarded. + * + * @throws Exception if removing the names or discarding the state failed + */ + void removeAllHandlesAndDiscardState() throws Exception; + + /** + * Only remove all the state handle pointers on Kubernetes or ZooKeeper. + * + * @throws Exception if removing the handles failed + */ + void removeAllHandles() throws Exception; + + /** + * Release the lock on the specific state handle so that it could deleted other {@link StateHandleStore}. If no + * lock exists or the underlying storage does not support, nothing will happen. + * + * @throws Exception if releasing the lock + */ + void releaseHandle(String name) throws Exception; + + /** + * Release all the locks on corresponding state handle so that it could deleted other {@link StateHandleStore}. + * If no lock exists or the underlying storage does not support, nothing will happen. + * + * @throws Exception if releasing the locks + */ + void releaseAllHandles() throws Exception; Review comment: Given that we have the release calls would it make sense to add `andLock` to the methods which will add a lock (e.g. `getAndLock` and `addAndLock`, `getAllAndLock`)? This could help the users to see where a release call is necessary because a lock was created. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreUtil.java ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.util.ZooKeeperUtils; + +/** + * {@link JobGraphStoreUtil} implementation for ZooKeeper. + * Review comment: line can be removed ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java ########## @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.persistence.IntegerResourceVersion; +import org.apache.flink.runtime.persistence.ResourceVersion; +import org.apache.flink.runtime.persistence.StateHandleStore; +import org.apache.flink.runtime.persistence.TestingStateHandleStore; +import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Tests for {@link DefaultJobGraphStore} with {@link TestingJobGraphStoreWatcher}, {@link TestingStateHandleStore}, + * and {@link TestingJobGraphListener}. + */ +public class DefaultJobGraphStoreTest extends TestLogger { + + private final JobGraph testingJobGraph = new JobGraph(); + private final long timeout = 100L; + + private TestingStateHandleStore.Builder<JobGraph> builder; + private TestingRetrievableStateStorageHelper<JobGraph> jobGraphStorageHelper; + private TestingJobGraphStoreWatcher testingJobGraphStoreWatcher; + private TestingJobGraphListener testingJobGraphListener; + + @Before + public void setup() { + builder = TestingStateHandleStore.builder(); + testingJobGraphStoreWatcher = new TestingJobGraphStoreWatcher(); + testingJobGraphListener = new TestingJobGraphListener(); + jobGraphStorageHelper = new TestingRetrievableStateStorageHelper<>(); + } + + @After + public void teardown() { + if (testingJobGraphStoreWatcher != null) { + testingJobGraphStoreWatcher.stop(); + } + } + + @Test + public void testRecoverJobGraph() throws Exception { + final RetrievableStateHandle<JobGraph> stateHandle = jobGraphStorageHelper.store(testingJobGraph); + final TestingStateHandleStore<JobGraph> stateHandleStore = builder + .setGetFunction(ignore -> stateHandle) + .build(); + + final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); + + final JobGraph recoveredJobGraph = jobGraphStore.recoverJobGraph(testingJobGraph.getJobID()); + assertThat(recoveredJobGraph, is(notNullValue())); + assertThat(recoveredJobGraph.getJobID(), is(testingJobGraph.getJobID())); + } + + @Test + public void testRecoverJobGraphWhenNotExist() throws Exception { + final TestingStateHandleStore<JobGraph> stateHandleStore = builder + .setGetFunction(ignore -> { + throw new StateHandleStore.NotExistException("Not exist exception."); + }) + .build(); + + final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); + + final JobGraph recoveredJobGraph = jobGraphStore.recoverJobGraph(testingJobGraph.getJobID()); + assertThat(recoveredJobGraph, is(nullValue())); + } + + @Test + public void testRecoverJobGraphFailedShouldReleaseHandle() throws Exception { + final CompletableFuture<String> releaseFuture = new CompletableFuture<>(); + final FlinkException testException = new FlinkException("Test exception."); + final TestingStateHandleStore<JobGraph> stateHandleStore = builder + .setGetFunction(ignore -> { + throw testException; + }) + .setReleaseConsumer(releaseFuture::complete) + .build(); + + final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); + + try { + jobGraphStore.recoverJobGraph(testingJobGraph.getJobID()); + fail("recoverJobGraph should fail when there is exception in getting the state handle."); + } catch (Exception ex) { + assertThat(ex, FlinkMatchers.containsCause(testException)); + String actual = releaseFuture.get(timeout, TimeUnit.MILLISECONDS); + assertThat(actual, is(testingJobGraph.getJobID().toString())); + } + } + + @Test + public void testPutJobGraphWhenNotExist() throws Exception { + final CompletableFuture<JobGraph> addFuture = new CompletableFuture<>(); + final TestingStateHandleStore<JobGraph> stateHandleStore = builder + .setExistsFunction(ignore -> IntegerResourceVersion.notExist()) + .setAddFunction((ignore, state) -> { + addFuture.complete(state); + return jobGraphStorageHelper.store(state); + }) + .build(); + + final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); + jobGraphStore.putJobGraph(testingJobGraph); + + final JobGraph actual = addFuture.get(timeout, TimeUnit.MILLISECONDS); + assertThat(actual.getJobID(), is(testingJobGraph.getJobID())); + } + + @Test + public void testPutJobGraphWhenAlreadyExist() throws Exception { + final CompletableFuture<Tuple3<String, ResourceVersion, JobGraph>> replaceFuture = new CompletableFuture<>(); + final int resourceVersion = 100; + final AtomicBoolean alreadyExist = new AtomicBoolean(false); + final TestingStateHandleStore<JobGraph> stateHandleStore = builder + .setExistsFunction(ignore -> { + if (alreadyExist.get()) { + return IntegerResourceVersion.valueOf(resourceVersion); + } else { + alreadyExist.set(true); + return IntegerResourceVersion.notExist(); + } + }) + .setAddFunction((ignore, state) -> jobGraphStorageHelper.store(state)) + .setReplaceConsumer(replaceFuture::complete) + .build(); + + final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); + jobGraphStore.putJobGraph(testingJobGraph); + // Replace + jobGraphStore.putJobGraph(testingJobGraph); + + final Tuple3<String, ResourceVersion, JobGraph> actual = replaceFuture.get(timeout, TimeUnit.MILLISECONDS); + assertThat(actual.f0, is(testingJobGraph.getJobID().toString())); + assertThat(actual.f1, is(IntegerResourceVersion.valueOf(resourceVersion))); + assertThat(actual.f2.getJobID(), is(testingJobGraph.getJobID())); + } + + @Test + public void testRemoveJobGraph() throws Exception { + final CompletableFuture<JobID> removeFuture = new CompletableFuture<>(); + final TestingStateHandleStore<JobGraph> stateHandleStore = builder + .setAddFunction((ignore, state) -> jobGraphStorageHelper.store(state)) + .setRemoveFunction(name -> removeFuture.complete(JobID.fromHexString(name))) + .build(); + + final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); + + jobGraphStore.putJobGraph(testingJobGraph); + jobGraphStore.removeJobGraph(testingJobGraph.getJobID()); + final JobID actual = removeFuture.get(timeout, TimeUnit.MILLISECONDS); + assertThat(actual, is(testingJobGraph.getJobID())); + } + + @Test + public void testRemoveJobGraphWithNonExistName() throws Exception { + final CompletableFuture<JobID> removeFuture = new CompletableFuture<>(); + final TestingStateHandleStore<JobGraph> stateHandleStore = builder + .setRemoveFunction(name -> removeFuture.complete(JobID.fromHexString(name))) + .build(); + + final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); + jobGraphStore.removeJobGraph(testingJobGraph.getJobID()); + + try { + removeFuture.get(timeout, TimeUnit.MILLISECONDS); + fail("We should get an expected timeout because we are removing a non-existed job graph."); + } catch (TimeoutException ex) { + // expected + } + assertThat(removeFuture.isDone(), is(false)); + } + + @Test + public void testGetJobIds() throws Exception { + final List<JobID> existingJobIds = Arrays.asList(new JobID(0, 0), new JobID(0, 1)); + final TestingStateHandleStore<JobGraph> stateHandleStore = builder + .setGetAllHandlesSupplier( + () -> existingJobIds.stream().map(AbstractID::toString).collect(Collectors.toList())) + .build(); + + final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); + final Collection<JobID> jobIds = jobGraphStore.getJobIds(); + assertThat(jobIds, contains(existingJobIds.toArray())); + } + + @Test + public void testOnAddedJobGraphShouldNotProcessKnownJobGraphs() throws Exception { + final TestingStateHandleStore<JobGraph> stateHandleStore = builder + .setAddFunction((ignore, state) -> jobGraphStorageHelper.store(state)) + .build(); + final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); + jobGraphStore.putJobGraph(testingJobGraph); + + testingJobGraphStoreWatcher.addJobGraph(testingJobGraph.getJobID()); + assertThat(testingJobGraphListener.getAddedJobGraphs().size(), is(0)); + } + + @Test + public void testOnAddedJobGraphShouldOnlyProcessUnknownJobGraphs() throws Exception { + final RetrievableStateHandle<JobGraph> stateHandle = jobGraphStorageHelper.store(testingJobGraph); + final TestingStateHandleStore<JobGraph> stateHandleStore = builder + .setGetFunction(ignore -> stateHandle) + .setAddFunction((ignore, state) -> jobGraphStorageHelper.store(state)) + .build(); + final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); + jobGraphStore.recoverJobGraph(testingJobGraph.getJobID()); + + // Known recovered job + testingJobGraphStoreWatcher.addJobGraph(testingJobGraph.getJobID()); + // Unknown job + final JobID unknownJobId = JobID.generate(); + testingJobGraphStoreWatcher.addJobGraph(unknownJobId); + assertThat(testingJobGraphListener.getAddedJobGraphs().size(), is(1)); + assertThat(testingJobGraphListener.getAddedJobGraphs(), contains(unknownJobId)); + } + + @Test + public void testOnRemovedJobGraphShouldOnlyProcessKnownJobGraphs() throws Exception { + final TestingStateHandleStore<JobGraph> stateHandleStore = builder + .setAddFunction((ignore, state) -> jobGraphStorageHelper.store(state)) + .build(); + final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); + jobGraphStore.putJobGraph(testingJobGraph); + + // Unknown job + testingJobGraphStoreWatcher.removeJobGraph(JobID.generate()); + // Known job + testingJobGraphStoreWatcher.removeJobGraph(testingJobGraph.getJobID()); + assertThat(testingJobGraphListener.getRemovedJobGraphs().size(), is(1)); + assertThat(testingJobGraphListener.getRemovedJobGraphs(), contains(testingJobGraph.getJobID())); + } + + @Test + public void testOnRemovedJobGraphShouldNotProcessUnknownJobGraphs() throws Exception { + final TestingStateHandleStore<JobGraph> stateHandleStore = builder + .setAddFunction((ignore, state) -> jobGraphStorageHelper.store(state)) + .build(); + createAndStartJobGraphStore(stateHandleStore); + + testingJobGraphStoreWatcher.removeJobGraph(testingJobGraph.getJobID()); + assertThat(testingJobGraphListener.getRemovedJobGraphs().size(), is(0)); + } + + @Test + public void testOnAddedJobGraphIsIgnoredAfterBeingStop() throws Exception { + final TestingStateHandleStore<JobGraph> stateHandleStore = builder + .setAddFunction((ignore, state) -> jobGraphStorageHelper.store(state)) + .build(); + final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); + jobGraphStore.stop(); + + testingJobGraphStoreWatcher.addJobGraph(testingJobGraph.getJobID()); + assertThat(testingJobGraphListener.getAddedJobGraphs().size(), is(0)); + } + + @Test + public void testOnRemovedJobGraphIsIgnoredAfterBeingStop() throws Exception { + final TestingStateHandleStore<JobGraph> stateHandleStore = builder + .setAddFunction((ignore, state) -> jobGraphStorageHelper.store(state)) + .build(); + final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore); + jobGraphStore.putJobGraph(testingJobGraph); + jobGraphStore.stop(); + + testingJobGraphStoreWatcher.removeJobGraph(testingJobGraph.getJobID()); + assertThat(testingJobGraphListener.getRemovedJobGraphs().size(), is(0)); + } + + @Test + public void testStopJobGraphShouldCallReleaseAllHandles() throws Exception { Review comment: nit: `testStoppingJobGraphStoreShouldReleaseAllHandles` ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreITCase.java ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.highavailability; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.KubernetesResource; +import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration; +import org.apache.flink.kubernetes.kubeclient.DefaultKubeClientFactory; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.KubeClientFactory; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector; +import org.apache.flink.kubernetes.kubeclient.resources.TestingLeaderCallbackHandler; +import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper; + +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.UUID; + +import static org.apache.flink.kubernetes.highavailability.KubernetesHighAvailabilityTestBase.LEADER_CONFIGMAP_NAME; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; + +/** + * IT Tests for the {@link KubernetesStateHandleStore}. We expect only the leader could update the state store. + * The standby JobManagers update operations should not be issued. This is a "check-leadership-and-update" behavior + * test. It is a very basic requirement for {@link org.apache.flink.runtime.jobmanager.JobGraphStore} and + * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} implementation for Kubernetes. + */ +public class KubernetesStateHandleStoreITCase { Review comment: `extends TestLogger` is missing. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ########## @@ -474,6 +488,8 @@ protected String getLockPath(String rootPath) { client.create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path)); } catch (KeeperException.NodeExistsException ignored) { // we have already created the lock + } catch (KeeperException.NoNodeException ex) { Review comment: ah ok, this makes sense. Thanks for the clarification. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesJobGraphStoreUtil.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.highavailability; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.jobmanager.JobGraphStoreUtil; + +import static org.apache.flink.kubernetes.utils.Constants.JOB_GRAPH_STORE_KEY_PREFIX; + +/** + * {@link JobGraphStoreUtil} implementation for Kubernetes. + * + */ +public class KubernetesJobGraphStoreUtil implements JobGraphStoreUtil { + + /** + * Convert a key in ConfigMap to {@link JobID}. The key is stored with prefix + * {@link Constants#JOB_GRAPH_STORE_KEY_PREFIX}. + * + * @param key job graph key in ConfigMap. + * + * @return the parsed {@link JobID}. + */ + public JobID nameToJobID(String key) { + return JobID.fromHexString(key.replace(JOB_GRAPH_STORE_KEY_PREFIX, "")); Review comment: `key.substring(JOB_GRAPH_STORE_KEY_PREFIX.length())` might be a bit faster. ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.highavailability; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.testutils.FlinkMatchers; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector; +import org.apache.flink.runtime.persistence.StateHandleStore; +import org.apache.flink.runtime.persistence.StringResourceVersion; +import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper; +import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper.LongRetrievableStateHandle; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.function.FunctionUtils; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.function.Predicate; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Tests for {@link KubernetesStateHandleStore} operations. + */ +public class KubernetesStateHandleStoreTest extends KubernetesHighAvailabilityTestBase { + + private static final String PREFIX = "test-prefix-"; + private final String key = PREFIX + JobID.generate(); + private final Predicate<String> filter = k -> k.startsWith(PREFIX); + private final Long state = 12345L; + + private TestingLongStateHandleHelper longStateStorage; + + @Before + public void setup() { + super.setup(); + longStateStorage = new TestingLongStateHandleHelper(); + } + + @Test + public void testAdd() throws Exception { + new Context() {{ + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final KubernetesStateHandleStore<Long> store = new KubernetesStateHandleStore<>( + flinkKubeClient, LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY); + store.add(key, state); + assertThat(store.getAll().size(), is(1)); + assertThat(store.get(key).retrieveState(), is(state)); + }); + }}; + } + + @Test + public void testAddAlreadyExistingKey() throws Exception { + new Context() {{ + runTest( + () -> { + leaderCallbackGrantLeadership(); + + getLeaderConfigMap().getData().put(key, "existing data"); + + final KubernetesStateHandleStore<Long> store = new KubernetesStateHandleStore<>( + flinkKubeClient, LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY); + + try { + store.add(key, state); + fail("Exception should be thrown."); + } catch (StateHandleStore.AlreadyExistException ex) { + final String msg = String.format( + "%s already exists in ConfigMap %s", key, LEADER_CONFIGMAP_NAME); + assertThat(ex, FlinkMatchers.containsMessage(msg)); + } + assertThat(longStateStorage.getStateHandles().size(), is(1)); + assertThat(longStateStorage.getStateHandles().get(0).getNumberOfDiscardCalls(), is(1)); + }); + }}; + } + + @Test + public void testAddFailedWhenConfigMapNotExistAndDiscardState() throws Exception { + new Context() {{ + runTest( + () -> { + final KubernetesStateHandleStore<Long> store = new KubernetesStateHandleStore<>( + flinkKubeClient, LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY); + + try { + store.add(key, state); + fail("Exception should be thrown."); + } catch (Exception ex) { + final String msg = String.format("ConfigMap %s does not exist.", LEADER_CONFIGMAP_NAME); + assertThat(ex, FlinkMatchers.containsMessage(msg)); + } + assertThat(longStateStorage.getStateHandles().size(), is(1)); + assertThat(longStateStorage.getStateHandles().get(0).getNumberOfDiscardCalls(), is(1)); + }); + }}; + } + + @Test + public void testReplace() throws Exception { + new Context() {{ + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final KubernetesStateHandleStore<Long> store = new KubernetesStateHandleStore<>( + flinkKubeClient, LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY); + + store.add(key, state); + + final Long newState = 23456L; + final StringResourceVersion resourceVersion = store.exists(key); + store.replace(key, resourceVersion, newState); + + assertThat(store.getAll().size(), is(1)); + assertThat(store.get(key).retrieveState(), is(newState)); + }); + }}; + } + + @Test + public void testReplaceWithKeyNotExist() throws Exception { + new Context() {{ + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final KubernetesStateHandleStore<Long> store = new KubernetesStateHandleStore<>( + flinkKubeClient, LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY); + final Long newState = 23456L; + + try { + assertThat(store.exists(key), is(StringResourceVersion.notExist())); + store.replace(key, StringResourceVersion.notExist(), newState); + fail("Exception should be thrown."); + } catch (StateHandleStore.NotExistException e) { + final String msg = String.format( + "Could not find %s in ConfigMap %s", key, LEADER_CONFIGMAP_NAME); + assertThat(e, FlinkMatchers.containsMessage(msg)); + } + assertThat(store.getAll().size(), is(0)); + }); + }}; + } + + @Test + public void testReplaceWithNoLeadershipAndDiscardState() throws Exception { + new Context() {{ + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final KubernetesStateHandleStore<Long> store = new KubernetesStateHandleStore<>( + flinkKubeClient, LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY); + final Long newState = 23456L; + + store.add(key, state); + // Lost leadership + getLeaderCallback().notLeader(); + electionEventHandler.waitForRevokeLeader(TIMEOUT); + getLeaderConfigMap().getAnnotations().remove(KubernetesLeaderElector.LEADER_ANNOTATION_KEY); + + final StringResourceVersion resourceVersion = store.exists(key); + store.replace(key, resourceVersion, newState); + assertThat(store.getAll().size(), is(1)); + // The state do not change + assertThat(store.get(key).retrieveState(), is(state)); + + assertThat(longStateStorage.getStateHandles().size(), is(2)); + assertThat(longStateStorage.getStateHandles().get(0).getNumberOfDiscardCalls(), is(0)); + assertThat(longStateStorage.getStateHandles().get(1).getNumberOfDiscardCalls(), is(1)); + }); + }}; + } + + @Test + public void testReplaceFailedAndDiscardState() throws Exception { + final FlinkRuntimeException updateException = new FlinkRuntimeException("Failed to update"); + new Context() {{ + runTest( + () -> { + leaderCallbackGrantLeadership(); + + final KubernetesStateHandleStore<Long> store = new KubernetesStateHandleStore<>( + flinkKubeClient, LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY); + store.add(key, state); + + final FlinkKubeClient anotherFlinkKubeClient = createFlinkKubeClientBuilder() + .setCheckAndUpdateConfigMapFunction((configMapName, function) -> { + throw updateException; + }).build(); + final KubernetesStateHandleStore<Long> anotherStore = new KubernetesStateHandleStore<>( + anotherFlinkKubeClient, LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY); + + final Long newState = 23456L; + final StringResourceVersion resourceVersion = anotherStore.exists(key); + assertThat(resourceVersion.isNotExist(), is(false)); + try { + anotherStore.replace(key, resourceVersion, newState); Review comment: `fail` is missing. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesJobGraphStoreWatcher.java ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.highavailability; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.jobmanager.JobGraphStore; +import org.apache.flink.runtime.jobmanager.JobGraphStoreWatcher; +import org.apache.flink.runtime.persistence.StringResourceVersion; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.kubernetes.utils.Constants.JOB_GRAPH_STORE_KEY_PREFIX; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * {@link JobGraphStoreWatcher} implementation for Kubernetes. It watch the Dispatcher leader ConfigMap and call the + * {@link JobGraphStore.JobGraphListener} based on the received event. + */ +public class KubernetesJobGraphStoreWatcher implements JobGraphStoreWatcher { Review comment: Given that we only modify the stored JobGraphs when we are the leader, is it really necessary to additionally watch the config map for JobGraph store changes? Maybe it can be a no-op implementation. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java ########## @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.highavailability; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector; +import org.apache.flink.runtime.persistence.ResourceVersion; +import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper; +import org.apache.flink.runtime.persistence.StateHandleStore; +import org.apache.flink.runtime.persistence.StringResourceVersion; +import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.InstantiationUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Class which stores state via the provided {@link RetrievableStateStorageHelper} and writes the + * returned state handle to ConfigMap. + * + * <p>Added state is persisted via {@link RetrievableStateHandle RetrievableStateHandles}, + * which in turn are written to ConfigMap. This level of indirection is necessary to keep the + * amount of data in ConfigMap small. ConfigMap is build for data less than 1MB whereas + * state can grow to multiple MBs and GBs. + * + * <p>This is a very different implementation with {@link org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}. + * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap} transactional operation, we could guarantee that + * only the leader could update the store. Then we will completely get rid of the lock-and-release in Zookeeper + * implementation. + * + * @param <T> Type of state + */ +public class KubernetesStateHandleStore<T extends Serializable> implements StateHandleStore<T> { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesStateHandleStore.class); + + private final FlinkKubeClient kubeClient; + + private final String configMapName; + + private final RetrievableStateStorageHelper<T> storage; + + private final Predicate<String> filter; + + private final String lockIdentity; + + /** + * Creates a {@link KubernetesStateHandleStore}. + * + * @param kubeClient The Kubernetes client. + * @param storage To persist the actual state and whose returned state handle is then written to ConfigMap + * @param configMapName ConfigMap to store the state handle store pointer + * @param filter filter to get the expected keys for state handle + * @param lockIdentity lock identity of current HA service + */ + public KubernetesStateHandleStore( + FlinkKubeClient kubeClient, + String configMapName, + RetrievableStateStorageHelper<T> storage, + Predicate<String> filter, Review comment: maybe call `configMapKeyFilter` or so. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
