Repository: aurora
Updated Branches:
  refs/heads/master 8d0473cb5 -> de046f757


Introduce a Curator-based `SingletonService`.

Bugs closed: AURORA-1468

Reviewed at https://reviews.apache.org/r/46111/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/de046f75
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/de046f75
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/de046f75

Branch: refs/heads/master
Commit: de046f7572ae6b59a829c6adb7c09ab9e6a0165f
Parents: 8d0473c
Author: John Sirois <[email protected]>
Authored: Wed Apr 13 19:27:17 2016 -0600
Committer: John Sirois <[email protected]>
Committed: Wed Apr 13 19:27:17 2016 -0600

----------------------------------------------------------------------
 .../common/zookeeper/SingletonService.java      |   4 +
 .../zookeeper/testing/ZooKeeperTestServer.java  |   2 +-
 .../discovery/CuratorSingletonService.java      | 191 ++++++++++++++++++
 .../discovery/BaseCuratorDiscoveryTest.java     | 114 +++++++++++
 .../CuratorServiceGroupMonitorTest.java         | 108 ++---------
 .../discovery/CuratorSingletonServiceTest.java  | 194 +++++++++++++++++++
 6 files changed, 523 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/de046f75/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
index 3561d07..7f962eb 100644
--- 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
+++ 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
@@ -35,6 +35,10 @@ public interface SingletonService {
    * Indicates an error attempting to advertise leadership of a group of 
servers.
    */
   class AdvertiseException extends Exception {
+    public AdvertiseException(String message) {
+      super(message);
+    }
+
     public AdvertiseException(String message, Throwable cause) {
       super(message, cause);
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/de046f75/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
index 0ab24fa..50acaeb 100644
--- 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
+++ 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
@@ -103,7 +103,7 @@ public class ZooKeeperTestServer {
    *
    * @param sessionId The id of the client session to expire.
    */
-  void expireClientSession(long sessionId) {
+  public final void expireClientSession(long sessionId) {
     zooKeeperServer.closeSession(sessionId);
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/de046f75/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
 
b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
new file mode 100644
index 0000000..c9bd1eb
--- /dev/null
+++ 
b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.Closer;
+
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.curator.framework.recipes.nodes.PersistentNode;
+import org.apache.curator.utils.PathUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
+
+import static java.util.Objects.requireNonNull;
+
+class CuratorSingletonService implements SingletonService {
+
+  // This is the complement of the CuratorServiceGroupMonitor, it allows 
advertisement of a leader
+  // in a service group.
+  private static class Advertiser {
+
+    private final String groupPath;
+    private final String memberToken;
+    private final CuratorFramework client;
+    private final Codec<ServiceInstance> codec;
+
+    Advertiser(
+        CuratorFramework client,
+        String groupPath,
+        String memberToken,
+        Codec<ServiceInstance> codec) {
+
+      this.client = requireNonNull(client);
+      this.groupPath = PathUtils.validatePath(groupPath);
+      this.memberToken = MorePreconditions.checkNotBlank(memberToken);
+      this.codec = requireNonNull(codec);
+    }
+
+    void advertise(
+        Closer closer,
+        InetSocketAddress endpoint,
+        Map<String, InetSocketAddress> additionalEndpoints)
+        throws AdvertiseException, InterruptedException {
+
+      byte[] nodeData = serializeAdvertisement(endpoint, additionalEndpoints);
+      PersistentNode persistentNode =
+          new PersistentNode(
+              client,
+              CreateMode.EPHEMERAL_SEQUENTIAL,
+
+              // TODO(John Sirois): Enable GUID protection once clients are 
updated to support
+              // its effects on group member node naming.  We get nodes like:
+              //   4f5f98c4-8e71-41e3-8c8d-1c9a1f5f5df9-member_000000001
+              // Clients expect member_ is the prefix and are not prepared for 
the GUID.
+              false /* GUID protection */,
+
+              ZKPaths.makePath(groupPath, memberToken),
+              nodeData);
+      persistentNode.start();
+      closer.register(persistentNode);
+
+      // NB: This blocks on initial server set node population to emulate 
legacy
+      // SingletonService.LeaderControl.advertise (Group.join) behavior. 
Asynchronous
+      // population is an option though, we simply need to remove this wait.
+      if (!persistentNode.waitForInitialCreate(Long.MAX_VALUE, TimeUnit.DAYS)) 
{
+        throw new AdvertiseException("Timed out waiting for leader 
advertisement.");
+      }
+    }
+
+    private byte[] serializeAdvertisement(
+        InetSocketAddress endpoint,
+        Map<String, InetSocketAddress> additionalEndpoints)
+        throws AdvertiseException {
+
+      ServiceInstance serviceInstance =
+          new ServiceInstance(
+              asEndpoint(endpoint),
+              Maps.transformValues(additionalEndpoints, 
Advertiser::asEndpoint),
+              Status.ALIVE);
+
+      ByteArrayOutputStream sink = new ByteArrayOutputStream();
+      try {
+        codec.serialize(serviceInstance, sink);
+      } catch (IOException e) {
+        throw new AdvertiseException(
+            "Problem serializing service instance data for " + 
serviceInstance, e);
+      }
+      return sink.toByteArray();
+    }
+
+    private static Endpoint asEndpoint(InetSocketAddress endpoint) {
+      return new Endpoint(endpoint.getHostName(), endpoint.getPort());
+    }
+  }
+
+  private final LeaderLatch leaderLatch;
+  private final Advertiser advertiser;
+  private final String groupPath;
+
+  /**
+   * Creates a {@code SingletonService} backed by Curator.
+   *
+   * @param client A client to interact with a ZooKeeper ensemble.
+   * @param groupPath The root ZooKeeper path service members advertise their 
presence under.
+   * @param memberToken A token used to form service member node names.
+   * @param codec A codec that can be used to deserialize group member {@link 
ServiceInstance} data.
+   */
+  CuratorSingletonService(
+      CuratorFramework client,
+      String groupPath,
+      String memberToken,
+      Codec<ServiceInstance> codec) {
+
+    leaderLatch = new LeaderLatch(client, groupPath);
+    advertiser = new Advertiser(client, groupPath, memberToken, codec);
+    this.groupPath = PathUtils.validatePath(groupPath);
+  }
+
+  @Override
+  public synchronized void lead(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints,
+      LeadershipListener listener)
+      throws LeadException, InterruptedException {
+
+    requireNonNull(endpoint);
+    requireNonNull(additionalEndpoints);
+    requireNonNull(listener);
+
+    Closer closer = Closer.create();
+    leaderLatch.addListener(new LeaderLatchListener() {
+      @Override
+      public void isLeader() {
+        listener.onLeading(new LeaderControl() {
+          @Override
+          public void advertise() throws AdvertiseException, 
InterruptedException {
+            advertiser.advertise(closer, endpoint, additionalEndpoints);
+          }
+
+          @Override
+          public void leave() throws LeaveException {
+            try {
+              closer.close();
+            } catch (IOException e) {
+              throw new LeaveException("Failed to abdicate leadership of group 
at " + groupPath, e);
+            }
+          }
+        });
+      }
+
+      @Override
+      public void notLeader() {
+        listener.onDefeated();
+      }
+    });
+
+    try {
+      leaderLatch.start();
+    } catch (Exception e) {
+      // NB: We failed to lead; so we never could have advertised and there is 
no need to close the
+      // closer.
+      throw new LeadException("Failed to begin awaiting leadership of group " 
+ groupPath, e);
+    }
+    closer.register(leaderLatch);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/de046f75/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
 
b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
new file mode 100644
index 0000000..a2b4125
--- /dev/null
+++ 
b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Predicate;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.apache.aurora.common.zookeeper.ServerSet;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.junit.Before;
+
+class BaseCuratorDiscoveryTest extends BaseZooKeeperTest {
+
+  static final String GROUP_PATH = "/group/root";
+  static final String MEMBER_TOKEN = "member_";
+  static final Codec<ServiceInstance> CODEC = ServerSet.JSON_CODEC;
+  static final int PRIMARY_PORT = 42;
+
+  private CuratorFramework client;
+  private BlockingQueue<PathChildrenCacheEvent> groupEvents;
+  private CuratorServiceGroupMonitor groupMonitor;
+
+  @Before
+  public void setUpCurator() {
+    client = startNewClient();
+
+    PathChildrenCache groupCache =
+        new PathChildrenCache(client, GROUP_PATH, true /* cacheData */);
+    groupEvents = new LinkedBlockingQueue<>();
+    groupCache.getListenable().addListener((c, event) -> 
groupEvents.put(event));
+
+    Predicate<String> memberSelector = name -> name.contains(MEMBER_TOKEN);
+    groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, 
ServerSet.JSON_CODEC);
+  }
+
+  final CuratorFramework startNewClient() {
+    CuratorFramework curator = CuratorFrameworkFactory.builder()
+        .dontUseContainerParents() // Container nodes are only available in ZK 
3.5+.
+        .retryPolicy((retryCount, elapsedTimeMs, sleeper) -> false) // Don't 
retry.
+        .connectString(String.format("localhost:%d", getServer().getPort()))
+        .build();
+    curator.start();
+    addTearDown(curator::close);
+    return curator;
+  }
+
+  final void expireSession(CuratorFramework curator) throws Exception {
+    
getServer().expireClientSession(curator.getZookeeperClient().getZooKeeper().getSessionId());
+  }
+
+  final CuratorFramework getClient() {
+    return client;
+  }
+
+  final CuratorServiceGroupMonitor getGroupMonitor() {
+    return groupMonitor;
+  }
+
+  final void startGroupMonitor() throws ServiceGroupMonitor.MonitorException {
+    groupMonitor.start();
+    addTearDown(groupMonitor::close);
+  }
+
+  final void expectGroupEvent(PathChildrenCacheEvent.Type eventType) {
+    while (true) {
+      try {
+        PathChildrenCacheEvent event = groupEvents.take();
+        if (event.getType() == eventType) {
+          break;
+        }
+      } catch (InterruptedException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+
+  final byte[] serialize(ServiceInstance serviceInstance) throws IOException {
+    ByteArrayOutputStream sink = new ByteArrayOutputStream();
+    CODEC.serialize(serviceInstance, sink);
+    return sink.toByteArray();
+  }
+
+  final ServiceInstance serviceInstance(String hostName) {
+    return new ServiceInstance(
+        new Endpoint(hostName, PRIMARY_PORT),
+        ImmutableMap.of(),
+        Status.ALIVE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/de046f75/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java
 
b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java
index 5598389..1669205 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitorTest.java
@@ -13,97 +13,37 @@
  */
 package org.apache.aurora.scheduler.discovery;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.function.Predicate;
-
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.codec.ThriftBinaryCodec;
-import org.apache.aurora.common.thrift.Endpoint;
 import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.apache.aurora.common.zookeeper.ServerSet;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
-import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
-import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
-public class CuratorServiceGroupMonitorTest extends BaseZooKeeperTest {
-
-  private static final String GROUP_PATH = "/group/root";
-  private static final String MEMBER_TOKEN = "member_";
-
-  private CuratorFramework client;
-  private BlockingQueue<PathChildrenCacheEvent> groupEvents;
-  private CuratorServiceGroupMonitor groupMonitor;
-
-  @Before
-  public void setUpCurator() {
-    client = CuratorFrameworkFactory.builder()
-        .dontUseContainerParents() // Container nodes are only available in ZK 
3.5+.
-        .retryPolicy((retryCount, elapsedTimeMs, sleeper) -> false) // Don't 
retry.
-        .connectString(String.format("localhost:%d", getServer().getPort()))
-        .build();
-    client.start();
-    addTearDown(client::close);
-
-    PathChildrenCache groupCache =
-        new PathChildrenCache(client, GROUP_PATH, true /* cacheData */);
-    groupEvents = new LinkedBlockingQueue<>();
-    groupCache.getListenable().addListener((c, event) -> 
groupEvents.put(event));
-
-    Predicate<String> memberSelector = name -> name.contains(MEMBER_TOKEN);
-    groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, 
ServerSet.JSON_CODEC);
-  }
-
-  private void startGroupMonitor() throws ServiceGroupMonitor.MonitorException 
{
-    groupMonitor.start();
-    addTearDown(groupMonitor::close);
-  }
-
-  private void expectGroupEvent(PathChildrenCacheEvent.Type eventType) {
-    while (true) {
-      try {
-        PathChildrenCacheEvent event = groupEvents.take();
-        if (event.getType() == eventType) {
-          break;
-        }
-      } catch (InterruptedException ex) {
-        throw new RuntimeException(ex);
-      }
-    }
-  }
+public class CuratorServiceGroupMonitorTest extends BaseCuratorDiscoveryTest {
 
   @Test
   public void testNominalLifecycle() throws Exception {
     startGroupMonitor();
-    groupMonitor.close();
+    getGroupMonitor().close();
   }
 
   @Test
   public void testExceptionalLifecycle() throws Exception {
     // Close on a non-started or failed-to-start monitor should be allowed.
-    groupMonitor.close();
+    getGroupMonitor().close();
   }
 
   @Test
   public void testNoHosts() throws Exception {
-    assertEquals(ImmutableSet.of(), groupMonitor.get());
+    assertEquals(ImmutableSet.of(), getGroupMonitor().get());
 
     startGroupMonitor();
-    assertEquals(ImmutableSet.of(), groupMonitor.get());
+    assertEquals(ImmutableSet.of(), getGroupMonitor().get());
   }
 
   @Test
@@ -114,18 +54,18 @@ public class CuratorServiceGroupMonitorTest extends 
BaseZooKeeperTest {
     String onePath = createMember(one);
     ServiceInstance two = serviceInstance("two");
     String twoPath = createMember(two);
-    assertEquals(ImmutableSet.of(one, two), groupMonitor.get());
+    assertEquals(ImmutableSet.of(one, two), getGroupMonitor().get());
 
     deleteChild(twoPath);
-    assertEquals(ImmutableSet.of(one), groupMonitor.get());
+    assertEquals(ImmutableSet.of(one), getGroupMonitor().get());
 
     deleteChild(onePath);
     ServiceInstance three = serviceInstance("three");
     String threePath = createMember(three);
-    assertEquals(ImmutableSet.of(three), groupMonitor.get());
+    assertEquals(ImmutableSet.of(three), getGroupMonitor().get());
 
     deleteChild(threePath);
-    assertEquals(ImmutableSet.of(), groupMonitor.get());
+    assertEquals(ImmutableSet.of(), getGroupMonitor().get());
   }
 
   @Test
@@ -133,17 +73,17 @@ public class CuratorServiceGroupMonitorTest extends 
BaseZooKeeperTest {
     startGroupMonitor();
 
     String nonMemberPath = createNonMember();
-    assertEquals(ImmutableSet.of(), groupMonitor.get());
+    assertEquals(ImmutableSet.of(), getGroupMonitor().get());
 
     ServiceInstance member = serviceInstance("member");
     String memberPath = createMember(member);
-    assertEquals(ImmutableSet.of(member), groupMonitor.get());
+    assertEquals(ImmutableSet.of(member), getGroupMonitor().get());
 
     deleteChild(memberPath);
-    assertEquals(ImmutableSet.of(), groupMonitor.get());
+    assertEquals(ImmutableSet.of(), getGroupMonitor().get());
 
     deleteChild(nonMemberPath);
-    assertEquals(ImmutableSet.of(), groupMonitor.get());
+    assertEquals(ImmutableSet.of(), getGroupMonitor().get());
   }
 
   @Test
@@ -156,7 +96,7 @@ public class CuratorServiceGroupMonitorTest extends 
BaseZooKeeperTest {
     createMember(member);
 
     // Invalid member should be ignored.
-    assertEquals(ImmutableSet.of(member), groupMonitor.get());
+    assertEquals(ImmutableSet.of(member), getGroupMonitor().get());
   }
 
   @Test
@@ -168,14 +108,14 @@ public class CuratorServiceGroupMonitorTest extends 
BaseZooKeeperTest {
     createMember(two, false /* waitForGroupEvent */);
 
     // Not started yet, should see no group members.
-    assertEquals(ImmutableSet.of(), groupMonitor.get());
+    assertEquals(ImmutableSet.of(), getGroupMonitor().get());
 
     startGroupMonitor();
-    assertEquals(ImmutableSet.of(one, two), groupMonitor.get());
+    assertEquals(ImmutableSet.of(one, two), getGroupMonitor().get());
   }
 
   private void deleteChild(String twoPath) throws Exception {
-    client.delete().forPath(twoPath);
+    getClient().delete().forPath(twoPath);
     expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED);
   }
 
@@ -194,7 +134,7 @@ public class CuratorServiceGroupMonitorTest extends 
BaseZooKeeperTest {
   }
 
   private String createMember(byte[] nodeData, boolean waitForGroupEvent) 
throws Exception {
-    String path = client.create()
+    String path = getClient().create()
         .creatingParentsIfNeeded()
         .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
         .forPath(ZKPaths.makePath(GROUP_PATH, MEMBER_TOKEN), nodeData);
@@ -205,21 +145,11 @@ public class CuratorServiceGroupMonitorTest extends 
BaseZooKeeperTest {
   }
 
   private String createNonMember() throws Exception {
-    String path = client.create()
+    String path = getClient().create()
         .creatingParentsIfNeeded()
         .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
         .forPath(ZKPaths.makePath(GROUP_PATH, "not-a-member"));
     expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
     return path;
   }
-
-  private byte[] serialize(ServiceInstance serviceInstance) throws IOException 
{
-    ByteArrayOutputStream sink = new ByteArrayOutputStream();
-    ServerSet.JSON_CODEC.serialize(serviceInstance, sink);
-    return sink.toByteArray();
-  }
-
-  private ServiceInstance serviceInstance(String hostName) {
-    return new ServiceInstance(new Endpoint(hostName, 42), ImmutableMap.of(), 
Status.ALIVE);
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/de046f75/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
 
b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
new file mode 100644
index 0000000..6ea49b0
--- /dev/null
+++ 
b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.discovery;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.zookeeper.SingletonService;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.easymock.Capture;
+import org.easymock.IAnswer;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createControl;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.newCapture;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest {
+
+  private IMocksControl control;
+
+  @Before
+  public void setUpSingletonService() throws Exception {
+    control = createControl();
+    addTearDown(control::verify);
+  }
+
+  private SingletonService.LeadershipListener createMockLeadershipListener() {
+    return control.createMock(SingletonService.LeadershipListener.class);
+  }
+
+  private void newLeader(
+      CuratorFramework client,
+      String hostName,
+      SingletonService.LeadershipListener listener)
+      throws Exception {
+
+    CuratorSingletonService singletonService =
+        new CuratorSingletonService(client, GROUP_PATH, MEMBER_TOKEN, CODEC);
+    InetSocketAddress leaderEndpoint = 
InetSocketAddress.createUnresolved(hostName, PRIMARY_PORT);
+    singletonService.lead(leaderEndpoint, ImmutableMap.of(), listener);
+  }
+
+  @Test
+  public void testLeadAdvertise() throws Exception {
+    SingletonService.LeadershipListener listener = 
createMockLeadershipListener();
+    Capture<SingletonService.LeaderControl> capture = newCapture();
+    listener.onLeading(capture(capture));
+    expectLastCall();
+
+    control.replay();
+
+    startGroupMonitor();
+
+    // Can't be leader until we try to lead.
+    assertFalse(capture.hasCaptured());
+
+    newLeader(getClient(), "host1", listener);
+
+    // Wait to become leader.
+    expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
+    assertTrue(capture.hasCaptured());
+
+    // Leadership nodes should not be seen as service group nodes.
+    assertEquals(ImmutableSet.of(), getGroupMonitor().get());
+
+    capture.getValue().advertise();
+
+    // Verify we've advertised as leader.
+    expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
+    assertEquals(ImmutableSet.of(serviceInstance("host1")), 
getGroupMonitor().get());
+  }
+
+  @Test
+  public void testAbdicateTransition() throws Exception {
+    SingletonService.LeadershipListener host1Listener = 
createMockLeadershipListener();
+    Capture<SingletonService.LeaderControl> host1OnLeadingCapture = 
newCapture();
+    host1Listener.onLeading(capture(host1OnLeadingCapture));
+    expectLastCall();
+
+    SingletonService.LeadershipListener host2Listener = 
createMockLeadershipListener();
+    Capture<SingletonService.LeaderControl> host2OnLeadingCapture = 
newCapture();
+    host2Listener.onLeading(capture(host2OnLeadingCapture));
+    expectLastCall();
+
+    control.replay();
+
+    startGroupMonitor();
+
+    // Have host1 become leader.
+    newLeader(getClient(), "host1", host1Listener);
+    expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
+    assertTrue(host1OnLeadingCapture.hasCaptured());
+
+    host1OnLeadingCapture.getValue().advertise();
+    expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
+    assertEquals(ImmutableSet.of(serviceInstance("host1")), 
getGroupMonitor().get());
+
+    newLeader(getClient(), "host2", host2Listener);
+    expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
+    assertFalse(host2OnLeadingCapture.hasCaptured());
+
+    // Now have host1 abdicate.
+    host1OnLeadingCapture.getValue().leave();
+
+    // Should see both the leadership and service group member nodes get 
cleaned up by host1.
+    expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED);
+    expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED);
+
+    awaitCapture(host2OnLeadingCapture);
+  }
+
+  @Test
+  public void testDefeatTransition() throws Exception {
+    SingletonService.LeadershipListener host1Listener = 
createMockLeadershipListener();
+    Capture<SingletonService.LeaderControl> host1OnLeadingCapture = 
newCapture();
+    host1Listener.onLeading(capture(host1OnLeadingCapture));
+    expectLastCall();
+
+    CountDownLatch host1Defeated = new CountDownLatch(1);
+    host1Listener.onDefeated();
+    expectLastCall().andAnswer((IAnswer<Void>) () -> {
+      host1Defeated.countDown();
+      return null;
+    });
+
+    SingletonService.LeadershipListener host2Listener = 
createMockLeadershipListener();
+    Capture<SingletonService.LeaderControl> host2OnLeadingCapture = 
newCapture();
+    host2Listener.onLeading(capture(host2OnLeadingCapture));
+    expectLastCall();
+
+    control.replay();
+
+    startGroupMonitor();
+
+    // Have host1 become leader.
+    newLeader(getClient(), "host1", host1Listener);
+    expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
+    assertTrue(host1OnLeadingCapture.hasCaptured());
+
+    host1OnLeadingCapture.getValue().advertise();
+    expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
+    assertEquals(ImmutableSet.of(serviceInstance("host1")), 
getGroupMonitor().get());
+
+    newLeader(startNewClient(), "host2", host2Listener);
+    expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_ADDED);
+    assertFalse(host2OnLeadingCapture.hasCaptured());
+
+    // Simulate a session timeout - the ephemeral leader node goes away and 
host1 should be
+    // defeated.
+    expireSession(getClient());
+
+    // Should see both the leadership and service group member nodes go away 
as part of session
+    // expiration for ephemeral nodes.
+    expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED);
+    expectGroupEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED);
+
+    awaitCapture(host2OnLeadingCapture);
+
+    // No advertisement by host2 yet, even though it won leadership, but the 
host1 service group
+    // node should have been cleaned up as tested above.
+    assertEquals(ImmutableSet.of(), getGroupMonitor().get());
+
+    // Eventually host1 should notice its been defeated.
+    host1Defeated.await();
+  }
+
+  private void awaitCapture(Capture<?> capture) throws InterruptedException {
+    while (!capture.hasCaptured()) {
+      Thread.sleep(1L);
+    }
+  }
+}

Reply via email to