dmvk commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r743562938



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                
client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));

Review comment:
       It could indeed result in the `NodeExistsException`. However this 
wouldn't introduce inconsistencies, because we don't discard the underlying 
state.
   
   On the other hand, this would trigger the "unexpected" catch block bellow 
... 🤔 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                
client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));

Review comment:
       We can simply get rid of this write lock. It was meant just a safeguard, 
but it seems to introduce more corner cases. If we trust the leader election 
mechanism here, it's not needed. I'll remove it.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                
client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));

Review comment:
       We can simply get rid of this write lock. It was meant as a safeguard, 
but it seems to introduce more corner cases. If we trust the leader election 
mechanism here, it's not needed. I'll remove it.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                
client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));
+            } catch (KeeperException.NodeExistsException e) {
+                // There should always be a single JobMaster for each job, so 
we should never
+                // encounter this.
+                throw new IllegalStateException(
+                        "Write lock is already taken, which signals there are 
multiple processes writing to the same path. This should never happen.",
+                        e);
+            }
+            if (exists(path).isExisting()) {
+                throw new AlreadyExistException(
+                        String.format("ZooKeeper node %s already exists.", 
path));
+            }
+            return doAddAndLock(path, state);
+        } finally {
+            // Release the write-lock.
+            deleteIfExists(getWriteLockPath(path));
+        }
+    }
 
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+    private RetrievableStateHandle<T> doAddAndLock(String path, T state) 
throws Exception {
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
         } catch (Exception e) {
             if (indicatesPossiblyInconsistentState(e)) {
                 throw new PossibleInconsistentStateException(e);
             }
-
-            // in any other failure case: discard the state
+            // In case of any other failure, discard the state and rethrow the 
exception.
             storeHandle.discardState();
-
-            // We wrap the exception here so that it could be caught in 
DefaultJobGraphStore
-            throw ExceptionUtils.findThrowable(e, 
KeeperException.NodeExistsException.class)
-                    .map(
-                            nee ->
-                                    new AlreadyExistException(
-                                            "ZooKeeper node " + path + " 
already exists.", nee))
-                    .orElseThrow(() -> e);
+            throw e;
         }
     }
 
     // this method is provided for the sole purpose of easier testing
     @VisibleForTesting
     protected void writeStoreHandleTransactionally(String path, byte[] 
serializedStoreHandle)
             throws Exception {
-        // Write state handle (not the actual state) to ZooKeeper. This is 
expected to be
-        // smaller than the state itself. This level of indirection makes sure 
that data in
-        // ZooKeeper is small, because ZooKeeper is designed for data in the 
KB range, but
-        // the state can be larger.
-        // Create the lock node in a transaction with the actual state node. 
That way we can
+        // Write state handle (not the actual state) to ZooKeeper. This is 
expected to be smaller
+        // than the state itself. This level of indirection makes sure that 
data in ZooKeeper is
+        // small, because ZooKeeper is designed for data in the KB range, but 
the state can be
+        // larger. Create the lock node in a transaction with the actual state 
node. That way we can
         // prevent race conditions with a concurrent delete operation.
-        client.inTransaction()
-                .create()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path, serializedStoreHandle)
-                .and()
-                .create()
-                .withMode(CreateMode.EPHEMERAL)
-                .forPath(getLockPath(path))
-                .and()
-                .commit();
+        while (true) {
+            try {
+                client.inTransaction()
+                        .create()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, serializedStoreHandle)
+                        .and()
+                        .create()
+                        .withMode(CreateMode.EPHEMERAL)
+                        .forPath(getLockPath(path))
+                        .and()
+                        .commit();
+                break;
+            } catch (KeeperException.NodeExistsException e) {

Review comment:
       We now check whether node is empty before executing 
`writeStoreHandleTransactionally()`, so this is hopefully covered by that. We 
can IMO trust the transaction mechanism here (assuming there are no concurrent 
modifications to the znode).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to