tillrohrmann commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r746360946



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,43 +156,40 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
-
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+        if (exists(path).isExisting()) {
+            throw new AlreadyExistException(
+                    String.format("ZooKeeper node %s already exists.", path));
+        }
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
+        } catch (KeeperException.NodeExistsException e) {
+            // Transactions are not idempotent in the curator version we're 
currently using, so it
+            // is actually possible that we've re-tried a transaction that has 
already succeeded.
+            // We've ensured that the node hasn't been present prior executing 
the transaction, so
+            // we can assume that this is a result of the retry mechanism.
+            return storeHandle;

Review comment:
       Has `replace` the same problem?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -87,18 +88,19 @@
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
 
-    @VisibleForTesting
-    static final Set<Class<? extends KeeperException>> PRE_COMMIT_EXCEPTIONS =
-            newHashSet(
-                    KeeperException.NodeExistsException.class,
-                    KeeperException.BadArgumentsException.class,
-                    KeeperException.NoNodeException.class,
-                    KeeperException.NoAuthException.class,
-                    KeeperException.BadVersionException.class,
-                    KeeperException.AuthFailedException.class,
-                    KeeperException.InvalidACLException.class,
-                    KeeperException.SessionMovedException.class,
-                    KeeperException.NotReadOnlyException.class);
+    /** Pre-commit exceptions that don't imply data inconsistency. */
+    private static final Set<Class<? extends KeeperException>> 
SAFE_PRE_COMMIT_EXCEPTIONS =
+            new HashSet<>(
+                    Arrays.asList(
+                            KeeperException.NodeExistsException.class,
+                            KeeperException.BadArgumentsException.class,
+                            KeeperException.NoNodeException.class,
+                            KeeperException.NoAuthException.class,
+                            KeeperException.BadVersionException.class,
+                            KeeperException.AuthFailedException.class,
+                            KeeperException.InvalidACLException.class,
+                            KeeperException.SessionMovedException.class,
+                            KeeperException.NotReadOnlyException.class));

Review comment:
       Should we double check that any of these exceptions is safe in the sense 
that if we see it, then no metadata has been written (for `NodeExistsException` 
this does not hold true)? Maybe it is possible that we write something, then 
there is a connection loss and then a session moved. Can this happen? If yes, 
then we shouldn't consider `SessionMovedException`, for example, as safe.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -87,18 +88,19 @@
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
 
-    @VisibleForTesting
-    static final Set<Class<? extends KeeperException>> PRE_COMMIT_EXCEPTIONS =
-            newHashSet(
-                    KeeperException.NodeExistsException.class,
-                    KeeperException.BadArgumentsException.class,
-                    KeeperException.NoNodeException.class,
-                    KeeperException.NoAuthException.class,
-                    KeeperException.BadVersionException.class,
-                    KeeperException.AuthFailedException.class,
-                    KeeperException.InvalidACLException.class,
-                    KeeperException.SessionMovedException.class,
-                    KeeperException.NotReadOnlyException.class);
+    /** Pre-commit exceptions that don't imply data inconsistency. */
+    private static final Set<Class<? extends KeeperException>> 
SAFE_PRE_COMMIT_EXCEPTIONS =
+            new HashSet<>(
+                    Arrays.asList(
+                            KeeperException.NodeExistsException.class,

Review comment:
       If `replace` has the same problem, does it make sense to remove 
`NodeExistsException` from this list?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to