Repository: aurora Updated Branches: refs/heads/master 262ca16c5 -> ccf23820d
Introduce a Curator-based `ServiceGroupMonitor`. Bugs closed: AURORA-1468 Reviewed at https://reviews.apache.org/r/45902/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ccf23820 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ccf23820 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ccf23820 Branch: refs/heads/master Commit: ccf23820d15e251f7c7a44621992a1297b1a902f Parents: 262ca16 Author: John Sirois <[email protected]> Authored: Mon Apr 11 22:31:27 2016 -0600 Committer: John Sirois <[email protected]> Committed: Mon Apr 11 22:31:27 2016 -0600 ---------------------------------------------------------------------- build.gradle | 5 + .../discovery/CuratorServiceGroupMonitor.java | 106 ++++++++++ .../CuratorServiceGroupMonitorTest.java | 203 +++++++++++++++++++ 3 files changed, 314 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/ccf23820/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index fc61adf..d981ab7 100644 --- a/build.gradle +++ b/build.gradle @@ -80,6 +80,7 @@ For more details, please see https://issues.apache.org/jira/browse/AURORA-1169 } ext.commonsLangRev = '2.6' + ext.curatorRev = '2.10.0' ext.gsonRev = '2.3.1' ext.guavaRev = '19.0' ext.guiceRev = '3.0' @@ -115,6 +116,7 @@ For more details, please see https://issues.apache.org/jira/browse/AURORA-1169 force "com.google.protobuf:protobuf-java:${protobufRev}" force "junit:junit:${junitRev}" force "org.apache.thrift:libthrift:${thriftRev}" + force "org.apache.zookeeper:zookeeper:${zookeeperRev}" force "org.hamcrest:hamcrest-core:1.3" force "org.slf4j:slf4j-api:${slf4jRev}" force "org.mybatis:mybatis:${mybatisRev}" @@ -358,6 +360,9 @@ dependencies { compile 'javax.inject:javax.inject:1' compile "javax.servlet:javax.servlet-api:${servletRev}" compile "org.antlr:stringtemplate:${stringTemplateRev}" + compile "org.apache.curator:curator-client:${curatorRev}" + compile "org.apache.curator:curator-framework:${curatorRev}" + compile "org.apache.curator:curator-recipes:${curatorRev}" compile 'org.apache.mesos:mesos:0.26.0' compile "org.apache.shiro:shiro-guice:${shiroRev}" compile "org.apache.shiro:shiro-web:${shiroRev}" http://git-wip-us.apache.org/repos/asf/aurora/blob/ccf23820/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java new file mode 100644 index 0000000..9d8b7bd --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java @@ -0,0 +1,106 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Optional; +import java.util.function.Predicate; + +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.GuavaUtils; +import org.apache.aurora.common.io.Codec; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.scheduler.app.SchedulerMain; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.utils.ZKPaths; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +class CuratorServiceGroupMonitor implements ServiceGroupMonitor { + private static final Logger LOG = LoggerFactory.getLogger(SchedulerMain.class); + + private final PathChildrenCache groupCache; + private final Predicate<String> memberSelector; + private final Codec<ServiceInstance> codec; + + /** + * Creates a {@code ServiceGroupMonitor} backed by Curator. + * + * Although this monitor can be queried at any time, it will not usefully reflect service group + * membership until it is {@link #start() started}. When starting a monitor, it should be arranged + * that the monitor is {@link #close() closed} when no longer needed. + * + * It's important to be able to pick out group member nodes amongst child nodes for group paths + * that can contain mixed-content nodes. The given {@code memberSelector} should be able to + * discriminate member nodes from non-member nodes given the node name. + * + * @param groupCache The cache of group nodes. + * @param memberSelector A predicate that returns {@code true} for group node names that represent + * group members. Here the name is just the `basename` of the node's full + * ZooKeeper path. + * @param codec A codec that can be used to deserialize group member {@link ServiceInstance} data. + */ + CuratorServiceGroupMonitor( + PathChildrenCache groupCache, + Predicate<String> memberSelector, + Codec<ServiceInstance> codec) { + + this.groupCache = requireNonNull(groupCache); + this.memberSelector = requireNonNull(memberSelector); + this.codec = requireNonNull(codec); + } + + @Override + public void start() throws MonitorException { + try { + // NB: This blocks on an initial group population to emulate legacy ServerSetMonitor behavior; + // asynchronous population is an option using NORMAL or POST_INITIALIZED_EVENT StartModes + // though. + groupCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + } catch (Exception e) { + throw new MonitorException("Failed to begin monitoring service group.", e); + } + } + + @Override + public void close() throws IOException { + groupCache.close(); + } + + @Override + public ImmutableSet<ServiceInstance> get() { + return groupCache.getCurrentData().stream() + .filter(cd -> memberSelector.test(ZKPaths.getNodeFromPath(cd.getPath()))) + .map(this::extractServiceInstance) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(GuavaUtils.toImmutableSet()); + } + + private Optional<ServiceInstance> extractServiceInstance(ChildData data) { + ByteArrayInputStream source = new ByteArrayInputStream(data.getData()); + try { + return Optional.of(codec.deserialize(source)); + } catch (IOException e) { + LOG.error("Failed to deserialize ServiceInstance from " + data, e); + return Optional.empty(); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/ccf23820/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java new file mode 100644 index 0000000..0879c2e --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java @@ -0,0 +1,203 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Predicate; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.thrift.Endpoint; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; +import org.apache.aurora.common.zookeeper.ServerSet; +import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.CreateMode; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class CuratorServiceGroupMonitorTest extends BaseZooKeeperTest { + + private static final String GROUP_PATH = "/group/root"; + private static final String MEMBER_TOKEN = "member_"; + + private CuratorFramework client; + private BlockingQueue<PathChildrenCacheEvent> groupEvents; + private CuratorServiceGroupMonitor groupMonitor; + + @Before + public void setUpCurator() { + client = CuratorFrameworkFactory.builder() + .dontUseContainerParents() // Container nodes are only available in ZK 3.5+. + .retryPolicy((retryCount, elapsedTimeMs, sleeper) -> false) // Don't retry. + .connectString(String.format("localhost:%d", getServer().getPort())) + .build(); + client.start(); + addTearDown(client::close); + + PathChildrenCache groupCache = + new PathChildrenCache(client, GROUP_PATH, true /* cacheData */); + groupEvents = new LinkedBlockingQueue<>(); + groupCache.getListenable().addListener((c, event) -> groupEvents.put(event)); + + Predicate<String> memberSelector = name -> name.contains(MEMBER_TOKEN); + groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, ServerSet.JSON_CODEC); + } + + private void startGroupMonitor() throws ServiceGroupMonitor.MonitorException { + groupMonitor.start(); + addTearDown(groupMonitor::close); + } + + private void expectGroupEvent(PathChildrenCacheEvent.Type eventType) { + while (true) { + try { + PathChildrenCacheEvent event = groupEvents.take(); + if (event.getType() == eventType) { + break; + } + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + + @Test + public void testNominalLifecycle() throws Exception { + startGroupMonitor(); + groupMonitor.close(); + } + + @Test + public void testExceptionalLifecycle() throws Exception { + // Close on a non-started or failed-to-start monitor should be allowed. + groupMonitor.close(); + } + + @Test + public void testNoHosts() throws Exception { + assertEquals(ImmutableSet.of(), groupMonitor.get()); + + startGroupMonitor(); + assertEquals(ImmutableSet.of(), groupMonitor.get()); + } + + @Test + public void testHostUpdates() throws Exception { + startGroupMonitor(); + + ServiceInstance one = serviceInstance("one"); + String onePath = createMember(one); + ServiceInstance two = serviceInstance("two"); + String twoPath = createMember(two); + assertEquals(ImmutableSet.of(one, two), groupMonitor.get()); + + deleteChild(twoPath); + assertEquals(ImmutableSet.of(one), groupMonitor.get()); + + deleteChild(onePath); + ServiceInstance three = serviceInstance("three"); + String threePath = createMember(three); + assertEquals(ImmutableSet.of(three), groupMonitor.get()); + + deleteChild(threePath); + assertEquals(ImmutableSet.of(), groupMonitor.get()); + } + + @Test + public void testMixedNodes() throws Exception { + startGroupMonitor(); + + String nonMemberPath = createNonMember(); + assertEquals(ImmutableSet.of(), groupMonitor.get()); + + ServiceInstance member = serviceInstance("member"); + String memberPath = createMember(member); + assertEquals(ImmutableSet.of(member), groupMonitor.get()); + + deleteChild(memberPath); + assertEquals(ImmutableSet.of(), groupMonitor.get()); + + deleteChild(nonMemberPath); + assertEquals(ImmutableSet.of(), groupMonitor.get()); + } + + @Test + public void testStartBlocksOnInitialMembership() throws Exception { + ServiceInstance one = serviceInstance("one"); + createMember(one, false /* waitForGroupEvent */); + + ServiceInstance two = serviceInstance("two"); + createMember(two, false /* waitForGroupEvent */); + + // Not started yet, should see no group members. + assertEquals(ImmutableSet.of(), groupMonitor.get()); + + startGroupMonitor(); + assertEquals(ImmutableSet.of(one, two), groupMonitor.get()); + } + + private void deleteChild(String twoPath) throws Exception { + client.delete().forPath(twoPath); + expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED); + } + + private String createMember(ServiceInstance serviceInstance) throws Exception { + return createMember(serviceInstance, true /* waitForGroupEvent */); + } + + private String createMember(ServiceInstance serviceInstance, boolean waitForGroupEvent) + throws Exception { + + String path = client.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) + .forPath(ZKPaths.makePath(GROUP_PATH, MEMBER_TOKEN), serialize(serviceInstance)); + if (waitForGroupEvent) { + expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED); + } + return path; + } + + private String createNonMember() throws Exception { + String path = client.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) + .forPath(ZKPaths.makePath(GROUP_PATH, "not-a-member")); + expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED); + return path; + } + + private byte[] serialize(ServiceInstance serviceInstance) throws IOException { + ByteArrayOutputStream sink = new ByteArrayOutputStream(); + ServerSet.JSON_CODEC.serialize(serviceInstance, sink); + return sink.toByteArray(); + } + + private ServiceInstance serviceInstance(String hostName) { + return new ServiceInstance(new Endpoint(hostName, 42), ImmutableMap.of(), Status.ALIVE); + } +}
