dmvk commented on a change in pull request #18869:
URL: https://github.com/apache/flink/pull/18869#discussion_r813811641
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -157,10 +167,17 @@ public ZooKeeperStateHandleStore(
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
checkNotNull(state, "State");
final String path = normalizePath(pathInZooKeeper);
- if (exists(path).isExisting()) {
- throw new AlreadyExistException(
- String.format("ZooKeeper node %s already exists.", path));
+ final Optional<Stat> nodeStat =
nodeExistsButMightBeMarkedAsDeleted(path);
+
+ if (nodeStat.isPresent()) {
+ if (NOT_MARKED_FOR_DELETION.test(nodeStat.get())) {
+ throw new AlreadyExistException(
+ String.format("ZooKeeper node %s already exists.",
path));
+ }
+
+ releaseAndTryRemove(path);
Review comment:
🤔 Under which circumstances could this happen?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -102,6 +109,9 @@
KeeperException.SessionMovedException.class,
KeeperException.NotReadOnlyException.class));
+ private static final Predicate<Stat> NOT_MARKED_FOR_DELETION =
+ stat -> stat.getNumChildren() > 0;
Review comment:
This seems unnecessarily complex
```diff
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index b9ca22e6449..3255737f0d8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -109,9 +109,6 @@ public class ZooKeeperStateHandleStore<T extends
Serializable>
KeeperException.SessionMovedException.class,
KeeperException.NotReadOnlyException.class));
- private static final Predicate<Stat> NOT_MARKED_FOR_DELETION =
- stat -> stat.getNumChildren() > 0;
-
/** Curator ZooKeeper client. */
private final CuratorFramework client;
@@ -167,10 +164,10 @@ public class ZooKeeperStateHandleStore<T extends
Serializable>
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
checkNotNull(state, "State");
final String path = normalizePath(pathInZooKeeper);
- final Optional<Stat> nodeStat =
nodeExistsButMightBeMarkedAsDeleted(path);
+ final Optional<Stat> maybeStat = getStat(path);
- if (nodeStat.isPresent()) {
- if (NOT_MARKED_FOR_DELETION.test(nodeStat.get())) {
+ if (maybeStat.isPresent()) {
+ if (isAlive(maybeStat.get())) {
throw new AlreadyExistException(
String.format("ZooKeeper node %s already exists.",
path));
}
@@ -320,25 +317,20 @@ public class ZooKeeperStateHandleStore<T extends
Serializable>
@Override
public IntegerResourceVersion exists(String pathInZooKeeper) throws
Exception {
checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
-
- return getStatIfNotMarkedAsDeleted(pathInZooKeeper)
+ return getStat(pathInZooKeeper)
+ .filter(ZooKeeperStateHandleStore::isAlive)
.map(stat ->
IntegerResourceVersion.valueOf(stat.getVersion()))
.orElse(IntegerResourceVersion.notExisting());
}
- private Optional<Stat> getStatIfNotMarkedAsDeleted(String path) throws
Exception {
- return getStatIfExistingInternal(path, NOT_MARKED_FOR_DELETION);
- }
-
- private Optional<Stat> nodeExistsButMightBeMarkedAsDeleted(String path)
throws Exception {
- return getStatIfExistingInternal(path, Predicates.alwaysTrue());
+ private static boolean isAlive(Stat stat) {
+ return stat.getNumChildren() > 0;
}
- private Optional<Stat> getStatIfExistingInternal(String path,
Predicate<Stat> statCondition)
+ private Optional<Stat> getStat(String path)
throws Exception {
final String normalizedPath = normalizePath(path);
- return
Optional.ofNullable(client.checkExists().forPath(normalizedPath))
- .filter(statCondition);
+ return
Optional.ofNullable(client.checkExists().forPath(normalizedPath));
}
/**
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -537,7 +603,17 @@ public String toString() {
*/
@VisibleForTesting
String getLockPath(String rootPath) {
- return rootPath + '/' + lockNode;
+ return getLocksChildPath(rootPath) + '/' + lockNode;
+ }
+
+ /**
+ * Returns the sub-path for lock nodes of the corresponding node (referred
to through the passed
+ * {@code rooPath}. The returned sub-path collects the lock nodes for the
{@code rootPath}'s
+ * node. The {@code rootPath} is marked for deletion if the sub-path for
lock nodes is deleted.
+ */
+ @VisibleForTesting
+ static String getLocksChildPath(String rootPath) {
Review comment:
Maybe `getRootLockPath` + `getInstanceLockPath` could be more explicit?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
##########
@@ -954,4 +1212,38 @@ public void testRemoveAllHandlesShouldRemoveAllPaths()
throws Exception {
assertThat(zkStore.getAllHandles(), is(empty()));
}
+
+ @Test
+ public void testGetAllHandlesWithMarkedForDeletionEntries() throws
Exception {
Review comment:
hmm this seems inconsistent with what I did in k8s implementation 🤔
so if I understand it correctly, we always want `getAllHandles` to return
everything
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -375,7 +415,7 @@ public IntegerResourceVersion exists(String
pathInZooKeeper) throws Exception {
try {
final RetrievableStateHandle<T> stateHandle =
getAndLock(path);
stateHandles.add(new Tuple2<>(stateHandle, path));
- } catch (KeeperException.NoNodeException ignored) {
+ } catch (NotExistException ignored) {
Review comment:
how did this ever work? Do we have a test case covering this?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -219,6 +242,11 @@ public void replace(String pathInZooKeeper,
IntegerResourceVersion expectedVersi
final String path = normalizePath(pathInZooKeeper);
+ checkState(
+ hasLock(path),
+ "'{}' is only allowed to be replaced if the instance has a
lock on this node.",
+ path);
Review comment:
Why do we need this check now?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -375,7 +415,7 @@ public IntegerResourceVersion exists(String
pathInZooKeeper) throws Exception {
try {
final RetrievableStateHandle<T> stateHandle =
getAndLock(path);
stateHandles.add(new Tuple2<>(stateHandle, path));
- } catch (KeeperException.NoNodeException ignored) {
+ } catch (NotExistException ignored) {
// Concurrent deletion, retry
continue retry;
Review comment:
oh man, goto statement spotted 🙈
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
##########
@@ -137,6 +161,141 @@ public void testAddAndLock() throws Exception {
assertEquals(state, actual);
}
+ @Test
+ public void testAddAndLockOnMarkedForDeletionNode() throws Exception {
+ final CuratorFramework client =
+ ZooKeeperUtils.useNamespaceAndEnsurePath(
+ ZOOKEEPER.getClient(),
"/testAddAndLockOnMarkedForDeletionNode");
+ final
ZooKeeperStateHandleStore<TestingLongStateHandleHelper.LongStateHandle> zkStore
=
+ new ZooKeeperStateHandleStore<>(client, new
TestingLongStateHandleHelper());
+
+ final String markedForDeletionNode = "marked-for-deletion";
+ final long oldStateValue = 1L;
+ zkStore.addAndLock(
+ generateZookeeperPath(markedForDeletionNode),
+ new
TestingLongStateHandleHelper.LongStateHandle(oldStateValue));
+
+ markNodeForDeletion(client, markedForDeletionNode);
+
+ // Test
+ final long updatedStateValue = oldStateValue + 2;
+ zkStore.addAndLock(
+ generateZookeeperPath(markedForDeletionNode),
+ new
TestingLongStateHandleHelper.LongStateHandle(updatedStateValue));
+
+ // Verify
+ // State handle created
+ assertEquals(1, zkStore.getAllAndLock().size());
+ assertEquals(
+ updatedStateValue,
+
zkStore.getAndLock(generateZookeeperPath(markedForDeletionNode))
+ .retrieveState()
+ .getValue());
+ }
+
+ @Test
+ public void testRepeatableCleanup() throws Exception {
+ final
ZooKeeperStateHandleStore<TestingLongStateHandleHelper.LongStateHandle>
testInstance =
+ new ZooKeeperStateHandleStore<>(
+ ZOOKEEPER.getClient(), new
TestingLongStateHandleHelper());
+
+ final String pathInZooKeeper = "/testRepeatableCleanup";
+
+ final RuntimeException expectedException =
+ new RuntimeException("Expected RuntimeException");
+ final TestingLongStateHandleHelper.LongStateHandle stateHandle =
+ new TestingLongStateHandleHelper.LongStateHandle(12354L,
expectedException);
+
+ testInstance.addAndLock(pathInZooKeeper, stateHandle);
+
+ try {
+ testInstance.releaseAndTryRemove(pathInZooKeeper);
Review comment:
nit: assertThrows?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -423,25 +462,27 @@ public boolean releaseAndTryRemove(String
pathInZooKeeper) throws Exception {
release(pathInZooKeeper);
try {
- client.delete().forPath(path);
+ deleteIdempotently(getLocksChildPath(path));
} catch (KeeperException.NotEmptyException ignored) {
- LOG.debug("Could not delete znode {} because it is still locked.",
path);
+ LOG.debug(
+ "Could not delete znode {} because it is still locked.",
+ getLocksChildPath(path));
return false;
}
if (stateHandle != null) {
stateHandle.discardState();
}
+ deleteIdempotently(path);
Review comment:
```suggestion
// We can now "commit" the removal by removing the root directory.
deleteIdempotently(path);
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -529,6 +579,22 @@ public String toString() {
// Protected methods
//
---------------------------------------------------------------------------------------------------------
+ /**
+ * Checks whether a lock is created for this instance on the passed
ZooKeeper node.
+ *
+ * @param rootPath The node that shall be checked.
+ * @return {@code true} if the lock exists; {@code false} otherwise.
+ */
+ private boolean hasLock(String rootPath) throws Exception {
+ final String normalizedRootPath = normalizePath(rootPath);
+ try {
+ return
client.checkExists().forPath(getLockPath(normalizedRootPath)) != null;
Review comment:
Under which circumstances can this return null? (as non existent node
seems to throw an exception)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -474,13 +515,22 @@ public void releaseAndTryRemoveAll() throws Exception {
@Override
public void release(String pathInZooKeeper) throws Exception {
final String path = normalizePath(pathInZooKeeper);
+ final String lockPath = getLockPath(path);
try {
- client.delete().forPath(getLockPath(path));
- } catch (KeeperException.NoNodeException ignored) {
- // we have never locked this node
+ deleteIdempotently(lockPath);
} catch (Exception e) {
- throw new Exception(
- "Could not release the lock: " +
getLockPath(pathInZooKeeper) + '.', e);
+ throw new Exception("Could not release the lock: " + lockPath +
'.', e);
+ }
+ }
+
+ private void deleteIdempotently(String path) throws Exception {
Review comment:
It took me a while to understand what's meant as idempotent here. How
about:
```suggestion
private void deleteIfExists(String path) throws Exception {
```
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]