[FLINK-5074] [runtime] Add a ZooKeeper-based RunningJobRegistry

This closes #2903


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/544f5346
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/544f5346
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/544f5346

Branch: refs/heads/master
Commit: 544f53467b901e6e891a23fc4f2ef3a6be229718
Parents: 8780cb6
Author: shuai.xus <[email protected]>
Authored: Mon Feb 13 18:33:14 2017 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Mon Feb 20 19:43:47 2017 +0100

----------------------------------------------------------------------
 .../HighAvailabilityServicesUtils.java          | 10 ++-
 .../highavailability/ZookeeperHaServices.java   |  6 +-
 .../highavailability/ZookeeperRegistry.java     | 94 ++++++++++++++++++++
 .../highavailability/ZooKeeperRegistryTest.java | 78 ++++++++++++++++
 4 files changed, 183 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/544f5346/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 9113309..fe180de 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -19,8 +19,10 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 
 public class HighAvailabilityServicesUtils {
 
@@ -32,8 +34,8 @@ public class HighAvailabilityServicesUtils {
                                return new EmbeddedNonHaServices();
 
                        case ZOOKEEPER:
-                               throw new 
UnsupportedOperationException("ZooKeeper high availability services " +
-                                               "have not been implemented 
yet.");
+                               return new 
ZookeeperHaServices(ZooKeeperUtils.startCuratorFramework(config), 
+                                               Executors.directExecutor(), 
config);
 
                        default:
                                throw new Exception("High availability mode " + 
highAvailabilityMode + " is not supported.");
@@ -49,8 +51,8 @@ public class HighAvailabilityServicesUtils {
                                final String resourceManagerAddress = null;
                                return new 
NonHaServices(resourceManagerAddress);
                        case ZOOKEEPER:
-                               throw new 
UnsupportedOperationException("ZooKeeper high availability services " +
-                                       "have not been implemented yet.");
+                               return new 
ZookeeperHaServices(ZooKeeperUtils.startCuratorFramework(configuration), 
+                                               Executors.directExecutor(), 
configuration);
                        default:
                                throw new Exception("Recovery mode " + 
highAvailabilityMode + " is not supported.");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/544f5346/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index ed0ad17..741f9e6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -97,10 +97,14 @@ public class ZookeeperHaServices implements 
HighAvailabilityServices {
        /** The runtime configuration */
        private final Configuration configuration;
 
+       /** The zookeeper based running jobs registry */
+       private final RunningJobsRegistry runningJobsRegistry;
+
        public ZookeeperHaServices(CuratorFramework client, Executor executor, 
Configuration configuration) {
                this.client = checkNotNull(client);
                this.executor = checkNotNull(executor);
                this.configuration = checkNotNull(configuration);
+               this.runningJobsRegistry = new ZookeeperRegistry(client, 
configuration);
        }
 
        // 
------------------------------------------------------------------------
@@ -149,7 +153,7 @@ public class ZookeeperHaServices implements 
HighAvailabilityServices {
 
        @Override
        public RunningJobsRegistry getRunningJobsRegistry() {
-               throw new UnsupportedOperationException("not yet implemented");
+               return runningJobsRegistry;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/544f5346/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
new file mode 100644
index 0000000..c0621af
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A zookeeper based registry for running jobs, highly available.
+ */
+public class ZookeeperRegistry implements RunningJobsRegistry {
+       
+       private static final String DEFAULT_HA_JOB_REGISTRY_PATH = 
"/running_job_registry/";
+
+       /** The ZooKeeper client to use */
+       private final CuratorFramework client;
+
+       private final String runningJobPath;
+
+       private static final String HA_JOB_REGISTRY_PATH = 
"high-availability.zookeeper.job.registry";
+
+       public ZookeeperRegistry(final CuratorFramework client, final 
Configuration configuration) {
+               this.client = client;
+               runningJobPath = 
configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT) + 
+                       configuration.getString(HA_JOB_REGISTRY_PATH, 
DEFAULT_HA_JOB_REGISTRY_PATH);
+       }
+
+       @Override
+       public void setJobRunning(JobID jobID) throws IOException {
+               checkNotNull(jobID);
+
+               try {
+                       String zkPath = runningJobPath + jobID.toString();
+                       
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
+                       this.client.setData().forPath(zkPath);
+               }
+               catch (Exception e) {
+                       throw new IOException("Set running state to zk fail for 
job " + jobID.toString(), e);
+               }
+       }
+
+       @Override
+       public void setJobFinished(JobID jobID) throws IOException {
+               checkNotNull(jobID);
+
+               try {
+                       String zkPath = runningJobPath + jobID.toString();
+                       
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
+                       this.client.delete().forPath(zkPath);
+               }
+               catch (Exception e) {
+                       throw new IOException("Set finished state to zk fail 
for job " + jobID.toString(), e);
+               }
+       }
+
+       @Override
+       public boolean isJobRunning(JobID jobID) throws IOException {
+               checkNotNull(jobID);
+
+               try {
+                       Stat stat = client.checkExists().forPath(runningJobPath 
+ jobID.toString());
+                       if (stat != null) {
+                               return true;
+                       }
+                       return false;
+               }
+               catch (Exception e) {
+                       throw new IOException("Get running state from zk fail 
for job " + jobID.toString(), e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/544f5346/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
new file mode 100644
index 0000000..72982c8
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+public class ZooKeeperRegistryTest extends TestLogger {
+       private TestingServer testingServer;
+
+       private static Logger LOG = 
LoggerFactory.getLogger(ZooKeeperRegistryTest.class);
+
+       @Before
+       public void before() throws Exception {
+               testingServer = new TestingServer();
+       }
+
+       @After
+       public void after() throws Exception {
+               testingServer.stop();
+               testingServer = null;
+       }
+
+       /**
+        * Tests that the function of ZookeeperRegistry, setJobRunning(), 
setJobFinished(), isJobRunning()
+        */
+       @Test
+       public void testZooKeeperRegistry() throws Exception {
+               Configuration configuration = new Configuration();
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
testingServer.getConnectString());
+               configuration.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
+
+               HighAvailabilityServices zkHaService = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
+               RunningJobsRegistry zkRegistry = 
zkHaService.getRunningJobsRegistry();
+
+               try {
+                       JobID jobID = JobID.generate();
+                       assertTrue(!zkRegistry.isJobRunning(jobID));
+
+                       zkRegistry.setJobRunning(jobID);
+                       assertTrue(zkRegistry.isJobRunning(jobID));
+
+                       zkRegistry.setJobFinished(jobID);
+                       assertTrue(!zkRegistry.isJobRunning(jobID));
+
+               } finally {
+                       if (zkHaService != null) {
+                               zkHaService.close();
+                       }
+               }
+       }
+}

Reply via email to