Remove legacy commons ZK code

Reviewed at https://reviews.apache.org/r/62652/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/15cb049f
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/15cb049f
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/15cb049f

Branch: refs/heads/master
Commit: 15cb049f3b5d1a3d662e8a396ce7020b107a2fe8
Parents: c638877
Author: Bill Farner <[email protected]>
Authored: Wed Oct 18 17:33:50 2017 -0700
Committer: Bill Farner <[email protected]>
Committed: Wed Oct 18 17:33:50 2017 -0700

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   4 +-
 .../aurora/common/net/pool/DynamicHostSet.java  |  59 --
 .../aurora/common/zookeeper/Candidate.java      |  78 ---
 .../aurora/common/zookeeper/CandidateImpl.java  | 127 ----
 .../aurora/common/zookeeper/Encoding.java       |  87 +++
 .../apache/aurora/common/zookeeper/Group.java   | 674 -------------------
 .../aurora/common/zookeeper/ServerSet.java      |  74 --
 .../aurora/common/zookeeper/ServerSetImpl.java  | 349 ----------
 .../aurora/common/zookeeper/ServerSets.java     | 118 ----
 .../common/zookeeper/SingletonServiceImpl.java  | 122 ----
 .../common/zookeeper/ZooKeeperClient.java       | 372 ----------
 .../aurora/common/zookeeper/ZooKeeperUtils.java | 106 +--
 .../testing/BaseZooKeeperClientTest.java        | 140 ----
 .../zookeeper/testing/ZooKeeperTestServer.java  |   6 +-
 .../common/zookeeper/CandidateImplTest.java     | 165 -----
 .../aurora/common/zookeeper/EncodingTest.java   |  44 ++
 .../aurora/common/zookeeper/GroupTest.java      | 321 ---------
 .../aurora/common/zookeeper/JsonCodecTest.java  |  18 +-
 .../common/zookeeper/ServerSetImplTest.java     | 258 -------
 .../aurora/common/zookeeper/ServerSetsTest.java |  44 --
 .../zookeeper/SingletonServiceImplTest.java     | 243 -------
 .../common/zookeeper/ZooKeeperClientTest.java   | 210 ------
 .../common/zookeeper/ZooKeeperUtilsTest.java    |  76 +--
 .../CommonsServiceDiscoveryModule.java          | 102 ---
 .../discovery/CommonsServiceGroupMonitor.java   |  59 --
 .../CuratorServiceDiscoveryModule.java          |   4 +-
 .../discovery/FlaggedZooKeeperConfig.java       |   8 -
 .../discovery/ServiceDiscoveryModule.java       |  11 +-
 .../scheduler/discovery/ZooKeeperConfig.java    |  12 +-
 .../aurora/scheduler/app/SchedulerIT.java       |  57 +-
 .../scheduler/config/CommandLineTest.java       |   2 -
 .../discovery/AbstractDiscoveryModuleTest.java  |  82 ---
 .../discovery/BaseCuratorDiscoveryTest.java     |   6 +-
 .../discovery/CommonsDiscoveryModuleTest.java   |  29 -
 .../CommonsServiceGroupMonitorTest.java         | 137 ----
 .../discovery/CuratorDiscoveryModuleTest.java   |  63 +-
 .../discovery/ZooKeeperConfigTest.java          |   5 +-
 .../aurora/scheduler/http/LeaderHealthTest.java |   3 +-
 38 files changed, 237 insertions(+), 4038 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 079f495..f4cc416 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -18,8 +18,10 @@
 - Increase default ZooKeeper session timeout from 4 to 15 seconds.
 - Add option `-zk_connection_timeout` to control the connection timeout of 
ZooKeeper connections.
 
-### Deprecations and removals
+### Deprecations and removals:
 
+- Removed the deprecated command line argument `-zk_use_curator`, removing the 
choice to use the
+  legacy ZooKeeper client.
 - Removed the `rewriteConfigs` thrift API call in the scheduler. This was a 
last-ditch mechanism
   to modify scheduler state on the fly. It was considered extremely risky to 
use since its
   inception, and is safer to abandon due to its lack of use and likelihood for 
