tillrohrmann closed pull request #6685: [FLINK-10326] Simplify
ZooKeeperSubmittedJobGraphStore#constructor
URL: https://github.com/apache/flink/pull/6685
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index a0301843a6d..81584293e74 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -20,7 +20,6 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
@@ -69,9 +68,6 @@
/** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
private final Object cacheLock = new Object();
- /** Client (not a namespace facade). */
- private final CuratorFramework client;
-
/** The set of IDs of all added job graphs. */
private final Set<JobID> addedJobGraphs = new HashSet<>();
@@ -96,34 +92,20 @@
/**
* Submitted job graph store backed by ZooKeeper.
*
- * @param client ZooKeeper client
- * @param currentJobsPath ZooKeeper path for current job graphs
- * @param stateStorage State storage used to persist the submitted jobs
- * @throws Exception
+ * @param zooKeeperFullBasePath ZooKeeper path for current job graphs
+ * @param zooKeeperStateHandleStore State storage used to persist the
submitted jobs
*/
public ZooKeeperSubmittedJobGraphStore(
- CuratorFramework client,
- String currentJobsPath,
- RetrievableStateStorageHelper<SubmittedJobGraph>
stateStorage) throws Exception {
-
- checkNotNull(currentJobsPath, "Current jobs path");
- checkNotNull(stateStorage, "State storage");
-
- // Keep a reference to the original client and not the
namespace facade. The namespace
- // facade cannot be closed.
- this.client = checkNotNull(client, "Curator client");
-
- // Ensure that the job graphs path exists
- client.newNamespaceAwareEnsurePath(currentJobsPath)
- .ensure(client.getZookeeperClient());
+ String zooKeeperFullBasePath,
+ ZooKeeperStateHandleStore<SubmittedJobGraph>
zooKeeperStateHandleStore,
+ PathChildrenCache pathCache) {
- // All operations will have the path as root
- CuratorFramework facade =
client.usingNamespace(client.getNamespace() + currentJobsPath);
+ checkNotNull(zooKeeperFullBasePath, "Current jobs path");
- this.zooKeeperFullBasePath = client.getNamespace() +
currentJobsPath;
- this.jobGraphsInZooKeeper = new
ZooKeeperStateHandleStore<>(facade, stateStorage);
+ this.zooKeeperFullBasePath = zooKeeperFullBasePath;
+ this.jobGraphsInZooKeeper =
checkNotNull(zooKeeperStateHandleStore);
- this.pathCache = new PathChildrenCache(facade, "/", false);
+ this.pathCache = checkNotNull(pathCache);
pathCache.getListenable().addListener(new
SubmittedJobGraphsPathCacheListener());
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index cc1ec7044c4..039dcf41d8d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -33,6 +33,7 @@
import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
import
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import
org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
import org.apache.flink.util.Preconditions;
@@ -41,6 +42,7 @@
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
@@ -244,10 +246,23 @@ public static ZooKeeperSubmittedJobGraphStore
createSubmittedJobGraphs(
// ZooKeeper submitted jobs root dir
String zooKeeperSubmittedJobsPath =
configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
+ // Ensure that the job graphs path exists
+ client.newNamespaceAwareEnsurePath(zooKeeperSubmittedJobsPath)
+ .ensure(client.getZookeeperClient());
+
+ // All operations will have the path as root
+ CuratorFramework facade =
client.usingNamespace(client.getNamespace() + zooKeeperSubmittedJobsPath);
+
+ final String zooKeeperFullSubmittedJobsPath =
client.getNamespace() + zooKeeperSubmittedJobsPath;
+
+ final ZooKeeperStateHandleStore<SubmittedJobGraph>
zooKeeperStateHandleStore = new ZooKeeperStateHandleStore<>(facade,
stateStorage);
+
+ final PathChildrenCache pathCache = new
PathChildrenCache(facade, "/", false);
+
return new ZooKeeperSubmittedJobGraphStore(
- client,
- zooKeeperSubmittedJobsPath,
- stateStorage);
+ zooKeeperFullSubmittedJobsPath,
+ zooKeeperStateHandleStore,
+ pathCache);
}
/**
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
index fae84591b55..04b7792be6a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
@@ -24,9 +24,11 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.TestLogger;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -98,9 +100,9 @@ public void testJobGraphRemovalFailureAndLockRelease()
throws Exception {
@Nonnull
public ZooKeeperSubmittedJobGraphStore
createSubmittedJobGraphStore(CuratorFramework client,
TestingRetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws
Exception {
return new ZooKeeperSubmittedJobGraphStore(
- client,
- "/foobar",
- stateStorage);
+ client.getNamespace(),
+ new ZooKeeperStateHandleStore<>(client, stateStorage),
+ new PathChildrenCache(client, "/", false));
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index e9be145c37f..23c27252888 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -27,17 +27,22 @@
import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import akka.actor.ActorRef;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
@@ -59,9 +64,9 @@
*/
public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
- private final static ZooKeeperTestEnvironment ZooKeeper = new
ZooKeeperTestEnvironment(1);
+ private static final ZooKeeperTestEnvironment ZooKeeper = new
ZooKeeperTestEnvironment(1);
- private final static RetrievableStateStorageHelper<SubmittedJobGraph>
localStateStorage = new RetrievableStateStorageHelper<SubmittedJobGraph>() {
+ private static final RetrievableStateStorageHelper<SubmittedJobGraph>
localStateStorage = new RetrievableStateStorageHelper<SubmittedJobGraph>() {
@Override
public RetrievableStateHandle<SubmittedJobGraph>
store(SubmittedJobGraph state) throws IOException {
ByteStreamStateHandle byteStreamStateHandle = new
ByteStreamStateHandle(
@@ -71,7 +76,6 @@
}
};
-
@AfterClass
public static void tearDown() throws Exception {
if (ZooKeeper != null) {
@@ -86,10 +90,7 @@ public void cleanUp() throws Exception {
@Test
public void testPutAndRemoveJobGraph() throws Exception {
- ZooKeeperSubmittedJobGraphStore jobGraphs = new
ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(),
- "/testPutAndRemoveJobGraph",
- localStateStorage);
+ ZooKeeperSubmittedJobGraphStore jobGraphs =
createZooKeeperSubmittedJobGraphStore("/testPutAndRemoveJobGraph");
try {
SubmittedJobGraphListener listener =
mock(SubmittedJobGraphListener.class);
@@ -142,10 +143,25 @@ public void testPutAndRemoveJobGraph() throws Exception {
}
}
+ @Nonnull
+ private ZooKeeperSubmittedJobGraphStore
createZooKeeperSubmittedJobGraphStore(String fullPath) throws Exception {
+ final CuratorFramework client = ZooKeeper.getClient();
+ // Ensure that the job graphs path exists
+
client.newNamespaceAwareEnsurePath(fullPath).ensure(client.getZookeeperClient());
+
+ // All operations will have the path as root
+ CuratorFramework facade =
client.usingNamespace(client.getNamespace() + fullPath);
+ return new ZooKeeperSubmittedJobGraphStore(
+ fullPath,
+ new ZooKeeperStateHandleStore<>(
+ facade,
+ localStateStorage),
+ new PathChildrenCache(facade, "/", false));
+ }
+
@Test
public void testRecoverJobGraphs() throws Exception {
- ZooKeeperSubmittedJobGraphStore jobGraphs = new
ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(),
"/testRecoverJobGraphs", localStateStorage);
+ ZooKeeperSubmittedJobGraphStore jobGraphs =
createZooKeeperSubmittedJobGraphStore("/testRecoverJobGraphs");
try {
SubmittedJobGraphListener listener =
mock(SubmittedJobGraphListener.class);
@@ -195,12 +211,9 @@ public void testConcurrentAddJobGraph() throws Exception {
ZooKeeperSubmittedJobGraphStore otherJobGraphs = null;
try {
- jobGraphs = new ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(),
"/testConcurrentAddJobGraph", localStateStorage);
-
- otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(),
"/testConcurrentAddJobGraph", localStateStorage);
+ jobGraphs =
createZooKeeperSubmittedJobGraphStore("/testConcurrentAddJobGraph");
+ otherJobGraphs =
createZooKeeperSubmittedJobGraphStore("/testConcurrentAddJobGraph");
SubmittedJobGraph jobGraph =
createSubmittedJobGraph(new JobID(), 0);
SubmittedJobGraph otherJobGraph =
createSubmittedJobGraph(new JobID(), 0);
@@ -254,11 +267,9 @@ public Void answer(InvocationOnMock invocation) throws
Throwable {
@Test(expected = IllegalStateException.class)
public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
- ZooKeeperSubmittedJobGraphStore jobGraphs = new
ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(),
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
+ ZooKeeperSubmittedJobGraphStore jobGraphs =
createZooKeeperSubmittedJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
- ZooKeeperSubmittedJobGraphStore otherJobGraphs = new
ZooKeeperSubmittedJobGraphStore(
- ZooKeeper.createClient(),
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
+ ZooKeeperSubmittedJobGraphStore otherJobGraphs =
createZooKeeperSubmittedJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
jobGraphs.start(null);
otherJobGraphs.start(null);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services