Repository: aurora Updated Branches: refs/heads/master 8d0473cb5 -> de046f757
Introduce a Curator-based `SingletonService`. Bugs closed: AURORA-1468 Reviewed at https://reviews.apache.org/r/46111/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/de046f75 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/de046f75 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/de046f75 Branch: refs/heads/master Commit: de046f7572ae6b59a829c6adb7c09ab9e6a0165f Parents: 8d0473c Author: John Sirois <[email protected]> Authored: Wed Apr 13 19:27:17 2016 -0600 Committer: John Sirois <[email protected]> Committed: Wed Apr 13 19:27:17 2016 -0600 ---------------------------------------------------------------------- .../common/zookeeper/SingletonService.java | 4 + .../zookeeper/testing/ZooKeeperTestServer.java | 2 +- .../discovery/CuratorSingletonService.java | 191 ++++++++++++++++++ .../discovery/BaseCuratorDiscoveryTest.java | 114 +++++++++++ .../CuratorServiceGroupMonitorTest.java | 108 ++--------- .../discovery/CuratorSingletonServiceTest.java | 194 +++++++++++++++++++ 6 files changed, 523 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/de046f75/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java index 3561d07..7f962eb 100644 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java @@ -35,6 +35,10 @@ public interface SingletonService { * Indicates an error attempting to advertise leadership of a group of servers. */ class AdvertiseException extends Exception { + public AdvertiseException(String message) { + super(message); + } + public AdvertiseException(String message, Throwable cause) { super(message, cause); } http://git-wip-us.apache.org/repos/asf/aurora/blob/de046f75/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java index 0ab24fa..50acaeb 100644 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java @@ -103,7 +103,7 @@ public class ZooKeeperTestServer { * * @param sessionId The id of the client session to expire. */ - void expireClientSession(long sessionId) { + public final void expireClientSession(long sessionId) { zooKeeperServer.closeSession(sessionId); } http://git-wip-us.apache.org/repos/asf/aurora/blob/de046f75/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java new file mode 100644 index 0000000..c9bd1eb --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java @@ -0,0 +1,191 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Maps; +import com.google.common.io.Closer; + +import org.apache.aurora.common.base.MorePreconditions; +import org.apache.aurora.common.io.Codec; +import org.apache.aurora.common.thrift.Endpoint; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; +import org.apache.aurora.common.zookeeper.SingletonService; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.curator.framework.recipes.nodes.PersistentNode; +import org.apache.curator.utils.PathUtils; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.CreateMode; + +import static java.util.Objects.requireNonNull; + +class CuratorSingletonService implements SingletonService { + + // This is the complement of the CuratorServiceGroupMonitor, it allows advertisement of a leader + // in a service group. + private static class Advertiser { + + private final String groupPath; + private final String memberToken; + private final CuratorFramework client; + private final Codec<ServiceInstance> codec; + + Advertiser( + CuratorFramework client, + String groupPath, + String memberToken, + Codec<ServiceInstance> codec) { + + this.client = requireNonNull(client); + this.groupPath = PathUtils.validatePath(groupPath); + this.memberToken = MorePreconditions.checkNotBlank(memberToken); + this.codec = requireNonNull(codec); + } + + void advertise( + Closer closer, + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints) + throws AdvertiseException, InterruptedException { + + byte[] nodeData = serializeAdvertisement(endpoint, additionalEndpoints); + PersistentNode persistentNode = + new PersistentNode( + client, + CreateMode.EPHEMERAL_SEQUENTIAL, + + // TODO(John Sirois): Enable GUID protection once clients are updated to support + // its effects on group member node naming. We get nodes like: + // 4f5f98c4-8e71-41e3-8c8d-1c9a1f5f5df9-member_000000001 + // Clients expect member_ is the prefix and are not prepared for the GUID. + false /* GUID protection */, + + ZKPaths.makePath(groupPath, memberToken), + nodeData); + persistentNode.start(); + closer.register(persistentNode); + + // NB: This blocks on initial server set node population to emulate legacy + // SingletonService.LeaderControl.advertise (Group.join) behavior. Asynchronous + // population is an option though, we simply need to remove this wait. + if (!persistentNode.waitForInitialCreate(Long.MAX_VALUE, TimeUnit.DAYS)) { + throw new AdvertiseException("Timed out waiting for leader advertisement."); + } + } + + private byte[] serializeAdvertisement( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints) + throws AdvertiseException { + + ServiceInstance serviceInstance = + new ServiceInstance( + asEndpoint(endpoint), + Maps.transformValues(additionalEndpoints, Advertiser::asEndpoint), + Status.ALIVE); + + ByteArrayOutputStream sink = new ByteArrayOutputStream(); + try { + codec.serialize(serviceInstance, sink); + } catch (IOException e) { + throw new AdvertiseException( + "Problem serializing service instance data for " + serviceInstance, e); + } + return sink.toByteArray(); + } + + private static Endpoint asEndpoint(InetSocketAddress endpoint) { + return new Endpoint(endpoint.getHostName(), endpoint.getPort()); + } + } + + private final LeaderLatch leaderLatch; + private final Advertiser advertiser; + private final String groupPath; + + /** + * Creates a {@code SingletonService} backed by Curator. + * + * @param client A client to interact with a ZooKeeper ensemble. + * @param groupPath The root ZooKeeper path service members advertise their presence under. + * @param memberToken A token used to form service member node names. + * @param codec A codec that can be used to deserialize group member {@link ServiceInstance} data. + */ + CuratorSingletonService( + CuratorFramework client, + String groupPath, + String memberToken, + Codec<ServiceInstance> codec) { + + leaderLatch = new LeaderLatch(client, groupPath); + advertiser = new Advertiser(client, groupPath, memberToken, codec); + this.groupPath = PathUtils.validatePath(groupPath); + } + + @Override + public synchronized void lead( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + LeadershipListener listener) + throws LeadException, InterruptedException { + + requireNonNull(endpoint); + requireNonNull(additionalEndpoints); + requireNonNull(listener); + + Closer closer = Closer.create(); + leaderLatch.addListener(new LeaderLatchListener() { + @Override + public void isLeader() { + listener.onLeading(new LeaderControl() { + @Override + public void advertise() throws AdvertiseException, InterruptedException { + advertiser.advertise(closer, endpoint, additionalEndpoints); + } + + @Override + public void leave() throws LeaveException { + try { + closer.close(); + } catch (IOException e) { + throw new LeaveException("Failed to abdicate leadership of group at " + groupPath, e); + } + } + }); + } + + @Override + public void notLeader() { + listener.onDefeated(); + } + }); + + try { + leaderLatch.start(); + } catch (Exception e) { + // NB: We failed to lead; so we never could have advertised and there is no need to close the + // closer. + throw new LeadException("Failed to begin awaiting leadership of group " + groupPath, e); + } + closer.register(leaderLatch); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/de046f75/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java new file mode 100644 index 0000000..a2b4125 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java @@ -0,0 +1,114 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Predicate; + +import com.google.common.collect.ImmutableMap; + +import org.apache.aurora.common.io.Codec; +import org.apache.aurora.common.thrift.Endpoint; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; +import org.apache.aurora.common.zookeeper.ServerSet; +import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.junit.Before; + +class BaseCuratorDiscoveryTest extends BaseZooKeeperTest { + + static final String GROUP_PATH = "/group/root"; + static final String MEMBER_TOKEN = "member_"; + static final Codec<ServiceInstance> CODEC = ServerSet.JSON_CODEC; + static final int PRIMARY_PORT = 42; + + private CuratorFramework client; + private BlockingQueue<PathChildrenCacheEvent> groupEvents; + private CuratorServiceGroupMonitor groupMonitor; + + @Before + public void setUpCurator() { + client = startNewClient(); + + PathChildrenCache groupCache = + new PathChildrenCache(client, GROUP_PATH, true /* cacheData */); + groupEvents = new LinkedBlockingQueue<>(); + groupCache.getListenable().addListener((c, event) -> groupEvents.put(event)); + + Predicate<String> memberSelector = name -> name.contains(MEMBER_TOKEN); + groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, ServerSet.JSON_CODEC); + } + + final CuratorFramework startNewClient() { + CuratorFramework curator = CuratorFrameworkFactory.builder() + .dontUseContainerParents() // Container nodes are only available in ZK 3.5+. + .retryPolicy((retryCount, elapsedTimeMs, sleeper) -> false) // Don't retry. + .connectString(String.format("localhost:%d", getServer().getPort())) + .build(); + curator.start(); + addTearDown(curator::close); + return curator; + } + + final void expireSession(CuratorFramework curator) throws Exception { + getServer().expireClientSession(curator.getZookeeperClient().getZooKeeper().getSessionId()); + } + + final CuratorFramework getClient() { + return client; + } + + final CuratorServiceGroupMonitor getGroupMonitor() { + return groupMonitor; + } + + final void startGroupMonitor() throws ServiceGroupMonitor.MonitorException { + groupMonitor.start(); + addTearDown(groupMonitor::close); + } + + final void expectGroupEvent(PathChildrenCacheEvent.Type eventType) { + while (true) { + try { + PathChildrenCacheEvent event = groupEvents.take(); + if (event.getType() == eventType) { + break; + } + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + + final byte[] serialize(ServiceInstance serviceInstance) throws IOException { + ByteArrayOutputStream sink = new ByteArrayOutputStream(); + CODEC.serialize(serviceInstance, sink); + return sink.toByteArray(); + } + + final ServiceInstance serviceInstance(String hostName) { + return new ServiceInstance( + new Endpoint(hostName, PRIMARY_PORT), + ImmutableMap.of(), + Status.ALIVE); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/de046f75/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java index 5598389..1669205 100644 --- a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java +++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java @@ -13,97 +13,37 @@ */ package org.apache.aurora.scheduler.discovery; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.function.Predicate; - -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.aurora.codec.ThriftBinaryCodec; -import org.apache.aurora.common.thrift.Endpoint; import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.thrift.Status; -import org.apache.aurora.common.zookeeper.ServerSet; -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest; -import org.apache.aurora.scheduler.app.ServiceGroupMonitor; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; -import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertEquals; -public class CuratorServiceGroupMonitorTest extends BaseZooKeeperTest { - - private static final String GROUP_PATH = "/group/root"; - private static final String MEMBER_TOKEN = "member_"; - - private CuratorFramework client; - private BlockingQueue<PathChildrenCacheEvent> groupEvents; - private CuratorServiceGroupMonitor groupMonitor; - - @Before - public void setUpCurator() { - client = CuratorFrameworkFactory.builder() - .dontUseContainerParents() // Container nodes are only available in ZK 3.5+. - .retryPolicy((retryCount, elapsedTimeMs, sleeper) -> false) // Don't retry. - .connectString(String.format("localhost:%d", getServer().getPort())) - .build(); - client.start(); - addTearDown(client::close); - - PathChildrenCache groupCache = - new PathChildrenCache(client, GROUP_PATH, true /* cacheData */); - groupEvents = new LinkedBlockingQueue<>(); - groupCache.getListenable().addListener((c, event) -> groupEvents.put(event)); - - Predicate<String> memberSelector = name -> name.contains(MEMBER_TOKEN); - groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, ServerSet.JSON_CODEC); - } - - private void startGroupMonitor() throws ServiceGroupMonitor.MonitorException { - groupMonitor.start(); - addTearDown(groupMonitor::close); - } - - private void expectGroupEvent(PathChildrenCacheEvent.Type eventType) { - while (true) { - try { - PathChildrenCacheEvent event = groupEvents.take(); - if (event.getType() == eventType) { - break; - } - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } - } - } +public class CuratorServiceGroupMonitorTest extends BaseCuratorDiscoveryTest { @Test public void testNominalLifecycle() throws Exception { startGroupMonitor(); - groupMonitor.close(); + getGroupMonitor().close(); } @Test public void testExceptionalLifecycle() throws Exception { // Close on a non-started or failed-to-start monitor should be allowed. - groupMonitor.close(); + getGroupMonitor().close(); } @Test public void testNoHosts() throws Exception { - assertEquals(ImmutableSet.of(), groupMonitor.get()); + assertEquals(ImmutableSet.of(), getGroupMonitor().get()); startGroupMonitor(); - assertEquals(ImmutableSet.of(), groupMonitor.get()); + assertEquals(ImmutableSet.of(), getGroupMonitor().get()); } @Test @@ -114,18 +54,18 @@ public class CuratorServiceGroupMonitorTest extends BaseZooKeeperTest { String onePath = createMember(one); ServiceInstance two = serviceInstance("two"); String twoPath = createMember(two); - assertEquals(ImmutableSet.of(one, two), groupMonitor.get()); + assertEquals(ImmutableSet.of(one, two), getGroupMonitor().get()); deleteChild(twoPath); - assertEquals(ImmutableSet.of(one), groupMonitor.get()); + assertEquals(ImmutableSet.of(one), getGroupMonitor().get()); deleteChild(onePath); ServiceInstance three = serviceInstance("three"); String threePath = createMember(three); - assertEquals(ImmutableSet.of(three), groupMonitor.get()); + assertEquals(ImmutableSet.of(three), getGroupMonitor().get()); deleteChild(threePath); - assertEquals(ImmutableSet.of(), groupMonitor.get()); + assertEquals(ImmutableSet.of(), getGroupMonitor().get()); } @Test @@ -133,17 +73,17 @@ public class CuratorServiceGroupMonitorTest extends BaseZooKeeperTest { startGroupMonitor(); String nonMemberPath = createNonMember(); - assertEquals(ImmutableSet.of(), groupMonitor.get()); + assertEquals(ImmutableSet.of(), getGroupMonitor().get()); ServiceInstance member = serviceInstance("member"); String memberPath = createMember(member); - assertEquals(ImmutableSet.of(member), groupMonitor.get()); + assertEquals(ImmutableSet.of(member), getGroupMonitor().get()); deleteChild(memberPath); - assertEquals(ImmutableSet.of(), groupMonitor.get()); + assertEquals(ImmutableSet.of(), getGroupMonitor().get()); deleteChild(nonMemberPath); - assertEquals(ImmutableSet.of(), groupMonitor.get()); + assertEquals(ImmutableSet.of(), getGroupMonitor().get()); } @Test @@ -156,7 +96,7 @@ public class CuratorServiceGroupMonitorTest extends BaseZooKeeperTest { createMember(member); // Invalid member should be ignored. - assertEquals(ImmutableSet.of(member), groupMonitor.get()); + assertEquals(ImmutableSet.of(member), getGroupMonitor().get()); } @Test @@ -168,14 +108,14 @@ public class CuratorServiceGroupMonitorTest extends BaseZooKeeperTest { createMember(two, false /* waitForGroupEvent */); // Not started yet, should see no group members. - assertEquals(ImmutableSet.of(), groupMonitor.get()); + assertEquals(ImmutableSet.of(), getGroupMonitor().get()); startGroupMonitor(); - assertEquals(ImmutableSet.of(one, two), groupMonitor.get()); + assertEquals(ImmutableSet.of(one, two), getGroupMonitor().get()); } private void deleteChild(String twoPath) throws Exception { - client.delete().forPath(twoPath); + getClient().delete().forPath(twoPath); expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED); } @@ -194,7 +134,7 @@ public class CuratorServiceGroupMonitorTest extends BaseZooKeeperTest { } private String createMember(byte[] nodeData, boolean waitForGroupEvent) throws Exception { - String path = client.create() + String path = getClient().create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath(ZKPaths.makePath(GROUP_PATH, MEMBER_TOKEN), nodeData); @@ -205,21 +145,11 @@ public class CuratorServiceGroupMonitorTest extends BaseZooKeeperTest { } private String createNonMember() throws Exception { - String path = client.create() + String path = getClient().create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath(ZKPaths.makePath(GROUP_PATH, "not-a-member")); expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED); return path; } - - private byte[] serialize(ServiceInstance serviceInstance) throws IOException { - ByteArrayOutputStream sink = new ByteArrayOutputStream(); - ServerSet.JSON_CODEC.serialize(serviceInstance, sink); - return sink.toByteArray(); - } - - private ServiceInstance serviceInstance(String hostName) { - return new ServiceInstance(new Endpoint(hostName, 42), ImmutableMap.of(), Status.ALIVE); - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/de046f75/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java new file mode 100644 index 0000000..6ea49b0 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java @@ -0,0 +1,194 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.zookeeper.SingletonService; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.easymock.Capture; +import org.easymock.IAnswer; +import org.easymock.IMocksControl; +import org.junit.Before; +import org.junit.Test; + +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.createControl; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.newCapture; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest { + + private IMocksControl control; + + @Before + public void setUpSingletonService() throws Exception { + control = createControl(); + addTearDown(control::verify); + } + + private SingletonService.LeadershipListener createMockLeadershipListener() { + return control.createMock(SingletonService.LeadershipListener.class); + } + + private void newLeader( + CuratorFramework client, + String hostName, + SingletonService.LeadershipListener listener) + throws Exception { + + CuratorSingletonService singletonService = + new CuratorSingletonService(client, GROUP_PATH, MEMBER_TOKEN, CODEC); + InetSocketAddress leaderEndpoint = InetSocketAddress.createUnresolved(hostName, PRIMARY_PORT); + singletonService.lead(leaderEndpoint, ImmutableMap.of(), listener); + } + + @Test + public void testLeadAdvertise() throws Exception { + SingletonService.LeadershipListener listener = createMockLeadershipListener(); + Capture<SingletonService.LeaderControl> capture = newCapture(); + listener.onLeading(capture(capture)); + expectLastCall(); + + control.replay(); + + startGroupMonitor(); + + // Can't be leader until we try to lead. + assertFalse(capture.hasCaptured()); + + newLeader(getClient(), "host1", listener); + + // Wait to become leader. + expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED); + assertTrue(capture.hasCaptured()); + + // Leadership nodes should not be seen as service group nodes. + assertEquals(ImmutableSet.of(), getGroupMonitor().get()); + + capture.getValue().advertise(); + + // Verify we've advertised as leader. + expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED); + assertEquals(ImmutableSet.of(serviceInstance("host1")), getGroupMonitor().get()); + } + + @Test + public void testAbdicateTransition() throws Exception { + SingletonService.LeadershipListener host1Listener = createMockLeadershipListener(); + Capture<SingletonService.LeaderControl> host1OnLeadingCapture = newCapture(); + host1Listener.onLeading(capture(host1OnLeadingCapture)); + expectLastCall(); + + SingletonService.LeadershipListener host2Listener = createMockLeadershipListener(); + Capture<SingletonService.LeaderControl> host2OnLeadingCapture = newCapture(); + host2Listener.onLeading(capture(host2OnLeadingCapture)); + expectLastCall(); + + control.replay(); + + startGroupMonitor(); + + // Have host1 become leader. + newLeader(getClient(), "host1", host1Listener); + expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED); + assertTrue(host1OnLeadingCapture.hasCaptured()); + + host1OnLeadingCapture.getValue().advertise(); + expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED); + assertEquals(ImmutableSet.of(serviceInstance("host1")), getGroupMonitor().get()); + + newLeader(getClient(), "host2", host2Listener); + expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED); + assertFalse(host2OnLeadingCapture.hasCaptured()); + + // Now have host1 abdicate. + host1OnLeadingCapture.getValue().leave(); + + // Should see both the leadership and service group member nodes get cleaned up by host1. + expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED); + expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED); + + awaitCapture(host2OnLeadingCapture); + } + + @Test + public void testDefeatTransition() throws Exception { + SingletonService.LeadershipListener host1Listener = createMockLeadershipListener(); + Capture<SingletonService.LeaderControl> host1OnLeadingCapture = newCapture(); + host1Listener.onLeading(capture(host1OnLeadingCapture)); + expectLastCall(); + + CountDownLatch host1Defeated = new CountDownLatch(1); + host1Listener.onDefeated(); + expectLastCall().andAnswer((IAnswer<Void>) () -> { + host1Defeated.countDown(); + return null; + }); + + SingletonService.LeadershipListener host2Listener = createMockLeadershipListener(); + Capture<SingletonService.LeaderControl> host2OnLeadingCapture = newCapture(); + host2Listener.onLeading(capture(host2OnLeadingCapture)); + expectLastCall(); + + control.replay(); + + startGroupMonitor(); + + // Have host1 become leader. + newLeader(getClient(), "host1", host1Listener); + expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED); + assertTrue(host1OnLeadingCapture.hasCaptured()); + + host1OnLeadingCapture.getValue().advertise(); + expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED); + assertEquals(ImmutableSet.of(serviceInstance("host1")), getGroupMonitor().get()); + + newLeader(startNewClient(), "host2", host2Listener); + expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED); + assertFalse(host2OnLeadingCapture.hasCaptured()); + + // Simulate a session timeout - the ephemeral leader node goes away and host1 should be + // defeated. + expireSession(getClient()); + + // Should see both the leadership and service group member nodes go away as part of session + // expiration for ephemeral nodes. + expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED); + expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED); + + awaitCapture(host2OnLeadingCapture); + + // No advertisement by host2 yet, even though it won leadership, but the host1 service group + // node should have been cleaned up as tested above. + assertEquals(ImmutableSet.of(), getGroupMonitor().get()); + + // Eventually host1 should notice its been defeated. + host1Defeated.await(); + } + + private void awaitCapture(Capture<?> capture) throws InterruptedException { + while (!capture.hasCaptured()) { + Thread.sleep(1L); + } + } +}
