tillrohrmann closed pull request #6685: [FLINK-10326] Simplify 
ZooKeeperSubmittedJobGraphStore#constructor
URL: https://github.com/apache/flink/pull/6685
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index a0301843a6d..81584293e74 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -20,7 +20,6 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -69,9 +68,6 @@
        /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
        private final Object cacheLock = new Object();
 
-       /** Client (not a namespace facade). */
-       private final CuratorFramework client;
-
        /** The set of IDs of all added job graphs. */
        private final Set<JobID> addedJobGraphs = new HashSet<>();
 
@@ -96,34 +92,20 @@
        /**
         * Submitted job graph store backed by ZooKeeper.
         *
-        * @param client ZooKeeper client
-        * @param currentJobsPath ZooKeeper path for current job graphs
-        * @param stateStorage State storage used to persist the submitted jobs
-        * @throws Exception
+        * @param zooKeeperFullBasePath ZooKeeper path for current job graphs
+        * @param zooKeeperStateHandleStore State storage used to persist the 
submitted jobs
         */
        public ZooKeeperSubmittedJobGraphStore(
-                       CuratorFramework client,
-                       String currentJobsPath,
-                       RetrievableStateStorageHelper<SubmittedJobGraph> 
stateStorage) throws Exception {
-
-               checkNotNull(currentJobsPath, "Current jobs path");
-               checkNotNull(stateStorage, "State storage");
-
-               // Keep a reference to the original client and not the 
namespace facade. The namespace
-               // facade cannot be closed.
-               this.client = checkNotNull(client, "Curator client");
-
-               // Ensure that the job graphs path exists
-               client.newNamespaceAwareEnsurePath(currentJobsPath)
-                               .ensure(client.getZookeeperClient());
+                       String zooKeeperFullBasePath,
+                       ZooKeeperStateHandleStore<SubmittedJobGraph> 
zooKeeperStateHandleStore,
+                       PathChildrenCache pathCache) {
 
-               // All operations will have the path as root
-               CuratorFramework facade = 
client.usingNamespace(client.getNamespace() + currentJobsPath);
+               checkNotNull(zooKeeperFullBasePath, "Current jobs path");
 
-               this.zooKeeperFullBasePath = client.getNamespace() + 
currentJobsPath;
-               this.jobGraphsInZooKeeper = new 
ZooKeeperStateHandleStore<>(facade, stateStorage);
+               this.zooKeeperFullBasePath = zooKeeperFullBasePath;
+               this.jobGraphsInZooKeeper = 
checkNotNull(zooKeeperStateHandleStore);
 
-               this.pathCache = new PathChildrenCache(facade, "/", false);
+               this.pathCache = checkNotNull(pathCache);
                pathCache.getListenable().addListener(new 
SubmittedJobGraphsPathCacheListener());
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index cc1ec7044c4..039dcf41d8d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -33,6 +33,7 @@
 import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
 import 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import 
org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
 import org.apache.flink.util.Preconditions;
 
@@ -41,6 +42,7 @@
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.imps.DefaultACLProvider;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
@@ -244,10 +246,23 @@ public static ZooKeeperSubmittedJobGraphStore 
createSubmittedJobGraphs(
                // ZooKeeper submitted jobs root dir
                String zooKeeperSubmittedJobsPath = 
configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
 
+               // Ensure that the job graphs path exists
+               client.newNamespaceAwareEnsurePath(zooKeeperSubmittedJobsPath)
+                       .ensure(client.getZookeeperClient());
+
+               // All operations will have the path as root
+               CuratorFramework facade = 
client.usingNamespace(client.getNamespace() + zooKeeperSubmittedJobsPath);
+
+               final String zooKeeperFullSubmittedJobsPath = 
client.getNamespace() + zooKeeperSubmittedJobsPath;
+
+               final ZooKeeperStateHandleStore<SubmittedJobGraph> 
zooKeeperStateHandleStore = new ZooKeeperStateHandleStore<>(facade, 
stateStorage);
+
+               final PathChildrenCache pathCache = new 
PathChildrenCache(facade, "/", false);
+
                return new ZooKeeperSubmittedJobGraphStore(
-                       client,
-                       zooKeeperSubmittedJobsPath,
-                       stateStorage);
+                       zooKeeperFullSubmittedJobsPath,
+                       zooKeeperStateHandleStore,
+                       pathCache);
        }
 
        /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
