XComp commented on a change in pull request #18901:
URL: https://github.com/apache/flink/pull/18901#discussion_r814977189



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -69,13 +75,82 @@
  * the leader could update the store. Then we will completely get rid of the 
lock-and-release in
  * Zookeeper implementation.
  *
- * @param <T> Type of state
+ * @param <T> Type of the state we're storing.
  */
 public class KubernetesStateHandleStore<T extends Serializable>
         implements StateHandleStore<T, StringResourceVersion> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesStateHandleStore.class);
 
+    private static <T extends Serializable> StateHandleWithDeleteMarker<T> 
deserializeStateHandle(
+            String content) throws IOException {
+        checkNotNull(content, "Content should not be null.");
+        final byte[] data = Base64.getDecoder().decode(content);
+        try {
+            return deserialize(data);
+        } catch (IOException | ClassNotFoundException e) {
+            throw new IOException(
+                    String.format(
+                            "Failed to deserialize state handle from ConfigMap 
data %s.", content),
+                    e);
+        }
+    }
+
+    private static String toBase64(byte[] bytes) {
+        return Base64.getEncoder().encodeToString(bytes);
+    }
+
+    @VisibleForTesting
+    static String serializeStateHandle(StateHandleWithDeleteMarker<?> 
stateHandle)

Review comment:
       ```suggestion
       static String serializeStateHandle(StateObject<?> stateHandle)
   ```
   I'm wondering whether we should move this into `StateHandleStoreUtils` along 
`serializeOrDiscard`

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -509,17 +619,88 @@ public String toString() {
         return this.getClass().getSimpleName() + "{configMapName='" + 
configMapName + "'}";
     }
 
-    private RetrievableStateHandle<T> deserializeObject(String content) throws 
IOException {
-        checkNotNull(content, "Content should not be null.");
+    private boolean isValidOperation(KubernetesConfigMap c) {
+        return lockIdentity == null || 
KubernetesLeaderElector.hasLeadership(c, lockIdentity);
+    }
 
-        final byte[] data = Base64.getDecoder().decode(content);
+    @VisibleForTesting
+    CompletableFuture<Boolean> updateConfigMap(
+            Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> 
updateFn) {
+        return kubeClient.checkAndUpdateConfigMap(
+                configMapName,
+                configMap -> {
+                    if (isValidOperation(configMap)) {
+                        return updateFn.apply(configMap);
+                    }
+                    return Optional.empty();
+                });
+    }
 
-        try {
-            return deserialize(data);
-        } catch (IOException | ClassNotFoundException e) {
-            throw new IOException(
-                    "Failed to deserialize state handle from ConfigMap data " 
+ content + '.', e);
+    /**
+     * Adds entry into the ConfigMap. If the entry already exists and contains 
delete marker, the
+     * try finish the removal before the actual update.
+     */
+    private Optional<KubernetesConfigMap> addEntry(

Review comment:
       nit: I found it a bit annoying to scroll all the way down to find the 
private method. My understanding was, that we usually put methods right below 
the method (if possible) where it is used to have a faster look-up.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -725,4 +856,176 @@ public void testRemoveAllHandles() throws Exception {
             }
         };
     }
+
+    @Test
+    public void testReleaseAndTryRemoveIsIdempotent() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesStateHandleStore<
+                                            
TestingLongStateHandleHelper.LongStateHandle>
+                                    store =
+                                            new KubernetesStateHandleStore<>(
+                                                    flinkKubeClient,
+                                                    LEADER_CONFIGMAP_NAME,
+                                                    longStateStorage,
+                                                    filter,
+                                                    LOCK_IDENTITY);
+
+                            final RuntimeException discardException =
+                                    new RuntimeException("Test exception.");
+                            store.addAndLock(
+                                    key,
+                                    new 
TestingLongStateHandleHelper.LongStateHandle(2L) {
+
+                                        final AtomicBoolean thrown = new 
AtomicBoolean(false);
+
+                                        @Override
+                                        public void discardState() {
+                                            if (!thrown.getAndSet(true)) {
+                                                throw discardException;
+                                            }
+                                            super.discardState();
+                                        }
+                                    });
+
+                            assertThat(store.getAllAndLock().size(), is(1));
+                            assertThat(
+                                    store.getAndLock(key),
+                                    
is(notNullValue(RetrievableStateHandle.class)));
+
+                            // First remove attempt should fail when we're 
discarding the underlying
+                            // state.
+                            final Exception exception =
+                                    assertThrows(
+                                            Exception.class, () -> 
store.releaseAndTryRemove(key));
+                            assertThat(exception, 
FlinkMatchers.containsCause(discardException));
+
+                            // Now we should see that the node is 
"soft-deleted". This means it can
+                            // no longer be accessed by the get methods, but 
the underlying state
+                            // still exists.
+                            assertThat(store.getAllAndLock().size(), is(0));
+                            assertThrows(Exception.class, () -> 
store.getAndLock(key));
+                            
assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0));
+                            
assertThat(getLeaderConfigMap().getData().containsKey(key), is(true));
+
+                            // Second retry should succeed and remove the 
underlying state and the
+                            // reference in config map.
+                            assertThat(store.releaseAndTryRemove(key), 
is(true));
+                            assertThat(store.getAllAndLock().size(), is(0));
+                            assertThrows(Exception.class, () -> 
store.getAndLock(key));
+                            
assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(1));
+                            
assertThat(getLeaderConfigMap().getData().containsKey(key), is(false));
+                        });
+            }
+        };
+    }
+
+    @Test
+    public void testReleaseAndTryRemoveAllIsIdempotent() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesStateHandleStore<
+                                            
TestingLongStateHandleHelper.LongStateHandle>
+                                    store =
+                                            new KubernetesStateHandleStore<>(
+                                                    flinkKubeClient,
+                                                    LEADER_CONFIGMAP_NAME,
+                                                    longStateStorage,
+                                                    filter,
+                                                    LOCK_IDENTITY);
+                            final int numKeys = 10;
+
+                            final RuntimeException discardException =
+                                    new RuntimeException("Test exception.");
+                            for (int idx = 0; idx < numKeys; idx++) {
+                                store.addAndLock(
+                                        key + "_" + idx,
+                                        new 
TestingLongStateHandleHelper.LongStateHandle(idx + 1) {
+
+                                            final AtomicBoolean thrown = new 
AtomicBoolean(false);
+
+                                            @Override
+                                            public void discardState() {
+                                                if (!thrown.getAndSet(true)) {
+                                                    throw discardException;
+                                                }
+                                                super.discardState();
+                                            }
+                                        });
+                            }
+
+                            // All keys should be retrievable
+                            assertThat(store.getAllAndLock().size(), 
is(numKeys));
+                            for (int idx = 0; idx < numKeys; idx++) {
+                                assertThat(
+                                        store.getAndLock(key + "_" + idx),
+                                        
is(notNullValue(RetrievableStateHandle.class)));
+                            }
+
+                            // First remove attempt should fail when we're 
discarding the underlying
+                            // state.
+                            final Exception exception =
+                                    assertThrows(Exception.class, 
store::releaseAndTryRemoveAll);
+                            assertThat(exception, 
FlinkMatchers.containsCause(discardException));
+
+                            // Now we should see that the all nodes are 
"soft-deleted". This means
+                            // it can no longer be accessed by the get 
methods, but the underlying
+                            // state still exists.
+                            assertThat(store.getAllAndLock().size(), is(0));
+                            for (int idx = 0; idx < numKeys; idx++) {
+                                final String indexKey = key + "_" + idx;
+                                assertThrows(Exception.class, () -> 
store.getAndLock(indexKey));
+                                assertThat(
+                                        
getLeaderConfigMap().getData().containsKey(indexKey),
+                                        is(true));
+                            }
+                            
assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0));
+
+                            // Second retry should succeed and remove the 
underlying state and the
+                            // reference in config map.
+                            store.releaseAndTryRemoveAll();
+                            assertThat(store.getAllAndLock().size(), is(0));
+                            for (int idx = 0; idx < numKeys; idx++) {
+                                final String indexKey = key + "_" + idx;
+                                assertThrows(Exception.class, () -> 
store.getAndLock(indexKey));
+                                assertThat(
+                                        
getLeaderConfigMap().getData().containsKey(indexKey),
+                                        is(false));
+                            }
+                            assertThat(
+                                    
TestingLongStateHandleHelper.getGlobalDiscardCount(), is(10));

