This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new aa3bb951db7 [FLINK-26987][runtime] Fixes getAllAndLock livelock
aa3bb951db7 is described below

commit aa3bb951db745f94070f2ef6ecb62ce207bda520
Author: Matthias Pohl <[email protected]>
AuthorDate: Fri Apr 1 13:44:41 2022 +0200

    [FLINK-26987][runtime] Fixes getAllAndLock livelock
    
    This livelock can happen in situations where an entry was marked
    for deletion but is not deleted, yet.
---
 .../zookeeper/ZooKeeperStateHandleStore.java       | 10 ++++--
 .../zookeeper/ZooKeeperStateHandleStoreTest.java   | 40 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index c4388d410bf..0217ffdac9b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -391,7 +391,6 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable>
         final String rootPath = "/";
         boolean success = false;
 
-        retry:
         while (!success) {
             stateHandles.clear();
 
@@ -411,8 +410,13 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable>
                         final RetrievableStateHandle<T> stateHandle = 
getAndLock(path);
                         stateHandles.add(new Tuple2<>(stateHandle, path));
                     } catch (NotExistException ignored) {
-                        // Concurrent deletion, retry
-                        continue retry;
+                        // The node is subject for deletion which can mean two 
things:
+                        // 1. The state is marked for deletion: The cVersion 
of the node does not
+                        // necessarily change. We're not interested in the 
state anymore, anyway.
+                        // Therefore, this error can be ignored.
+                        // 2. An actual concurrent deletion is going on. The 
child node is gone.
+                        // That would affect the cVersion of the parent node 
and, as a consequence,
+                        // would trigger a restart the logic through the while 
loop.
                     } catch (IOException ioException) {
                         LOG.warn(
                                 "Could not get all ZooKeeper children. Node {} 
contained "
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index ddf6fc7b207..eff53576fc1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -867,6 +867,46 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
                 
Iterables.getOnlyElement(actuallyLockedHandles).f0.retrieveState().getValue());
     }
 
+    @Test
+    public void testGetAllAndLockWhileEntryIsMarkedForDeletion() throws 
Exception {
+        final TestingLongStateHandleHelper stateHandleProvider = new 
TestingLongStateHandleHelper();
+        final CuratorFramework client =
+                ZooKeeperUtils.useNamespaceAndEnsurePath(
+                        ZOOKEEPER.getClient(), 
"/testGetAllAndLockWhileEntryIsMarkedForDeletion");
+
+        final 
ZooKeeperStateHandleStore<TestingLongStateHandleHelper.LongStateHandle>
+                stateHandleStore = new ZooKeeperStateHandleStore<>(client, 
stateHandleProvider);
+
+        final String pathInZooKeeperPrefix = "/node";
+
+        final long stateForDeletion = 42L;
+        final String handlePathForDeletion = pathInZooKeeperPrefix + 
"-for-deletion";
+        stateHandleStore.addAndLock(
+                handlePathForDeletion,
+                new 
TestingLongStateHandleHelper.LongStateHandle(stateForDeletion));
+        // marks the entry for deletion but doesn't delete it, yet
+        client.delete()
+                .deletingChildrenIfNeeded()
+                
.forPath(ZooKeeperStateHandleStore.getRootLockPath(handlePathForDeletion));
+
+        final long stateToKeep = stateForDeletion + 2;
+        stateHandleStore.addAndLock(
+                pathInZooKeeperPrefix + "-keep",
+                new TestingLongStateHandleHelper.LongStateHandle(stateToKeep));
+
+        final List<
+                        Tuple2<
+                                RetrievableStateHandle<
+                                        
TestingLongStateHandleHelper.LongStateHandle>,
+                                String>>
+                actuallyLockedHandles = stateHandleStore.getAllAndLock();
+
+        assertEquals(
+                "Only the StateHandle that was expected to be kept should be 
returned.",
+                stateToKeep,
+                
Iterables.getOnlyElement(actuallyLockedHandles).f0.retrieveState().getValue());
+    }
+
     /** Tests that the state is returned sorted. */
     @Test
     public void testGetAllSortedByName() throws Exception {

Reply via email to