index fae84591b55..04b7792be6a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStoreTest.java
@@ -24,9 +24,11 @@
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -98,9 +100,9 @@ public void testJobGraphRemovalFailureAndLockRelease() 
throws Exception {
        @Nonnull
        public ZooKeeperSubmittedJobGraphStore 
createSubmittedJobGraphStore(CuratorFramework client, 
TestingRetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws 
Exception {
                return new ZooKeeperSubmittedJobGraphStore(
-                       client,
-                       "/foobar",
-                       stateStorage);
+                       client.getNamespace(),
+                       new ZooKeeperStateHandleStore<>(client, stateStorage),
+                       new PathChildrenCache(client, "/", false));
        }
 
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index e9be145c37f..23c27252888 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -27,17 +27,22 @@
 import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorRef;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
@@ -59,9 +64,9 @@
  */
 public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger {
 
-       private final static ZooKeeperTestEnvironment ZooKeeper = new 
ZooKeeperTestEnvironment(1);
+       private static final ZooKeeperTestEnvironment ZooKeeper = new 
ZooKeeperTestEnvironment(1);
 
-       private final static RetrievableStateStorageHelper<SubmittedJobGraph> 
localStateStorage = new RetrievableStateStorageHelper<SubmittedJobGraph>() {
+       private static final RetrievableStateStorageHelper<SubmittedJobGraph> 
localStateStorage = new RetrievableStateStorageHelper<SubmittedJobGraph>() {
                @Override
                public RetrievableStateHandle<SubmittedJobGraph> 
store(SubmittedJobGraph state) throws IOException {
                        ByteStreamStateHandle byteStreamStateHandle = new 
ByteStreamStateHandle(
@@ -71,7 +76,6 @@
                }
        };
 
-
        @AfterClass
        public static void tearDown() throws Exception {
                if (ZooKeeper != null) {
@@ -86,10 +90,7 @@ public void cleanUp() throws Exception {
 
        @Test
        public void testPutAndRemoveJobGraph() throws Exception {
-               ZooKeeperSubmittedJobGraphStore jobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
-                       ZooKeeper.createClient(),
-                       "/testPutAndRemoveJobGraph",
-                       localStateStorage);
+               ZooKeeperSubmittedJobGraphStore jobGraphs = 
createZooKeeperSubmittedJobGraphStore("/testPutAndRemoveJobGraph");
 
                try {
                        SubmittedJobGraphListener listener = 
mock(SubmittedJobGraphListener.class);
@@ -142,10 +143,25 @@ public void testPutAndRemoveJobGraph() throws Exception {
                }
        }
 
+       @Nonnull
+       private ZooKeeperSubmittedJobGraphStore 
createZooKeeperSubmittedJobGraphStore(String fullPath) throws Exception {
+               final CuratorFramework client = ZooKeeper.getClient();
+               // Ensure that the job graphs path exists
+               
client.newNamespaceAwareEnsurePath(fullPath).ensure(client.getZookeeperClient());
+
+               // All operations will have the path as root
+               CuratorFramework facade = 
client.usingNamespace(client.getNamespace() + fullPath);
+               return new ZooKeeperSubmittedJobGraphStore(
+                       fullPath,
+                       new ZooKeeperStateHandleStore<>(
+                               facade,
+                               localStateStorage),
+                       new PathChildrenCache(facade, "/", false));
+       }
+
        @Test
        public void testRecoverJobGraphs() throws Exception {
-               ZooKeeperSubmittedJobGraphStore jobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
-                               ZooKeeper.createClient(), 
"/testRecoverJobGraphs", localStateStorage);
+               ZooKeeperSubmittedJobGraphStore jobGraphs = 
createZooKeeperSubmittedJobGraphStore("/testRecoverJobGraphs");
 
                try {
                        SubmittedJobGraphListener listener = 
mock(SubmittedJobGraphListener.class);
@@ -195,12 +211,9 @@ public void testConcurrentAddJobGraph() throws Exception {
                ZooKeeperSubmittedJobGraphStore otherJobGraphs = null;
 
                try {
-                       jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", localStateStorage);
-
-                       otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", localStateStorage);
+                       jobGraphs = 
createZooKeeperSubmittedJobGraphStore("/testConcurrentAddJobGraph");
 
+                       otherJobGraphs = 
createZooKeeperSubmittedJobGraphStore("/testConcurrentAddJobGraph");
 
                        SubmittedJobGraph jobGraph = 
createSubmittedJobGraph(new JobID(), 0);
                        SubmittedJobGraph otherJobGraph = 
createSubmittedJobGraph(new JobID(), 0);
@@ -254,11 +267,9 @@ public Void answer(InvocationOnMock invocation) throws 
Throwable {
 
        @Test(expected = IllegalStateException.class)
        public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
-               ZooKeeperSubmittedJobGraphStore jobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
-                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
+               ZooKeeperSubmittedJobGraphStore jobGraphs = 
createZooKeeperSubmittedJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
 
-               ZooKeeperSubmittedJobGraphStore otherJobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
-                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
+               ZooKeeperSubmittedJobGraphStore otherJobGraphs = 
createZooKeeperSubmittedJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
 
                jobGraphs.start(null);
                otherJobGraphs.start(null);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to