This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e7ac3ba  [FLINK-10324] Replace 
ZooKeeperStateHandleStore#getAllSortedByNameAndLock by getAllAndLock
e7ac3ba is described below

commit e7ac3ba7dfcb90c21025def2bf4112b108d21afd
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Sep 12 11:55:31 2018 +0200

    [FLINK-10324] Replace ZooKeeperStateHandleStore#getAllSortedByNameAndLock 
by getAllAndLock
    
    In order to reduce code duplication this commit replaces 
ZooKeeperStateHandleStore#
    getAllSortedByNameAndLock by getAllAndLock and do the sorting of the 
entries afterwards.
    The implication of this change is that we no longer try to release and 
remove corrupted
    entries and instead simply ignore them.
    
    This closes #6681.
---
 .../ZooKeeperCompletedCheckpointStore.java         |  8 ++-
 .../zookeeper/ZooKeeperStateHandleStore.java       | 63 ----------------------
 ...oKeeperCompletedCheckpointStoreMockitoTest.java |  2 +-
 .../zookeeper/ZooKeeperStateHandleStoreTest.java   | 21 ++------
 4 files changed, 12 insertions(+), 82 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index e443fc2..51f4008 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -35,6 +35,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.concurrent.Executor;
@@ -69,6 +71,8 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);
 
+       private static final 
Comparator<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> 
STRING_COMPARATOR = Comparator.comparing(o -> o.f1);
+
        /** Curator ZooKeeper client. */
        private final CuratorFramework client;
 
@@ -153,7 +157,7 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> initialCheckpoints;
                while (true) {
                        try {
-                               initialCheckpoints = 
checkpointsInZooKeeper.getAllSortedByNameAndLock();
+                               initialCheckpoints = 
checkpointsInZooKeeper.getAllAndLock();
                                break;
                        }
                        catch (ConcurrentModificationException e) {
@@ -161,6 +165,8 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                        }
                }
 
+               Collections.sort(initialCheckpoints, STRING_COMPARATOR);
+
                int numberOfInitialCheckpoints = initialCheckpoints.size();
 
                LOG.info("Found {} checkpoints in ZooKeeper.", 
numberOfInitialCheckpoints);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index b9cd0c1..2cb1ccc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -24,7 +24,6 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -320,68 +319,6 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
                return stateHandles;
        }
 
-
-       /**
-        * Gets all available state handles from ZooKeeper sorted by name 
(ascending) and locks the
-        * respective state nodes. The result tuples contain the retrieved 
state and the path to the
-        * node in ZooKeeper.
-        *
-        * <p>If there is a concurrent modification, the operation is retried 
until it succeeds.
-        *
-        * @return All state handles in ZooKeeper.
-        * @throws Exception If a ZooKeeper or state handle operation fails
-        */
-       @SuppressWarnings("unchecked")
-       public List<Tuple2<RetrievableStateHandle<T>, String>> 
getAllSortedByNameAndLock() throws Exception {
-               final List<Tuple2<RetrievableStateHandle<T>, String>> 
stateHandles = new ArrayList<>();
-
-               boolean success = false;
-
-               retry:
-               while (!success) {
-                       stateHandles.clear();
-
-                       Stat stat = client.checkExists().forPath("/");
-                       if (stat == null) {
-                               break; // Node does not exist, done.
-                       } else {
-                               // Initial cVersion (number of changes to the 
children of this node)
-                               int initialCVersion = stat.getCversion();
-
-                               List<String> children = 
ZKPaths.getSortedChildren(
-                                               
client.getZookeeperClient().getZooKeeper(),
-                                               
ZKPaths.fixForNamespace(client.getNamespace(), "/"));
-
-                               for (String path : children) {
-                                       path = "/" + path;
-
-                                       try {
-                                               final RetrievableStateHandle<T> 
stateHandle = getAndLock(path);
-                                               stateHandles.add(new 
Tuple2<>(stateHandle, path));
-                                       } catch 
(KeeperException.NoNodeException ignored) {
-                                               // Concurrent deletion, retry
-                                               continue retry;
-                                       } catch (IOException ioException) {
-                                               LOG.warn("Could not get all 
ZooKeeper children. Node {} contained " +
-                                                       "corrupted data. 
Releasing and trying to remove this node.", path, ioException);
-
-                                               releaseAndTryRemove(path);
-                                       }
-                               }
-
-                               int finalCVersion = 
client.checkExists().forPath("/").getCversion();
-
-                               // Check for concurrent modifications
-                               success = initialCVersion == finalCVersion;
-
-                               // we don't have to release all locked nodes in 
case of a concurrent modification, because we
-                               // will retrieve them in the next iteration 
again.
-                       }
-               }
-
-               return stateHandles;
-       }
-
        /**
         * Releases the lock for the given state node and tries to remove the 
state node if it is no longer locked.
         * It returns the {@link RetrievableStateHandle} stored under the given 
state node if any.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
index 1f7d369..e9b90b7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -126,7 +126,7 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest 
extends TestLogger {
 
                ZooKeeperStateHandleStore<CompletedCheckpoint> 
zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, 
storageHelperMock));
                
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
-               
doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
+               
doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllAndLock();
 
                final int numCheckpointsToRetain = 1;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index 2dd27e7..3c37fae 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -36,6 +36,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -365,11 +367,12 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
                        store.addAndLock(pathInZooKeeper, val);
                }
 
-               List<Tuple2<RetrievableStateHandle<Long>, String>> actual = 
store.getAllSortedByNameAndLock();
+               List<Tuple2<RetrievableStateHandle<Long>, String>> actual = 
store.getAllAndLock();
                assertEquals(expected.length, actual.size());
 
                // bring the elements in sort order
                Arrays.sort(expected);
+               Collections.sort(actual, Comparator.comparing(o -> o.f1));
 
                for (int i = 0; i < expected.length; i++) {
                        assertEquals(expected[i], 
actual.get(i).f0.retrieveState());
@@ -468,22 +471,6 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
                }
 
                assertEquals(expected, actual);
-
-               // check the same for the all sorted by name call
-               allEntries = store.getAllSortedByNameAndLock();
-
-               actual.clear();
-
-               for (Tuple2<RetrievableStateHandle<Long>, String> entry : 
allEntries) {
-                       actual.add(entry.f0.retrieveState());
-               }
-
-               assertEquals(expected, actual);
-
-               Stat stat = ZOOKEEPER.getClient().checkExists().forPath("/" + 
2);
-
-               // check that the corrupted node no longer exists
-               assertNull("The corrupted node should no longer exist.", stat);
        }
 
        /**

Reply via email to