tillrohrmann commented on a change in pull request #13864:
URL: https://github.com/apache/flink/pull/13864#discussion_r517380834
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/statehandle/RetrievableStateStorageHelper.java
##########
@@ -16,9 +16,10 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.zookeeper;
+package org.apache.flink.runtime.statehandle;
Review comment:
Maybe call the package `persistence` instead of `statehandle`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
##########
@@ -684,22 +685,22 @@ public void testReleaseAll() throws Exception {
assertEquals(0, stat.getNumChildren());
}
- zkStore.releaseAndTryRemoveAll();
+ zkStore.removeAll();
Stat stat = ZOOKEEPER.getClient().checkExists().forPath("/");
assertEquals(0, stat.getNumChildren());
}
@Test
- public void testDeleteAllShouldRemoveAllPaths() throws Exception {
+ public void testReleaseAndTryRemoveAllShouldRemoveAllPaths() throws
Exception {
Review comment:
`testRemoveAllShouldRemoveAllPaths`
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ConfigMap.
+ *
+ * <p>Added state is persisted via {@link RetrievableStateHandle
RetrievableStateHandles},
+ * which in turn are written to ConfigMap. This level of indirection is
necessary to keep the
+ * amount of data in ConfigMap small. ConfigMap is build for data less than
1MB whereas
+ * state can grow to multiple MBs and GBs.
+ *
+ * <p>This is a very different implementation with {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}.
+ * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap}
transactional operation, we could guarantee that
+ * only the leader could update the store. Then we will completely get rid of
the lock-and-release in Zookeeper
+ * implementation.
+ *
+ * @param <T> Type of state
+ */
+public class KubernetesStateHandleStore<T extends Serializable> implements
StateHandleStore<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final RetrievableStateStorageHelper<T> storage;
+
+ private final Predicate<String> filter;
+
+ private final String lockIdentity;
+
+ private final Supplier<Exception> configMapNotExistSupplier;
+
+ /**
+ * Creates a {@link KubernetesStateHandleStore}.
+ *
+ * @param kubeClient The Kubernetes client.
+ * @param storage To persist the actual state and whose returned
state handle is then written to ConfigMap
+ * @param configMapName ConfigMap to store the state handle store
pointer
+ * @param filter filter to get the expected keys for state handle
+ * @param lockIdentity lock identity of current HA service
+ */
+ public KubernetesStateHandleStore(
+ FlinkKubeClient kubeClient,
+ String configMapName,
+ RetrievableStateStorageHelper<T> storage,
+ Predicate<String> filter,
+ String lockIdentity) {
+
+ this.kubeClient = checkNotNull(kubeClient, "Kubernetes client");
+ this.storage = checkNotNull(storage, "State storage");
+ this.configMapName = checkNotNull(configMapName, "ConfigMap
name");
+ this.filter = checkNotNull(filter);
+ this.lockIdentity = checkNotNull(lockIdentity, "Lock identity
of current HA service");
+
+ this.configMapNotExistSupplier = () -> new
KubernetesException("ConfigMap " + configMapName + " not exists.");
+ }
+
+ /**
+ * Creates a state handle, stores it in ConfigMap. We could guarantee
that only the leader could update the
+ * ConfigMap. Since “Get(check the leader)-and-Update(write back to the
ConfigMap)” is a
+ * transactional operation.
+ *
+ * @param key Key in ConfigMap
+ * @param state State to be added
+ *
+ * @throws AlreadyExistException if the name already exists
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public RetrievableStateHandle<T> add(String key, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> storeHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(storeHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ if
(!c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ return Optional.of(c);
+ } else {
+ throw new
CompletionException(new StateHandleStore.AlreadyExistException(
+ key + " already
exists in ConfigMap " + configMapName));
+ }
+ }
+ return Optional.empty();
+ }).get();
+ return storeHandle;
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ // Never reached
+ return null;
+ } finally {
+ if (!success) {
+ // Cleanup the state handle if it was not
written to ConfigMap.
+ if (storeHandle != null) {
+ storeHandle.discardState();
+ }
+ }
+ }
+ }
+
+ /**
+ * Replaces a state handle in ConfigMap and discards the old state
handle. Wo do not lock resource
+ * version and then replace in Kubernetes. Since the ConfigMap is
periodically updated by leader, the
+ * resource version changes very fast. We use a "check-existence and
update" transactional operation instead.
+ *
+ * @param key Key in ConfigMap
+ * @param resourceVersion resource version when checking existence via
{@link #exists}.
+ * @param state State to be added
+ *
+ * @throws NotExistException if the name does not exist
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public void replace(String key, String resourceVersion, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> oldStateHandle = get(key);
+
+ final RetrievableStateHandle<T> newStateHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(newStateHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ // Check the existence
+ if
(c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ } else {
+ throw new
CompletionException(new StateHandleStore.NotExistException(
+ "Could not find
" + key + " in ConfigMap " + configMapName));
+ }
+ return Optional.of(c);
+ }
+ return Optional.empty();
+ }).get();
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ } finally {
+ if (success) {
+ oldStateHandle.discardState();
+ } else {
+ newStateHandle.discardState();
+ }
+ }
+ }
+
+ /**
+ * Returns the resource version of the ConfigMap or {@link
StateHandleStore#NON_EXIST_RESOURCE_VERSION} if the key
+ * does not exist.
+ *
+ * @param key Key in ConfigMap
+ *
+ * @return resource version or {@link #NON_EXIST_RESOURCE_VERSION} when
not exists.
+ *
+ * @throws Exception if the check existence operation failed
+ */
+ @Override
+ public String exists(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(configMap -> {
+ if (configMap.getData().containsKey(key)) {
+ return configMap.getResourceVersion();
+ }
+ return
StateHandleStore.NON_EXIST_RESOURCE_VERSION;
+ })
+ .orElseThrow(configMapNotExistSupplier);
+ }
+
+ /**
+ * Gets the {@link RetrievableStateHandle} stored in the given
ConfigMap.
+ *
+ * @param key Key in ConfigMap
+ *
+ * @return The retrieved state handle from the specified ConfigMap and
key
+ *
+ * @throws IOException if the method failed to deserialize the stored
state handle
+ * @throws NotExistException when the name does not exist
+ * @throws Exception if get state handle from ConfigMap failed
+ */
+ @Override
+ public RetrievableStateHandle<T> get(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+
+ final Optional<KubernetesConfigMap> optional =
kubeClient.getConfigMap(configMapName);
+ if (optional.isPresent()) {
+ final KubernetesConfigMap configMap = optional.get();
+ if (configMap.getData().containsKey(key)) {
+ return
deserializeObject(configMap.getData().get(key));
+ } else {
+ throw new StateHandleStore.NotExistException(
+ "Could not find " + key + " in
ConfigMap " + configMapName);
+ }
+ } else {
+ throw configMapNotExistSupplier.get();
+ }
+ }
+
+ /**
+ * Gets all available state handles sorted by key from Kubernetes.
+ *
+ * @return All state handles from ConfigMap.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<Tuple2<RetrievableStateHandle<T>, String>> getAll() {
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(
+ configMap -> {
+ final
List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new
ArrayList<>();
+ configMap.getData().entrySet().stream()
+ .filter(entry ->
filter.test(entry.getKey()))
+
.sorted(Comparator.comparing(Map.Entry::getKey))
+ .forEach(entry -> {
+ try {
+
stateHandles.add(new Tuple2(deserializeObject(entry.getValue()),
entry.getKey()));
+ } catch (Exception e) {
+
LOG.warn("ConfigMap {} contained corrupted data. Ignoring the key {}.",
+
configMapName, entry.getKey());
+ }
+ });
+ return stateHandles;
+ })
+ .orElse(Collections.emptyList());
+ }
+
+ /**
+ * Return a list of all valid keys for state handles.
+ *
+ * @return List of valid state handle keys in Kubernetes ConfigMap
+ *
+ * @throws Exception if get state handle names from ConfigMap failed.
+ */
+ @Override
+ public Collection<String> getAllNames() throws Exception {
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(configMap -> configMap.getData().keySet().stream()
+ .sorted(Comparator.comparing(e -> e))
+ .filter(filter)
+ .collect(Collectors.toList()))
+ .orElseThrow(configMapNotExistSupplier);
+ }
+
+ /**
+ * Remove the key in state config map. As well as the state on external
storage will be removed.
+ * It returns the {@link RetrievableStateHandle} stored under the given
state node if any.
+ *
+ * @param key Key to be removed from ConfigMap
+ *
+ * @return True if the state handle is removed successfully
+ *
+ * @throws Exception if removing the key or discarding the state failed
+ */
+ @Override
+ public boolean remove(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ final AtomicReference<RetrievableStateHandle<T>>
stateHandleRefer = new AtomicReference<>();
+
+ try {
+ return kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ configMap -> {
+ if
(KubernetesLeaderElector.hasLeadership(configMap, lockIdentity)) {
+ final String content =
configMap.getData().remove(key);
+ if (content != null) {
+ try {
+
stateHandleRefer.set(deserializeObject(content));
+ } catch (Exception e) {
+ LOG.warn("Could
not retrieve the state handle of {} from ConfigMap {}.",
+ key,
configMapName, e);
+ }
+ }
+ return Optional.of(configMap);
+ }
+ return Optional.empty();
+ })
+ .whenComplete((succeed, ignore) -> {
+ if (succeed) {
+ if (stateHandleRefer.get() !=
null) {
+ try {
+
stateHandleRefer.get().discardState();
+ } catch (Exception e) {
+ throw new
CompletionException(e);
+ }
+ }
+ }
+ }).get();
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
Review comment:
`ExceptionUtils.stripExecutionException`
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/statehandle/TestingLongStateHandleHelper.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.statehandle;
+
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Testing implementation for {@link RetrievableStateStorageHelper} and {@link
RetrievableStateHandle}
+ * with type {@link Long}.
+ */
+public class TestingLongStateHandleHelper {
+
+ /**
+ * Testing {@link RetrievableStateStorageHelper} implementation with
{@link Long}.
+ */
+ public static class LongStateStorage implements
RetrievableStateStorageHelper<Long> {
Review comment:
I'd also be fine if the top most class directly implements
`RetrievalbeStateStorageHelper`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple3;
+import
org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.runtime.statehandle.TestingStateHandleStore;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.runtime.statehandle.StateHandleStore.NON_EXIST_RESOURCE_VERSION;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link DefaultJobGraphStore} with {@link
TestingJobGraphStoreWatcher}, {@link TestingStateHandleStore}
+ * and {@link TestingJobGraphListener}.
+ */
+public class DefaultJobGraphStoreTest extends TestLogger {
+
+ private final JobGraph testingJobGraph = new JobGraph();
+ private final long timeout = 3 * 1000;
+
+ private TestingStateHandleStore.Builder<JobGraph> builder;
+ private TestingRetrievableStateStorageHelper<JobGraph>
jobGraphStorageHelper;
+ private TestingJobGraphStoreWatcher testingJobGraphStoreWatcher = new
TestingJobGraphStoreWatcher();
+ private TestingJobGraphListener testingJobGraphListener = new
TestingJobGraphListener();
+
+ @Before
+ public void setup() {
+ builder = TestingStateHandleStore.builder();
+ jobGraphStorageHelper = new
TestingRetrievableStateStorageHelper<>();
+ }
+
+ @After
+ public void teardown() {
+ if (testingJobGraphStoreWatcher != null) {
+ testingJobGraphStoreWatcher.stop();
+ }
+ }
+
+ @Test
+ public void testRecoverJobGraph() throws Exception {
+ final RetrievableStateHandle<JobGraph> stateHandle =
jobGraphStorageHelper.store(testingJobGraph);
+ final TestingStateHandleStore<JobGraph> stateHandleStore =
builder
+ .setGetFunction(ignore -> stateHandle)
+ .build();
+
+ final JobGraphStore jobGraphStore =
createAndStartJobGraphStore(stateHandleStore);
+
+ final JobGraph recoveredJobGraph =
jobGraphStore.recoverJobGraph(testingJobGraph.getJobID());
+ assertThat(recoveredJobGraph, is(notNullValue()));
+ assertThat(recoveredJobGraph.getJobID(),
is(testingJobGraph.getJobID()));
+
+ jobGraphStore.stop();
+ }
+
+ @Test
+ public void testRecoverJobGraphWhenNotExist() throws Exception {
+ final TestingStateHandleStore<JobGraph> stateHandleStore =
builder
+ .setGetFunction(ignore -> {
+ throw new
StateHandleStore.NotExistException("Not exist exception.");
+ })
+ .build();
+
+ final JobGraphStore jobGraphStore =
createAndStartJobGraphStore(stateHandleStore);
+
+ final JobGraph recoveredJobGraph =
jobGraphStore.recoverJobGraph(testingJobGraph.getJobID());
+ assertThat(recoveredJobGraph, is(nullValue()));
+ }
+
+ @Test
+ public void testRecoverJobGraphExceptionForwarding() throws Exception {
+ final TestingStateHandleStore<JobGraph> stateHandleStore =
builder
+ .setGetFunction(ignore -> {
+ throw new FlinkException("Other exception.");
+ })
+ .build();
+
+ final JobGraphStore jobGraphStore =
createAndStartJobGraphStore(stateHandleStore);
+
+ final String expectedErrorMsg = "Could not retrieve the
submitted job graph state handle " +
+ "for " + testingJobGraph.getJobID() + " from the
submitted job graph store";
+ try {
+
jobGraphStore.recoverJobGraph(testingJobGraph.getJobID());
+ fail("Exception should be thrown");
+ } catch (Exception ex) {
+ assertThat(ex.getMessage(),
containsString(expectedErrorMsg));
Review comment:
here we could use the `FlinkMatchers.containsMessage()`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -328,7 +334,8 @@ public int exists(String pathInZooKeeper) throws Exception {
* @return True if the state handle could be released
* @throws Exception If the ZooKeeper operation or discarding the state
handle fails
*/
- public boolean releaseAndTryRemove(String pathInZooKeeper) throws
Exception {
+ @Override
+ public boolean remove(String pathInZooKeeper) throws Exception {
Review comment:
maybe call it `removeHandleAndDiscardState`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -474,6 +488,8 @@ protected String getLockPath(String rootPath) {
client.create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path));
} catch (KeeperException.NodeExistsException ignored) {
// we have already created the lock
+ } catch (KeeperException.NoNodeException ex) {
Review comment:
When will a `NoNodeException` be thrown when creating a new zNode?
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ConfigMap.
+ *
+ * <p>Added state is persisted via {@link RetrievableStateHandle
RetrievableStateHandles},
+ * which in turn are written to ConfigMap. This level of indirection is
necessary to keep the
+ * amount of data in ConfigMap small. ConfigMap is build for data less than
1MB whereas
+ * state can grow to multiple MBs and GBs.
+ *
+ * <p>This is a very different implementation with {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}.
+ * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap}
transactional operation, we could guarantee that
+ * only the leader could update the store. Then we will completely get rid of
the lock-and-release in Zookeeper
+ * implementation.
+ *
+ * @param <T> Type of state
+ */
+public class KubernetesStateHandleStore<T extends Serializable> implements
StateHandleStore<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final RetrievableStateStorageHelper<T> storage;
+
+ private final Predicate<String> filter;
+
+ private final String lockIdentity;
+
+ private final Supplier<Exception> configMapNotExistSupplier;
+
+ /**
+ * Creates a {@link KubernetesStateHandleStore}.
+ *
+ * @param kubeClient The Kubernetes client.
Review comment:
Just for the future, you don't have to align the JavaDoc parameter
descriptions.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Default implementation for {@link JobGraphStore}. Combined with different
{@link StateHandleStore}, we could persist
+ * the job graphs to various distributed storage. Also combined with different
{@link JobGraphStoreWatcher}, we could
+ * get all the changes on the job graph store and do the response.
+ *
+ * <p>We have to make some variables {@link #lock}, {@link #addedJobGraphs},
and method {@link #verifyIsRunning()}
+ * as protected so that they could be used in the {@link
ZooKeeperJobGraphStore}.
+ */
+public class DefaultJobGraphStore implements JobGraphStore,
JobGraphStore.JobGraphListener {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultJobGraphStore.class);
+
+ /** Lock to synchronize with the {@link JobGraphListener}. */
+ protected final Object lock = new Object();
+
+ /** The set of IDs of all added job graphs. */
+ @GuardedBy("lock")
+ protected final Set<JobID> addedJobGraphs = new HashSet<>();
+
+ /** Submitted job graphs handle store. */
+ private final StateHandleStore<JobGraph> jobGraphStateHandleStore;
+
+ private final JobGraphStoreWatcher jobGraphStoreWatcher;
+
+ /** The job graph store description. Usually it is the storage name
+ * (e.g. ZooKeeper path or Kubernetes ConfigMap name). */
+ private final String jobGraphStoreDescription;
+
+ private final Function<String, JobID> nameToJobIDConverter;
+
+ private final Function<JobID, String> jobIDToNameConverter;
Review comment:
maybe we can introduce a `BijectiveFunction` interface which goes in
both ways `x -> y` and `y -> x`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -429,12 +437,18 @@ public void releaseAll() throws Exception {
*
* @throws Exception ZK errors
*/
- public void deleteChildren() throws Exception {
+ @Override
+ public void removeAllNames() throws Exception {
Review comment:
Maybe `removeAllHandles`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -365,14 +372,15 @@ public boolean releaseAndTryRemove(String
pathInZooKeeper) throws Exception {
*
* @throws Exception if the delete operation fails
*/
- public void releaseAndTryRemoveAll() throws Exception {
- Collection<String> children = getAllPaths();
+ @Override
+ public void removeAll() throws Exception {
Review comment:
Same here possibly: `removeAllHandlesAndDiscardState`
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ConfigMap.
+ *
+ * <p>Added state is persisted via {@link RetrievableStateHandle
RetrievableStateHandles},
+ * which in turn are written to ConfigMap. This level of indirection is
necessary to keep the
+ * amount of data in ConfigMap small. ConfigMap is build for data less than
1MB whereas
+ * state can grow to multiple MBs and GBs.
+ *
+ * <p>This is a very different implementation with {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}.
+ * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap}
transactional operation, we could guarantee that
+ * only the leader could update the store. Then we will completely get rid of
the lock-and-release in Zookeeper
+ * implementation.
+ *
+ * @param <T> Type of state
+ */
+public class KubernetesStateHandleStore<T extends Serializable> implements
StateHandleStore<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final RetrievableStateStorageHelper<T> storage;
+
+ private final Predicate<String> filter;
+
+ private final String lockIdentity;
+
+ private final Supplier<Exception> configMapNotExistSupplier;
+
+ /**
+ * Creates a {@link KubernetesStateHandleStore}.
+ *
+ * @param kubeClient The Kubernetes client.
+ * @param storage To persist the actual state and whose returned
state handle is then written to ConfigMap
+ * @param configMapName ConfigMap to store the state handle store
pointer
+ * @param filter filter to get the expected keys for state handle
+ * @param lockIdentity lock identity of current HA service
+ */
+ public KubernetesStateHandleStore(
+ FlinkKubeClient kubeClient,
+ String configMapName,
+ RetrievableStateStorageHelper<T> storage,
+ Predicate<String> filter,
+ String lockIdentity) {
+
+ this.kubeClient = checkNotNull(kubeClient, "Kubernetes client");
+ this.storage = checkNotNull(storage, "State storage");
+ this.configMapName = checkNotNull(configMapName, "ConfigMap
name");
+ this.filter = checkNotNull(filter);
+ this.lockIdentity = checkNotNull(lockIdentity, "Lock identity
of current HA service");
+
+ this.configMapNotExistSupplier = () -> new
KubernetesException("ConfigMap " + configMapName + " not exists.");
Review comment:
nit: " does not exist."
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ConfigMap.
+ *
+ * <p>Added state is persisted via {@link RetrievableStateHandle
RetrievableStateHandles},
+ * which in turn are written to ConfigMap. This level of indirection is
necessary to keep the
+ * amount of data in ConfigMap small. ConfigMap is build for data less than
1MB whereas
+ * state can grow to multiple MBs and GBs.
+ *
+ * <p>This is a very different implementation with {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}.
+ * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap}
transactional operation, we could guarantee that
+ * only the leader could update the store. Then we will completely get rid of
the lock-and-release in Zookeeper
+ * implementation.
+ *
+ * @param <T> Type of state
+ */
+public class KubernetesStateHandleStore<T extends Serializable> implements
StateHandleStore<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final RetrievableStateStorageHelper<T> storage;
+
+ private final Predicate<String> filter;
+
+ private final String lockIdentity;
+
+ private final Supplier<Exception> configMapNotExistSupplier;
+
+ /**
+ * Creates a {@link KubernetesStateHandleStore}.
+ *
+ * @param kubeClient The Kubernetes client.
+ * @param storage To persist the actual state and whose returned
state handle is then written to ConfigMap
+ * @param configMapName ConfigMap to store the state handle store
pointer
+ * @param filter filter to get the expected keys for state handle
+ * @param lockIdentity lock identity of current HA service
+ */
+ public KubernetesStateHandleStore(
+ FlinkKubeClient kubeClient,
+ String configMapName,
+ RetrievableStateStorageHelper<T> storage,
+ Predicate<String> filter,
+ String lockIdentity) {
+
+ this.kubeClient = checkNotNull(kubeClient, "Kubernetes client");
+ this.storage = checkNotNull(storage, "State storage");
+ this.configMapName = checkNotNull(configMapName, "ConfigMap
name");
+ this.filter = checkNotNull(filter);
+ this.lockIdentity = checkNotNull(lockIdentity, "Lock identity
of current HA service");
+
+ this.configMapNotExistSupplier = () -> new
KubernetesException("ConfigMap " + configMapName + " not exists.");
+ }
+
+ /**
+ * Creates a state handle, stores it in ConfigMap. We could guarantee
that only the leader could update the
+ * ConfigMap. Since “Get(check the leader)-and-Update(write back to the
ConfigMap)” is a
+ * transactional operation.
+ *
+ * @param key Key in ConfigMap
+ * @param state State to be added
+ *
+ * @throws AlreadyExistException if the name already exists
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public RetrievableStateHandle<T> add(String key, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> storeHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(storeHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ if
(!c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ return Optional.of(c);
+ } else {
+ throw new
CompletionException(new StateHandleStore.AlreadyExistException(
+ key + " already
exists in ConfigMap " + configMapName));
+ }
+ }
+ return Optional.empty();
+ }).get();
+ return storeHandle;
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ // Never reached
+ return null;
+ } finally {
+ if (!success) {
+ // Cleanup the state handle if it was not
written to ConfigMap.
+ if (storeHandle != null) {
+ storeHandle.discardState();
+ }
+ }
+ }
+ }
+
+ /**
+ * Replaces a state handle in ConfigMap and discards the old state
handle. Wo do not lock resource
+ * version and then replace in Kubernetes. Since the ConfigMap is
periodically updated by leader, the
+ * resource version changes very fast. We use a "check-existence and
update" transactional operation instead.
+ *
+ * @param key Key in ConfigMap
+ * @param resourceVersion resource version when checking existence via
{@link #exists}.
+ * @param state State to be added
+ *
+ * @throws NotExistException if the name does not exist
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public void replace(String key, String resourceVersion, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> oldStateHandle = get(key);
+
+ final RetrievableStateHandle<T> newStateHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(newStateHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ // Check the existence
+ if
(c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
Review comment:
we could factor `Base64.getEncoder().encodeToString` out because it is
used multiple times.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ConfigMap.
+ *
+ * <p>Added state is persisted via {@link RetrievableStateHandle
RetrievableStateHandles},
+ * which in turn are written to ConfigMap. This level of indirection is
necessary to keep the
+ * amount of data in ConfigMap small. ConfigMap is build for data less than
1MB whereas
+ * state can grow to multiple MBs and GBs.
+ *
+ * <p>This is a very different implementation with {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}.
+ * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap}
transactional operation, we could guarantee that
+ * only the leader could update the store. Then we will completely get rid of
the lock-and-release in Zookeeper
+ * implementation.
+ *
+ * @param <T> Type of state
+ */
+public class KubernetesStateHandleStore<T extends Serializable> implements
StateHandleStore<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final RetrievableStateStorageHelper<T> storage;
+
+ private final Predicate<String> filter;
+
+ private final String lockIdentity;
+
+ private final Supplier<Exception> configMapNotExistSupplier;
+
+ /**
+ * Creates a {@link KubernetesStateHandleStore}.
+ *
+ * @param kubeClient The Kubernetes client.
+ * @param storage To persist the actual state and whose returned
state handle is then written to ConfigMap
+ * @param configMapName ConfigMap to store the state handle store
pointer
+ * @param filter filter to get the expected keys for state handle
+ * @param lockIdentity lock identity of current HA service
+ */
+ public KubernetesStateHandleStore(
+ FlinkKubeClient kubeClient,
+ String configMapName,
+ RetrievableStateStorageHelper<T> storage,
+ Predicate<String> filter,
+ String lockIdentity) {
+
+ this.kubeClient = checkNotNull(kubeClient, "Kubernetes client");
+ this.storage = checkNotNull(storage, "State storage");
+ this.configMapName = checkNotNull(configMapName, "ConfigMap
name");
+ this.filter = checkNotNull(filter);
+ this.lockIdentity = checkNotNull(lockIdentity, "Lock identity
of current HA service");
+
+ this.configMapNotExistSupplier = () -> new
KubernetesException("ConfigMap " + configMapName + " not exists.");
+ }
+
+ /**
+ * Creates a state handle, stores it in ConfigMap. We could guarantee
that only the leader could update the
+ * ConfigMap. Since “Get(check the leader)-and-Update(write back to the
ConfigMap)” is a
+ * transactional operation.
+ *
+ * @param key Key in ConfigMap
+ * @param state State to be added
+ *
+ * @throws AlreadyExistException if the name already exists
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public RetrievableStateHandle<T> add(String key, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> storeHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(storeHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ if
(!c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ return Optional.of(c);
+ } else {
+ throw new
CompletionException(new StateHandleStore.AlreadyExistException(
+ key + " already
exists in ConfigMap " + configMapName));
+ }
+ }
+ return Optional.empty();
+ }).get();
+ return storeHandle;
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
Review comment:
I think here we need `ExceptionUtils.stripExecutionException()`.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ConfigMap.
+ *
+ * <p>Added state is persisted via {@link RetrievableStateHandle
RetrievableStateHandles},
+ * which in turn are written to ConfigMap. This level of indirection is
necessary to keep the
+ * amount of data in ConfigMap small. ConfigMap is build for data less than
1MB whereas
+ * state can grow to multiple MBs and GBs.
+ *
+ * <p>This is a very different implementation with {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}.
+ * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap}
transactional operation, we could guarantee that
+ * only the leader could update the store. Then we will completely get rid of
the lock-and-release in Zookeeper
+ * implementation.
+ *
+ * @param <T> Type of state
+ */
+public class KubernetesStateHandleStore<T extends Serializable> implements
StateHandleStore<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final RetrievableStateStorageHelper<T> storage;
+
+ private final Predicate<String> filter;
+
+ private final String lockIdentity;
+
+ private final Supplier<Exception> configMapNotExistSupplier;
+
+ /**
+ * Creates a {@link KubernetesStateHandleStore}.
+ *
+ * @param kubeClient The Kubernetes client.
+ * @param storage To persist the actual state and whose returned
state handle is then written to ConfigMap
+ * @param configMapName ConfigMap to store the state handle store
pointer
+ * @param filter filter to get the expected keys for state handle
+ * @param lockIdentity lock identity of current HA service
+ */
+ public KubernetesStateHandleStore(
+ FlinkKubeClient kubeClient,
+ String configMapName,
+ RetrievableStateStorageHelper<T> storage,
+ Predicate<String> filter,
+ String lockIdentity) {
+
+ this.kubeClient = checkNotNull(kubeClient, "Kubernetes client");
+ this.storage = checkNotNull(storage, "State storage");
+ this.configMapName = checkNotNull(configMapName, "ConfigMap
name");
+ this.filter = checkNotNull(filter);
+ this.lockIdentity = checkNotNull(lockIdentity, "Lock identity
of current HA service");
+
+ this.configMapNotExistSupplier = () -> new
KubernetesException("ConfigMap " + configMapName + " not exists.");
+ }
+
+ /**
+ * Creates a state handle, stores it in ConfigMap. We could guarantee
that only the leader could update the
+ * ConfigMap. Since “Get(check the leader)-and-Update(write back to the
ConfigMap)” is a
+ * transactional operation.
+ *
+ * @param key Key in ConfigMap
+ * @param state State to be added
+ *
+ * @throws AlreadyExistException if the name already exists
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public RetrievableStateHandle<T> add(String key, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> storeHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(storeHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ if
(!c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ return Optional.of(c);
+ } else {
+ throw new
CompletionException(new StateHandleStore.AlreadyExistException(
+ key + " already
exists in ConfigMap " + configMapName));
+ }
+ }
+ return Optional.empty();
+ }).get();
+ return storeHandle;
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ // Never reached
+ return null;
+ } finally {
+ if (!success) {
+ // Cleanup the state handle if it was not
written to ConfigMap.
+ if (storeHandle != null) {
+ storeHandle.discardState();
+ }
+ }
+ }
+ }
+
+ /**
+ * Replaces a state handle in ConfigMap and discards the old state
handle. Wo do not lock resource
+ * version and then replace in Kubernetes. Since the ConfigMap is
periodically updated by leader, the
+ * resource version changes very fast. We use a "check-existence and
update" transactional operation instead.
+ *
+ * @param key Key in ConfigMap
+ * @param resourceVersion resource version when checking existence via
{@link #exists}.
+ * @param state State to be added
+ *
+ * @throws NotExistException if the name does not exist
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public void replace(String key, String resourceVersion, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> oldStateHandle = get(key);
+
+ final RetrievableStateHandle<T> newStateHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(newStateHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ // Check the existence
+ if
(c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ } else {
+ throw new
CompletionException(new StateHandleStore.NotExistException(
+ "Could not find
" + key + " in ConfigMap " + configMapName));
+ }
+ return Optional.of(c);
+ }
+ return Optional.empty();
+ }).get();
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
Review comment:
`ExceptionUtils.stripExecutionException` would be better.
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import
org.apache.flink.runtime.statehandle.TestingLongStateHandleHelper.LongRetrievableStateHandle;
+import
org.apache.flink.runtime.statehandle.TestingLongStateHandleHelper.LongStateStorage;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Predicate;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link KubernetesStateHandleStore} operations.
+ */
+public class KubernetesStateHandleStoreTest extends
KubernetesHighAvailabilityTestBase {
+
+ private static final String PREFIX = "test-prefix-";
+ private final String key = PREFIX + JobID.generate();
+ private final Predicate<String> filter = k -> k.startsWith(PREFIX);
+ private final Long state = 12345L;
+
+ private LongStateStorage longStateStorage;
+
+ @Before
+ public void setup() {
+ super.setup();
+ longStateStorage = new LongStateStorage();
+ }
+
+ @Test
+ public void testAdd() throws Exception {
+ new Context() {{
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesStateHandleStore<Long>
store = new KubernetesStateHandleStore<>(
+ flinkKubeClient,
LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY);
+ store.add(key, state);
+ assertThat(store.getAll().size(),
is(1));
+
assertThat(store.get(key).retrieveState(), is(state));
+ });
+ }};
+ }
+
+ @Test
+ public void testAddAlreadyExistingKey() throws Exception {
+ new Context() {{
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesStateHandleStore<Long>
store = new KubernetesStateHandleStore<>(
+ flinkKubeClient,
LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY);
+ store.add(key, state);
+
+ try {
+ store.add(key, state);
+ fail("Exception should be
thrown.");
+ } catch
(StateHandleStore.AlreadyExistException ex) {
+ final String msg =
String.format(
+ "%s already exists in
ConfigMap %s", key, LEADER_CONFIGMAP_NAME);
+ assertThat(ex.getMessage(),
containsString(msg));
+ }
+
assertThat(longStateStorage.getStateHandles().size(), is(2));
Review comment:
It is a bit confusing that `longStateStorage` contains two state handles
even though the `add` operation failed.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/statehandle/TestingLongStateHandleHelper.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.statehandle;
+
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Testing implementation for {@link RetrievableStateStorageHelper} and {@link
RetrievableStateHandle}
+ * with type {@link Long}.
+ */
+public class TestingLongStateHandleHelper {
+
+ /**
+ * Testing {@link RetrievableStateStorageHelper} implementation with
{@link Long}.
+ */
+ public static class LongStateStorage implements
RetrievableStateStorageHelper<Long> {
+
+ private final List<LongRetrievableStateHandle> stateHandles =
new ArrayList<>();
+
+ @Override
+ public RetrievableStateHandle<Long> store(Long state) {
+ LongRetrievableStateHandle stateHandle = new
LongRetrievableStateHandle(state);
+ stateHandles.add(stateHandle);
+
+ return stateHandle;
+ }
+
+ public List<LongRetrievableStateHandle> getStateHandles() {
+ return stateHandles;
+ }
+ }
+
+ /**
+ * Testing {@link RetrievableStateStorageHelper} implementation with
{@link Long}.
+ */
+ public static class LongRetrievableStateHandle implements
RetrievableStateHandle<Long> {
+
+ private static final long serialVersionUID =
-3555329254423838912L;
+
+ private static int numberOfGlobalDiscardCalls = 0;
+
+ private final Long state;
+
+ private int numberOfDiscardCalls = 0;
+
+ public LongRetrievableStateHandle(Long state) {
+ this.state = state;
+ }
+
+ @Override
+ public Long retrieveState() {
+ return state;
+ }
+
+ @Override
+ public void discardState() {
+ numberOfGlobalDiscardCalls++;
+ numberOfDiscardCalls++;
+ }
Review comment:
I would assume that `discardState` can be called by multiple
`LongRetrievableStateHandle` instances concurrently leading to a problem with
the static field `numberOfGlobalDiscardCalls`.
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import
org.apache.flink.runtime.statehandle.TestingLongStateHandleHelper.LongRetrievableStateHandle;
+import
org.apache.flink.runtime.statehandle.TestingLongStateHandleHelper.LongStateStorage;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Predicate;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link KubernetesStateHandleStore} operations.
+ */
+public class KubernetesStateHandleStoreTest extends
KubernetesHighAvailabilityTestBase {
+
+ private static final String PREFIX = "test-prefix-";
+ private final String key = PREFIX + JobID.generate();
+ private final Predicate<String> filter = k -> k.startsWith(PREFIX);
+ private final Long state = 12345L;
+
+ private LongStateStorage longStateStorage;
+
+ @Before
+ public void setup() {
+ super.setup();
+ longStateStorage = new LongStateStorage();
+ }
+
+ @Test
+ public void testAdd() throws Exception {
+ new Context() {{
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesStateHandleStore<Long>
store = new KubernetesStateHandleStore<>(
+ flinkKubeClient,
LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY);
+ store.add(key, state);
+ assertThat(store.getAll().size(),
is(1));
+
assertThat(store.get(key).retrieveState(), is(state));
+ });
+ }};
+ }
+
+ @Test
+ public void testAddAlreadyExistingKey() throws Exception {
+ new Context() {{
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesStateHandleStore<Long>
store = new KubernetesStateHandleStore<>(
+ flinkKubeClient,
LEADER_CONFIGMAP_NAME, longStateStorage, filter, LOCK_IDENTITY);
+ store.add(key, state);
+
+ try {
+ store.add(key, state);
+ fail("Exception should be
thrown.");
+ } catch
(StateHandleStore.AlreadyExistException ex) {
+ final String msg =
String.format(
+ "%s already exists in
ConfigMap %s", key, LEADER_CONFIGMAP_NAME);
+ assertThat(ex.getMessage(),
containsString(msg));
Review comment:
Could use `FlinkMatchers.containsMessage`
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreITCase.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.KubernetesResource;
+import
org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.DefaultKubeClientFactory;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import
org.apache.flink.kubernetes.kubeclient.resources.TestingLeaderCallbackHandler;
+import
org.apache.flink.runtime.statehandle.TestingLongStateHandleHelper.LongStateStorage;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.flink.kubernetes.highavailability.KubernetesHighAvailabilityTestBase.LEADER_CONFIGMAP_NAME;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT Tests for the {@link KubernetesStateHandleStore}. We expect only the
leader could update the state store.
+ * The standby JobManagers update operations should not be issued. This is a
"check-leadership-and-update" behavior
+ * test. It is a very basic requirement for {@link
org.apache.flink.runtime.jobmanager.JobGraphStore} and
+ * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore}
implementation for Kubernetes.
+ */
+public class KubernetesStateHandleStoreITCase {
+
+ @ClassRule
+ public static KubernetesResource kubernetesResource = new
KubernetesResource();
+
+ private final KubeClientFactory kubeClientFactory = new
DefaultKubeClientFactory();
+
+ private static final long TIMEOUT = 120L * 1000L;
+
+ private static final String KEY = "state-handle-test";
+
+ @Test
+ public void testMultipleKubernetesLeaderElectors() throws Exception {
+ final Configuration configuration =
kubernetesResource.getConfiguration();
+ final ExecutorService executorService =
kubernetesResource.getExecutorService();
Review comment:
I guess this is no longer accurate based on the latest FLINK-19542 PR.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -156,7 +158,7 @@ public ZooKeeperStateHandleStore(
return storeHandle;
}
catch (KeeperException.NodeExistsException e) {
- throw new ConcurrentModificationException("ZooKeeper
unexpectedly modified", e);
+ throw new
StateHandleStore.AlreadyExistException(e.getMessage());
Review comment:
shall we set `e` as the cause of the thrown exception?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperJobGraphStore.java
##########
@@ -63,35 +45,16 @@
* <p>The root path is watched to detect concurrent modifications in corner
situations where
* multiple instances operate concurrently. The job manager acts as a {@link
JobGraphListener}
* to react to such situations.
+ *
+ * <p>NOTICE: The only reason we still have this class is that we need to
release the lock. If we completely get
+ * rid of current lock-and-lease to avoid concurrent modification, like
Kubernetes, then this class could be
+ * directly removed.
*/
-public class ZooKeeperJobGraphStore implements JobGraphStore {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ZooKeeperJobGraphStore.class);
-
- /** Lock to synchronize with the {@link JobGraphListener}. */
- private final Object cacheLock = new Object();
-
- /** The set of IDs of all added job graphs. */
- private final Set<JobID> addedJobGraphs = new HashSet<>();
+public class ZooKeeperJobGraphStore extends DefaultJobGraphStore {
Review comment:
Alternatively, one could make the `release` call part of the
`StateHandleStore` and let the `DefaultJobGraphStore` call it. The K8s
implementation would simply have a void implementation of this method. That way
it might be a bit less complex at the cost of watering down the interface.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ConfigMap.
+ *
+ * <p>Added state is persisted via {@link RetrievableStateHandle
RetrievableStateHandles},
+ * which in turn are written to ConfigMap. This level of indirection is
necessary to keep the
+ * amount of data in ConfigMap small. ConfigMap is build for data less than
1MB whereas
+ * state can grow to multiple MBs and GBs.
+ *
+ * <p>This is a very different implementation with {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}.
+ * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap}
transactional operation, we could guarantee that
+ * only the leader could update the store. Then we will completely get rid of
the lock-and-release in Zookeeper
+ * implementation.
+ *
+ * @param <T> Type of state
+ */
+public class KubernetesStateHandleStore<T extends Serializable> implements
StateHandleStore<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final RetrievableStateStorageHelper<T> storage;
+
+ private final Predicate<String> filter;
+
+ private final String lockIdentity;
+
+ private final Supplier<Exception> configMapNotExistSupplier;
+
+ /**
+ * Creates a {@link KubernetesStateHandleStore}.
+ *
+ * @param kubeClient The Kubernetes client.
+ * @param storage To persist the actual state and whose returned
state handle is then written to ConfigMap
+ * @param configMapName ConfigMap to store the state handle store
pointer
+ * @param filter filter to get the expected keys for state handle
+ * @param lockIdentity lock identity of current HA service
+ */
+ public KubernetesStateHandleStore(
+ FlinkKubeClient kubeClient,
+ String configMapName,
+ RetrievableStateStorageHelper<T> storage,
+ Predicate<String> filter,
+ String lockIdentity) {
+
+ this.kubeClient = checkNotNull(kubeClient, "Kubernetes client");
+ this.storage = checkNotNull(storage, "State storage");
+ this.configMapName = checkNotNull(configMapName, "ConfigMap
name");
+ this.filter = checkNotNull(filter);
+ this.lockIdentity = checkNotNull(lockIdentity, "Lock identity
of current HA service");
+
+ this.configMapNotExistSupplier = () -> new
KubernetesException("ConfigMap " + configMapName + " not exists.");
+ }
+
+ /**
+ * Creates a state handle, stores it in ConfigMap. We could guarantee
that only the leader could update the
+ * ConfigMap. Since “Get(check the leader)-and-Update(write back to the
ConfigMap)” is a
+ * transactional operation.
+ *
+ * @param key Key in ConfigMap
+ * @param state State to be added
+ *
+ * @throws AlreadyExistException if the name already exists
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public RetrievableStateHandle<T> add(String key, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> storeHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(storeHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ if
(!c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ return Optional.of(c);
+ } else {
+ throw new
CompletionException(new StateHandleStore.AlreadyExistException(
+ key + " already
exists in ConfigMap " + configMapName));
+ }
+ }
+ return Optional.empty();
+ }).get();
+ return storeHandle;
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ // Never reached
+ return null;
+ } finally {
+ if (!success) {
+ // Cleanup the state handle if it was not
written to ConfigMap.
+ if (storeHandle != null) {
+ storeHandle.discardState();
+ }
+ }
+ }
+ }
+
+ /**
+ * Replaces a state handle in ConfigMap and discards the old state
handle. Wo do not lock resource
+ * version and then replace in Kubernetes. Since the ConfigMap is
periodically updated by leader, the
+ * resource version changes very fast. We use a "check-existence and
update" transactional operation instead.
+ *
+ * @param key Key in ConfigMap
+ * @param resourceVersion resource version when checking existence via
{@link #exists}.
+ * @param state State to be added
+ *
+ * @throws NotExistException if the name does not exist
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ @Override
+ public void replace(String key, String resourceVersion, T state) throws
Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ checkNotNull(state, "State.");
+
+ final RetrievableStateHandle<T> oldStateHandle = get(key);
+
+ final RetrievableStateHandle<T> newStateHandle =
storage.store(state);
+
+ boolean success = false;
+
+ try {
+ final byte[] serializedStoreHandle =
InstantiationUtil.serializeObject(newStateHandle);
+ success = kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ c -> {
+ if
(KubernetesLeaderElector.hasLeadership(c, lockIdentity)) {
+ // Check the existence
+ if
(c.getData().containsKey(key)) {
+ c.getData().put(key,
Base64.getEncoder().encodeToString(serializedStoreHandle));
+ } else {
+ throw new
CompletionException(new StateHandleStore.NotExistException(
+ "Could not find
" + key + " in ConfigMap " + configMapName));
+ }
+ return Optional.of(c);
+ }
+ return Optional.empty();
+ }).get();
+ } catch (Exception ex) {
+
ExceptionUtils.rethrowException(ExceptionUtils.stripCompletionException(ex));
+ } finally {
+ if (success) {
+ oldStateHandle.discardState();
+ } else {
+ newStateHandle.discardState();
+ }
+ }
+ }
+
+ /**
+ * Returns the resource version of the ConfigMap or {@link
StateHandleStore#NON_EXIST_RESOURCE_VERSION} if the key
+ * does not exist.
+ *
+ * @param key Key in ConfigMap
+ *
+ * @return resource version or {@link #NON_EXIST_RESOURCE_VERSION} when
not exists.
+ *
+ * @throws Exception if the check existence operation failed
+ */
+ @Override
+ public String exists(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(configMap -> {
+ if (configMap.getData().containsKey(key)) {
+ return configMap.getResourceVersion();
+ }
+ return
StateHandleStore.NON_EXIST_RESOURCE_VERSION;
+ })
+ .orElseThrow(configMapNotExistSupplier);
+ }
+
+ /**
+ * Gets the {@link RetrievableStateHandle} stored in the given
ConfigMap.
+ *
+ * @param key Key in ConfigMap
+ *
+ * @return The retrieved state handle from the specified ConfigMap and
key
+ *
+ * @throws IOException if the method failed to deserialize the stored
state handle
+ * @throws NotExistException when the name does not exist
+ * @throws Exception if get state handle from ConfigMap failed
+ */
+ @Override
+ public RetrievableStateHandle<T> get(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+
+ final Optional<KubernetesConfigMap> optional =
kubeClient.getConfigMap(configMapName);
+ if (optional.isPresent()) {
+ final KubernetesConfigMap configMap = optional.get();
+ if (configMap.getData().containsKey(key)) {
+ return
deserializeObject(configMap.getData().get(key));
+ } else {
+ throw new StateHandleStore.NotExistException(
+ "Could not find " + key + " in
ConfigMap " + configMapName);
+ }
+ } else {
+ throw configMapNotExistSupplier.get();
+ }
+ }
+
+ /**
+ * Gets all available state handles sorted by key from Kubernetes.
+ *
+ * @return All state handles from ConfigMap.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<Tuple2<RetrievableStateHandle<T>, String>> getAll() {
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(
+ configMap -> {
+ final
List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new
ArrayList<>();
+ configMap.getData().entrySet().stream()
+ .filter(entry ->
filter.test(entry.getKey()))
+
.sorted(Comparator.comparing(Map.Entry::getKey))
+ .forEach(entry -> {
+ try {
+
stateHandles.add(new Tuple2(deserializeObject(entry.getValue()),
entry.getKey()));
+ } catch (Exception e) {
+
LOG.warn("ConfigMap {} contained corrupted data. Ignoring the key {}.",
+
configMapName, entry.getKey());
+ }
+ });
+ return stateHandles;
+ })
+ .orElse(Collections.emptyList());
+ }
+
+ /**
+ * Return a list of all valid keys for state handles.
+ *
+ * @return List of valid state handle keys in Kubernetes ConfigMap
+ *
+ * @throws Exception if get state handle names from ConfigMap failed.
+ */
+ @Override
+ public Collection<String> getAllNames() throws Exception {
+
+ return kubeClient.getConfigMap(configMapName)
+ .map(configMap -> configMap.getData().keySet().stream()
+ .sorted(Comparator.comparing(e -> e))
+ .filter(filter)
+ .collect(Collectors.toList()))
+ .orElseThrow(configMapNotExistSupplier);
+ }
+
+ /**
+ * Remove the key in state config map. As well as the state on external
storage will be removed.
+ * It returns the {@link RetrievableStateHandle} stored under the given
state node if any.
+ *
+ * @param key Key to be removed from ConfigMap
+ *
+ * @return True if the state handle is removed successfully
+ *
+ * @throws Exception if removing the key or discarding the state failed
+ */
+ @Override
+ public boolean remove(String key) throws Exception {
+ checkNotNull(key, "Key in ConfigMap.");
+ final AtomicReference<RetrievableStateHandle<T>>
stateHandleRefer = new AtomicReference<>();
+
+ try {
+ return kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ configMap -> {
+ if
(KubernetesLeaderElector.hasLeadership(configMap, lockIdentity)) {
+ final String content =
configMap.getData().remove(key);
+ if (content != null) {
+ try {
+
stateHandleRefer.set(deserializeObject(content));
Review comment:
Just as a food for thought. Somehow the
`kubeClient.checkAndUpdateConfigMap` does not provide a rich enough interface
to also return a value from it. If `CompletableFuture<UpdateResult>
checkAndUpdateConfigMap()` with `UpdateResult` being able to contain a payload
like `RetrievableStateHandle<T>`, then one would not have to use the
`AtomicReference` which is somewhat working around the API which is not
expressive enough. Of course, the `function` argument to the
`checkAndUpdateConfigMap` method would have to be pimped as well in order to
return the updated configMap value and the return value of the method itself.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -482,18 +492,18 @@ protected String getLockPath(String rootPath) {
try {
byte[] data = client.getData().forPath(path);
- try {
- RetrievableStateHandle<T>
retrievableStateHandle = InstantiationUtil.deserializeObject(
- data,
-
Thread.currentThread().getContextClassLoader());
+ RetrievableStateHandle<T> retrievableStateHandle =
InstantiationUtil.deserializeObject(
+ data,
+ Thread.currentThread().getContextClassLoader());
- success = true;
+ success = true;
- return retrievableStateHandle;
- } catch (IOException | ClassNotFoundException e) {
- throw new IOException("Failed to deserialize
state handle from ZooKeeper data from " +
- path + '.', e);
- }
+ return retrievableStateHandle;
+ } catch (KeeperException.NoNodeException ex) {
+ throw new
StateHandleStore.NotExistException(ex.getMessage());
Review comment:
Cause `ex` seems to be swallowed.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -194,11 +197,11 @@ public void replace(String pathInZooKeeper, int
expectedVersion, T state) throws
// Replace state handle in ZooKeeper.
client.setData()
- .withVersion(expectedVersion)
+
.withVersion(Integer.valueOf(expectedVersion))
.forPath(path, serializedStateHandle);
success = true;
} catch (KeeperException.NoNodeException e) {
- throw new ConcurrentModificationException("ZooKeeper
unexpectedly modified", e);
+ throw new
StateHandleStore.NotExistException(e.getMessage());
Review comment:
Same here with the cause `e`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -474,6 +482,8 @@ protected String getLockPath(String rootPath) {
client.create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path));
} catch (KeeperException.NodeExistsException ignored) {
// we have already created the lock
+ } catch (KeeperException.NoNodeException ex) {
+ throw new
StateHandleStore.NotExistException(ex.getMessage());
Review comment:
Cause seems to be swallowed.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/statehandle/StateHandleStore.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.statehandle;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to distributed coordination system(e.g. Zookeeper,
Kubernetes, etc.).
+ *
+ * <p>To avoid concurrent modification issues, we need to ensure that only the
leader could update the state store.
+ * For Zookeeper, we need to lock a node(aka create an ephemeral node under
the specified node to be locked) and release
+ * (aka delete the ephemeral node).
+ * For Kubernetes, we could perform a {@link
FlinkKubeClient#checkAndUpdateConfigMap} transactional operation for this.
+ * Then we will completely get rid of the lock-and-release operations.
+ *
+ * <p>We do not define the interfaces with lock/release. Because they are not
common requirements for different
+ * implementations. Then the {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}
+ * needs some additional interfaces to release the locks.
+ *
+ * @param <T> Type of state
+ */
+public interface StateHandleStore<T extends Serializable> {
+
+ String NON_EXIST_RESOURCE_VERSION = "-1";
+
+ /**
+ * Persist the state to distributed storage(e.g. S3, HDFS, etc.). And
then creates a state handle, stores it in
+ * the distributed coordination system(e.g. ZooKeeper, Kubernetes,
etc.).
+ *
+ * @param name Key name in ConfigMap or child path name in ZooKeeper
+ * @param state State to be added
+ *
+ * @throws AlreadyExistException if the name already exists
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ RetrievableStateHandle<T> add(String name, T state) throws Exception;
+
+ /**
+ * Replaces a state handle in the distributed coordination system and
discards the old state handle.
+ *
+ * @param name Key name in ConfigMap or child path name in ZooKeeper
+ * @param resourceVersion resource version of previous storage object.
If the resource version does not match, the
+ * replace operation will fail. Since there is an unexpected update
operation snuck in.
+ * @param state State to be replace with
+ *
+ * @throws NotExistException if the name does not exist
+ * @throws Exception if persisting state or writing state handle failed
+ */
+ void replace(String name, String resourceVersion, T state) throws
Exception;
+
+ /**
+ * Returns resource version or {@link #NON_EXIST_RESOURCE_VERSION} if
the name does not exist.
+ *
+ * @param name Key name in ConfigMap or child path name in ZooKeeper
+ *
+ * @return current resource version in {@link String} if exist. Or
{@link #NON_EXIST_RESOURCE_VERSION} if the name
+ * does not exist.
+ *
+ * @throws Exception if the check existence operation failed
+ */
+ String exists(String name) throws Exception;
Review comment:
Sounds like a good idea :-)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/statehandle/StateHandleStore.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.statehandle;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to distributed coordination system(e.g. Zookeeper,
Kubernetes, etc.).
+ *
+ * <p>To avoid concurrent modification issues, we need to ensure that only the
leader could update the state store.
+ * For Zookeeper, we need to lock a node(aka create an ephemeral node under
the specified node to be locked) and release
+ * (aka delete the ephemeral node).
+ * For Kubernetes, we could perform a {@link
FlinkKubeClient#checkAndUpdateConfigMap} transactional operation for this.
+ * Then we will completely get rid of the lock-and-release operations.
+ *
+ * <p>We do not define the interfaces with lock/release. Because they are not
common requirements for different
+ * implementations. Then the {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}
+ * needs some additional interfaces to release the locks.
Review comment:
I agree with Xintong. The JavaDoc here should explain the contracts of
this interface.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
+import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.statehandle.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.statehandle.StateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class which stores state via the provided {@link
RetrievableStateStorageHelper} and writes the
+ * returned state handle to ConfigMap.
+ *
+ * <p>Added state is persisted via {@link RetrievableStateHandle
RetrievableStateHandles},
+ * which in turn are written to ConfigMap. This level of indirection is
necessary to keep the
+ * amount of data in ConfigMap small. ConfigMap is build for data less than
1MB whereas
+ * state can grow to multiple MBs and GBs.
+ *
+ * <p>This is a very different implementation with {@link
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore}.
+ * Benefit from the {@link FlinkKubeClient#checkAndUpdateConfigMap}
transactional operation, we could guarantee that
+ * only the leader could update the store. Then we will completely get rid of
the lock-and-release in Zookeeper
+ * implementation.
+ *
+ * @param <T> Type of state
+ */
+public class KubernetesStateHandleStore<T extends Serializable> implements
StateHandleStore<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+
+ private final FlinkKubeClient kubeClient;
+
+ private final String configMapName;
+
+ private final RetrievableStateStorageHelper<T> storage;
+
+ private final Predicate<String> filter;
+
+ private final String lockIdentity;
+
+ private final Supplier<Exception> configMapNotExistSupplier;
Review comment:
Maybe it is clearer to have a method producing the config Map does not
exist exception. That way it could be easily called to generate the exception.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]