tillrohrmann commented on a change in pull request #15832:
URL: https://github.com/apache/flink/pull/15832#discussion_r628363445



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/persistence/PossibleInconsistentStateException.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.persistence;
+
+/**
+ * {@code PossibleInconsistentStateException} represents errors that might 
have lead to an
+ * inconsistent state within the HA resources.
+ */
+public class PossibleInconsistentStateException extends Exception {

Review comment:
       Shall we let this exception extend from `FlinkException`?

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -114,21 +118,28 @@ public KubernetesStateHandleStore(
      * @param key Key in ConfigMap
      * @param state State to be added
      * @throws AlreadyExistException if the name already exists
+     * @throws PossibleInconsistentStateException if the write-to-Kubernetes 
operation failed. This
+     *     indicates that it's not clear whether the new state was 
successfully written to
+     *     Kubernetes or not. No state was discarded. Proper error handling 
has to be applied on the
+     *     caller's side.
      * @throws Exception if persisting state or writing state handle failed
      */
     @Override
-    public RetrievableStateHandle<T> addAndLock(String key, T state) throws 
Exception {
+    public RetrievableStateHandle<T> addAndLock(String key, T state)
+            throws PossibleInconsistentStateException, Exception {
         checkNotNull(key, "Key in ConfigMap.");
         checkNotNull(state, "State.");
 
         final RetrievableStateHandle<T> storeHandle = storage.store(state);
 
-        boolean success = false;
+        final byte[] serializedStoreHandle = serializeStateHandle(storeHandle);
 
+        // initialize flag to serve the failure case
+        boolean discardState = true;
         try {
-            final byte[] serializedStoreHandle = 
InstantiationUtil.serializeObject(storeHandle);
-            success =
-                    kubeClient
+            // a successful operation will result in the state not being 
discarded
+            discardState =
+                    !kubeClient

Review comment:
       I think this is personal taste: I find it easier to think about it with 
`success` and w/o the negation here.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -470,6 +508,26 @@ public String toString() {
         return this.getClass().getSimpleName() + "{configMapName='" + 
configMapName + "'}";
     }
 
+    private byte[] serializeStateHandle(RetrievableStateHandle<T> 
retrievableStateHandle)
+            throws Exception {
+        try {
+            // Serialize the state handle. This writes the state to the 
backend.
+            return InstantiationUtil.serializeObject(retrievableStateHandle);
+        } catch (Exception e) {
+            discardState(retrievableStateHandle);

Review comment:
       I think here it would be good to catch a possible exception and add it 
as a suppressed exception to `e`. Otherwise, we might lose it.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -196,27 +231,74 @@ public void replace(String pathInZooKeeper, 
IntegerResourceVersion expectedVersi
 
         RetrievableStateHandle<T> newStateHandle = storage.store(state);
 
-        boolean success = false;
+        final byte[] serializedStateHandle = 
serializeStateHandle(newStateHandle);
 
+        // initialize flags to serve the failure case
+        boolean discardOldState = false;
+        boolean discardNewState = true;
         try {
-            // Serialize the new state handle. This writes the state to the 
backend.
-            byte[] serializedStateHandle = 
InstantiationUtil.serializeObject(newStateHandle);
+            setStateHandle(path, serializedStateHandle, 
expectedVersion.getValue());
+
+            // swap subject for deletion in case of success
+            discardOldState = true;
+            discardNewState = false;
+        } catch (Exception e) {
+            if (indicatesPossiblyInconsistentState(e)) {
+                // it's unclear whether the state handle metadata was written 
to ZooKeeper -
+                // hence, we don't discard any data
+                discardOldState = false;
+                discardNewState = false;
+                throw new PossibleInconsistentStateException(e);
+            }
 
-            // Replace state handle in ZooKeeper.
-            client.setData()
-                    .withVersion(expectedVersion.getValue())
-                    .forPath(path, serializedStateHandle);
-            success = true;
-        } catch (KeeperException.NoNodeException e) {
             // We wrap the exception here so that it could be caught in 
DefaultJobGraphStore
-            throw new NotExistException("ZooKeeper node " + path + " does not 
exist.", e);
+            throw ExceptionUtils.findThrowable(e, 
KeeperException.NoNodeException.class)
+                    .map(
+                            nnee ->
+                                    new NotExistException(
+                                            "ZooKeeper node " + path + " does 
not exist.", nnee))
+                    .orElseThrow(() -> e);
         } finally {
-            if (success) {
-                oldStateHandle.discardState();
-            } else {
-                newStateHandle.discardState();
+            if (discardOldState) {
+                discardState(oldStateHandle);
             }
+
+            if (discardNewState) {
+                discardState(newStateHandle);
+            }
+        }
+    }
+
+    // this method is provided for the sole purpose of easier testing
+    @VisibleForTesting
+    protected void setStateHandle(String path, byte[] serializedStateHandle, 
int expectedVersion)
+            throws Exception {
+        // Replace state handle in ZooKeeper.
+        client.setData().withVersion(expectedVersion).forPath(path, 
serializedStateHandle);
+    }
+
+    private static void discardState(@Nullable StateObject stateObject) throws 
Exception {

Review comment:
       `@Nullable` required?

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
##########
@@ -470,6 +508,26 @@ public String toString() {
         return this.getClass().getSimpleName() + "{configMapName='" + 
configMapName + "'}";
     }
 
+    private byte[] serializeStateHandle(RetrievableStateHandle<T> 
retrievableStateHandle)
+            throws Exception {
+        try {
+            // Serialize the state handle. This writes the state to the 
backend.
+            return InstantiationUtil.serializeObject(retrievableStateHandle);
+        } catch (Exception e) {
+            discardState(retrievableStateHandle);
+            ExceptionUtils.rethrow(e);
+        }
+
+        // will never happen but is added to please the compiler
+        return new byte[0];
+    }
+
+    private static void discardState(@Nullable StateObject stateObject) throws 
Exception {

Review comment:
       Is `@Nullable` required?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to