[FLINK-5074] [runtime] Add a ZooKeeper-based RunningJobRegistry This closes #2903
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/544f5346 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/544f5346 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/544f5346 Branch: refs/heads/master Commit: 544f53467b901e6e891a23fc4f2ef3a6be229718 Parents: 8780cb6 Author: shuai.xus <[email protected]> Authored: Mon Feb 13 18:33:14 2017 +0800 Committer: Stephan Ewen <[email protected]> Committed: Mon Feb 20 19:43:47 2017 +0100 ---------------------------------------------------------------------- .../HighAvailabilityServicesUtils.java | 10 ++- .../highavailability/ZookeeperHaServices.java | 6 +- .../highavailability/ZookeeperRegistry.java | 94 ++++++++++++++++++++ .../highavailability/ZooKeeperRegistryTest.java | 78 ++++++++++++++++ 4 files changed, 183 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/544f5346/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java index 9113309..fe180de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java @@ -19,8 +19,10 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.ZooKeeperUtils; public class HighAvailabilityServicesUtils { @@ -32,8 +34,8 @@ public class HighAvailabilityServicesUtils { return new EmbeddedNonHaServices(); case ZOOKEEPER: - throw new UnsupportedOperationException("ZooKeeper high availability services " + - "have not been implemented yet."); + return new ZookeeperHaServices(ZooKeeperUtils.startCuratorFramework(config), + Executors.directExecutor(), config); default: throw new Exception("High availability mode " + highAvailabilityMode + " is not supported."); @@ -49,8 +51,8 @@ public class HighAvailabilityServicesUtils { final String resourceManagerAddress = null; return new NonHaServices(resourceManagerAddress); case ZOOKEEPER: - throw new UnsupportedOperationException("ZooKeeper high availability services " + - "have not been implemented yet."); + return new ZookeeperHaServices(ZooKeeperUtils.startCuratorFramework(configuration), + Executors.directExecutor(), configuration); default: throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported."); } http://git-wip-us.apache.org/repos/asf/flink/blob/544f5346/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java index ed0ad17..741f9e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java @@ -97,10 +97,14 @@ public class ZookeeperHaServices implements HighAvailabilityServices { /** The runtime configuration */ private final Configuration configuration; + /** The zookeeper based running jobs registry */ + private final RunningJobsRegistry runningJobsRegistry; + public ZookeeperHaServices(CuratorFramework client, Executor executor, Configuration configuration) { this.client = checkNotNull(client); this.executor = checkNotNull(executor); this.configuration = checkNotNull(configuration); + this.runningJobsRegistry = new ZookeeperRegistry(client, configuration); } // ------------------------------------------------------------------------ @@ -149,7 +153,7 @@ public class ZookeeperHaServices implements HighAvailabilityServices { @Override public RunningJobsRegistry getRunningJobsRegistry() { - throw new UnsupportedOperationException("not yet implemented"); + return runningJobsRegistry; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/544f5346/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java new file mode 100644 index 0000000..c0621af --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A zookeeper based registry for running jobs, highly available. + */ +public class ZookeeperRegistry implements RunningJobsRegistry { + + private static final String DEFAULT_HA_JOB_REGISTRY_PATH = "/running_job_registry/"; + + /** The ZooKeeper client to use */ + private final CuratorFramework client; + + private final String runningJobPath; + + private static final String HA_JOB_REGISTRY_PATH = "high-availability.zookeeper.job.registry"; + + public ZookeeperRegistry(final CuratorFramework client, final Configuration configuration) { + this.client = client; + runningJobPath = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT) + + configuration.getString(HA_JOB_REGISTRY_PATH, DEFAULT_HA_JOB_REGISTRY_PATH); + } + + @Override + public void setJobRunning(JobID jobID) throws IOException { + checkNotNull(jobID); + + try { + String zkPath = runningJobPath + jobID.toString(); + this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient()); + this.client.setData().forPath(zkPath); + } + catch (Exception e) { + throw new IOException("Set running state to zk fail for job " + jobID.toString(), e); + } + } + + @Override + public void setJobFinished(JobID jobID) throws IOException { + checkNotNull(jobID); + + try { + String zkPath = runningJobPath + jobID.toString(); + this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient()); + this.client.delete().forPath(zkPath); + } + catch (Exception e) { + throw new IOException("Set finished state to zk fail for job " + jobID.toString(), e); + } + } + + @Override + public boolean isJobRunning(JobID jobID) throws IOException { + checkNotNull(jobID); + + try { + Stat stat = client.checkExists().forPath(runningJobPath + jobID.toString()); + if (stat != null) { + return true; + } + return false; + } + catch (Exception e) { + throw new IOException("Get running state from zk fail for job " + jobID.toString(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/544f5346/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java new file mode 100644 index 0000000..72982c8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.curator.test.TestingServer; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.util.TestLogger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.*; + +public class ZooKeeperRegistryTest extends TestLogger { + private TestingServer testingServer; + + private static Logger LOG = LoggerFactory.getLogger(ZooKeeperRegistryTest.class); + + @Before + public void before() throws Exception { + testingServer = new TestingServer(); + } + + @After + public void after() throws Exception { + testingServer.stop(); + testingServer = null; + } + + /** + * Tests that the function of ZookeeperRegistry, setJobRunning(), setJobFinished(), isJobRunning() + */ + @Test + public void testZooKeeperRegistry() throws Exception { + Configuration configuration = new Configuration(); + configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); + configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + + HighAvailabilityServices zkHaService = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration); + RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry(); + + try { + JobID jobID = JobID.generate(); + assertTrue(!zkRegistry.isJobRunning(jobID)); + + zkRegistry.setJobRunning(jobID); + assertTrue(zkRegistry.isJobRunning(jobID)); + + zkRegistry.setJobFinished(jobID); + assertTrue(!zkRegistry.isJobRunning(jobID)); + + } finally { + if (zkHaService != null) { + zkHaService.close(); + } + } + } +}
