XComp commented on a change in pull request #18869:
URL: https://github.com/apache/flink/pull/18869#discussion_r814021500



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -157,10 +167,17 @@ public ZooKeeperStateHandleStore(
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
         final String path = normalizePath(pathInZooKeeper);
-        if (exists(path).isExisting()) {
-            throw new AlreadyExistException(
-                    String.format("ZooKeeper node %s already exists.", path));
+        final Optional<Stat> nodeStat = 
nodeExistsButMightBeMarkedAsDeleted(path);
+
+        if (nodeStat.isPresent()) {
+            if (NOT_MARKED_FOR_DELETION.test(nodeStat.get())) {
+                throw new AlreadyExistException(
+                        String.format("ZooKeeper node %s already exists.", 
path));
+            }
+
+            releaseAndTryRemove(path);

Review comment:
       it's only called during the checkpoint creation (where we shouldn't run 
into the situation that a checkpoint gets deleted because only one leader 
access a checkpoint at a time) and when a JobGraph is added during a job 
submission.
   
   So, in this case, this is only added to make the implementation consistent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to