XComp commented on a change in pull request #18869:
URL: https://github.com/apache/flink/pull/18869#discussion_r814021500
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -157,10 +167,17 @@ public ZooKeeperStateHandleStore(
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
checkNotNull(state, "State");
final String path = normalizePath(pathInZooKeeper);
- if (exists(path).isExisting()) {
- throw new AlreadyExistException(
- String.format("ZooKeeper node %s already exists.", path));
+ final Optional<Stat> nodeStat =
nodeExistsButMightBeMarkedAsDeleted(path);
+
+ if (nodeStat.isPresent()) {
+ if (NOT_MARKED_FOR_DELETION.test(nodeStat.get())) {
+ throw new AlreadyExistException(
+ String.format("ZooKeeper node %s already exists.",
path));
+ }
+
+ releaseAndTryRemove(path);
Review comment:
it's only called during the checkpoint creation (where we shouldn't run
into the situation that a checkpoint gets deleted because only one leader
access a checkpoint at a time) and when a JobGraph is added during a job
submission.
So, in this case, this is only added to make the implementation consistent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]