This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new aa3bb951db7 [FLINK-26987][runtime] Fixes getAllAndLock livelock
aa3bb951db7 is described below
commit aa3bb951db745f94070f2ef6ecb62ce207bda520
Author: Matthias Pohl <[email protected]>
AuthorDate: Fri Apr 1 13:44:41 2022 +0200
[FLINK-26987][runtime] Fixes getAllAndLock livelock
This livelock can happen in situations where an entry was marked
for deletion but is not deleted, yet.
---
.../zookeeper/ZooKeeperStateHandleStore.java | 10 ++++--
.../zookeeper/ZooKeeperStateHandleStoreTest.java | 40 ++++++++++++++++++++++
2 files changed, 47 insertions(+), 3 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index c4388d410bf..0217ffdac9b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -391,7 +391,6 @@ public class ZooKeeperStateHandleStore<T extends
Serializable>
final String rootPath = "/";
boolean success = false;
- retry:
while (!success) {
stateHandles.clear();
@@ -411,8 +410,13 @@ public class ZooKeeperStateHandleStore<T extends
Serializable>
final RetrievableStateHandle<T> stateHandle =
getAndLock(path);
stateHandles.add(new Tuple2<>(stateHandle, path));
} catch (NotExistException ignored) {
- // Concurrent deletion, retry
- continue retry;
+ // The node is subject for deletion which can mean two
things:
+ // 1. The state is marked for deletion: The cVersion
of the node does not
+ // necessarily change. We're not interested in the
state anymore, anyway.
+ // Therefore, this error can be ignored.
+ // 2. An actual concurrent deletion is going on. The
child node is gone.
+ // That would affect the cVersion of the parent node
and, as a consequence,
+ // would trigger a restart the logic through the while
loop.
} catch (IOException ioException) {
LOG.warn(
"Could not get all ZooKeeper children. Node {}
contained "
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index ddf6fc7b207..eff53576fc1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -867,6 +867,46 @@ public class ZooKeeperStateHandleStoreTest extends
TestLogger {
Iterables.getOnlyElement(actuallyLockedHandles).f0.retrieveState().getValue());
}
+ @Test
+ public void testGetAllAndLockWhileEntryIsMarkedForDeletion() throws
Exception {
+ final TestingLongStateHandleHelper stateHandleProvider = new
TestingLongStateHandleHelper();
+ final CuratorFramework client =
+ ZooKeeperUtils.useNamespaceAndEnsurePath(
+ ZOOKEEPER.getClient(),
"/testGetAllAndLockWhileEntryIsMarkedForDeletion");
+
+ final
ZooKeeperStateHandleStore<TestingLongStateHandleHelper.LongStateHandle>
+ stateHandleStore = new ZooKeeperStateHandleStore<>(client,
stateHandleProvider);
+
+ final String pathInZooKeeperPrefix = "/node";
+
+ final long stateForDeletion = 42L;
+ final String handlePathForDeletion = pathInZooKeeperPrefix +
"-for-deletion";
+ stateHandleStore.addAndLock(
+ handlePathForDeletion,
+ new
TestingLongStateHandleHelper.LongStateHandle(stateForDeletion));
+ // marks the entry for deletion but doesn't delete it, yet
+ client.delete()
+ .deletingChildrenIfNeeded()
+
.forPath(ZooKeeperStateHandleStore.getRootLockPath(handlePathForDeletion));
+
+ final long stateToKeep = stateForDeletion + 2;
+ stateHandleStore.addAndLock(
+ pathInZooKeeperPrefix + "-keep",
+ new TestingLongStateHandleHelper.LongStateHandle(stateToKeep));
+
+ final List<
+ Tuple2<
+ RetrievableStateHandle<
+
TestingLongStateHandleHelper.LongStateHandle>,
+ String>>
+ actuallyLockedHandles = stateHandleStore.getAllAndLock();
+
+ assertEquals(
+ "Only the StateHandle that was expected to be kept should be
returned.",
+ stateToKeep,
+
Iterables.getOnlyElement(actuallyLockedHandles).f0.retrieveState().getValue());
+ }
+
/** Tests that the state is returned sorted. */
@Test
public void testGetAllSortedByName() throws Exception {