tillrohrmann commented on a change in pull request #13864:
URL: https://github.com/apache/flink/pull/13864#discussion_r518043106



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.persistence;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Class which stores state via the provided {@link 
RetrievableStateStorageHelper} and writes the
+ * returned state handle to distributed coordination system(e.g. Zookeeper, 
Kubernetes, etc.).
+ *
+ * <p>To avoid concurrent modification issues, the implementation needs to 
ensure that only the leader could
+ * update the state store.
+ *
+ * @param <T> Type of state
+ */
+public interface StateHandleStore<T extends Serializable> {
+
+       /**
+        * Persist the state to distributed storage(e.g. S3, HDFS, etc.). And 
then creates a state handle, stores it in
+        * the distributed coordination system(e.g. ZooKeeper, Kubernetes, 
etc.).
+        *
+        * @param name Key name in ConfigMap or child path name in ZooKeeper
+        * @param state State to be added
+        *
+        * @throws AlreadyExistException if the name already exists
+        * @throws Exception if persisting state or writing state handle failed
+        */
+       RetrievableStateHandle<T> add(String name, T state) throws Exception;
+
+       /**
+        * Replaces a state handle in the distributed coordination system and 
discards the old state handle.
+        *
+        * @param name Key name in ConfigMap or child path name in ZooKeeper
+        * @param resourceVersion resource version of previous storage object. 
If the resource version does not match, the
+        * replace operation will fail. Since there is an unexpected update 
operation snuck in.
+        * @param state State to be replace with
+        *
+        * @throws NotExistException if the name does not exist
+        * @throws Exception if persisting state or writing state handle failed
+        */
+       void replace(String name, ResourceVersion resourceVersion, T state) 
throws Exception;
+
+       /**
+        * Returns resource version of state handle with specific name on the 
underlying storage.
+        *
+        * @param name Key name in ConfigMap or child path name in ZooKeeper
+        *
+        * @return current resource version on the underlying storage.
+        *
+        * @throws Exception if the check existence operation failed
+        */
+       ResourceVersion exists(String name) throws Exception;
+
+       /**
+        * Gets the {@link RetrievableStateHandle} stored with the given name.
+        *
+        * @param name Key name in ConfigMap or child path name in ZooKeeper
+        *
+        * @return The retrieved state handle
+        *
+        * @throws IOException if the method failed to deserialize the stored 
state handle
+        * @throws NotExistException when the name does not exist
+        * @throws Exception if get state handle failed
+        */
+       RetrievableStateHandle<T> get(String name) throws Exception;
+
+       /**
+        * Gets all available state handles from the storage.
+        *
+        * @return All retrieved state handles.
+        *
+        * @throws Exception if get state handle operation failed
+        */
+       List<Tuple2<RetrievableStateHandle<T>, String>> getAll() throws 
Exception;
+
+       /**
+        * Return a list of all valid name for state handles.
+        *
+        * @return List of valid state handle name. The name is key name in 
ConfigMap or child path name in ZooKeeper.
+        *
+        * @throws Exception if get handle operation failed
+        */
+       Collection<String> getAllHandles() throws Exception;
+
+       /**
+        * Remove the state handle and discard the state with given name.
+        *
+        * @param name Key name in ConfigMap or child path name in ZooKeeper
+        *
+        * @return True if the state handle could be removed.
+        *
+        * @throws Exception if removing the handles or discarding the state 
failed
+        */
+       boolean removeHandleAndDiscardState(String name) throws Exception;
+
+       /**
+        * Remove all the states. Not only the state handles in the distributed 
coordination system
+        * will be removed, but also the real state data on the distributed 
storage will be discarded.
+        *
+        * @throws Exception if removing the names or discarding the state 
failed
+        */
+       void removeAllHandlesAndDiscardState() throws Exception;
+
+       /**
+        * Only remove all the state handle pointers on Kubernetes or ZooKeeper.
+        *
+        * @throws Exception if removing the handles failed
+        */
+       void removeAllHandles() throws Exception;
+
+       /**
+        * Release the lock on the specific state handle so that it could 
deleted other {@link StateHandleStore}. If no
+        * lock exists or the underlying storage does not support, nothing will 
happen.
+        *
+        * @throws Exception if releasing the lock
+        */
+       void releaseHandle(String name) throws Exception;
+
+       /**
+        * Release all the locks on corresponding state handle so that it could 
deleted other {@link StateHandleStore}.
+        * If no lock exists or the underlying storage does not support, 
nothing will happen.
+        *
+        * @throws Exception if releasing the locks
+        */
+       void releaseAllHandles() throws Exception;

Review comment:
       Given that we have the release calls would it make sense to add 
`andLock` to the methods which will add a lock (e.g. `getAndLock` and 
`addAndLock`, `getAllAndLock`)? This could help the users to see where a 
release call is necessary because a lock was created.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStoreUtil.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+/**
+ * {@link JobGraphStoreUtil} implementation for ZooKeeper.
+ *

Review comment:
       line can be removed

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import 
org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.persistence.IntegerResourceVersion;
+import org.apache.flink.runtime.persistence.ResourceVersion;
+import org.apache.flink.runtime.persistence.StateHandleStore;
+import org.apache.flink.runtime.persistence.TestingStateHandleStore;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link DefaultJobGraphStore} with {@link 
TestingJobGraphStoreWatcher}, {@link TestingStateHandleStore},
+ * and {@link TestingJobGraphListener}.
+ */
+public class DefaultJobGraphStoreTest extends TestLogger {
+
+       private final JobGraph testingJobGraph = new JobGraph();
+       private final long timeout = 100L;
+
+       private TestingStateHandleStore.Builder<JobGraph> builder;
+       private TestingRetrievableStateStorageHelper<JobGraph> 
jobGraphStorageHelper;
+       private TestingJobGraphStoreWatcher testingJobGraphStoreWatcher;
+       private TestingJobGraphListener testingJobGraphListener;
+
+       @Before
+       public void setup() {
+               builder = TestingStateHandleStore.builder();
+               testingJobGraphStoreWatcher = new TestingJobGraphStoreWatcher();
+               testingJobGraphListener = new TestingJobGraphListener();
+               jobGraphStorageHelper = new 
TestingRetrievableStateStorageHelper<>();
+       }
+
+       @After
+       public void teardown() {
+               if (testingJobGraphStoreWatcher != null) {
+                       testingJobGraphStoreWatcher.stop();
+               }
+       }
+
+       @Test
+       public void testRecoverJobGraph() throws Exception {
+               final RetrievableStateHandle<JobGraph> stateHandle = 
jobGraphStorageHelper.store(testingJobGraph);
+               final TestingStateHandleStore<JobGraph> stateHandleStore = 
builder
+                       .setGetFunction(ignore -> stateHandle)
+                       .build();
+
+               final JobGraphStore jobGraphStore = 
createAndStartJobGraphStore(stateHandleStore);
+
+               final JobGraph recoveredJobGraph = 
jobGraphStore.recoverJobGraph(testingJobGraph.getJobID());
+               assertThat(recoveredJobGraph, is(notNullValue()));
+               assertThat(recoveredJobGraph.getJobID(), 
is(testingJobGraph.getJobID()));
+       }
+
+       @Test
+       public void testRecoverJobGraphWhenNotExist() throws Exception {
+               final TestingStateHandleStore<JobGraph> stateHandleStore = 
builder
+                       .setGetFunction(ignore -> {
+                               throw new 
StateHandleStore.NotExistException("Not exist exception.");
+                       })
+                       .build();
+
+               final JobGraphStore jobGraphStore = 
createAndStartJobGraphStore(stateHandleStore);
+
+               final JobGraph recoveredJobGraph = 
jobGraphStore.recoverJobGraph(testingJobGraph.getJobID());
+               assertThat(recoveredJobGraph, is(nullValue()));
+       }
+
+       @Test
+       public void testRecoverJobGraphFailedShouldReleaseHandle() throws 
Exception {
+               final CompletableFuture<String> releaseFuture = new 
CompletableFuture<>();
+               final FlinkException testException = new FlinkException("Test 
exception.");
+               final TestingStateHandleStore<JobGraph> stateHandleStore = 
builder
+                       .setGetFunction(ignore -> {
+                               throw testException;
+                       })
+                       .setReleaseConsumer(releaseFuture::complete)
+                       .build();
+
+               final JobGraphStore jobGraphStore = 
createAndStartJobGraphStore(stateHandleStore);
+
+               try {
+                       
jobGraphStore.recoverJobGraph(testingJobGraph.getJobID());
+                       fail("recoverJobGraph should fail when there is 
exception in getting the state handle.");
+               } catch (Exception ex) {
+                       assertThat(ex, 
FlinkMatchers.containsCause(testException));
+                       String actual = releaseFuture.get(timeout, 
TimeUnit.MILLISECONDS);
+                       assertThat(actual, 
is(testingJobGraph.getJobID().toString()));
+               }
+       }
+
+       @Test
+       public void testPutJobGraphWhenNotExist() throws Exception {
+               final CompletableFuture<JobGraph> addFuture = new 
CompletableFuture<>();
+               final TestingStateHandleStore<JobGraph> stateHandleStore = 
builder
+                       .setExistsFunction(ignore -> 
IntegerResourceVersion.notExist())
+                       .setAddFunction((ignore, state) -> {
+                               addFuture.complete(state);
+                               return jobGraphStorageHelper.store(state);
+                       })
+                       .build();
+
+               final JobGraphStore jobGraphStore = 
createAndStartJobGraphStore(stateHandleStore);
+               jobGraphStore.putJobGraph(testingJobGraph);
+
+               final JobGraph actual = addFuture.get(timeout, 
TimeUnit.MILLISECONDS);
+               assertThat(actual.getJobID(), is(testingJobGraph.getJobID()));
+       }
+
+       @Test
+       public void testPutJobGraphWhenAlreadyExist() throws Exception {
+               final CompletableFuture<Tuple3<String, ResourceVersion, 
JobGraph>> replaceFuture = new CompletableFuture<>();
+               final int resourceVersion = 100;
+               final AtomicBoolean alreadyExist = new AtomicBoolean(false);
+               final TestingStateHandleStore<JobGraph> stateHandleStore = 
builder
+                       .setExistsFunction(ignore -> {
+                               if (alreadyExist.get()) {
+                                       return 
IntegerResourceVersion.valueOf(resourceVersion);
+                               } else {
+                                       alreadyExist.set(true);
+                                       return 
IntegerResourceVersion.notExist();
+                               }
+                       })
+                       .setAddFunction((ignore, state) -> 
jobGraphStorageHelper.store(state))
+                       .setReplaceConsumer(replaceFuture::complete)
+                       .build();
+
+               final JobGraphStore jobGraphStore = 
createAndStartJobGraphStore(stateHandleStore);
+               jobGraphStore.putJobGraph(testingJobGraph);
+               // Replace
+               jobGraphStore.putJobGraph(testingJobGraph);
+
+               final Tuple3<String, ResourceVersion, JobGraph> actual = 
replaceFuture.get(timeout, TimeUnit.MILLISECONDS);
+               assertThat(actual.f0, 
is(testingJobGraph.getJobID().toString()));
+               assertThat(actual.f1, 
is(IntegerResourceVersion.valueOf(resourceVersion)));
+               assertThat(actual.f2.getJobID(), 
is(testingJobGraph.getJobID()));
+       }
+
+       @Test
+       public void testRemoveJobGraph() throws Exception {
+               final CompletableFuture<JobID> removeFuture = new 
CompletableFuture<>();
+               final TestingStateHandleStore<JobGraph> stateHandleStore = 
builder
+                       .setAddFunction((ignore, state) -> 
jobGraphStorageHelper.store(state))
+                       .setRemoveFunction(name -> 
removeFuture.complete(JobID.fromHexString(name)))
+                       .build();
+
+               final JobGraphStore jobGraphStore = 
createAndStartJobGraphStore(stateHandleStore);
+
+               jobGraphStore.putJobGraph(testingJobGraph);
+               jobGraphStore.removeJobGraph(testingJobGraph.getJobID());
+               final JobID actual = removeFuture.get(timeout, 
TimeUnit.MILLISECONDS);
+               assertThat(actual, is(testingJobGraph.getJobID()));
+       }
+
+       @Test
+       public void testRemoveJobGraphWithNonExistName() throws Exception {
+               final CompletableFuture<JobID> removeFuture = new 
CompletableFuture<>();
+               final TestingStateHandleStore<JobGraph> stateHandleStore = 
builder
+                       .setRemoveFunction(name -> 
removeFuture.complete(JobID.fromHexString(name)))
+                       .build();
+
+               final JobGraphStore jobGraphStore = 
createAndStartJobGraphStore(stateHandleStore);
+               jobGraphStore.removeJobGraph(testingJobGraph.getJobID());
+
+               try {
+                       removeFuture.get(timeout, TimeUnit.MILLISECONDS);
+                       fail("We should get an expected timeout because we are 
removing a non-existed job graph.");
+               } catch (TimeoutException ex) {
+                       // expected
+               }
+               assertThat(removeFuture.isDone(), is(false));
+       }
+
+       @Test
+       public void testGetJobIds() throws Exception {
+               final List<JobID> existingJobIds = Arrays.asList(new JobID(0, 
0), new JobID(0, 1));
+               final TestingStateHandleStore<JobGraph> stateHandleStore = 
builder
+                       .setGetAllHandlesSupplier(
+                               () -> 
existingJobIds.stream().map(AbstractID::toString).collect(Collectors.toList()))
+                       .build();
+
+               final JobGraphStore jobGraphStore = 
createAndStartJobGraphStore(stateHandleStore);
+               final Collection<JobID> jobIds = jobGraphStore.getJobIds();
+               assertThat(jobIds, contains(existingJobIds.toArray()));
+       }
+
+       @Test
+       public void testOnAddedJobGraphShouldNotProcessKnownJobGraphs() throws 
Exception {
+               final TestingStateHandleStore<JobGraph> stateHandleStore = 
builder
+                       .setAddFunction((ignore, state) -> 
jobGraphStorageHelper.store(state))
+                       .build();
+               final JobGraphStore jobGraphStore = 
createAndStartJobGraphStore(stateHandleStore);
+               jobGraphStore.putJobGraph(testingJobGraph);
+
+               
testingJobGraphStoreWatcher.addJobGraph(testingJobGraph.getJobID());
+               assertThat(testingJobGraphListener.getAddedJobGraphs().size(), 
is(0));
+       }
+
+       @Test
+       public void testOnAddedJobGraphShouldOnlyProcessUnknownJobGraphs() 
throws Exception {
+               final RetrievableStateHandle<JobGraph> stateHandle = 
jobGraphStorageHelper.store(testingJobGraph);
+               final TestingStateHandleStore<JobGraph> stateHandleStore = 
builder
+                       .setGetFunction(ignore -> stateHandle)
+                       .setAddFunction((ignore, state) -> 
jobGraphStorageHelper.store(state))
+                       .build();
+               final JobGraphStore jobGraphStore = 
createAndStartJobGraphStore(stateHandleStore);
+               jobGraphStore.recoverJobGraph(testingJobGraph.getJobID());
+
+               // Known recovered job
+               
testingJobGraphStoreWatcher.addJobGraph(testingJobGraph.getJobID());
+               // Unknown job
+               final JobID unknownJobId = JobID.generate();
+               testingJobGraphStoreWatcher.addJobGraph(unknownJobId);
+               assertThat(testingJobGraphListener.getAddedJobGraphs().size(), 
is(1));
+               assertThat(testingJobGraphListener.getAddedJobGraphs(), 
contains(unknownJobId));
+       }
+
+       @Test
+       public void testOnRemovedJobGraphShouldOnlyProcessKnownJobGraphs() 
throws Exception {
+               final TestingStateHandleStore<JobGraph> stateHandleStore = 
builder
+                       .setAddFunction((ignore, state) -> 
jobGraphStorageHelper.store(state))
+                       .build();
+               final JobGraphStore jobGraphStore = 
createAndStartJobGraphStore(stateHandleStore);
+               jobGraphStore.putJobGraph(testingJobGraph);
+
+               // Unknown job
+               testingJobGraphStoreWatcher.removeJobGraph(JobID.generate());
+               // Known job
+               
testingJobGraphStoreWatcher.removeJobGraph(testingJobGraph.getJobID());
+               
assertThat(testingJobGraphListener.getRemovedJobGraphs().size(), is(1));
+               assertThat(testingJobGraphListener.getRemovedJobGraphs(), 
contains(testingJobGraph.getJobID()));
+       }
+
+       @Test
+       public void testOnRemovedJobGraphShouldNotProcessUnknownJobGraphs() 
throws Exception {
+               final TestingStateHandleStore<JobGraph> stateHandleStore = 
builder
+                       .setAddFunction((ignore, state) -> 
jobGraphStorageHelper.store(state))
+                       .build();
+               createAndStartJobGraphStore(stateHandleStore);
+
+               
testingJobGraphStoreWatcher.removeJobGraph(testingJobGraph.getJobID());
+               
assertThat(testingJobGraphListener.getRemovedJobGraphs().size(), is(0));
+       }
+
+       @Test
+       public void testOnAddedJobGraphIsIgnoredAfterBeingStop() throws 
Exception {
+               final TestingStateHandleStore<JobGraph> stateHandleStore = 
builder
+                       .setAddFunction((ignore, state) -> 
jobGraphStorageHelper.store(state))
+                       .build();
+               final JobGraphStore jobGraphStore = 
createAndStartJobGraphStore(stateHandleStore);
+               jobGraphStore.stop();
+
+               
testingJobGraphStoreWatcher.addJobGraph(testingJobGraph.getJobID());
+               assertThat(testingJobGraphListener.getAddedJobGraphs().size(), 
is(0));
+       }
+
+       @Test
+       public void testOnRemovedJobGraphIsIgnoredAfterBeingStop() throws 
Exception {
+               final TestingStateHandleStore<JobGraph> stateHandleStore = 
builder
+                       .setAddFunction((ignore, state) -> 
jobGraphStorageHelper.store(state))
+                       .build();
+               final JobGraphStore jobGraphStore = 
createAndStartJobGraphStore(stateHandleStore);
+               jobGraphStore.putJobGraph(testingJobGraph);
+               jobGraphStore.stop();
+
+               
testingJobGraphStoreWatcher.removeJobGraph(testingJobGraph.getJobID());
+               
assertThat(testingJobGraphListener.getRemovedJobGraphs().size(), is(0));
+       }
+
+       @Test
+       public void testStopJobGraphShouldCallReleaseAllHandles() throws 
Exception {

Review comment:
       nit: `testStoppingJobGraphStoreShouldReleaseAllHandles`

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreITCase.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.KubernetesResource;
+import 
org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.DefaultKubeClientFactory;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import 
org.apache.flink.kubernetes.kubeclient.resources.TestingLeaderCallbackHandler;
+import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static 
org.apache.flink.kubernetes.highavailability.KubernetesHighAvailabilityTestBase.LEADER_CONFIGMAP_NAME;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT Tests for the {@link KubernetesStateHandleStore}. We expect only the 
leader could update the state store.
+ * The standby JobManagers update operations should not be issued. This is a 
"check-leadership-and-update" behavior
+ * test. It is a very basic requirement for {@link 
org.apache.flink.runtime.jobmanager.JobGraphStore} and
+ * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} 
implementation for Kubernetes.
+ */
+public class KubernetesStateHandleStoreITCase {

Review comment:
       `extends TestLogger` is missing.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -474,6 +488,8 @@ protected String getLockPath(String rootPath) {
                                
client.create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path));
                        } catch (KeeperException.NodeExistsException ignored) {
                                // we have already created the lock
+                       } catch (KeeperException.NoNodeException ex) {

Review comment:
       ah ok, this makes sense. Thanks for the clarification.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesJobGraphStoreUtil.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.jobmanager.JobGraphStoreUtil;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.JOB_GRAPH_STORE_KEY_PREFIX;
+
+/**
+ * {@link JobGraphStoreUtil} implementation for Kubernetes.
+ *
+ */
+public class KubernetesJobGraphStoreUtil implements JobGraphStoreUtil {
+
+       /**
+        * Convert a key in ConfigMap to {@link JobID}. The key is stored with 
prefix
+        * {@link Constants#JOB_GRAPH_STORE_KEY_PREFIX}.
+        *
+        * @param key job graph key in ConfigMap.
+        *
+        * @return the parsed {@link JobID}.
+        */
+       public JobID nameToJobID(String key) {
+               return 
JobID.fromHexString(key.replace(JOB_GRAPH_STORE_KEY_PREFIX, ""));

Review comment:
       `key.substring(JOB_GRAPH_STORE_KEY_PREFIX.length())` might be a bit 
faster.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.persistence.StateHandleStore;
+import org.apache.flink.runtime.persistence.StringResourceVersion;
+import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper;
+import 
org.apache.flink.runtime.persistence.TestingLongStateHandleHelper.LongRetrievableStateHandle;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Predicate;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link KubernetesStateHandleStore} operations.
+ */
+public class KubernetesStateHandleStoreTest extends 
KubernetesHighAvailabilityTestBase {
+
+       private static final String PREFIX = "test-prefix-";
+       private final String key = PREFIX + JobID.generate();
+       private final Predicate<String> filter = k -> k.startsWith(PREFIX);
+       private final Long state = 12345L;
+
+       private TestingLongStateHandleHelper longStateStorage;
+
+       @Before
+       public void setup() {
+               super.setup();
+               longStateStorage = new TestingLongStateHandleHelper();
+       }
+
+       @Test
+       public void testAdd() throws Exception {
+               new Context() {{
+                       runTest(
+                               () -> {
+                                       leaderCallbackGrantLeadership();
+
+                                       final KubernetesStateHandleStore<Long> 
store = new KubernetesStateHandleStore<>(
+                                               flinkKubeClient, 
LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY);
+                                       store.add(key, state);
+                                       assertThat(store.getAll().size(), 
is(1));
+                                       
assertThat(store.get(key).retrieveState(), is(state));
+                               });
+               }};
+       }
+
+       @Test
+       public void testAddAlreadyExistingKey() throws Exception {
+               new Context() {{
+                       runTest(
+                               () -> {
+                                       leaderCallbackGrantLeadership();
+
+                                       getLeaderConfigMap().getData().put(key, 
"existing data");
+
+                                       final KubernetesStateHandleStore<Long> 
store = new KubernetesStateHandleStore<>(
+                                               flinkKubeClient, 
LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY);
+
+                                       try {
+                                               store.add(key, state);
+                                               fail("Exception should be 
thrown.");
+                                       } catch 
(StateHandleStore.AlreadyExistException ex) {
+                                               final String msg = 
String.format(
+                                                       "%s already exists in 
ConfigMap %s", key, LEADER_CONFIGMAP_NAME);
+                                               assertThat(ex, 
FlinkMatchers.containsMessage(msg));
+                                       }
+                                       
assertThat(longStateStorage.getStateHandles().size(), is(1));
+                                       
assertThat(longStateStorage.getStateHandles().get(0).getNumberOfDiscardCalls(), 
is(1));
+                               });
+               }};
+       }
+
+       @Test
+       public void testAddFailedWhenConfigMapNotExistAndDiscardState() throws 
Exception {
+               new Context() {{
+                       runTest(
+                               () -> {
+                                       final KubernetesStateHandleStore<Long> 
store = new KubernetesStateHandleStore<>(
+                                               flinkKubeClient, 
LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY);
+
+                                       try {
+                                               store.add(key, state);
+                                               fail("Exception should be 
thrown.");
+                                       } catch (Exception ex) {
+                                               final String msg = 
String.format("ConfigMap %s does not exist.", LEADER_CONFIGMAP_NAME);
+                                               assertThat(ex, 
FlinkMatchers.containsMessage(msg));
+                                       }
+                                       
assertThat(longStateStorage.getStateHandles().size(), is(1));
+                                       
assertThat(longStateStorage.getStateHandles().get(0).getNumberOfDiscardCalls(), 
is(1));
+                               });
+               }};
+       }
+
+       @Test
+       public void testReplace() throws Exception {
+               new Context() {{
+                       runTest(
+                               () -> {
+                                       leaderCallbackGrantLeadership();
+
+                                       final KubernetesStateHandleStore<Long> 
store = new KubernetesStateHandleStore<>(
+                                               flinkKubeClient, 
LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY);
+
+                                       store.add(key, state);
+
+                                       final Long newState = 23456L;
+                                       final StringResourceVersion 
resourceVersion = store.exists(key);
+                                       store.replace(key, resourceVersion, 
newState);
+
+                                       assertThat(store.getAll().size(), 
is(1));
+                                       
assertThat(store.get(key).retrieveState(), is(newState));
+                               });
+               }};
+       }
+
+       @Test
+       public void testReplaceWithKeyNotExist() throws Exception {
+               new Context() {{
+                       runTest(
+                               () -> {
+                                       leaderCallbackGrantLeadership();
+
+                                       final KubernetesStateHandleStore<Long> 
store = new KubernetesStateHandleStore<>(
+                                               flinkKubeClient, 
LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY);
+                                       final Long newState = 23456L;
+
+                                       try {
+                                               assertThat(store.exists(key), 
is(StringResourceVersion.notExist()));
+                                               store.replace(key, 
StringResourceVersion.notExist(), newState);
+                                               fail("Exception should be 
thrown.");
+                                       } catch 
(StateHandleStore.NotExistException e) {
+                                               final String msg = 
String.format(
+                                                       "Could not find %s in 
ConfigMap %s", key, LEADER_CONFIGMAP_NAME);
+                                               assertThat(e, 
FlinkMatchers.containsMessage(msg));
+                                       }
+                                       assertThat(store.getAll().size(), 
is(0));
+                               });
+               }};
+       }
+
+       @Test
+       public void testReplaceWithNoLeadershipAndDiscardState() throws 
Exception {
+               new Context() {{
+                       runTest(
+                               () -> {
+                                       leaderCallbackGrantLeadership();
+
+                                       final KubernetesStateHandleStore<Long> 
store = new KubernetesStateHandleStore<>(
+                                               flinkKubeClient, 
LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY);
+                                       final Long newState = 23456L;
+
+                                       store.add(key, state);
+                                       // Lost leadership
+                                       getLeaderCallback().notLeader();
+                                       
electionEventHandler.waitForRevokeLeader(TIMEOUT);
+                                       
getLeaderConfigMap().getAnnotations().remove(KubernetesLeaderElector.LEADER_ANNOTATION_KEY);
+
+                                       final StringResourceVersion 
resourceVersion = store.exists(key);
+                                       store.replace(key, resourceVersion, 
newState);
+                                       assertThat(store.getAll().size(), 
is(1));
+                                       // The state do not change
+                                       
assertThat(store.get(key).retrieveState(), is(state));
+
+                                       
assertThat(longStateStorage.getStateHandles().size(), is(2));
+                                       
assertThat(longStateStorage.getStateHandles().get(0).getNumberOfDiscardCalls(), 
is(0));
+                                       
assertThat(longStateStorage.getStateHandles().get(1).getNumberOfDiscardCalls(), 
is(1));
+                               });
+               }};
+       }
+
+       @Test
+       public void testReplaceFailedAndDiscardState() throws Exception {
+               final FlinkRuntimeException updateException = new 
FlinkRuntimeException("Failed to update");
+               new Context() {{
+                       runTest(
+                               () -> {
+                                       leaderCallbackGrantLeadership();
+
+                                       final KubernetesStateHandleStore<Long> 
store = new KubernetesStateHandleStore<>(
+                                               flinkKubeClient, 
LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY);
+                                       store.add(key, state);
+
+                                       final FlinkKubeClient 
anotherFlinkKubeClient = createFlinkKubeClientBuilder()
+                                               
.setCheckAndUpdateConfigMapFunction((configMapName, function) -> {
+                                                       throw updateException;
+                                               }).build();
+                                       final KubernetesStateHandleStore<Long> 
anotherStore = new KubernetesStateHandleStore<>(
+                                               anotherFlinkKubeClient, 
LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY);
+
+                                       final Long newState = 23456L;
+                                       final StringResourceVersion 
resourceVersion = anotherStore.exists(key);
+                                       
assertThat(resourceVersion.isNotExist(), is(false));
+                                       try {
+                                               anotherStore.replace(key, 
resourceVersion, newState);

Review comment:
       `fail` is missing.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesJobGraphStoreWatcher.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.jobmanager.JobGraphStoreWatcher;
+import org.apache.flink.runtime.persistence.StringResourceVersion;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.JOB_GRAPH_STORE_KEY_PREFIX;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link JobGraphStoreWatcher} implementation for Kubernetes. It watch the 
Dispatcher leader ConfigMap and call the
+ * {@link JobGraphStore.JobGraphListener} based on the received event.
+ */
+public class KubernetesJobGraphStoreWatcher implements JobGraphStoreWatcher {

Review comment:
       Given that we only modify the stored JobGraphs when we are the leader, 
is it really necessary to additionally watch the config map for JobGraph store 
changes? Maybe it can be a no-op implementation.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.persistence.ResourceVersion;
+import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.persistence.StateHandleStore;
+import org.apache.flink.runtime.persistence.StringResourceVersion;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class which stores state via the provided {@link 
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ConfigMap.
+ *
+ * <p>Added state is persisted via {@link RetrievableStateHandle 
RetrievableStateHandles},
+ * which in turn are written to ConfigMap. This level of indirection is 
necessary to keep the
+ * amount of data in ConfigMap small. ConfigMap is build for data less than 
1MB whereas
+ * state can grow to multiple MBs and GBs.
+ *
+ * <p>This is a very different implementation with {@link 
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}.
+ * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap} 
transactional operation, we could guarantee that
+ * only the leader could update the store. Then we will completely get rid of 
the lock-and-release in Zookeeper
+ * implementation.
+ *
+ * @param <T> Type of state
+ */
+public class KubernetesStateHandleStore<T extends Serializable> implements 
StateHandleStore<T> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+
+       private final FlinkKubeClient kubeClient;
+
+       private final String configMapName;
+
+       private final RetrievableStateStorageHelper<T> storage;
+
+       private final Predicate<String> filter;
+
+       private final String lockIdentity;
+
+       /**
+        * Creates a {@link KubernetesStateHandleStore}.
+        *
+        * @param kubeClient The Kubernetes client.
+        * @param storage To persist the actual state and whose returned state 
handle is then written to ConfigMap
+        * @param configMapName ConfigMap to store the state handle store 
pointer
+        * @param filter filter to get the expected keys for state handle
+        * @param lockIdentity lock identity of current HA service
+        */
+       public KubernetesStateHandleStore(
+                       FlinkKubeClient kubeClient,
+                       String configMapName,
+                       RetrievableStateStorageHelper<T> storage,
+                       Predicate<String> filter,

Review comment:
       maybe call `configMapKeyFilter` or so.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to