XComp commented on a change in pull request #18901:
URL: https://github.com/apache/flink/pull/18901#discussion_r814977189
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -69,13 +75,82 @@
* the leader could update the store. Then we will completely get rid of the
lock-and-release in
* Zookeeper implementation.
*
- * @param <T> Type of state
+ * @param <T> Type of the state we're storing.
*/
public class KubernetesStateHandleStore<T extends Serializable>
implements StateHandleStore<T, StringResourceVersion> {
private static final Logger LOG =
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
+ private static <T extends Serializable> StateHandleWithDeleteMarker<T>
deserializeStateHandle(
+ String content) throws IOException {
+ checkNotNull(content, "Content should not be null.");
+ final byte[] data = Base64.getDecoder().decode(content);
+ try {
+ return deserialize(data);
+ } catch (IOException | ClassNotFoundException e) {
+ throw new IOException(
+ String.format(
+ "Failed to deserialize state handle from ConfigMap
data %s.", content),
+ e);
+ }
+ }
+
+ private static String toBase64(byte[] bytes) {
+ return Base64.getEncoder().encodeToString(bytes);
+ }
+
+ @VisibleForTesting
+ static String serializeStateHandle(StateHandleWithDeleteMarker<?>
stateHandle)
Review comment:
```suggestion
static String serializeStateHandle(StateObject<?> stateHandle)
```
I'm wondering whether we should move this into `StateHandleStoreUtils` along
`serializeOrDiscard`
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -509,17 +619,88 @@ public String toString() {
return this.getClass().getSimpleName() + "{configMapName='" +
configMapName + "'}";
}
- private RetrievableStateHandle<T> deserializeObject(String content) throws
IOException {
- checkNotNull(content, "Content should not be null.");
+ private boolean isValidOperation(KubernetesConfigMap c) {
+ return lockIdentity == null ||
KubernetesLeaderElector.hasLeadership(c, lockIdentity);
+ }
- final byte[] data = Base64.getDecoder().decode(content);
+ @VisibleForTesting
+ CompletableFuture<Boolean> updateConfigMap(
+ Function<KubernetesConfigMap, Optional<KubernetesConfigMap>>
updateFn) {
+ return kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ configMap -> {
+ if (isValidOperation(configMap)) {
+ return updateFn.apply(configMap);
+ }
+ return Optional.empty();
+ });
+ }
- try {
- return deserialize(data);
- } catch (IOException | ClassNotFoundException e) {
- throw new IOException(
- "Failed to deserialize state handle from ConfigMap data "
+ content + '.', e);
+ /**
+ * Adds entry into the ConfigMap. If the entry already exists and contains
delete marker, the
+ * try finish the removal before the actual update.
+ */
+ private Optional<KubernetesConfigMap> addEntry(
Review comment:
nit: I found it a bit annoying to scroll all the way down to find the
private method. My understanding was, that we usually put methods right below
the method (if possible) where it is used to have a faster look-up.
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -725,4 +856,176 @@ public void testRemoveAllHandles() throws Exception {
}
};
}
+
+ @Test
+ public void testReleaseAndTryRemoveIsIdempotent() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesStateHandleStore<
+
TestingLongStateHandleHelper.LongStateHandle>
+ store =
+ new KubernetesStateHandleStore<>(
+ flinkKubeClient,
+ LEADER_CONFIGMAP_NAME,
+ longStateStorage,
+ filter,
+ LOCK_IDENTITY);
+
+ final RuntimeException discardException =
+ new RuntimeException("Test exception.");
+ store.addAndLock(
+ key,
+ new
TestingLongStateHandleHelper.LongStateHandle(2L) {
+
+ final AtomicBoolean thrown = new
AtomicBoolean(false);
+
+ @Override
+ public void discardState() {
+ if (!thrown.getAndSet(true)) {
+ throw discardException;
+ }
+ super.discardState();
+ }
+ });
+
+ assertThat(store.getAllAndLock().size(), is(1));
+ assertThat(
+ store.getAndLock(key),
+
is(notNullValue(RetrievableStateHandle.class)));
+
+ // First remove attempt should fail when we're
discarding the underlying
+ // state.
+ final Exception exception =
+ assertThrows(
+ Exception.class, () ->
store.releaseAndTryRemove(key));
+ assertThat(exception,
FlinkMatchers.containsCause(discardException));
+
+ // Now we should see that the node is
"soft-deleted". This means it can
+ // no longer be accessed by the get methods, but
the underlying state
+ // still exists.
+ assertThat(store.getAllAndLock().size(), is(0));
+ assertThrows(Exception.class, () ->
store.getAndLock(key));
+
assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0));
+
assertThat(getLeaderConfigMap().getData().containsKey(key), is(true));
+
+ // Second retry should succeed and remove the
underlying state and the
+ // reference in config map.
+ assertThat(store.releaseAndTryRemove(key),
is(true));
+ assertThat(store.getAllAndLock().size(), is(0));
+ assertThrows(Exception.class, () ->
store.getAndLock(key));
+
assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(1));
+
assertThat(getLeaderConfigMap().getData().containsKey(key), is(false));
+ });
+ }
+ };
+ }
+
+ @Test
+ public void testReleaseAndTryRemoveAllIsIdempotent() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesStateHandleStore<
+
TestingLongStateHandleHelper.LongStateHandle>
+ store =
+ new KubernetesStateHandleStore<>(
+ flinkKubeClient,
+ LEADER_CONFIGMAP_NAME,
+ longStateStorage,
+ filter,
+ LOCK_IDENTITY);
+ final int numKeys = 10;
+
+ final RuntimeException discardException =
+ new RuntimeException("Test exception.");
+ for (int idx = 0; idx < numKeys; idx++) {
+ store.addAndLock(
+ key + "_" + idx,
+ new
TestingLongStateHandleHelper.LongStateHandle(idx + 1) {
+
+ final AtomicBoolean thrown = new
AtomicBoolean(false);
+
+ @Override
+ public void discardState() {
+ if (!thrown.getAndSet(true)) {
+ throw discardException;
+ }
+ super.discardState();
+ }
+ });
+ }
+
+ // All keys should be retrievable
+ assertThat(store.getAllAndLock().size(),
is(numKeys));
+ for (int idx = 0; idx < numKeys; idx++) {
+ assertThat(
+ store.getAndLock(key + "_" + idx),
+
is(notNullValue(RetrievableStateHandle.class)));
+ }
+
+ // First remove attempt should fail when we're
discarding the underlying
+ // state.
+ final Exception exception =
+ assertThrows(Exception.class,
store::releaseAndTryRemoveAll);
+ assertThat(exception,
FlinkMatchers.containsCause(discardException));
+
+ // Now we should see that the all nodes are
"soft-deleted". This means
+ // it can no longer be accessed by the get
methods, but the underlying
+ // state still exists.
+ assertThat(store.getAllAndLock().size(), is(0));
+ for (int idx = 0; idx < numKeys; idx++) {
+ final String indexKey = key + "_" + idx;
+ assertThrows(Exception.class, () ->
store.getAndLock(indexKey));
+ assertThat(
+
getLeaderConfigMap().getData().containsKey(indexKey),
+ is(true));
+ }
+
assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0));
+
+ // Second retry should succeed and remove the
underlying state and the
+ // reference in config map.
+ store.releaseAndTryRemoveAll();
+ assertThat(store.getAllAndLock().size(), is(0));
+ for (int idx = 0; idx < numKeys; idx++) {
+ final String indexKey = key + "_" + idx;
+ assertThrows(Exception.class, () ->
store.getAndLock(indexKey));
+ assertThat(
+
getLeaderConfigMap().getData().containsKey(indexKey),
+ is(false));
+ }
+ assertThat(
+
TestingLongStateHandleHelper.getGlobalDiscardCount(), is(10));
Review comment:
```suggestion
TestingLongStateHandleHelper.getGlobalDiscardCount(), is(numKeys));
```
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -20,35 +20,43 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.FlinkMatchers;
+import
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.StateHandleWithDeleteMarker;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.persistence.StringResourceVersion;
import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
/** Tests for {@link KubernetesStateHandleStore} operations. */
public class KubernetesStateHandleStoreTest extends
KubernetesHighAvailabilityTestBase {
private static final String PREFIX = "test-prefix-";
+
private final String key = PREFIX + JobID.generate();
private final Predicate<String> filter = k -> k.startsWith(PREFIX);
private final TestingLongStateHandleHelper.LongStateHandle state =
Review comment:
tbh, it's a bit weird that we store this state as a member without
reinitializing it. It should be initialized in the setup method. Otherwise, the
discard counter increases constantly 🤔
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -509,17 +619,88 @@ public String toString() {
return this.getClass().getSimpleName() + "{configMapName='" +
configMapName + "'}";
}
- private RetrievableStateHandle<T> deserializeObject(String content) throws
IOException {
- checkNotNull(content, "Content should not be null.");
+ private boolean isValidOperation(KubernetesConfigMap c) {
+ return lockIdentity == null ||
KubernetesLeaderElector.hasLeadership(c, lockIdentity);
+ }
- final byte[] data = Base64.getDecoder().decode(content);
+ @VisibleForTesting
+ CompletableFuture<Boolean> updateConfigMap(
+ Function<KubernetesConfigMap, Optional<KubernetesConfigMap>>
updateFn) {
+ return kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ configMap -> {
+ if (isValidOperation(configMap)) {
+ return updateFn.apply(configMap);
+ }
+ return Optional.empty();
+ });
+ }
- try {
- return deserialize(data);
- } catch (IOException | ClassNotFoundException e) {
- throw new IOException(
- "Failed to deserialize state handle from ConfigMap data "
+ content + '.', e);
+ /**
+ * Adds entry into the ConfigMap. If the entry already exists and contains
delete marker, the
+ * try finish the removal before the actual update.
+ */
+ private Optional<KubernetesConfigMap> addEntry(
+ KubernetesConfigMap configMap, String key, byte[]
serializedStateHandle)
+ throws Exception {
+ final String content = configMap.getData().get(key);
+ if (content != null) {
+ try {
+ final StateHandleWithDeleteMarker<T> stateHandle =
deserializeStateHandle(content);
+ if (stateHandle.isMarkedForDeletion()) {
+ // This might be a left-over after the fail-over. As the
remove operation is
+ // idempotent let's try to finish it.
+ if (!releaseAndTryRemove(key)) {
+ throw new IllegalStateException(
+ "Unable to remove the marked as deleting
entry.");
+ }
+ } else {
+ throw getKeyAlreadyExistException(key);
+ }
+ } catch (IOException e) {
+ // Just log the invalid entry, it will be overridden
+ // by the update code path below.
+ logInvalidEntry(key, configMapName, e);
+ }
}
+ configMap.getData().put(key, toBase64(serializedStateHandle));
+ return Optional.of(configMap);
+ }
+
+ /**
+ * Replace the entry in the ConfigMap. If the entry already exists and
contains delete marker,
+ * we treat is as non-existent and perform the best effort removal.
+ */
+ private Optional<KubernetesConfigMap> replaceEntry(
+ KubernetesConfigMap configMap,
+ String key,
+ byte[] serializedStateHandle,
+ AtomicReference<RetrievableStateHandle<T>> oldStateHandleRef)
+ throws NotExistException {
+ final String content = configMap.getData().get(key);
+ if (content != null) {
+ try {
+ final StateHandleWithDeleteMarker<T> stateHandle =
deserializeStateHandle(content);
+ oldStateHandleRef.set(stateHandle.getInner());
+ if (stateHandle.isMarkedForDeletion()) {
+ final NotExistException exception =
getKeyNotExistException(key);
+ try {
+ // Try to finish the removal. We don't really care
whether this succeeds or
+ // not, from the "replace" point of view, the entry
doesn't exist.
+ releaseAndTryRemove(key);
+ } catch (Exception e) {
+ exception.addSuppressed(e);
+ }
+ throw exception;
+ }
+ } catch (IOException e) {
+ // Just log the invalid entry, it will be logged by the update
code path below.
Review comment:
```suggestion
// Just log the invalid entry, it will be removed by the
update code path below.
```
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -509,17 +619,88 @@ public String toString() {
return this.getClass().getSimpleName() + "{configMapName='" +
configMapName + "'}";
}
- private RetrievableStateHandle<T> deserializeObject(String content) throws
IOException {
- checkNotNull(content, "Content should not be null.");
+ private boolean isValidOperation(KubernetesConfigMap c) {
+ return lockIdentity == null ||
KubernetesLeaderElector.hasLeadership(c, lockIdentity);
+ }
- final byte[] data = Base64.getDecoder().decode(content);
+ @VisibleForTesting
+ CompletableFuture<Boolean> updateConfigMap(
+ Function<KubernetesConfigMap, Optional<KubernetesConfigMap>>
updateFn) {
+ return kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ configMap -> {
+ if (isValidOperation(configMap)) {
+ return updateFn.apply(configMap);
+ }
+ return Optional.empty();
+ });
+ }
- try {
- return deserialize(data);
- } catch (IOException | ClassNotFoundException e) {
- throw new IOException(
- "Failed to deserialize state handle from ConfigMap data "
+ content + '.', e);
+ /**
+ * Adds entry into the ConfigMap. If the entry already exists and contains
delete marker, the
+ * try finish the removal before the actual update.
+ */
+ private Optional<KubernetesConfigMap> addEntry(
+ KubernetesConfigMap configMap, String key, byte[]
serializedStateHandle)
+ throws Exception {
+ final String content = configMap.getData().get(key);
+ if (content != null) {
+ try {
+ final StateHandleWithDeleteMarker<T> stateHandle =
deserializeStateHandle(content);
+ if (stateHandle.isMarkedForDeletion()) {
+ // This might be a left-over after the fail-over. As the
remove operation is
+ // idempotent let's try to finish it.
+ if (!releaseAndTryRemove(key)) {
+ throw new IllegalStateException(
+ "Unable to remove the marked as deleting
entry.");
+ }
+ } else {
+ throw getKeyAlreadyExistException(key);
+ }
+ } catch (IOException e) {
+ // Just log the invalid entry, it will be overridden
+ // by the update code path below.
+ logInvalidEntry(key, configMapName, e);
+ }
}
+ configMap.getData().put(key, toBase64(serializedStateHandle));
+ return Optional.of(configMap);
+ }
+
+ /**
+ * Replace the entry in the ConfigMap. If the entry already exists and
contains delete marker,
+ * we treat is as non-existent and perform the best effort removal.
Review comment:
```suggestion
* we treat it as non-existent and perform the best effort removal.
```
nit
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -725,4 +856,176 @@ public void testRemoveAllHandles() throws Exception {
}
};
}
+
+ @Test
+ public void testReleaseAndTryRemoveIsIdempotent() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesStateHandleStore<
+
TestingLongStateHandleHelper.LongStateHandle>
+ store =
+ new KubernetesStateHandleStore<>(
+ flinkKubeClient,
+ LEADER_CONFIGMAP_NAME,
+ longStateStorage,
+ filter,
+ LOCK_IDENTITY);
+
+ final RuntimeException discardException =
+ new RuntimeException("Test exception.");
+ store.addAndLock(
+ key,
+ new
TestingLongStateHandleHelper.LongStateHandle(2L) {
Review comment:
I guess this should make the `StateHandleStoreUtilsTest` fail because of
the missing Serial UID? 🤔
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -118,16 +126,68 @@ public void testAddAlreadyExistingKey() throws Exception {
key, LEADER_CONFIGMAP_NAME);
assertThat(ex,
FlinkMatchers.containsMessage(msg));
}
-
assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(1));
+
+ // Both initial & new handles should be in the
storage (we never clean
+ // it for testing).
+
assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(2));
+ // Only the new one (second entry in the store)
should have been
+ // discarded.
assertThat(
TestingLongStateHandleHelper
.getDiscardCallCountForStateHandleByIndex(0),
+ is(0));
+ assertThat(
+ TestingLongStateHandleHelper
+
.getDiscardCallCountForStateHandleByIndex(1),
is(1));
});
}
};
}
+ @Test
+ public void testAddAndLockWithDeletingEntry() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ addDeletingEntry(getLeaderConfigMap(), key, 1337L);
Review comment:
By returning the added StateHandle here, the test could access its
discard state directly through the `isDiscarded` method.
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -725,4 +856,176 @@ public void testRemoveAllHandles() throws Exception {
}
};
}
+
+ @Test
+ public void testReleaseAndTryRemoveIsIdempotent() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesStateHandleStore<
+
TestingLongStateHandleHelper.LongStateHandle>
+ store =
+ new KubernetesStateHandleStore<>(
+ flinkKubeClient,
+ LEADER_CONFIGMAP_NAME,
+ longStateStorage,
+ filter,
+ LOCK_IDENTITY);
+
+ final RuntimeException discardException =
+ new RuntimeException("Test exception.");
+ store.addAndLock(
+ key,
+ new
TestingLongStateHandleHelper.LongStateHandle(2L) {
+
+ final AtomicBoolean thrown = new
AtomicBoolean(false);
+
+ @Override
+ public void discardState() {
+ if (!thrown.getAndSet(true)) {
+ throw discardException;
+ }
+ super.discardState();
+ }
+ });
+
+ assertThat(store.getAllAndLock().size(), is(1));
+ assertThat(
+ store.getAndLock(key),
+
is(notNullValue(RetrievableStateHandle.class)));
+
+ // First remove attempt should fail when we're
discarding the underlying
+ // state.
+ final Exception exception =
+ assertThrows(
+ Exception.class, () ->
store.releaseAndTryRemove(key));
+ assertThat(exception,
FlinkMatchers.containsCause(discardException));
+
+ // Now we should see that the node is
"soft-deleted". This means it can
+ // no longer be accessed by the get methods, but
the underlying state
+ // still exists.
+ assertThat(store.getAllAndLock().size(), is(0));
+ assertThrows(Exception.class, () ->
store.getAndLock(key));
+
assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0));
+
assertThat(getLeaderConfigMap().getData().containsKey(key), is(true));
+
+ // Second retry should succeed and remove the
underlying state and the
+ // reference in config map.
+ assertThat(store.releaseAndTryRemove(key),
is(true));
+ assertThat(store.getAllAndLock().size(), is(0));
+ assertThrows(Exception.class, () ->
store.getAndLock(key));
+
assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(1));
+
assertThat(getLeaderConfigMap().getData().containsKey(key), is(false));
+ });
+ }
+ };
+ }
+
+ @Test
+ public void testReleaseAndTryRemoveAllIsIdempotent() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesStateHandleStore<
+
TestingLongStateHandleHelper.LongStateHandle>
+ store =
+ new KubernetesStateHandleStore<>(
+ flinkKubeClient,
+ LEADER_CONFIGMAP_NAME,
+ longStateStorage,
+ filter,
+ LOCK_IDENTITY);
+ final int numKeys = 10;
+
+ final RuntimeException discardException =
+ new RuntimeException("Test exception.");
+ for (int idx = 0; idx < numKeys; idx++) {
Review comment:
By adding a handle that doesn't fail during discard we could verify the
behavior for these as well. This goes into the direction which I mentioned
earlier whether we remove each entry individually based on its discard call
being successful or not (how it's done in the ZK implementation) or whether we
let none be deleted as soon as a single entry has an error during discard (how
it's currently implemented in k8s)
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -725,4 +856,176 @@ public void testRemoveAllHandles() throws Exception {
}
};
}
+
+ @Test
+ public void testReleaseAndTryRemoveIsIdempotent() throws Exception {
+ new Context() {
+ {
+ runTest(
+ () -> {
+ leaderCallbackGrantLeadership();
+
+ final KubernetesStateHandleStore<
+
TestingLongStateHandleHelper.LongStateHandle>
+ store =
+ new KubernetesStateHandleStore<>(
+ flinkKubeClient,
+ LEADER_CONFIGMAP_NAME,
+ longStateStorage,
+ filter,
+ LOCK_IDENTITY);
+
+ final RuntimeException discardException =
+ new RuntimeException("Test exception.");
+ store.addAndLock(
+ key,
+ new
TestingLongStateHandleHelper.LongStateHandle(2L) {
Review comment:
`LongStateHandle` provides a way to pass in a `PreDiscardCallback` now
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -469,7 +574,17 @@ public void releaseAndTryRemoveAll() throws Exception {
"Could not properly remove
all state handles.",
exception));
}
+ // Commit the "removal transaction" by
removing the entries from the
+ // ConfigMap.
+ return updateConfigMap(
Review comment:
Here I notice that the ZK implementation behaves slightly differently:
In the ZK implementation, we commit each entry individually (i.e. collecting
the successful discards and removing them from the `ConfigMap` even if
exceptions have been caught). I leave it up to you whether we want to align
that...
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -509,17 +619,88 @@ public String toString() {
return this.getClass().getSimpleName() + "{configMapName='" +
configMapName + "'}";
}
- private RetrievableStateHandle<T> deserializeObject(String content) throws
IOException {
- checkNotNull(content, "Content should not be null.");
+ private boolean isValidOperation(KubernetesConfigMap c) {
+ return lockIdentity == null ||
KubernetesLeaderElector.hasLeadership(c, lockIdentity);
+ }
- final byte[] data = Base64.getDecoder().decode(content);
+ @VisibleForTesting
+ CompletableFuture<Boolean> updateConfigMap(
+ Function<KubernetesConfigMap, Optional<KubernetesConfigMap>>
updateFn) {
+ return kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ configMap -> {
+ if (isValidOperation(configMap)) {
+ return updateFn.apply(configMap);
+ }
+ return Optional.empty();
+ });
+ }
- try {
- return deserialize(data);
- } catch (IOException | ClassNotFoundException e) {
- throw new IOException(
- "Failed to deserialize state handle from ConfigMap data "
+ content + '.', e);
+ /**
+ * Adds entry into the ConfigMap. If the entry already exists and contains
delete marker, the
+ * try finish the removal before the actual update.
+ */
+ private Optional<KubernetesConfigMap> addEntry(
+ KubernetesConfigMap configMap, String key, byte[]
serializedStateHandle)
+ throws Exception {
+ final String content = configMap.getData().get(key);
+ if (content != null) {
+ try {
+ final StateHandleWithDeleteMarker<T> stateHandle =
deserializeStateHandle(content);
+ if (stateHandle.isMarkedForDeletion()) {
+ // This might be a left-over after the fail-over. As the
remove operation is
+ // idempotent let's try to finish it.
+ if (!releaseAndTryRemove(key)) {
+ throw new IllegalStateException(
+ "Unable to remove the marked as deleting
entry.");
+ }
+ } else {
+ throw getKeyAlreadyExistException(key);
+ }
+ } catch (IOException e) {
+ // Just log the invalid entry, it will be overridden
+ // by the update code path below.
+ logInvalidEntry(key, configMapName, e);
+ }
}
+ configMap.getData().put(key, toBase64(serializedStateHandle));
+ return Optional.of(configMap);
+ }
+
+ /**
+ * Replace the entry in the ConfigMap. If the entry already exists and
contains delete marker,
+ * we treat is as non-existent and perform the best effort removal.
+ */
+ private Optional<KubernetesConfigMap> replaceEntry(
+ KubernetesConfigMap configMap,
+ String key,
+ byte[] serializedStateHandle,
+ AtomicReference<RetrievableStateHandle<T>> oldStateHandleRef)
+ throws NotExistException {
+ final String content = configMap.getData().get(key);
+ if (content != null) {
+ try {
+ final StateHandleWithDeleteMarker<T> stateHandle =
deserializeStateHandle(content);
+ oldStateHandleRef.set(stateHandle.getInner());
+ if (stateHandle.isMarkedForDeletion()) {
+ final NotExistException exception =
getKeyNotExistException(key);
+ try {
+ // Try to finish the removal. We don't really care
whether this succeeds or
+ // not, from the "replace" point of view, the entry
doesn't exist.
+ releaseAndTryRemove(key);
Review comment:
Do we actually test how well nested k8s calls work? alternatively, we
could create a internal remove method...
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -509,17 +619,88 @@ public String toString() {
return this.getClass().getSimpleName() + "{configMapName='" +
configMapName + "'}";
}
- private RetrievableStateHandle<T> deserializeObject(String content) throws
IOException {
- checkNotNull(content, "Content should not be null.");
+ private boolean isValidOperation(KubernetesConfigMap c) {
+ return lockIdentity == null ||
KubernetesLeaderElector.hasLeadership(c, lockIdentity);
+ }
- final byte[] data = Base64.getDecoder().decode(content);
+ @VisibleForTesting
+ CompletableFuture<Boolean> updateConfigMap(
+ Function<KubernetesConfigMap, Optional<KubernetesConfigMap>>
updateFn) {
+ return kubeClient.checkAndUpdateConfigMap(
+ configMapName,
+ configMap -> {
+ if (isValidOperation(configMap)) {
+ return updateFn.apply(configMap);
+ }
+ return Optional.empty();
+ });
+ }
- try {
- return deserialize(data);
- } catch (IOException | ClassNotFoundException e) {
- throw new IOException(
- "Failed to deserialize state handle from ConfigMap data "
+ content + '.', e);
+ /**
+ * Adds entry into the ConfigMap. If the entry already exists and contains
delete marker, the
+ * try finish the removal before the actual update.
Review comment:
```suggestion
* Adds entry into the ConfigMap. If the entry already exists and
contains delete marker, the {@code KubernetesStateHandleStore}
* tries to finish the removal before the actual update.
```
nit
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -423,40 +528,40 @@ public boolean releaseAndTryRemove(String key) throws
Exception {
*/
@Override
public void releaseAndTryRemoveAll() throws Exception {
Review comment:
I was initially hesitant to remove. this method from the
`StateHandleStore` interface entirely, but looking at the changes, it might
have been the better approach considering that it's nowhere used in the
production code base 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]