Review comment:
       ```suggestion
                                       
TestingLongStateHandleHelper.getGlobalDiscardCount(), is(numKeys));
   ```

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -20,35 +20,43 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.testutils.FlinkMatchers;
+import 
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.StateHandleWithDeleteMarker;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
 import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
 import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
 import org.apache.flink.runtime.persistence.StateHandleStore;
 import org.apache.flink.runtime.persistence.StringResourceVersion;
 import org.apache.flink.runtime.persistence.TestingLongStateHandleHelper;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.FunctionUtils;
 
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
 /** Tests for {@link KubernetesStateHandleStore} operations. */
 public class KubernetesStateHandleStoreTest extends 
KubernetesHighAvailabilityTestBase {
 
     private static final String PREFIX = "test-prefix-";
+
     private final String key = PREFIX + JobID.generate();
     private final Predicate<String> filter = k -> k.startsWith(PREFIX);
     private final TestingLongStateHandleHelper.LongStateHandle state =

Review comment:
       tbh, it's a bit weird that we store this state as a member without 
reinitializing it. It should be initialized in the setup method. Otherwise, the 
discard counter increases constantly 🤔 

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -509,17 +619,88 @@ public String toString() {
         return this.getClass().getSimpleName() + "{configMapName='" + 
configMapName + "'}";
     }
 
-    private RetrievableStateHandle<T> deserializeObject(String content) throws 
IOException {
-        checkNotNull(content, "Content should not be null.");
+    private boolean isValidOperation(KubernetesConfigMap c) {
+        return lockIdentity == null || 
KubernetesLeaderElector.hasLeadership(c, lockIdentity);
+    }
 
-        final byte[] data = Base64.getDecoder().decode(content);
+    @VisibleForTesting
+    CompletableFuture<Boolean> updateConfigMap(
+            Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> 
updateFn) {
+        return kubeClient.checkAndUpdateConfigMap(
+                configMapName,
+                configMap -> {
+                    if (isValidOperation(configMap)) {
+                        return updateFn.apply(configMap);
+                    }
+                    return Optional.empty();
+                });
+    }
 
-        try {
-            return deserialize(data);
-        } catch (IOException | ClassNotFoundException e) {
-            throw new IOException(
-                    "Failed to deserialize state handle from ConfigMap data " 
+ content + '.', e);
+    /**
+     * Adds entry into the ConfigMap. If the entry already exists and contains 
delete marker, the
+     * try finish the removal before the actual update.
+     */
+    private Optional<KubernetesConfigMap> addEntry(
+            KubernetesConfigMap configMap, String key, byte[] 
serializedStateHandle)
+            throws Exception {
+        final String content = configMap.getData().get(key);
+        if (content != null) {
+            try {
+                final StateHandleWithDeleteMarker<T> stateHandle = 
deserializeStateHandle(content);
+                if (stateHandle.isMarkedForDeletion()) {
+                    // This might be a left-over after the fail-over. As the 
remove operation is
+                    // idempotent let's try to finish it.
+                    if (!releaseAndTryRemove(key)) {
+                        throw new IllegalStateException(
+                                "Unable to remove the marked as deleting 
entry.");
+                    }
+                } else {
+                    throw getKeyAlreadyExistException(key);
+                }
+            } catch (IOException e) {
+                // Just log the invalid entry, it will be overridden
+                // by the update code path below.
+                logInvalidEntry(key, configMapName, e);
+            }
         }
+        configMap.getData().put(key, toBase64(serializedStateHandle));
+        return Optional.of(configMap);
+    }
+
+    /**
+     * Replace the entry in the ConfigMap. If the entry already exists and 
contains delete marker,
+     * we treat is as non-existent and perform the best effort removal.
+     */
+    private Optional<KubernetesConfigMap> replaceEntry(
+            KubernetesConfigMap configMap,
+            String key,
+            byte[] serializedStateHandle,
+            AtomicReference<RetrievableStateHandle<T>> oldStateHandleRef)
+            throws NotExistException {
+        final String content = configMap.getData().get(key);
+        if (content != null) {
+            try {
+                final StateHandleWithDeleteMarker<T> stateHandle = 
deserializeStateHandle(content);
+                oldStateHandleRef.set(stateHandle.getInner());
+                if (stateHandle.isMarkedForDeletion()) {
+                    final NotExistException exception = 
getKeyNotExistException(key);
+                    try {
+                        // Try to finish the removal. We don't really care 
whether this succeeds or
+                        // not, from the "replace" point of view, the entry 
doesn't exist.
+                        releaseAndTryRemove(key);
+                    } catch (Exception e) {
+                        exception.addSuppressed(e);
+                    }
+                    throw exception;
+                }
+            } catch (IOException e) {
+                // Just log the invalid entry, it will be logged by the update 
code path below.

Review comment:
       ```suggestion
                   // Just log the invalid entry, it will be removed by the 
update code path below.
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -509,17 +619,88 @@ public String toString() {
         return this.getClass().getSimpleName() + "{configMapName='" + 
configMapName + "'}";
     }
 
-    private RetrievableStateHandle<T> deserializeObject(String content) throws 
IOException {
-        checkNotNull(content, "Content should not be null.");
+    private boolean isValidOperation(KubernetesConfigMap c) {
+        return lockIdentity == null || 
KubernetesLeaderElector.hasLeadership(c, lockIdentity);
+    }
 
-        final byte[] data = Base64.getDecoder().decode(content);
+    @VisibleForTesting
+    CompletableFuture<Boolean> updateConfigMap(
+            Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> 
updateFn) {
+        return kubeClient.checkAndUpdateConfigMap(
+                configMapName,
+                configMap -> {
+                    if (isValidOperation(configMap)) {
+                        return updateFn.apply(configMap);
+                    }
+                    return Optional.empty();
+                });
+    }
 
-        try {
-            return deserialize(data);
-        } catch (IOException | ClassNotFoundException e) {
-            throw new IOException(
-                    "Failed to deserialize state handle from ConfigMap data " 
+ content + '.', e);
+    /**
+     * Adds entry into the ConfigMap. If the entry already exists and contains 
delete marker, the
+     * try finish the removal before the actual update.
+     */
+    private Optional<KubernetesConfigMap> addEntry(
+            KubernetesConfigMap configMap, String key, byte[] 
serializedStateHandle)
+            throws Exception {
+        final String content = configMap.getData().get(key);
+        if (content != null) {
+            try {
+                final StateHandleWithDeleteMarker<T> stateHandle = 
deserializeStateHandle(content);
+                if (stateHandle.isMarkedForDeletion()) {
+                    // This might be a left-over after the fail-over. As the 
remove operation is
+                    // idempotent let's try to finish it.
+                    if (!releaseAndTryRemove(key)) {
+                        throw new IllegalStateException(
+                                "Unable to remove the marked as deleting 
entry.");
+                    }
+                } else {
+                    throw getKeyAlreadyExistException(key);
+                }
+            } catch (IOException e) {
+                // Just log the invalid entry, it will be overridden
+                // by the update code path below.
+                logInvalidEntry(key, configMapName, e);
+            }
         }
+        configMap.getData().put(key, toBase64(serializedStateHandle));
+        return Optional.of(configMap);
+    }
+
+    /**
+     * Replace the entry in the ConfigMap. If the entry already exists and 
contains delete marker,
+     * we treat is as non-existent and perform the best effort removal.

Review comment:
       ```suggestion
        * we treat it as non-existent and perform the best effort removal.
   ```
   nit

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -725,4 +856,176 @@ public void testRemoveAllHandles() throws Exception {
             }
         };
     }
+
+    @Test
+    public void testReleaseAndTryRemoveIsIdempotent() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesStateHandleStore<
+                                            
TestingLongStateHandleHelper.LongStateHandle>
+                                    store =
+                                            new KubernetesStateHandleStore<>(
+                                                    flinkKubeClient,
+                                                    LEADER_CONFIGMAP_NAME,
+                                                    longStateStorage,
+                                                    filter,
+                                                    LOCK_IDENTITY);
+
+                            final RuntimeException discardException =
+                                    new RuntimeException("Test exception.");
+                            store.addAndLock(
+                                    key,
+                                    new 
TestingLongStateHandleHelper.LongStateHandle(2L) {

Review comment:
       I guess this should make the `StateHandleStoreUtilsTest` fail because of 
the missing Serial UID? 🤔 

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -118,16 +126,68 @@ public void testAddAlreadyExistingKey() throws Exception {
                                                 key, LEADER_CONFIGMAP_NAME);
                                 assertThat(ex, 
FlinkMatchers.containsMessage(msg));
                             }
-                            
assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(1));
+
+                            // Both initial & new handles should be in the 
storage (we never clean
+                            // it for testing).
+                            
assertThat(TestingLongStateHandleHelper.getGlobalStorageSize(), is(2));
+                            // Only the new one (second entry in the store) 
should have been
+                            // discarded.
                             assertThat(
                                     TestingLongStateHandleHelper
                                             
.getDiscardCallCountForStateHandleByIndex(0),
+                                    is(0));
+                            assertThat(
+                                    TestingLongStateHandleHelper
+                                            
.getDiscardCallCountForStateHandleByIndex(1),
                                     is(1));
                         });
             }
         };
     }
 
+    @Test
+    public void testAddAndLockWithDeletingEntry() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            addDeletingEntry(getLeaderConfigMap(), key, 1337L);

Review comment:
       By returning the added StateHandle here, the test could access its 
discard state directly through the `isDiscarded` method.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -725,4 +856,176 @@ public void testRemoveAllHandles() throws Exception {
             }
         };
     }
+
+    @Test
+    public void testReleaseAndTryRemoveIsIdempotent() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesStateHandleStore<
+                                            
TestingLongStateHandleHelper.LongStateHandle>
+                                    store =
+                                            new KubernetesStateHandleStore<>(
+                                                    flinkKubeClient,
+                                                    LEADER_CONFIGMAP_NAME,
+                                                    longStateStorage,
+                                                    filter,
+                                                    LOCK_IDENTITY);
+
+                            final RuntimeException discardException =
+                                    new RuntimeException("Test exception.");
+                            store.addAndLock(
+                                    key,
+                                    new 
TestingLongStateHandleHelper.LongStateHandle(2L) {
+
+                                        final AtomicBoolean thrown = new 
AtomicBoolean(false);
+
+                                        @Override
+                                        public void discardState() {
+                                            if (!thrown.getAndSet(true)) {
+                                                throw discardException;
+                                            }
+                                            super.discardState();
+                                        }
+                                    });
+
+                            assertThat(store.getAllAndLock().size(), is(1));
+                            assertThat(
+                                    store.getAndLock(key),
+                                    
is(notNullValue(RetrievableStateHandle.class)));
+
+                            // First remove attempt should fail when we're 
discarding the underlying
+                            // state.
+                            final Exception exception =
+                                    assertThrows(
+                                            Exception.class, () -> 
store.releaseAndTryRemove(key));
+                            assertThat(exception, 
FlinkMatchers.containsCause(discardException));
+
+                            // Now we should see that the node is 
"soft-deleted". This means it can
+                            // no longer be accessed by the get methods, but 
the underlying state
+                            // still exists.
+                            assertThat(store.getAllAndLock().size(), is(0));
+                            assertThrows(Exception.class, () -> 
store.getAndLock(key));
+                            
assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0));
+                            
assertThat(getLeaderConfigMap().getData().containsKey(key), is(true));
+
+                            // Second retry should succeed and remove the 
underlying state and the
+                            // reference in config map.
+                            assertThat(store.releaseAndTryRemove(key), 
is(true));
+                            assertThat(store.getAllAndLock().size(), is(0));
+                            assertThrows(Exception.class, () -> 
store.getAndLock(key));
+                            
assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(1));
+                            
assertThat(getLeaderConfigMap().getData().containsKey(key), is(false));
+                        });
+            }
+        };
+    }
+
+    @Test
+    public void testReleaseAndTryRemoveAllIsIdempotent() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesStateHandleStore<
+                                            
TestingLongStateHandleHelper.LongStateHandle>
+                                    store =
+                                            new KubernetesStateHandleStore<>(
+                                                    flinkKubeClient,
+                                                    LEADER_CONFIGMAP_NAME,
+                                                    longStateStorage,
+                                                    filter,
+                                                    LOCK_IDENTITY);
+                            final int numKeys = 10;
+
+                            final RuntimeException discardException =
+                                    new RuntimeException("Test exception.");
+                            for (int idx = 0; idx < numKeys; idx++) {

Review comment:
       By adding a handle that doesn't fail during discard we could verify the 
behavior for these as well. This goes into the direction which I mentioned 
earlier whether we remove each entry individually based on its discard call 
being successful or not (how it's done in the ZK implementation) or whether we 
let none be deleted as soon as a single entry has an error during discard (how 
it's currently implemented in k8s)

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
##########
@@ -725,4 +856,176 @@ public void testRemoveAllHandles() throws Exception {
             }
         };
     }
+
+    @Test
+    public void testReleaseAndTryRemoveIsIdempotent() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesStateHandleStore<
+                                            
TestingLongStateHandleHelper.LongStateHandle>
+                                    store =
+                                            new KubernetesStateHandleStore<>(
+                                                    flinkKubeClient,
+                                                    LEADER_CONFIGMAP_NAME,
+                                                    longStateStorage,
+                                                    filter,
+                                                    LOCK_IDENTITY);
+
+                            final RuntimeException discardException =
+                                    new RuntimeException("Test exception.");
+                            store.addAndLock(
+                                    key,
+                                    new 
TestingLongStateHandleHelper.LongStateHandle(2L) {

Review comment:
       `LongStateHandle` provides a way to pass in a `PreDiscardCallback` now

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -469,7 +574,17 @@ public void releaseAndTryRemoveAll() throws Exception {
                                                     "Could not properly remove 
all state handles.",
                                                     exception));
                                 }
+                                // Commit the "removal transaction" by 
removing the entries from the
+                                // ConfigMap.
+                                return updateConfigMap(

Review comment:
       Here I notice that the ZK implementation behaves slightly differently: 
In the ZK implementation, we commit each entry individually (i.e. collecting 
the successful discards and removing them from the `ConfigMap` even if 
exceptions have been caught). I leave it up to you whether we want to align 
that...

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -509,17 +619,88 @@ public String toString() {
         return this.getClass().getSimpleName() + "{configMapName='" + 
configMapName + "'}";
     }
 
-    private RetrievableStateHandle<T> deserializeObject(String content) throws 
IOException {
-        checkNotNull(content, "Content should not be null.");
+    private boolean isValidOperation(KubernetesConfigMap c) {
+        return lockIdentity == null || 
KubernetesLeaderElector.hasLeadership(c, lockIdentity);
+    }
 
-        final byte[] data = Base64.getDecoder().decode(content);
+    @VisibleForTesting
+    CompletableFuture<Boolean> updateConfigMap(
+            Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> 
updateFn) {
+        return kubeClient.checkAndUpdateConfigMap(
+                configMapName,
+                configMap -> {
+                    if (isValidOperation(configMap)) {
+                        return updateFn.apply(configMap);
+                    }
+                    return Optional.empty();
+                });
+    }
 
-        try {
-            return deserialize(data);
-        } catch (IOException | ClassNotFoundException e) {
-            throw new IOException(
-                    "Failed to deserialize state handle from ConfigMap data " 
+ content + '.', e);
+    /**
+     * Adds entry into the ConfigMap. If the entry already exists and contains 
delete marker, the
+     * try finish the removal before the actual update.
+     */
+    private Optional<KubernetesConfigMap> addEntry(
+            KubernetesConfigMap configMap, String key, byte[] 
serializedStateHandle)
+            throws Exception {
+        final String content = configMap.getData().get(key);
+        if (content != null) {
+            try {
+                final StateHandleWithDeleteMarker<T> stateHandle = 
deserializeStateHandle(content);
+                if (stateHandle.isMarkedForDeletion()) {
+                    // This might be a left-over after the fail-over. As the 
remove operation is
+                    // idempotent let's try to finish it.
+                    if (!releaseAndTryRemove(key)) {
+                        throw new IllegalStateException(
+                                "Unable to remove the marked as deleting 
entry.");
+                    }
+                } else {
+                    throw getKeyAlreadyExistException(key);
+                }
+            } catch (IOException e) {
+                // Just log the invalid entry, it will be overridden
+                // by the update code path below.
+                logInvalidEntry(key, configMapName, e);
+            }
         }
+        configMap.getData().put(key, toBase64(serializedStateHandle));
+        return Optional.of(configMap);
+    }
+
+    /**
+     * Replace the entry in the ConfigMap. If the entry already exists and 
contains delete marker,
+     * we treat is as non-existent and perform the best effort removal.
+     */
+    private Optional<KubernetesConfigMap> replaceEntry(
+            KubernetesConfigMap configMap,
+            String key,
+            byte[] serializedStateHandle,
+            AtomicReference<RetrievableStateHandle<T>> oldStateHandleRef)
+            throws NotExistException {
+        final String content = configMap.getData().get(key);
+        if (content != null) {
+            try {
+                final StateHandleWithDeleteMarker<T> stateHandle = 
deserializeStateHandle(content);
+                oldStateHandleRef.set(stateHandle.getInner());
+                if (stateHandle.isMarkedForDeletion()) {
+                    final NotExistException exception = 
getKeyNotExistException(key);
+                    try {
+                        // Try to finish the removal. We don't really care 
whether this succeeds or
+                        // not, from the "replace" point of view, the entry 
doesn't exist.
+                        releaseAndTryRemove(key);

Review comment:
       Do we actually test how well nested k8s calls work? alternatively, we 
could create a internal remove method...

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -509,17 +619,88 @@ public String toString() {
         return this.getClass().getSimpleName() + "{configMapName='" + 
configMapName + "'}";
     }
 
-    private RetrievableStateHandle<T> deserializeObject(String content) throws 
IOException {
-        checkNotNull(content, "Content should not be null.");
+    private boolean isValidOperation(KubernetesConfigMap c) {
+        return lockIdentity == null || 
KubernetesLeaderElector.hasLeadership(c, lockIdentity);
+    }
 
-        final byte[] data = Base64.getDecoder().decode(content);
+    @VisibleForTesting
+    CompletableFuture<Boolean> updateConfigMap(
+            Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> 
updateFn) {
+        return kubeClient.checkAndUpdateConfigMap(
+                configMapName,
+                configMap -> {
+                    if (isValidOperation(configMap)) {
+                        return updateFn.apply(configMap);
+                    }
+                    return Optional.empty();
+                });
+    }
 
-        try {
-            return deserialize(data);
-        } catch (IOException | ClassNotFoundException e) {
-            throw new IOException(
-                    "Failed to deserialize state handle from ConfigMap data " 
+ content + '.', e);
+    /**
+     * Adds entry into the ConfigMap. If the entry already exists and contains 
delete marker, the
+     * try finish the removal before the actual update.

Review comment:
       ```suggestion
        * Adds entry into the ConfigMap. If the entry already exists and 
contains delete marker, the {@code KubernetesStateHandleStore}
        * tries to finish the removal before the actual update.
   ```
   nit

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -423,40 +528,40 @@ public boolean releaseAndTryRemove(String key) throws 
Exception {
      */
     @Override
     public void releaseAndTryRemoveAll() throws Exception {

Review comment:
       I was initially hesitant to remove. this method from the 
`StateHandleStore` interface entirely, but looking at the changes, it might 
have been the better approach considering that it's nowhere used in the 
production code base 🤔 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to