This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e7ac3ba [FLINK-10324] Replace
ZooKeeperStateHandleStore#getAllSortedByNameAndLock by getAllAndLock
e7ac3ba is described below
commit e7ac3ba7dfcb90c21025def2bf4112b108d21afd
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Sep 12 11:55:31 2018 +0200
[FLINK-10324] Replace ZooKeeperStateHandleStore#getAllSortedByNameAndLock
by getAllAndLock
In order to reduce code duplication this commit replaces
ZooKeeperStateHandleStore#
getAllSortedByNameAndLock by getAllAndLock and do the sorting of the
entries afterwards.
The implication of this change is that we no longer try to release and
remove corrupted
entries and instead simply ignore them.
This closes #6681.
---
.../ZooKeeperCompletedCheckpointStore.java | 8 ++-
.../zookeeper/ZooKeeperStateHandleStore.java | 63 ----------------------
...oKeeperCompletedCheckpointStoreMockitoTest.java | 2 +-
.../zookeeper/ZooKeeperStateHandleStoreTest.java | 21 ++------
4 files changed, 12 insertions(+), 82 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index e443fc2..51f4008 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -35,6 +35,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.concurrent.Executor;
@@ -69,6 +71,8 @@ public class ZooKeeperCompletedCheckpointStore implements
CompletedCheckpointSto
private static final Logger LOG =
LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);
+ private static final
Comparator<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>
STRING_COMPARATOR = Comparator.comparing(o -> o.f1);
+
/** Curator ZooKeeper client. */
private final CuratorFramework client;
@@ -153,7 +157,7 @@ public class ZooKeeperCompletedCheckpointStore implements
CompletedCheckpointSto
List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>,
String>> initialCheckpoints;
while (true) {
try {
- initialCheckpoints =
checkpointsInZooKeeper.getAllSortedByNameAndLock();
+ initialCheckpoints =
checkpointsInZooKeeper.getAllAndLock();
break;
}
catch (ConcurrentModificationException e) {
@@ -161,6 +165,8 @@ public class ZooKeeperCompletedCheckpointStore implements
CompletedCheckpointSto
}
}
+ Collections.sort(initialCheckpoints, STRING_COMPARATOR);
+
int numberOfInitialCheckpoints = initialCheckpoints.size();
LOG.info("Found {} checkpoints in ZooKeeper.",
numberOfInitialCheckpoints);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index b9cd0c1..2cb1ccc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -24,7 +24,6 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
@@ -320,68 +319,6 @@ public class ZooKeeperStateHandleStore<T extends
Serializable> {
return stateHandles;
}
-
- /**
- * Gets all available state handles from ZooKeeper sorted by name
(ascending) and locks the
- * respective state nodes. The result tuples contain the retrieved
state and the path to the
- * node in ZooKeeper.
- *
- * <p>If there is a concurrent modification, the operation is retried
until it succeeds.
- *
- * @return All state handles in ZooKeeper.
- * @throws Exception If a ZooKeeper or state handle operation fails
- */
- @SuppressWarnings("unchecked")
- public List<Tuple2<RetrievableStateHandle<T>, String>>
getAllSortedByNameAndLock() throws Exception {
- final List<Tuple2<RetrievableStateHandle<T>, String>>
stateHandles = new ArrayList<>();
-
- boolean success = false;
-
- retry:
- while (!success) {
- stateHandles.clear();
-
- Stat stat = client.checkExists().forPath("/");
- if (stat == null) {
- break; // Node does not exist, done.
- } else {
- // Initial cVersion (number of changes to the
children of this node)
- int initialCVersion = stat.getCversion();
-
- List<String> children =
ZKPaths.getSortedChildren(
-
client.getZookeeperClient().getZooKeeper(),
-
ZKPaths.fixForNamespace(client.getNamespace(), "/"));
-
- for (String path : children) {
- path = "/" + path;
-
- try {
- final RetrievableStateHandle<T>
stateHandle = getAndLock(path);
- stateHandles.add(new
Tuple2<>(stateHandle, path));
- } catch
(KeeperException.NoNodeException ignored) {
- // Concurrent deletion, retry
- continue retry;
- } catch (IOException ioException) {
- LOG.warn("Could not get all
ZooKeeper children. Node {} contained " +
- "corrupted data.
Releasing and trying to remove this node.", path, ioException);
-
- releaseAndTryRemove(path);
- }
- }
-
- int finalCVersion =
client.checkExists().forPath("/").getCversion();
-
- // Check for concurrent modifications
- success = initialCVersion == finalCVersion;
-
- // we don't have to release all locked nodes in
case of a concurrent modification, because we
- // will retrieve them in the next iteration
again.
- }
- }
-
- return stateHandles;
- }
-
/**
* Releases the lock for the given state node and tries to remove the
state node if it is no longer locked.
* It returns the {@link RetrievableStateHandle} stored under the given
state node if any.
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
index 1f7d369..e9b90b7 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -126,7 +126,7 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest
extends TestLogger {
ZooKeeperStateHandleStore<CompletedCheckpoint>
zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client,
storageHelperMock));
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
-
doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
+
doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllAndLock();
final int numCheckpointsToRetain = 1;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index 2dd27e7..3c37fae 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -36,6 +36,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -365,11 +367,12 @@ public class ZooKeeperStateHandleStoreTest extends
TestLogger {
store.addAndLock(pathInZooKeeper, val);
}
- List<Tuple2<RetrievableStateHandle<Long>, String>> actual =
store.getAllSortedByNameAndLock();
+ List<Tuple2<RetrievableStateHandle<Long>, String>> actual =
store.getAllAndLock();
assertEquals(expected.length, actual.size());
// bring the elements in sort order
Arrays.sort(expected);
+ Collections.sort(actual, Comparator.comparing(o -> o.f1));
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i],
actual.get(i).f0.retrieveState());
@@ -468,22 +471,6 @@ public class ZooKeeperStateHandleStoreTest extends
TestLogger {
}
assertEquals(expected, actual);
-
- // check the same for the all sorted by name call
- allEntries = store.getAllSortedByNameAndLock();
-
- actual.clear();
-
- for (Tuple2<RetrievableStateHandle<Long>, String> entry :
allEntries) {
- actual.add(entry.f0.retrieveState());
- }
-
- assertEquals(expected, actual);
-
- Stat stat = ZOOKEEPER.getClient().checkExists().forPath("/" +
2);
-
- // check that the corrupted node no longer exists
- assertNull("The corrupted node should no longer exist.", stat);
}
/**