code rot.

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java 
b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
deleted file mode 100644
index df469ef..0000000
--- 
a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.net.pool;
-
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.base.Command;
-
-/**
- * A host set that can be monitored for changes.
- *
- * @param <T> The type that is used to identify members of the host set.
- */
-public interface DynamicHostSet<T> {
-
-  /**
-   * Registers a monitor to receive change notices for this server set as long 
as this jvm process
-   * is alive.  Blocks until the initial server set can be gathered and 
delivered to the monitor.
-   * The monitor will be notified if the membership set or parameters of 
existing members have
-   * changed.
-   *
-   * @param monitor the server set monitor to call back when the host set 
changes
-   * @return A command which, when executed, will stop monitoring the host set.
-   * @throws MonitorException if there is a problem monitoring the host set
-   */
-  Command watch(HostChangeMonitor<T> monitor) throws MonitorException;
-
-  /**
-   * An interface to an object that is interested in receiving notification 
whenever the host set
-   * changes.
-   */
-  interface HostChangeMonitor<T> {
-
-    /**
-     * Called when either the available set of services changes (when a 
service dies or a new
-     * instance comes on-line) or when an existing service advertises a status 
or health change.
-     *
-     * @param hostSet the current set of available ServiceInstances
-     */
-    void onChange(ImmutableSet<T> hostSet);
-  }
-
-  class MonitorException extends Exception {
-    public MonitorException(String msg, Throwable cause) {
-      super(msg, cause);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
deleted file mode 100644
index 75c1b14..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.Group.WatchException;
-import 
org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * Interface definition for becoming or querying for a ZooKeeper-based group 
leader.
- */
-public interface Candidate {
-
-  /**
-   * Returns the current group leader by querying ZooKeeper synchronously.
-   *
-   * @return the current group leader's identifying data or {@link 
Optional#absent()} if there is
-   *     no leader
-   * @throws ZooKeeperConnectionException if there was a problem connecting to 
ZooKeeper
-   * @throws KeeperException if there was a problem reading the leader 
information
-   * @throws InterruptedException if this thread is interrupted getting the 
leader
-   */
-  public Optional<byte[]> getLeaderData()
-      throws ZooKeeperConnectionException, KeeperException, 
InterruptedException;
-
-  /**
-   * Encapsulates a leader that can be elected and subsequently defeated.
-   */
-  interface Leader {
-
-    /**
-     * Called when this leader has been elected.
-     *
-     * @param abdicate a command that can be used to abdicate leadership and 
force a new election
-     */
-    void onElected(ExceptionalCommand<JoinException> abdicate);
-
-    /**
-     * Called when the leader has been ousted.  Can occur either if the leader 
abdicates or if an
-     * external event causes the leader to lose its leadership role (session 
expiration).
-     */
-    void onDefeated();
-  }
-
-  /**
-   * Offers this candidate in leadership elections for as long as the current 
jvm process is alive.
-   * Upon election, the {@code onElected} callback will be executed and a 
command that can be used
-   * to abdicate leadership will be passed in.  If the elected leader jvm 
process dies or the
-   * elected leader successfully abdicates then a new leader will be elected.  
Leaders that
-   * successfully abdicate are removed from the group and will not be eligible 
for leadership
-   * election unless {@link #offerLeadership(Leader)} is called again.
-   *
-   * @param leader the leader to notify of election and defeat events
-   * @throws JoinException if there was a problem joining the group
-   * @throws WatchException if there is a problem generating the 1st group 
membership list
-   * @throws InterruptedException if interrupted waiting to join the group and 
determine initial
-   *     election results
-   * @return a supplier that can be queried to find out if this leader is 
currently elected
-   */
-  public Supplier<Boolean> offerLeadership(Leader leader)
-        throws JoinException, WatchException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
deleted file mode 100644
index 98b5ee4..0000000
--- 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.Group.Membership;
-import org.apache.aurora.common.zookeeper.Group.WatchException;
-import 
org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements leader election for small groups of candidates.  This 
implementation is subject to the
- * <a 
href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection";>
- * herd effect</a> for a given group and should only be used for small (~10 
member) candidate pools.
- */
-public class CandidateImpl implements Candidate {
-  private static final Logger LOG = 
LoggerFactory.getLogger(CandidateImpl.class);
-
-  private static final byte[] UNKNOWN_CANDIDATE_DATA = 
"<unknown>".getBytes(Charsets.UTF_8);
-
-  private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = () -> {
-    try {
-      return InetAddress.getLocalHost().getHostAddress().getBytes();
-    } catch (UnknownHostException e) {
-      LOG.warn("Failed to determine local address!", e);
-      return UNKNOWN_CANDIDATE_DATA;
-    }
-  };
-
-  private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE =
-      candidates -> Ordering.natural().min(candidates);
-
-  private final Group group;
-
-  /**
-   * Creates a candidate that can be used to offer leadership for the given 
{@code group}.
-   */
-  public CandidateImpl(Group group) {
-    this.group = Preconditions.checkNotNull(group);
-  }
-
-  @Override
-  public Optional<byte[]> getLeaderData()
-      throws ZooKeeperConnectionException, KeeperException, 
InterruptedException {
-
-    String leaderId = getLeader(group.getMemberIds());
-    return leaderId == null
-        ? Optional.<byte[]>absent()
-        : Optional.of(group.getMemberData(leaderId));
-  }
-
-  @Override
-  public Supplier<Boolean> offerLeadership(final Leader leader)
-      throws JoinException, WatchException, InterruptedException {
-
-    final Membership membership = group.join(IP_ADDRESS_DATA_SUPPLIER, 
leader::onDefeated);
-
-    final AtomicBoolean elected = new AtomicBoolean(false);
-    final AtomicBoolean abdicated = new AtomicBoolean(false);
-    group.watch(memberIds -> {
-      boolean noCandidates = Iterables.isEmpty(memberIds);
-      String memberId = membership.getMemberId();
-
-      if (noCandidates) {
-        LOG.warn("All candidates have temporarily left the group: " + group);
-      } else if (!Iterables.contains(memberIds, memberId)) {
-        LOG.error(
-            "Current member ID {} is not a candidate for leader, current 
voting: {}",
-            memberId, memberIds);
-      } else {
-        boolean electedLeader = memberId.equals(getLeader(memberIds));
-        boolean previouslyElected = elected.getAndSet(electedLeader);
-
-        if (!previouslyElected && electedLeader) {
-          LOG.info("Candidate {} is now leader of group: {}",
-              membership.getMemberPath(), memberIds);
-
-          leader.onElected(() -> {
-            membership.cancel();
-            abdicated.set(true);
-          });
-        } else if (!electedLeader) {
-          if (previouslyElected) {
-            leader.onDefeated();
-          }
-          LOG.info(
-              "Candidate {} waiting for the next leader election, current 
voting: {}",
-              membership.getMemberPath(), memberIds);
-        }
-      }
-    });
-
-    return () -> !abdicated.get() && elected.get();
-  }
-
-  @Nullable
-  private String getLeader(Iterable<String> memberIds) {
-    return Iterables.isEmpty(memberIds) ? null : 
MOST_RECENT_JUDGE.apply(memberIds);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/Encoding.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/Encoding.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/Encoding.java
new file mode 100644
index 0000000..204f5c4
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Encoding.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+
+/**
+ * Utility class for encoding and decoding data stored in ZooKeeper nodes.
+ */
+public class Encoding {
+  /**
+   * Encodes a {@link ServiceInstance} as a JSON object.
+   *
+   * This is the default encoding for service instance data in ZooKeeper.
+   */
+  public static final Codec<ServiceInstance> JSON_CODEC = new JsonCodec();
+
+  private Encoding() {
+    // Utility class.
+  }
+
+  /**
+   * Returns a serialized Thrift service instance object, with given endpoints 
and codec.
+   *
+   * @param serviceInstance the Thrift service instance object to be serialized
+   * @param codec the codec to use to serialize a Thrift service instance 
object
+   * @return byte array that contains a serialized Thrift service instance
+   */
+  static byte[] serializeServiceInstance(
+      ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws 
IOException {
+
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    codec.serialize(serviceInstance, output);
+    return output.toByteArray();
+  }
+
+  /**
+   * Serializes a service instance based on endpoints.
+   * @see #serializeServiceInstance(ServiceInstance, Codec)
+   *
+   * @param address the target address of the service instance
+   * @param additionalEndpoints additional endpoints of the service instance
+   * @param status service status
+   */
+  static byte[] serializeServiceInstance(
+      InetSocketAddress address,
+      Map<String, Endpoint> additionalEndpoints,
+      Status status,
+      Codec<ServiceInstance> codec) throws IOException {
+
+    ServiceInstance serviceInstance = new ServiceInstance(
+        new Endpoint(address.getHostName(), address.getPort()), 
additionalEndpoints, status);
+    return serializeServiceInstance(serviceInstance, codec);
+  }
+
+  /**
+   * Creates a service instance object deserialized from byte array.
+   *
+   * @param data the byte array contains a serialized Thrift service instance
+   * @param codec the codec to use to deserialize the byte array
+   */
+  static ServiceInstance deserializeServiceInstance(
+      byte[] data, Codec<ServiceInstance> codec) throws IOException {
+
+    return codec.deserialize(new ByteArrayInputStream(data));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
deleted file mode 100644
index 2720dd1..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
+++ /dev/null
@@ -1,674 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.base.Commands;
-import org.apache.aurora.common.base.ExceptionalSupplier;
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.aurora.common.util.BackoffHelper;
-import 
org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class exposes methods for joining and monitoring distributed groups.  
The groups this class
- * monitors are realized as persistent paths in ZooKeeper with ephemeral child 
nodes for
- * each member of a group.
- */
-public class Group {
-  private static final Logger LOG = LoggerFactory.getLogger(Group.class);
-
-  private static final Supplier<byte[]> NO_MEMBER_DATA = 
Suppliers.ofInstance(null);
-  private static final String DEFAULT_NODE_NAME_PREFIX = "member_";
-
-  private final ZooKeeperClient zkClient;
-  private final ImmutableList<ACL> acl;
-  private final String path;
-
-  private final NodeScheme nodeScheme;
-  private final Predicate<String> nodeNameFilter;
-
-  private final BackoffHelper backoffHelper;
-
-  /**
-   * Creates a group rooted at the given {@code path}.  Paths must be absolute 
and trailing or
-   * duplicate slashes will be normalized.  For example, all the following 
paths would create a
-   * group at the normalized path /my/distributed/group:
-   * <ul>
-   *   <li>/my/distributed/group
-   *   <li>/my/distributed/group/
-   *   <li>/my/distributed//group
-   * </ul>
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param acl the ACL to use for creating the persistent group path if it 
does not already exist
-   * @param path the absolute persistent path that represents this group
-   * @param nodeScheme the scheme that defines how nodes are created
-   */
-  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, 
NodeScheme nodeScheme) {
-    this.zkClient = Preconditions.checkNotNull(zkClient);
-    this.acl = ImmutableList.copyOf(acl);
-    this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path));
-
-    this.nodeScheme = Preconditions.checkNotNull(nodeScheme);
-    nodeNameFilter = Group.this.nodeScheme::isMember;
-
-    backoffHelper = new BackoffHelper();
-  }
-
-  /**
-   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} 
with a
-   * {@code namePrefix} of 'member_'.
-   */
-  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
-    this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX);
-  }
-
-  /**
-   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, 
NodeScheme)} with a
-   * {@link DefaultScheme} using {@code namePrefix}.
-   */
-  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, 
String namePrefix) {
-    this(zkClient, acl, path, new DefaultScheme(namePrefix));
-  }
-
-  public String getMemberPath(String memberId) {
-    return path + "/" + MorePreconditions.checkNotBlank(memberId);
-  }
-
-  public String getPath() {
-    return path;
-  }
-
-  public String getMemberId(String nodePath) {
-    MorePreconditions.checkNotBlank(nodePath);
-    Preconditions.checkArgument(nodePath.startsWith(path + "/"),
-        "Not a member of this group[%s]: %s", path, nodePath);
-
-    String memberId = StringUtils.substringAfterLast(nodePath, "/");
-    Preconditions.checkArgument(nodeScheme.isMember(memberId),
-        "Not a group member: %s", memberId);
-    return memberId;
-  }
-
-  /**
-   * Returns the current list of group member ids by querying ZooKeeper 
synchronously.
-   *
-   * @return the ids of all the present members of this group
-   * @throws ZooKeeperConnectionException if there was a problem connecting to 
ZooKeeper
-   * @throws KeeperException if there was a problem reading this group's 
member ids
-   * @throws InterruptedException if this thread is interrupted listing the 
group members
-   */
-  public Iterable<String> getMemberIds()
-      throws ZooKeeperConnectionException, KeeperException, 
InterruptedException {
-    return Iterables.filter(zkClient.get().getChildren(path, false), 
nodeNameFilter);
-  }
-
-  /**
-   * Gets the data for one of this groups members by querying ZooKeeper 
synchronously.
-   *
-   * @param memberId the id of the member whose data to retrieve
-   * @return the data associated with the {@code memberId}
-   * @throws ZooKeeperConnectionException if there was a problem connecting to 
ZooKeeper
-   * @throws KeeperException if there was a problem reading this member's data
-   * @throws InterruptedException if this thread is interrupted retrieving the 
member data
-   */
-  public byte[] getMemberData(String memberId)
-      throws ZooKeeperConnectionException, KeeperException, 
InterruptedException {
-    return zkClient.get().getData(getMemberPath(memberId), false, null);
-  }
-
-  /**
-   * Represents membership in a distributed group.
-   */
-  public interface Membership {
-
-    /**
-     * Returns the persistent ZooKeeper path that represents this group.
-     */
-    String getGroupPath();
-
-    /**
-     * Returns the id (ZooKeeper node name) of this group member.  May change 
over time if the
-     * ZooKeeper session expires.
-     */
-    String getMemberId();
-
-    /**
-     * Returns the full ZooKeeper path to this group member.  May change over 
time if the
-     * ZooKeeper session expires.
-     */
-    String getMemberPath();
-
-    /**
-     * Updates the membership data synchronously using the {@code 
Supplier<byte[]>} passed to
-     * {@link Group#join()}.
-     *
-     * @return the new membership data
-     * @throws UpdateException if there was a problem updating the membership 
data
-     */
-    byte[] updateMemberData() throws UpdateException;
-
-    /**
-     * Cancels group membership by deleting the associated ZooKeeper member 
node.
-     *
-     * @throws JoinException if there is a problem deleting the node
-     */
-    void cancel() throws JoinException;
-  }
-
-  /**
-   * Indicates an error joining a group.
-   */
-  public static class JoinException extends Exception {
-    public JoinException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Indicates an error updating a group member's data.
-   */
-  public static class UpdateException extends Exception {
-    public UpdateException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Equivalent to calling {@code join(null, null)}.
-   */
-  public final Membership join() throws JoinException, InterruptedException {
-    return join(NO_MEMBER_DATA, null);
-  }
-
-  /**
-   * Equivalent to calling {@code join(memberData, null)}.
-   */
-  public final Membership join(Supplier<byte[]> memberData)
-      throws JoinException, InterruptedException {
-
-    return join(memberData, null);
-  }
-
-  /**
-   * Equivalent to calling {@code join(null, onLoseMembership)}.
-   */
-  public final Membership join(@Nullable final Command onLoseMembership)
-      throws JoinException, InterruptedException {
-
-    return join(NO_MEMBER_DATA, onLoseMembership);
-  }
-
-  /**
-   * Joins this group and returns the resulting Membership when successful.  
Membership will be
-   * automatically cancelled when the current jvm process dies; however the 
returned Membership
-   * object can be used to cancel membership earlier.  Unless
-   * {@link Group.Membership#cancel()} is called the membership will
-   * be maintained by re-establishing it silently in the background.
-   *
-   * <p>Any {@code memberData} given is persisted in the member node in 
ZooKeeper.  If an
-   * {@code onLoseMembership} callback is supplied, it will be notified each 
time this member loses
-   * membership in the group.
-   *
-   * @param memberData a supplier of the data to store in the member node
-   * @param onLoseMembership a callback to notify when membership is lost
-   * @return a Membership object with the member details
-   * @throws JoinException if there was a problem joining the group
-   * @throws InterruptedException if this thread is interrupted awaiting 
completion of the join
-   */
-  public final Membership join(Supplier<byte[]> memberData, @Nullable Command 
onLoseMembership)
-      throws JoinException, InterruptedException {
-
-    Preconditions.checkNotNull(memberData);
-    ensurePersistentGroupPath();
-
-    final ActiveMembership groupJoiner = new ActiveMembership(memberData, 
onLoseMembership);
-    return backoffHelper.doUntilResult(() -> {
-      try {
-        return groupJoiner.join();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new JoinException("Interrupted trying to join group at path: " + 
path, e);
-      } catch (ZooKeeperConnectionException e) {
-        LOG.warn("Temporary error trying to join group at path: " + path, e);
-        return null;
-      } catch (KeeperException e) {
-        if (zkClient.shouldRetry(e)) {
-          LOG.warn("Temporary error trying to join group at path: " + path, e);
-          return null;
-        } else {
-          throw new JoinException("Problem joining partition group at path: " 
+ path, e);
-        }
-      }
-    });
-  }
-
-  private void ensurePersistentGroupPath() throws JoinException, 
InterruptedException {
-    backoffHelper.doUntilSuccess(() -> {
-      try {
-        ZooKeeperUtils.ensurePath(zkClient, acl, path);
-        return true;
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new JoinException("Interrupted trying to ensure group at path: " 
+ path, e);
-      } catch (ZooKeeperConnectionException e) {
-        LOG.warn("Problem connecting to ZooKeeper, retrying", e);
-        return false;
-      } catch (KeeperException e) {
-        if (zkClient.shouldRetry(e)) {
-          LOG.warn("Temporary error ensuring path: " + path, e);
-          return false;
-        } else {
-          throw new JoinException("Problem ensuring group at path: " + path, 
e);
-        }
-      }
-    });
-  }
-
-  private class ActiveMembership implements Membership {
-    private final Supplier<byte[]> memberData;
-    private final Command onLoseMembership;
-    private String nodePath;
-    private String memberId;
-    private volatile boolean cancelled;
-    private byte[] membershipData;
-
-    public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command 
onLoseMembership) {
-      this.memberData = memberData;
-      this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : 
onLoseMembership;
-    }
-
-    @Override
-    public String getGroupPath() {
-      return path;
-    }
-
-    @Override
-    public synchronized String getMemberId() {
-      return memberId;
-    }
-
-    @Override
-    public synchronized String getMemberPath() {
-      return nodePath;
-    }
-
-    @Override
-    public synchronized byte[] updateMemberData() throws UpdateException {
-      byte[] membershipData = memberData.get();
-      if (!ArrayUtils.isEquals(this.membershipData, membershipData)) {
-        try {
-          zkClient.get().setData(nodePath, membershipData, 
ZooKeeperUtils.ANY_VERSION);
-          this.membershipData = membershipData;
-        } catch (KeeperException e) {
-          throw new UpdateException("Problem updating membership data.", e);
-        } catch (InterruptedException e) {
-          throw new UpdateException("Interrupted attempting to update 
membership data.", e);
-        } catch (ZooKeeperConnectionException e) {
-          throw new UpdateException(
-              "Could not connect to the ZooKeeper cluster to update membership 
data.", e);
-        }
-      }
-      return membershipData;
-    }
-
-    @Override
-    public synchronized void cancel() throws JoinException {
-      if (!cancelled) {
-        try {
-          backoffHelper.doUntilSuccess(() -> {
-            try {
-              zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION);
-              return true;
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-              throw new JoinException("Interrupted trying to cancel 
membership: " + nodePath, e);
-            } catch (ZooKeeperConnectionException e) {
-              LOG.warn("Problem connecting to ZooKeeper, retrying", e);
-              return false;
-            } catch (NoNodeException e) {
-              LOG.info("Membership already cancelled, node at path: " + 
nodePath +
-                       " has been deleted");
-              return true;
-            } catch (KeeperException e) {
-              if (zkClient.shouldRetry(e)) {
-                LOG.warn("Temporary error cancelling membership: " + nodePath, 
e);
-                return false;
-              } else {
-                throw new JoinException("Problem cancelling membership: " + 
nodePath, e);
-              }
-            }
-          });
-          cancelled = true; // Prevent auto-re-join logic from undoing this 
cancel.
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new JoinException("Problem cancelling membership: " + 
nodePath, e);
-        }
-      }
-    }
-
-    private class CancelledException extends IllegalStateException { /* marker 
*/ }
-
-    synchronized Membership join()
-        throws ZooKeeperConnectionException, InterruptedException, 
KeeperException {
-
-      if (cancelled) {
-        throw new CancelledException();
-      }
-
-      if (nodePath == null) {
-        // Re-join if our ephemeral node goes away due to session expiry - 
only needs to be
-        // registered once.
-        zkClient.registerExpirationHandler(this::tryJoin);
-      }
-
-      byte[] membershipData = memberData.get();
-      String nodeName = nodeScheme.createName(membershipData);
-      CreateMode createMode = nodeScheme.isSequential()
-          ? CreateMode.EPHEMERAL_SEQUENTIAL
-          : CreateMode.EPHEMERAL;
-      nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, 
acl, createMode);
-      memberId = Group.this.getMemberId(nodePath);
-      LOG.info("Set group member ID to " + memberId);
-      this.membershipData = membershipData;
-
-      // Re-join if our ephemeral node goes away due to maliciousness.
-      zkClient.get().exists(nodePath, event -> {
-        if (event.getType() == EventType.NodeDeleted) {
-          tryJoin();
-        }
-      });
-
-      return this;
-    }
-
-    private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin =
-        () -> {
-          try {
-            join();
-            return true;
-          } catch (CancelledException e) {
-            // Lost a cancel race - that's ok.
-            return true;
-          } catch (ZooKeeperConnectionException e) {
-            LOG.warn("Problem connecting to ZooKeeper, retrying", e);
-            return false;
-          } catch (KeeperException e) {
-            if (zkClient.shouldRetry(e)) {
-              LOG.warn("Temporary error re-joining group: " + path, e);
-              return false;
-            } else {
-              throw new IllegalStateException("Permanent problem re-joining 
group: " + path, e);
-            }
-          }
-        };
-
-    private synchronized void tryJoin() {
-      onLoseMembership.execute();
-      try {
-        backoffHelper.doUntilSuccess(tryJoin);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(
-            String.format("Interrupted while trying to re-join group: %s, 
giving up", path), e);
-      }
-    }
-  }
-
-  /**
-   * An interface to an object that listens for changes to a group's 
membership.
-   */
-  public interface GroupChangeListener {
-
-    /**
-     * Called whenever group membership changes with the new list of member 
ids.
-     *
-     * @param memberIds the current member ids
-     */
-    void onGroupChange(Iterable<String> memberIds);
-  }
-
-  /**
-   * An interface that dictates the scheme to use for storing and filtering 
nodes that represent
-   * members of a distributed group.
-   */
-  public interface NodeScheme {
-    /**
-     * Determines if a child node is a member of a group by examining the 
node's name.
-     *
-     * @param nodeName the name of a child node found in a group
-     * @return {@code true} if {@code nodeName} identifies a group member in 
this scheme
-     */
-    boolean isMember(String nodeName);
-
-    /**
-     * Generates a node name for the node representing this process in the 
distributed group.
-     *
-     * @param membershipData the data that will be stored in this node
-     * @return the name for the node that will represent this process in the 
group
-     */
-    String createName(byte[] membershipData);
-
-    /**
-     * Indicates whether this scheme needs ephemeral sequential nodes or just 
ephemeral nodes.
-     *
-     * @return {@code true} if this scheme requires sequential node names; 
{@code false} otherwise
-     */
-    boolean isSequential();
-  }
-
-  /**
-   * Indicates an error watching a group.
-   */
-  public static class WatchException extends Exception {
-    public WatchException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Watches this group for the lifetime of this jvm process.  This method 
will block until the
-   * current group members are available, notify the {@code 
groupChangeListener} and then return.
-   * All further changes to the group membership will cause notifications on a 
background thread.
-   *
-   * @param groupChangeListener the listener to notify of group membership 
change events
-   * @return A command which, when executed, will stop watching the group.
-   * @throws WatchException if there is a problem generating the 1st group 
membership list
-   * @throws InterruptedException if interrupted waiting to gather the 1st 
group membership list
-   */
-  public final Command watch(final GroupChangeListener groupChangeListener)
-      throws WatchException, InterruptedException {
-    Preconditions.checkNotNull(groupChangeListener);
-
-    try {
-      ensurePersistentGroupPath();
-    } catch (JoinException e) {
-      throw new WatchException("Failed to create group path: " + path, e);
-    }
-
-    final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener);
-    backoffHelper.doUntilSuccess(() -> {
-      try {
-        groupMonitor.watchGroup();
-        return true;
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new WatchException("Interrupted trying to watch group at path: " 
+ path, e);
-      } catch (ZooKeeperConnectionException e) {
-        LOG.warn("Temporary error trying to watch group at path: " + path, e);
-        return null;
-      } catch (KeeperException e) {
-        if (zkClient.shouldRetry(e)) {
-          LOG.warn("Temporary error trying to watch group at path: " + path, 
e);
-          return null;
-        } else {
-          throw new WatchException("Problem trying to watch group at path: " + 
path, e);
-        }
-      }
-    });
-    return groupMonitor::stopWatching;
-  }
-
-  /**
-   * Helps continuously monitor a group for membership changes.
-   */
-  private class GroupMonitor {
-    private final GroupChangeListener groupChangeListener;
-    private volatile boolean stopped = false;
-    private Set<String> members;
-
-    GroupMonitor(GroupChangeListener groupChangeListener) {
-      this.groupChangeListener = groupChangeListener;
-    }
-
-    private final Watcher groupWatcher = event -> {
-      if (event.getType() == EventType.NodeChildrenChanged) {
-        tryWatchGroup();
-      }
-    };
-
-    private final ExceptionalSupplier<Boolean, InterruptedException> 
tryWatchGroup =
-        () -> {
-          try {
-            watchGroup();
-            return true;
-          } catch (ZooKeeperConnectionException e) {
-            LOG.warn("Problem connecting to ZooKeeper, retrying", e);
-            return false;
-          } catch (KeeperException e) {
-            if (zkClient.shouldRetry(e)) {
-              LOG.warn("Temporary error re-watching group: " + path, e);
-              return false;
-            } else {
-              throw new IllegalStateException("Permanent problem re-watching 
group: " + path, e);
-            }
-          }
-        };
-
-    private void tryWatchGroup() {
-      if (stopped) {
-        return;
-      }
-
-      try {
-        backoffHelper.doUntilSuccess(tryWatchGroup);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(
-            String.format("Interrupted while trying to re-watch group: %s, 
giving up", path), e);
-      }
-    }
-
-    private void watchGroup()
-        throws ZooKeeperConnectionException, InterruptedException, 
KeeperException {
-
-      if (stopped) {
-        return;
-      }
-
-      List<String> children = zkClient.get().getChildren(path, groupWatcher);
-      setMembers(Iterables.filter(children, nodeNameFilter));
-    }
-
-    private void stopWatching() {
-      // TODO(William Farner): Cancel the watch when
-      // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved.
-      LOG.info("Stopping watch on " + this);
-      stopped = true;
-    }
-
-    synchronized void setMembers(Iterable<String> members) {
-      if (stopped) {
-        LOG.info("Suppressing membership update, no longer watching " + this);
-        return;
-      }
-
-      if (this.members == null) {
-        // Reset our watch on the group if session expires - only needs to be 
registered once.
-        zkClient.registerExpirationHandler(this::tryWatchGroup);
-      }
-
-      Set<String> membership = ImmutableSet.copyOf(members);
-      if (!membership.equals(this.members)) {
-        groupChangeListener.onGroupChange(members);
-        this.members = membership;
-      }
-    }
-  }
-
-  /**
-   * Default naming scheme implementation. Stores nodes at [given path] + "/" 
+ [given prefix] +
-   * ZooKeeper-generated member ID. For example, if the path is 
"/discovery/servicename", and the
-   * prefix is "member_", the node's full path will look something like
-   * {@code /discovery/servicename/member_0000000007}.
-   */
-  public static class DefaultScheme implements NodeScheme {
-    private final String namePrefix;
-    private final Pattern namePattern;
-
-    /**
-     * Creates a sequential node scheme based on the given node name prefix.
-     *
-     * @param namePrefix the prefix for the names of the member nodes
-     */
-    public DefaultScheme(String namePrefix) {
-      this.namePrefix = MorePreconditions.checkNotBlank(namePrefix);
-      namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + 
"-?[0-9]+$");
-    }
-
-    @Override
-    public boolean isMember(String nodeName) {
-      return namePattern.matcher(nodeName).matches();
-    }
-
-    @Override
-    public String createName(byte[] membershipData) {
-      return namePrefix;
-    }
-
-    @Override
-    public boolean isSequential() {
-      return true;
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "Group " + path;
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
deleted file mode 100644
index aeea02d..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-import org.apache.aurora.common.io.Codec;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-
-/**
- * A logical set of servers registered in ZooKeeper.  Intended to be used by 
servers in a
- * common service to advertise their presence to server-set protocol-aware 
clients.
- *
- * Standard implementations should use the {@link #JSON_CODEC} to serialize 
the service instance
- * rendezvous data to zookeeper so that standard clients can interoperate.
- */
-public interface ServerSet {
-
-  /**
-   * Encodes a {@link ServiceInstance} as a JSON object.
-   *
-   * This is the default encoding for service instance data in ZooKeeper.
-   */
-  Codec<ServiceInstance> JSON_CODEC = new JsonCodec();
-
-  /**
-   * Attempts to join a server set for this logical service group.
-   *
-   * @param endpoint the primary service endpoint
-   * @param additionalEndpoints and additional endpoints keyed by their 
logical name
-   * @return an EndpointStatus object that allows the endpoint to adjust its 
status
-   * @throws JoinException if there was a problem joining the server set
-   * @throws InterruptedException if interrupted while waiting to join the 
server set
-   */
-  EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints)
-      throws JoinException, InterruptedException;
-
-  /**
-   * A handle to a service endpoint's status data that allows updating it to 
track current events.
-   */
-  interface EndpointStatus {
-
-    /**
-     * Removes the endpoint from the server set.
-     *
-     * @throws UpdateException if there was a problem leaving the ServerSet.
-     */
-    void leave() throws UpdateException;
-  }
-
-  /**
-   * Indicates an error updating a service's status information.
-   */
-  class UpdateException extends Exception {
-    public UpdateException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
deleted file mode 100644
index ace4980..0000000
--- 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Predicates;
-import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSortedSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.io.Codec;
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.apache.aurora.common.util.BackoffHelper;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * ZooKeeper-backed implementation of {@link ServerSet} and {@link 
DynamicHostSet}.
- */
-public class ServerSetImpl implements ServerSet, 
DynamicHostSet<ServiceInstance> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(ServerSetImpl.class);
-
-  private final ZooKeeperClient zkClient;
-  private final Group group;
-  private final Codec<ServiceInstance> codec;
-  private final BackoffHelper backoffHelper;
-
-  /**
-   * Creates a new ServerSet using open ZooKeeper node ACLs.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param path the name-service path of the service to connect to
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, String path) {
-    this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path);
-  }
-
-  /**
-   * Creates a new ServerSet for the given service {@code path}.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param acl the ACL to use for creating the persistent group path if it 
does not already exist
-   * @param path the name-service path of the service to connect to
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String 
path) {
-    this(zkClient, new Group(zkClient, acl, path), JSON_CODEC);
-  }
-
-  /**
-   * Creates a new ServerSet using the given service {@code group}.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param group the server group
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, Group group) {
-    this(zkClient, group, JSON_CODEC);
-  }
-
-  /**
-   * Creates a new ServerSet using the given service {@code group} and a 
custom {@code codec}.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param group the server group
-   * @param codec a codec to use for serializing and de-serializing the 
ServiceInstance data to and
-   *     from a byte array
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, Group group, 
Codec<ServiceInstance> codec) {
-    this.zkClient = checkNotNull(zkClient);
-    this.group = checkNotNull(group);
-    this.codec = checkNotNull(codec);
-
-    // TODO(John Sirois): Inject the helper so that backoff strategy can be 
configurable.
-    backoffHelper = new BackoffHelper();
-  }
-
-  @VisibleForTesting
-  ZooKeeperClient getZkClient() {
-    return zkClient;
-  }
-
-  @Override
-  public EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints)
-      throws Group.JoinException, InterruptedException {
-
-    checkNotNull(endpoint);
-    checkNotNull(additionalEndpoints);
-
-    MemberStatus memberStatus = new MemberStatus(endpoint, 
additionalEndpoints);
-    Supplier<byte[]> serviceInstanceSupplier = 
memberStatus::serializeServiceInstance;
-    Group.Membership membership = group.join(serviceInstanceSupplier);
-
-    return () -> memberStatus.leave(membership);
-  }
-
-  @Override
-  public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws 
MonitorException {
-    ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, 
monitor);
-    try {
-      return serverSetWatcher.watch();
-    } catch (Group.WatchException e) {
-      throw new MonitorException("ZooKeeper watch failed.", e);
-    } catch (InterruptedException e) {
-      throw new MonitorException("Interrupted while watching ZooKeeper.", e);
-    }
-  }
-
-  private class MemberStatus {
-    private final InetSocketAddress endpoint;
-    private final Map<String, InetSocketAddress> additionalEndpoints;
-
-    private MemberStatus(
-        InetSocketAddress endpoint,
-        Map<String, InetSocketAddress> additionalEndpoints) {
-
-      this.endpoint = endpoint;
-      this.additionalEndpoints = additionalEndpoints;
-    }
-
-    synchronized void leave(Group.Membership membership) throws 
UpdateException {
-      try {
-        membership.cancel();
-      } catch (Group.JoinException e) {
-        throw new UpdateException(
-            "Failed to auto-cancel group membership on transition to DEAD 
status", e);
-      }
-    }
-
-    byte[] serializeServiceInstance() {
-      ServiceInstance serviceInstance = new ServiceInstance(
-          ServerSets.toEndpoint(endpoint),
-          Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT),
-          Status.ALIVE);
-
-      LOG.debug("updating endpoint data to:\n\t" + serviceInstance);
-      try {
-        return ServerSets.serializeServiceInstance(serviceInstance, codec);
-      } catch (IOException e) {
-        throw new IllegalStateException("Unexpected problem serializing thrift 
struct " +
-            serviceInstance + "to a byte[]", e);
-      }
-    }
-  }
-
-  private static class ServiceInstanceFetchException extends RuntimeException {
-    ServiceInstanceFetchException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  private static class ServiceInstanceDeletedException extends 
RuntimeException {
-    ServiceInstanceDeletedException(String path) {
-      super(path);
-    }
-  }
-
-  private class ServerSetWatcher {
-    private final ZooKeeperClient zkClient;
-    private final HostChangeMonitor<ServiceInstance> monitor;
-    @Nullable private ImmutableSet<ServiceInstance> serverSet;
-
-    ServerSetWatcher(ZooKeeperClient zkClient, 
HostChangeMonitor<ServiceInstance> monitor) {
-      this.zkClient = zkClient;
-      this.monitor = monitor;
-    }
-
-    public Command watch() throws Group.WatchException, InterruptedException {
-      Watcher onExpirationWatcher = 
zkClient.registerExpirationHandler(this::rebuildServerSet);
-
-      try {
-        return group.watch(this::notifyGroupChange);
-      } catch (Group.WatchException e) {
-        zkClient.unregister(onExpirationWatcher);
-        throw e;
-      } catch (InterruptedException e) {
-        zkClient.unregister(onExpirationWatcher);
-        throw e;
-      }
-    }
-
-    private ServiceInstance getServiceInstance(final String nodePath) {
-      try {
-        return backoffHelper.doUntilResult(() -> {
-          try {
-            byte[] data = zkClient.get().getData(nodePath, false, null);
-            return ServerSets.deserializeServiceInstance(data, codec);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new ServiceInstanceFetchException(
-                "Interrupted updating service data for: " + nodePath, e);
-          } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            LOG.warn("Temporary error trying to updating service data for: " + 
nodePath, e);
-            return null;
-          } catch (NoNodeException e) {
-            invalidateNodePath(nodePath);
-            throw new ServiceInstanceDeletedException(nodePath);
-          } catch (KeeperException e) {
-            if (zkClient.shouldRetry(e)) {
-              LOG.warn("Temporary error trying to update service data for: " + 
nodePath, e);
-              return null;
-            } else {
-              throw new ServiceInstanceFetchException(
-                  "Failed to update service data for: " + nodePath, e);
-            }
-          } catch (IOException e) {
-            throw new ServiceInstanceFetchException(
-                "Failed to deserialize the ServiceInstance data for: " + 
nodePath, e);
-          }
-        });
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new ServiceInstanceFetchException(
-            "Interrupted trying to update service data for: " + nodePath, e);
-      }
-    }
-
-    private final LoadingCache<String, ServiceInstance> servicesByMemberId =
-        CacheBuilder.newBuilder().build(new CacheLoader<String, 
ServiceInstance>() {
-          @Override public ServiceInstance load(String memberId) {
-            return getServiceInstance(group.getMemberPath(memberId));
-          }
-        });
-
-    private void rebuildServerSet() {
-      Set<String> memberIds = 
ImmutableSet.copyOf(servicesByMemberId.asMap().keySet());
-      servicesByMemberId.invalidateAll();
-      notifyGroupChange(memberIds);
-    }
-
-    private String invalidateNodePath(String deletedPath) {
-      String memberId = group.getMemberId(deletedPath);
-      servicesByMemberId.invalidate(memberId);
-      return memberId;
-    }
-
-    private final Function<String, ServiceInstance> MAYBE_FETCH_NODE =
-        memberId -> {
-          // This get will trigger a fetch
-          try {
-            return servicesByMemberId.getUnchecked(memberId);
-          } catch (UncheckedExecutionException e) {
-            Throwable cause = e.getCause();
-            if (!(cause instanceof ServiceInstanceDeletedException)) {
-              Throwables.propagateIfInstanceOf(cause, 
ServiceInstanceFetchException.class);
-              throw new IllegalStateException(
-                  "Unexpected error fetching member data for: " + memberId, e);
-            }
-            return null;
-          }
-        };
-
-    private synchronized void notifyGroupChange(Iterable<String> memberIds) {
-      ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds);
-      Set<String> existingMemberIds = servicesByMemberId.asMap().keySet();
-
-      // Ignore no-op state changes except for the 1st when we've seen no 
group yet.
-      if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) {
-        SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, 
newMemberIds);
-        // Implicit removal from servicesByMemberId.
-        existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds));
-
-        Iterable<ServiceInstance> serviceInstances = Iterables.filter(
-            Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), 
Predicates.notNull());
-
-        notifyServerSetChange(ImmutableSet.copyOf(serviceInstances));
-      }
-    }
-
-    private void notifyServerSetChange(ImmutableSet<ServiceInstance> 
currentServerSet) {
-      // ZK nodes may have changed if there was a session expiry for a server 
in the server set, but
-      // if the server's status has not changed, we can skip any onChange 
updates.
-      if (!currentServerSet.equals(serverSet)) {
-        if (currentServerSet.isEmpty()) {
-          LOG.warn("server set empty for path " + group.getPath());
-        } else {
-          if (serverSet == null) {
-            LOG.info("received initial membership {}", currentServerSet);
-          } else {
-            logChange(currentServerSet);
-          }
-        }
-        serverSet = currentServerSet;
-        monitor.onChange(serverSet);
-      }
-    }
-
-    private void logChange(ImmutableSet<ServiceInstance> newServerSet) {
-      StringBuilder message = new StringBuilder("server set " + 
group.getPath() + " change: ");
-      if (serverSet.size() != newServerSet.size()) {
-        message.append("from ").append(serverSet.size())
-            .append(" members to ").append(newServerSet.size());
-      }
-
-      Joiner joiner = Joiner.on("\n\t\t");
-
-      SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet);
-      if (!left.isEmpty()) {
-        message.append("\n\tleft:\n\t\t").append(joiner.join(left));
-      }
-
-      SetView<ServiceInstance> joined = Sets.difference(newServerSet, 
serverSet);
-      if (!joined.isEmpty()) {
-        message.append("\n\tjoined:\n\t\t").append(joiner.join(joined));
-      }
-
-      LOG.info(message.toString());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
deleted file mode 100644
index 01a54a5..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.aurora.common.io.Codec;
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.apache.zookeeper.data.ACL;
-
-/**
- * Common ServerSet related functions
- */
-public class ServerSets {
-
-  private ServerSets() {
-    // Utility class.
-  }
-
-  /**
-   * A function that invokes {@link #toEndpoint(InetSocketAddress)}.
-   */
-  public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT =
-      ServerSets::toEndpoint;
-
-  /**
-   * Creates a server set that registers at a single path applying the given 
ACL to all nodes
-   * created in the path.
-   *
-   * @param zkClient ZooKeeper client to register with.
-   * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet 
creates.
-   * @param zkPath Path to register at.  @see #create(ZooKeeperClient, 
java.util.Set)
-   * @return A server set that registers at {@code zkPath}.
-   */
-  public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, 
String zkPath) {
-    Preconditions.checkNotNull(zkClient);
-    MorePreconditions.checkNotBlank(acl);
-    MorePreconditions.checkNotBlank(zkPath);
-
-    return new ServerSetImpl(zkClient, acl, zkPath);
-  }
-
-  /**
-   * Returns a serialized Thrift service instance object, with given endpoints 
and codec.
-   *
-   * @param serviceInstance the Thrift service instance object to be serialized
-   * @param codec the codec to use to serialize a Thrift service instance 
object
-   * @return byte array that contains a serialized Thrift service instance
-   */
-  public static byte[] serializeServiceInstance(
-      ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws 
IOException {
-
-    ByteArrayOutputStream output = new ByteArrayOutputStream();
-    codec.serialize(serviceInstance, output);
-    return output.toByteArray();
-  }
-
-  /**
-   * Serializes a service instance based on endpoints.
-   * @see #serializeServiceInstance(ServiceInstance, Codec)
-   *
-   * @param address the target address of the service instance
-   * @param additionalEndpoints additional endpoints of the service instance
-   * @param status service status
-   */
-  public static byte[] serializeServiceInstance(
-      InetSocketAddress address,
-      Map<String, Endpoint> additionalEndpoints,
-      Status status,
-      Codec<ServiceInstance> codec) throws IOException {
-
-    ServiceInstance serviceInstance =
-        new ServiceInstance(toEndpoint(address), additionalEndpoints, status);
-    return serializeServiceInstance(serviceInstance, codec);
-  }
-
-  /**
-   * Creates a service instance object deserialized from byte array.
-   *
-   * @param data the byte array contains a serialized Thrift service instance
-   * @param codec the codec to use to deserialize the byte array
-   */
-  public static ServiceInstance deserializeServiceInstance(
-      byte[] data, Codec<ServiceInstance> codec) throws IOException {
-
-    return codec.deserialize(new ByteArrayInputStream(data));
-  }
-
-  /**
-   * Creates an endpoint for the given InetSocketAddress.
-   *
-   * @param address the target address to create the endpoint for
-   */
-  public static Endpoint toEndpoint(InetSocketAddress address) {
-    return new Endpoint(address.getHostName(), address.getPort());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
deleted file mode 100644
index d9978a9..0000000
--- 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.zookeeper.Candidate.Leader;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.zookeeper.data.ACL;
-
-public class SingletonServiceImpl implements SingletonService {
-  @VisibleForTesting
-  static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_";
-
-  /**
-   * Creates a candidate that can be combined with an existing server set to 
form a singleton
-   * service using {@link #SingletonServiceImpl(ServerSet, Candidate)}.
-   *
-   * @param zkClient The ZooKeeper client to use.
-   * @param servicePath The path where service nodes live.
-   * @param acl The acl to apply to newly created candidate nodes and 
serverset nodes.
-   * @return A candidate that can be housed with a standard server set under a 
single zk path.
-   */
-  public static Candidate createSingletonCandidate(
-      ZooKeeperClient zkClient,
-      String servicePath,
-      Iterable<ACL> acl) {
-
-    return new CandidateImpl(new Group(zkClient, acl, servicePath, 
LEADER_ELECT_NODE_PREFIX));
-  }
-
-  private final ServerSet serverSet;
-  private final Candidate candidate;
-
-  /**
-   * Creates a new singleton service that uses the supplied candidate to vie 
for leadership and then
-   * advertises itself in the given server set once elected.
-   *
-   * @param serverSet The server set to advertise in on election.
-   * @param candidate The candidacy to use to vie for election.
-   */
-  public SingletonServiceImpl(ServerSet serverSet, Candidate candidate) {
-    this.serverSet = Preconditions.checkNotNull(serverSet);
-    this.candidate = Preconditions.checkNotNull(candidate);
-  }
-
-  @Override
-  public void lead(final InetSocketAddress endpoint,
-                   final Map<String, InetSocketAddress> additionalEndpoints,
-                   final LeadershipListener listener)
-                   throws LeadException, InterruptedException {
-
-    Preconditions.checkNotNull(listener);
-
-    try {
-      candidate.offerLeadership(new Leader() {
-        @Override public void onElected(final 
ExceptionalCommand<JoinException> abdicate) {
-          listener.onLeading(new LeaderControl() {
-            ServerSet.EndpointStatus endpointStatus = null;
-            final AtomicBoolean left = new AtomicBoolean(false);
-
-            // Methods are synchronized to prevent simultaneous invocations.
-            @Override public synchronized void advertise()
-                throws AdvertiseException, InterruptedException {
-
-              Preconditions.checkState(!left.get(), "Cannot advertise after 
leaving.");
-              Preconditions.checkState(endpointStatus == null, "Cannot 
advertise more than once.");
-              try {
-                endpointStatus = serverSet.join(endpoint, additionalEndpoints);
-              } catch (JoinException e) {
-                throw new AdvertiseException("Problem advertising endpoint " + 
endpoint, e);
-              }
-            }
-
-            @Override public synchronized void leave() throws LeaveException {
-              Preconditions.checkState(left.compareAndSet(false, true),
-                  "Cannot leave more than once.");
-              if (endpointStatus != null) {
-                try {
-                  endpointStatus.leave();
-                } catch (ServerSet.UpdateException e) {
-                  throw new LeaveException("Problem updating endpoint status 
for abdicating leader " +
-                      "at endpoint " + endpoint, e);
-                }
-              }
-              try {
-                abdicate.execute();
-              } catch (JoinException e) {
-                throw new LeaveException("Problem abdicating leadership for 
endpoint " + endpoint, e);
-              }
-            }
-          });
-        }
-
-        @Override public void onDefeated() {
-          listener.onDefeated();
-        }
-      });
-    } catch (JoinException e) {
-      throw new LeadException("Problem joining leadership group for endpoint " 
+ endpoint, e);
-    } catch (Group.WatchException e) {
-      throw new LeadException("Problem getting initial membership list for 
leadership group.", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
deleted file mode 100644
index ce243fb..0000000
--- 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.net.InetSocketAddressHelper;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.common.PathUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manages a connection to a ZooKeeper cluster.
- */
-public class ZooKeeperClient {
-
-  /**
-   * Indicates an error connecting to a zookeeper cluster.
-   */
-  public class ZooKeeperConnectionException extends Exception {
-    ZooKeeperConnectionException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  private final class SessionState {
-    private final long sessionId;
-    private final byte[] sessionPasswd;
-
-    private SessionState(long sessionId, byte[] sessionPasswd) {
-      this.sessionId = sessionId;
-      this.sessionPasswd = sessionPasswd;
-    }
-  }
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperClient.class);
-
-  private static final Amount<Long,Time> WAIT_FOREVER = Amount.of(0L, 
Time.MILLISECONDS);
-
-  private final int sessionTimeoutMs;
-  private final Optional<Credentials> credentials;
-  private final String zooKeeperServers;
-  // GuardedBy "this", but still volatile for tests, where we want to be able 
to see writes
-  // made from within long synchronized blocks.
-  private volatile ZooKeeper zooKeeper;
-  private SessionState sessionState;
-
-  private final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>();
-  private final BlockingQueue<WatchedEvent> eventQueue = new 
LinkedBlockingQueue<WatchedEvent>();
-
-  private static Iterable<InetSocketAddress> combine(InetSocketAddress address,
-      InetSocketAddress... addresses) {
-    return 
ImmutableSet.<InetSocketAddress>builder().add(address).add(addresses).build();
-  }
-
-  /**
-   * Creates an unconnected client that will lazily attempt to connect on the 
first call to
-   * {@link #get()}.
-   *
-   * @param sessionTimeout the ZK session timeout
-   * @param zooKeeperServer the first, required ZK server
-   * @param zooKeeperServers any additional servers forming the ZK cluster
-   */
-  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, 
InetSocketAddress zooKeeperServer,
-      InetSocketAddress... zooKeeperServers) {
-    this(sessionTimeout, combine(zooKeeperServer, zooKeeperServers));
-  }
-
-  /**
-   * Creates an unconnected client that will lazily attempt to connect on the 
first call to
-   * {@link #get}.
-   *
-   * @param sessionTimeout the ZK session timeout
-   * @param zooKeeperServers the set of servers forming the ZK cluster
-   */
-  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout,
-      Iterable<InetSocketAddress> zooKeeperServers) {
-    this(sessionTimeout, Optional.absent(), Optional.absent(), 
zooKeeperServers);
-  }
-
-  /**
-   * Creates an unconnected client that will lazily attempt to connect on the 
first call to
-   * {@link #get()}.  All successful connections will be authenticated with 
the given
-   * {@code credentials}.
-   *
-   * @param sessionTimeout the ZK session timeout
-   * @param credentials the credentials to authenticate with
-   * @param zooKeeperServer the first, required ZK server
-   * @param zooKeeperServers any additional servers forming the ZK cluster
-   */
-  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials 
credentials,
-      InetSocketAddress zooKeeperServer, InetSocketAddress... 
zooKeeperServers) {
-    this(sessionTimeout,
-        Optional.of(credentials),
-        Optional.absent(),
-        combine(zooKeeperServer, zooKeeperServers));
-  }
-
-  /**
-   * Creates an unconnected client that will lazily attempt to connect on the 
first call to
-   * {@link #get}.  All successful connections will be authenticated with the 
given
-   * {@code credentials}.
-   *
-   * @param sessionTimeout the ZK session timeout
-   * @param credentials the credentials to authenticate with
-   * @param zooKeeperServers the set of servers forming the ZK cluster
-   */
-  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials 
credentials,
-      Iterable<InetSocketAddress> zooKeeperServers) {
-        this(sessionTimeout,
-            Optional.of(credentials),
-            Optional.absent(),
-            zooKeeperServers);
-      }
-
-  /**
-   * Creates an unconnected client that will lazily attempt to connect on the 
first call to
-   * {@link #get}.  All successful connections will be authenticated with the 
given
-   * {@code credentials}.
-   *
-   * @param sessionTimeout the ZK session timeout
-   * @param credentials the credentials to authenticate with
-   * @param chrootPath an optional chroot path
-   * @param zooKeeperServers the set of servers forming the ZK cluster
-   */
-  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, 
Optional<Credentials> credentials,
-      Optional<String> chrootPath, Iterable<InetSocketAddress> 
zooKeeperServers) {
-    this.sessionTimeoutMs = 
Preconditions.checkNotNull(sessionTimeout).as(Time.MILLISECONDS);
-    this.credentials = Preconditions.checkNotNull(credentials);
-
-    if (chrootPath.isPresent()) {
-      PathUtils.validatePath(chrootPath.get());
-    }
-
-    Preconditions.checkNotNull(zooKeeperServers);
-    Preconditions.checkArgument(!Iterables.isEmpty(zooKeeperServers),
-        "Must present at least 1 ZK server");
-
-    Thread watcherProcessor = new Thread("ZookeeperClient-watcherProcessor") {
-      @Override
-      public void run() {
-        while (true) {
-          try {
-            WatchedEvent event = eventQueue.take();
-            for (Watcher watcher : watchers) {
-              watcher.process(event);
-            }
-          } catch (InterruptedException e) { /* ignore */ }
-        }
-      }
-    };
-    watcherProcessor.setDaemon(true);
-    watcherProcessor.start();
-
-    Iterable<String> servers =
-        Iterables.transform(ImmutableSet.copyOf(zooKeeperServers),
-            InetSocketAddressHelper::toString);
-    this.zooKeeperServers = 
Joiner.on(',').join(servers).concat(chrootPath.or(""));
-  }
-
-  /**
-   * Returns the current active ZK connection or establishes a new one if none 
has yet been
-   * established or a previous connection was disconnected or had its session 
time out.  This method
-   * will attempt to re-use sessions when possible.  Equivalent to:
-   * <pre>get(Amount.of(0L, ...)</pre>.
-   *
-   * @return a connected ZooKeeper client
-   * @throws ZooKeeperConnectionException if there was a problem connecting to 
the ZK cluster
-   * @throws InterruptedException if interrupted while waiting for a 
connection to be established
-   */
-  public synchronized ZooKeeper get() throws ZooKeeperConnectionException, 
InterruptedException {
-    try {
-      return get(WAIT_FOREVER);
-    } catch (TimeoutException e) {
-      InterruptedException interruptedException =
-          new InterruptedException("Got an unexpected TimeoutException for 0 
wait");
-      interruptedException.initCause(e);
-      throw interruptedException;
-    }
-  }
-
-  /**
-   * Returns the current active ZK connection or establishes a new one if none 
has yet been
-   * established or a previous connection was disconnected or had its session 
time out.  This
-   * method will attempt to re-use sessions when possible.
-   *
-   * @param connectionTimeout the maximum amount of time to wait for the 
connection to the ZK
-   *     cluster to be established; 0 to wait forever
-   * @return a connected ZooKeeper client
-   * @throws ZooKeeperConnectionException if there was a problem connecting to 
the ZK cluster
-   * @throws InterruptedException if interrupted while waiting for a 
connection to be established
-   * @throws TimeoutException if a connection could not be established within 
the configured
-   *     session timeout
-   */
-  public synchronized ZooKeeper get(Amount<Long, Time> connectionTimeout)
-      throws ZooKeeperConnectionException, InterruptedException, 
TimeoutException {
-
-    if (zooKeeper == null) {
-      final CountDownLatch connected = new CountDownLatch(1);
-      Watcher watcher = event -> {
-        switch (event.getType()) {
-          // Guard the None type since this watch may be used as the default 
watch on calls by
-          // the client outside our control.
-          case None:
-            switch (event.getState()) {
-              case Expired:
-                LOG.info("Zookeeper session expired. Event: " + event);
-                close();
-                break;
-              case SyncConnected:
-                connected.countDown();
-                break;
-            }
-        }
-
-        eventQueue.offer(event);
-      };
-
-      try {
-        zooKeeper = (sessionState != null)
-          ? new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher, 
sessionState.sessionId,
-            sessionState.sessionPasswd)
-          : new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher);
-      } catch (IOException e) {
-        throw new ZooKeeperConnectionException(
-            "Problem connecting to servers: " + zooKeeperServers, e);
-      }
-
-      if (connectionTimeout.getValue() > 0) {
-        if (!connected.await(connectionTimeout.as(Time.MILLISECONDS), 
TimeUnit.MILLISECONDS)) {
-          close();
-          throw new TimeoutException("Timed out waiting for a ZK connection 
after "
-                                     + connectionTimeout);
-        }
-      } else {
-        try {
-          connected.await();
-        } catch (InterruptedException ex) {
-          LOG.info("Interrupted while waiting to connect to zooKeeper");
-          close();
-          throw ex;
-        }
-      }
-      if (credentials.isPresent()) {
-        Credentials zkCredentials = credentials.get();
-        zooKeeper.addAuthInfo(zkCredentials.scheme(), 
zkCredentials.authToken());
-      }
-
-      sessionState = new SessionState(zooKeeper.getSessionId(), 
zooKeeper.getSessionPasswd());
-    }
-    return zooKeeper;
-  }
-
-  /**
-   * Clients that need to re-establish state after session expiration can 
register an
-   * {@code onExpired} command to execute.
-   *
-   * @param onExpired the {@code Command} to register
-   * @return the new {@link Watcher} which can later be passed to {@link 
#unregister} for
-   *     removal.
-   */
-  public Watcher registerExpirationHandler(final Command onExpired) {
-    Watcher watcher = event -> {
-      if (event.getType() == EventType.None && event.getState() == 
KeeperState.Expired) {
-        onExpired.execute();
-      }
-    };
-    register(watcher);
-    return watcher;
-  }
-
-  /**
-   * Clients that need to register a top-level {@code Watcher} should do so 
using this method.  The
-   * registered {@code watcher} will remain registered across re-connects and 
session expiration
-   * events.
-   *
-   * @param watcher the {@code Watcher to register}
-   */
-  public void register(Watcher watcher) {
-    watchers.add(watcher);
-  }
-
-  /**
-   * Clients can attempt to unregister a top-level {@code Watcher} that has 
previously been
-   * registered.
-   *
-   * @param watcher the {@code Watcher} to unregister as a top-level, 
persistent watch
-   * @return whether the given {@code Watcher} was found and removed from the 
active set
-   */
-  public boolean unregister(Watcher watcher) {
-    return watchers.remove(watcher);
-  }
-
-  /**
-   * Checks to see if the client might reasonably re-try an operation given 
the exception thrown
-   * while attempting it.  If the ZooKeeper session should be expired to 
enable the re-try to
-   * succeed this method will expire it as a side-effect.
-   *
-   * @param e the exception to test
-   * @return true if a retry can be attempted
-   */
-  public boolean shouldRetry(KeeperException e) {
-    if (e instanceof SessionExpiredException) {
-      close();
-    }
-    return ZooKeeperUtils.isRetryable(e);
-  }
-
-  /**
-   * Closes the current connection if any expiring the current ZooKeeper 
session.  Any subsequent
-   * calls to this method will no-op until the next successful {@link #get}.
-   */
-  public synchronized void close() {
-    if (zooKeeper != null) {
-      try {
-        zooKeeper.close();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOG.warn("Interrupted trying to close zooKeeper");
-      } finally {
-        zooKeeper = null;
-        sessionState = null;
-      }
-    }
-  }
-
-  @VisibleForTesting
-  synchronized boolean isClosed() {
-    return zooKeeper == null;
-  }
-
-  @VisibleForTesting
-  ZooKeeper getZooKeeperClientForTests() {
-    return zooKeeper;
-  }
-}

Reply via email to