rkhachatryan commented on a change in pull request #15832:
URL: https://github.com/apache/flink/pull/15832#discussion_r630164743



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -126,56 +144,71 @@ public ZooKeeperStateHandleStore(
      * @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* 
exist yet)
      * @param state State to be added
      * @return The Created {@link RetrievableStateHandle}.
+     * @throws PossibleInconsistentStateException if the write-to-ZooKeeper 
operation failed. This
+     *     indicates that it's not clear whether the new state was 
successfully written to ZooKeeper
+     *     or not. Proper error handling has to be applied on the caller's 
side.
      * @throws Exception If a ZooKeeper or state handle operation fails
      */
     @Override
-    public RetrievableStateHandle<T> addAndLock(String pathInZooKeeper, T 
state) throws Exception {
+    public RetrievableStateHandle<T> addAndLock(String pathInZooKeeper, T 
state)
+            throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
 
         final String path = normalizePath(pathInZooKeeper);
 
         RetrievableStateHandle<T> storeHandle = storage.store(state);
 
-        boolean success = false;
+        byte[] serializedStoreHandle = serializeStateHandle(storeHandle);
 
+        // initialize flag to serve the failure case
+        boolean discardState = true;
         try {
-            // Serialize the state handle. This writes the state to the 
backend.
-            byte[] serializedStoreHandle = 
InstantiationUtil.serializeObject(storeHandle);
-
-            // Write state handle (not the actual state) to ZooKeeper. This is 
expected to be
-            // smaller than the state itself. This level of indirection makes 
sure that data in
-            // ZooKeeper is small, because ZooKeeper is designed for data in 
the KB range, but
-            // the state can be larger.
-            // Create the lock node in a transaction with the actual state 
node. That way we can
-            // prevent
-            // race conditions with a concurrent delete operation.
-            client.inTransaction()
-                    .create()
-                    .withMode(CreateMode.PERSISTENT)
-                    .forPath(path, serializedStoreHandle)
-                    .and()
-                    .create()
-                    .withMode(CreateMode.EPHEMERAL)
-                    .forPath(getLockPath(path))
-                    .and()
-                    .commit();
-
-            success = true;
+            writeStoreHandleTransactionally(path, serializedStoreHandle);
+            discardState = false;
             return storeHandle;
-        } catch (KeeperException.NodeExistsException e) {
+        } catch (Exception e) {
+            if (indicatesPossiblyInconsistentState(e)) {
+                discardState = false;
+                throw new PossibleInconsistentStateException(e);
+            }
+
             // We wrap the exception here so that it could be caught in 
DefaultJobGraphStore
-            throw new AlreadyExistException("ZooKeeper node " + path + " 
already exists.", e);
+            throw ExceptionUtils.findThrowable(e, 
KeeperException.NodeExistsException.class)

Review comment:
       Is this line reachable? In `indicatesPossiblyInconsistentState` above, 
we already checked for `NodeExistsException` and thrown an exception if it was 
found.
   
   I think it's better to place this check before the 
`indicatesPossiblyInconsistentState` call (rather than updating the map).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -126,56 +144,71 @@ public ZooKeeperStateHandleStore(
      * @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* 
exist yet)
      * @param state State to be added
      * @return The Created {@link RetrievableStateHandle}.
+     * @throws PossibleInconsistentStateException if the write-to-ZooKeeper 
operation failed. This
+     *     indicates that it's not clear whether the new state was 
successfully written to ZooKeeper
+     *     or not. Proper error handling has to be applied on the caller's 
side.
      * @throws Exception If a ZooKeeper or state handle operation fails
      */
     @Override
-    public RetrievableStateHandle<T> addAndLock(String pathInZooKeeper, T 
state) throws Exception {
+    public RetrievableStateHandle<T> addAndLock(String pathInZooKeeper, T 
state)
+            throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
 
         final String path = normalizePath(pathInZooKeeper);
 
         RetrievableStateHandle<T> storeHandle = storage.store(state);
 
-        boolean success = false;
+        byte[] serializedStoreHandle = serializeStateHandle(storeHandle);
 
+        // initialize flag to serve the failure case
+        boolean discardState = true;
         try {
-            // Serialize the state handle. This writes the state to the 
backend.
-            byte[] serializedStoreHandle = 
InstantiationUtil.serializeObject(storeHandle);
-
-            // Write state handle (not the actual state) to ZooKeeper. This is 
expected to be
-            // smaller than the state itself. This level of indirection makes 
sure that data in
-            // ZooKeeper is small, because ZooKeeper is designed for data in 
the KB range, but
-            // the state can be larger.
-            // Create the lock node in a transaction with the actual state 
node. That way we can
-            // prevent
-            // race conditions with a concurrent delete operation.
-            client.inTransaction()
-                    .create()
-                    .withMode(CreateMode.PERSISTENT)
-                    .forPath(path, serializedStoreHandle)
-                    .and()
-                    .create()
-                    .withMode(CreateMode.EPHEMERAL)
-                    .forPath(getLockPath(path))
-                    .and()
-                    .commit();
-
-            success = true;
+            writeStoreHandleTransactionally(path, serializedStoreHandle);
+            discardState = false;
             return storeHandle;
-        } catch (KeeperException.NodeExistsException e) {
+        } catch (Exception e) {
+            if (indicatesPossiblyInconsistentState(e)) {
+                discardState = false;
+                throw new PossibleInconsistentStateException(e);
+            }
+
             // We wrap the exception here so that it could be caught in 
DefaultJobGraphStore
-            throw new AlreadyExistException("ZooKeeper node " + path + " 
already exists.", e);
+            throw ExceptionUtils.findThrowable(e, 
KeeperException.NodeExistsException.class)
+                    .map(
+                            nee ->
+                                    new AlreadyExistException(
+                                            "ZooKeeper node " + path + " 
already exists.", nee))
+                    .orElseThrow(() -> e);
         } finally {
-            if (!success) {
-                // Cleanup the state handle if it was not written to ZooKeeper.
-                if (storeHandle != null) {
-                    storeHandle.discardState();
-                }
+            if (discardState) {
+                storeHandle.discardState();

Review comment:
       I think we could get rid of a boolean flag and simplify code a bit by 
discarding the handle in the `catch` block instead of `finally`, something like 
this:
   ```
   try {
     writeStoreHandleTransactionally();
   } catch (Exception e) {
     if (indicatesPossiblyInconsistentState) {
       throw PossibleInconsistentStateException;
     } else {
       storeHandle.discardState();
     }
   }
   ```
   WDYT?
   

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/StateHandleStoreUtils.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.runtime.persistence.StateHandleStore;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+/**
+ * {@code StateHandleStoreUtils} collects utility methods that might be 
usefule for any {@link
+ * StateHandleStore} implementation.
+ */
+public class StateHandleStoreUtils {
+
+    /**
+     * Serializes the passed {@link StateObject} and discards the state in 
case of failure.
+     *
+     * @param stateObject the {@code StateObject} that shall be serialized.
+     * @return The serialized version of the passed {@code StateObject}.
+     * @throws Exception if an error occurred during the serialization. The 
corresponding {@code
+     *     StateObject} will be discarded in that case.
+     */
+    public static byte[] serializeStateHandle(StateObject stateObject) throws 
Exception {

Review comment:
       I think it would be more consistent to have a symmetric method in this 
class to deserialize handle.
   `ZooKeeperStateHandleStore` currently calls 
`InstantiationUtil.deserializeObject` for that.
   
   

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/StateHandleStoreUtils.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.runtime.persistence.StateHandleStore;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+/**
+ * {@code StateHandleStoreUtils} collects utility methods that might be 
usefule for any {@link
+ * StateHandleStore} implementation.
+ */
+public class StateHandleStoreUtils {
+
+    /**
+     * Serializes the passed {@link StateObject} and discards the state in 
case of failure.
+     *
+     * @param stateObject the {@code StateObject} that shall be serialized.
+     * @return The serialized version of the passed {@code StateObject}.
+     * @throws Exception if an error occurred during the serialization. The 
corresponding {@code
+     *     StateObject} will be discarded in that case.
+     */
+    public static byte[] serializeStateHandle(StateObject stateObject) throws 
Exception {
+        try {
+            return InstantiationUtil.serializeObject(stateObject);
+        } catch (Exception e) {
+            try {
+                stateObject.discardState();

Review comment:
       **nit**: this contract isn't very obvious to me, maybe it makes sense to 
rename the method to `serializeOrDiscard` or similar

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -196,29 +229,55 @@ public void replace(String pathInZooKeeper, 
IntegerResourceVersion expectedVersi
 
         RetrievableStateHandle<T> newStateHandle = storage.store(state);
 
-        boolean success = false;
+        final byte[] serializedStateHandle = 
serializeStateHandle(newStateHandle);
 
+        // initialize flags to serve the failure case
+        boolean discardOldState = false;
+        boolean discardNewState = true;
         try {
-            // Serialize the new state handle. This writes the state to the 
backend.
-            byte[] serializedStateHandle = 
InstantiationUtil.serializeObject(newStateHandle);
+            setStateHandle(path, serializedStateHandle, 
expectedVersion.getValue());
+
+            // swap subject for deletion in case of success
+            discardOldState = true;
+            discardNewState = false;
+        } catch (Exception e) {
+            if (indicatesPossiblyInconsistentState(e)) {
+                // it's unclear whether the state handle metadata was written 
to ZooKeeper -
+                // hence, we don't discard any data
+                discardNewState = false;
+                throw new PossibleInconsistentStateException(e);
+            }
 
-            // Replace state handle in ZooKeeper.
-            client.setData()
-                    .withVersion(expectedVersion.getValue())
-                    .forPath(path, serializedStateHandle);
-            success = true;
-        } catch (KeeperException.NoNodeException e) {
             // We wrap the exception here so that it could be caught in 
DefaultJobGraphStore
-            throw new NotExistException("ZooKeeper node " + path + " does not 
exist.", e);
+            throw ExceptionUtils.findThrowable(e, 
KeeperException.NoNodeException.class)
+                    .map(
+                            nnee ->
+                                    new NotExistException(
+                                            "ZooKeeper node " + path + " does 
not exist.", nnee))
+                    .orElseThrow(() -> e);
         } finally {
-            if (success) {
+            if (discardOldState) {
                 oldStateHandle.discardState();
-            } else {
+            }
+
+            if (discardNewState) {
                 newStateHandle.discardState();
             }
         }
     }
 
+    // this method is provided for the sole purpose of easier testing
+    @VisibleForTesting
+    protected void setStateHandle(String path, byte[] serializedStateHandle, 
int expectedVersion)
+            throws Exception {
+        // Replace state handle in ZooKeeper.
+        client.setData().withVersion(expectedVersion).forPath(path, 
serializedStateHandle);
+    }
+
+    private boolean indicatesPossiblyInconsistentState(Exception e) {
+        return !PRE_COMMIT_EXCEPTIONS.contains(e.getClass());

Review comment:
       Checking for class means that no subclass will match. Maybe 
`NoAuthException` extended by `AuthExpiredException`.
   I'm not sure whether this is a real issue though.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
##########
@@ -160,8 +177,94 @@ public void testFailingCompletedCheckpointStoreAdd() 
throws Exception {
                 .discardState();
     }
 
+    @Test
+    public void testCleanupForGenericFailure() throws Exception {
+        testStoringFailureHandling(new FlinkRuntimeException("Expected 
exception"), 1);
+    }
+
+    @Test
+    public void testCleanupOmissionForPossibleInconsistentStateException() 
throws Exception {
+        testStoringFailureHandling(new PossibleInconsistentStateException(), 
0);
+    }
+
+    private void testStoringFailureHandling(Exception failure, int 
expectedCleanupCalls)
+            throws Exception {
+        final JobVertexID jobVertexID1 = new JobVertexID();
+
+        final ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID1)
+                        .build();
+
+        final ExecutionVertex vertex = 
graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
+        final ExecutionAttemptID attemptId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
+
+        final StandaloneCheckpointIDCounter checkpointIDCounter =
+                new StandaloneCheckpointIDCounter();
+
+        final ManuallyTriggeredScheduledExecutor 
manuallyTriggeredScheduledExecutor =
+                new ManuallyTriggeredScheduledExecutor();
+
+        final CompletedCheckpointStore completedCheckpointStore =
+                new FailingCompletedCheckpointStore(
+                        (checkpoint, ignoredCleaner, ignoredPostCleanCallback) 
-> {
+                            throw failure;
+                        });
+
+        final AtomicInteger cleanupCallCount = new AtomicInteger(0);
+        final CheckpointCoordinator checkpointCoordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setExecutionGraph(graph)
+                        .setCheckpointIDCounter(checkpointIDCounter)
+                        .setCheckpointsCleaner(
+                                new CheckpointsCleaner() {
+
+                                    private static final long serialVersionUID 
=
+                                            2029876992397573325L;
+
+                                    @Override
+                                    public void cleanCheckpointOnFailedStoring(
+                                            CompletedCheckpoint 
completedCheckpoint,
+                                            Executor executor) {
+                                        cleanupCallCount.incrementAndGet();
+                                        super.cleanCheckpointOnFailedStoring(
+                                                completedCheckpoint, executor);
+                                    }
+                                })
+                        .setCompletedCheckpointStore(completedCheckpointStore)
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        .build();
+        
checkpointCoordinator.triggerSavepoint(tmpFolder.newFolder().getAbsolutePath());
+        manuallyTriggeredScheduledExecutor.triggerAll();
+
+        try {
+            checkpointCoordinator.receiveAcknowledgeMessage(
+                    new AcknowledgeCheckpoint(
+                            graph.getJobID(), attemptId, 
checkpointIDCounter.getLast()),
+                    "unknown location");
+            fail("CheckpointException should have been thrown.");
+        } catch (CheckpointException e) {
+            assertThat(
+                    e.getCheckpointFailureReason(),
+                    is(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE));
+        }
+
+        assertThat(cleanupCallCount.get(), is(expectedCleanupCalls));
+    }
+
     private static final class FailingCompletedCheckpointStore implements 
CompletedCheckpointStore {
 
+        private final TriConsumerWithException<
+                        CompletedCheckpoint, CheckpointsCleaner, Runnable, 
Exception>
+                addCheckpointConsumer;
+
+        public FailingCompletedCheckpointStore(
+                TriConsumerWithException<
+                                CompletedCheckpoint, CheckpointsCleaner, 
Runnable, Exception>
+                        addCheckpointConsumer) {
+            this.addCheckpointConsumer = addCheckpointConsumer;

Review comment:
       I'm not sure whether the flexibility we get here with TriConsumer 
justifies the complexity added.
   Wouldn't a field `private Exception failure;` passed to constructor 
sufficicient?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to