XComp commented on a change in pull request #15832:
URL: https://github.com/apache/flink/pull/15832#discussion_r630810712



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -196,29 +229,55 @@ public void replace(String pathInZooKeeper, 
IntegerResourceVersion expectedVersi
 
         RetrievableStateHandle<T> newStateHandle = storage.store(state);
 
-        boolean success = false;
+        final byte[] serializedStateHandle = 
serializeStateHandle(newStateHandle);
 
+        // initialize flags to serve the failure case
+        boolean discardOldState = false;
+        boolean discardNewState = true;
         try {
-            // Serialize the new state handle. This writes the state to the 
backend.
-            byte[] serializedStateHandle = 
InstantiationUtil.serializeObject(newStateHandle);
+            setStateHandle(path, serializedStateHandle, 
expectedVersion.getValue());
+
+            // swap subject for deletion in case of success
+            discardOldState = true;
+            discardNewState = false;
+        } catch (Exception e) {
+            if (indicatesPossiblyInconsistentState(e)) {
+                // it's unclear whether the state handle metadata was written 
to ZooKeeper -
+                // hence, we don't discard any data
+                discardNewState = false;
+                throw new PossibleInconsistentStateException(e);
+            }
 
-            // Replace state handle in ZooKeeper.
-            client.setData()
-                    .withVersion(expectedVersion.getValue())
-                    .forPath(path, serializedStateHandle);
-            success = true;
-        } catch (KeeperException.NoNodeException e) {
             // We wrap the exception here so that it could be caught in 
DefaultJobGraphStore
-            throw new NotExistException("ZooKeeper node " + path + " does not 
exist.", e);
+            throw ExceptionUtils.findThrowable(e, 
KeeperException.NoNodeException.class)
+                    .map(
+                            nnee ->
+                                    new NotExistException(
+                                            "ZooKeeper node " + path + " does 
not exist.", nnee))
+                    .orElseThrow(() -> e);
         } finally {
-            if (success) {
+            if (discardOldState) {
                 oldStateHandle.discardState();
-            } else {
+            }
+
+            if (discardNewState) {
                 newStateHandle.discardState();
             }
         }
     }
 
+    // this method is provided for the sole purpose of easier testing
+    @VisibleForTesting
+    protected void setStateHandle(String path, byte[] serializedStateHandle, 
int expectedVersion)
+            throws Exception {
+        // Replace state handle in ZooKeeper.
+        client.setData().withVersion(expectedVersion).forPath(path, 
serializedStateHandle);
+    }
+
+    private boolean indicatesPossiblyInconsistentState(Exception e) {
+        return !PRE_COMMIT_EXCEPTIONS.contains(e.getClass());

Review comment:
       I would like to keep it more explicit. If subclasses are meant to be in 
that set, we should add them explicitly. Your example with 
`AuthExpiredException` is just a made up example, isn't it? I couldn't find it 
in the code. I was going through the error cases that are listed in 
[KeeperException.create](https://github.com/apache/zookeeper/blob/branch-3.4/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java#L93)
 to cover the possible cases that are triggered by the `commit()` call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to