http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
new file mode 100644
index 0000000..a032aa3
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java
@@ -0,0 +1,491 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.PathUtils;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.net.InetSocketAddressHelper;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+
+/**
+ * Manages a connection to a ZooKeeper cluster.
+ */
+public class ZooKeeperClient {
+
+  /**
+   * Indicates an error connecting to a zookeeper cluster.
+   */
+  public class ZooKeeperConnectionException extends Exception {
+    public ZooKeeperConnectionException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Encapsulates a user's credentials and has the ability to authenticate 
them through a
+   * {@link ZooKeeper} client.
+   */
+  public interface Credentials {
+
+    /**
+     * A set of {@code Credentials} that performs no authentication.
+     */
+    Credentials NONE = new Credentials() {
+      @Override public void authenticate(ZooKeeper zooKeeper) {
+        // noop
+      }
+
+      @Override public String scheme() {
+        return null;
+      }
+
+      @Override public byte[] authToken() {
+        return null;
+      }
+    };
+
+    /**
+     * Authenticates these credentials against the given {@code ZooKeeper} 
client.
+     *
+     * @param zooKeeper the client to authenticate
+     */
+    void authenticate(ZooKeeper zooKeeper);
+
+    /**
+     * Returns the authentication scheme these credentials are for.
+     *
+     * @return the scheme these credentials are for or {@code null} if no 
authentication is
+     *     intended.
+     */
+    @Nullable
+    String scheme();
+
+    /**
+     * Returns the authentication token.
+     *
+     * @return the authentication token or {@code null} if no authentication 
is intended.
+     */
+    @Nullable
+    byte[] authToken();
+  }
+
+  /**
+   * Creates a set of credentials for the zoo keeper digest authentication 
mechanism.
+   *
+   * @param username the username to authenticate with
+   * @param password the password to authenticate with
+   * @return a set of credentials that can be used to authenticate the zoo 
keeper client
+   */
+  public static Credentials digestCredentials(String username, String 
password) {
+    MorePreconditions.checkNotBlank(username);
+    Preconditions.checkNotNull(password);
+
+    // TODO(John Sirois): DigestAuthenticationProvider is broken - uses 
platform default charset
+    // (on server) and so we just have to hope here that clients are deployed 
in compatible jvms.
+    // Consider writing and installing a version of 
DigestAuthenticationProvider that controls its
+    // Charset explicitly.
+    return credentials("digest", (username + ":" + password).getBytes());
+  }
+
+  /**
+   * Creates a set of credentials for the given authentication {@code scheme}.
+   *
+   * @param scheme the scheme to authenticate with
+   * @param authToken the authentication token
+   * @return a set of credentials that can be used to authenticate the zoo 
keeper client
+   */
+  public static Credentials credentials(final String scheme, final byte[] 
authToken) {
+    MorePreconditions.checkNotBlank(scheme);
+    Preconditions.checkNotNull(authToken);
+
+    return new Credentials() {
+      @Override public void authenticate(ZooKeeper zooKeeper) {
+        zooKeeper.addAuthInfo(scheme, authToken);
+      }
+
+      @Override public String scheme() {
+        return scheme;
+      }
+
+      @Override public byte[] authToken() {
+        return authToken;
+      }
+
+      @Override public boolean equals(Object o) {
+        if (!(o instanceof Credentials)) {
+          return false;
+        }
+
+        Credentials other = (Credentials) o;
+        return new EqualsBuilder()
+            .append(scheme, other.scheme())
+            .append(authToken, other.authToken())
+            .isEquals();
+      }
+
+      @Override public int hashCode() {
+        return Objects.hashCode(scheme, authToken);
+      }
+    };
+  }
+
+  private final class SessionState {
+    private final long sessionId;
+    private final byte[] sessionPasswd;
+
+    private SessionState(long sessionId, byte[] sessionPasswd) {
+      this.sessionId = sessionId;
+      this.sessionPasswd = sessionPasswd;
+    }
+  }
+
+  private static final Logger LOG = 
Logger.getLogger(ZooKeeperClient.class.getName());
+
+  private static final Amount<Long,Time> WAIT_FOREVER = Amount.of(0L, 
Time.MILLISECONDS);
+
+  private final int sessionTimeoutMs;
+  private final Credentials credentials;
+  private final String zooKeeperServers;
+  // GuardedBy "this", but still volatile for tests, where we want to be able 
to see writes
+  // made from within long synchronized blocks.
+  private volatile ZooKeeper zooKeeper;
+  private SessionState sessionState;
+
+  private final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>();
+  private final BlockingQueue<WatchedEvent> eventQueue = new 
LinkedBlockingQueue<WatchedEvent>();
+
+  private static Iterable<InetSocketAddress> combine(InetSocketAddress address,
+      InetSocketAddress... addresses) {
+    return 
ImmutableSet.<InetSocketAddress>builder().add(address).add(addresses).build();
+  }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the 
first call to
+   * {@link #get()}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param zooKeeperServer the first, required ZK server
+   * @param zooKeeperServers any additional servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, 
InetSocketAddress zooKeeperServer,
+      InetSocketAddress... zooKeeperServers) {
+    this(sessionTimeout, combine(zooKeeperServer, zooKeeperServers));
+  }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the 
first call to
+   * {@link #get}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param zooKeeperServers the set of servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout,
+      Iterable<InetSocketAddress> zooKeeperServers) {
+    this(sessionTimeout, Credentials.NONE, Optional.<String> absent(), 
zooKeeperServers);
+  }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the 
first call to
+   * {@link #get()}.  All successful connections will be authenticated with 
the given
+   * {@code credentials}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param credentials the credentials to authenticate with
+   * @param zooKeeperServer the first, required ZK server
+   * @param zooKeeperServers any additional servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials 
credentials,
+      InetSocketAddress zooKeeperServer, InetSocketAddress... 
zooKeeperServers) {
+    this(sessionTimeout, credentials, Optional.<String> absent(), 
combine(zooKeeperServer, zooKeeperServers));
+  }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the 
first call to
+   * {@link #get}.  All successful connections will be authenticated with the 
given
+   * {@code credentials}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param credentials the credentials to authenticate with
+   * @param zooKeeperServers the set of servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials 
credentials,
+      Iterable<InetSocketAddress> zooKeeperServers) {
+        this(sessionTimeout, credentials, Optional.<String> absent(), 
zooKeeperServers);
+      }
+
+  /**
+   * Creates an unconnected client that will lazily attempt to connect on the 
first call to
+   * {@link #get}.  All successful connections will be authenticated with the 
given
+   * {@code credentials}.
+   *
+   * @param sessionTimeout the ZK session timeout
+   * @param credentials the credentials to authenticate with
+   * @param chrootPath an optional chroot path
+   * @param zooKeeperServers the set of servers forming the ZK cluster
+   */
+  public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials 
credentials,
+      Optional<String> chrootPath, Iterable<InetSocketAddress> 
zooKeeperServers) {
+    this.sessionTimeoutMs = 
Preconditions.checkNotNull(sessionTimeout).as(Time.MILLISECONDS);
+    this.credentials = Preconditions.checkNotNull(credentials);
+
+    if (chrootPath.isPresent()) {
+      PathUtils.validatePath(chrootPath.get());
+    }
+
+    Preconditions.checkNotNull(zooKeeperServers);
+    Preconditions.checkArgument(!Iterables.isEmpty(zooKeeperServers),
+        "Must present at least 1 ZK server");
+
+    Thread watcherProcessor = new Thread("ZookeeperClient-watcherProcessor") {
+      @Override
+      public void run() {
+        while (true) {
+          try {
+            WatchedEvent event = eventQueue.take();
+            for (Watcher watcher : watchers) {
+              watcher.process(event);
+            }
+          } catch (InterruptedException e) { /* ignore */ }
+        }
+      }
+    };
+    watcherProcessor.setDaemon(true);
+    watcherProcessor.start();
+
+    Iterable<String> servers =
+        Iterables.transform(ImmutableSet.copyOf(zooKeeperServers),
+            InetSocketAddressHelper.INET_TO_STR);
+    this.zooKeeperServers = 
Joiner.on(',').join(servers).concat(chrootPath.or(""));
+  }
+
+  /**
+   * Returns true if this client has non-empty credentials set.  For example, 
returns {@code false}
+   * if this client was constructed with {@link Credentials#NONE}.
+   *
+   * @return {@code true} if this client is configured with non-empty 
credentials.
+   */
+  public boolean hasCredentials() {
+    return !Strings.isNullOrEmpty(credentials.scheme()) && 
(credentials.authToken() != null);
+  }
+
+  /**
+   * Returns the current active ZK connection or establishes a new one if none 
has yet been
+   * established or a previous connection was disconnected or had its session 
time out.  This method
+   * will attempt to re-use sessions when possible.  Equivalent to:
+   * <pre>get(Amount.of(0L, ...)</pre>.
+   *
+   * @return a connected ZooKeeper client
+   * @throws ZooKeeperConnectionException if there was a problem connecting to 
the ZK cluster
+   * @throws InterruptedException if interrupted while waiting for a 
connection to be established
+   */
+  public synchronized ZooKeeper get() throws ZooKeeperConnectionException, 
InterruptedException {
+    try {
+      return get(WAIT_FOREVER);
+    } catch (TimeoutException e) {
+      InterruptedException interruptedException =
+          new InterruptedException("Got an unexpected TimeoutException for 0 
wait");
+      interruptedException.initCause(e);
+      throw interruptedException;
+    }
+  }
+
+  /**
+   * Returns the current active ZK connection or establishes a new one if none 
has yet been
+   * established or a previous connection was disconnected or had its session 
time out.  This
+   * method will attempt to re-use sessions when possible.
+   *
+   * @param connectionTimeout the maximum amount of time to wait for the 
connection to the ZK
+   *     cluster to be established; 0 to wait forever
+   * @return a connected ZooKeeper client
+   * @throws ZooKeeperConnectionException if there was a problem connecting to 
the ZK cluster
+   * @throws InterruptedException if interrupted while waiting for a 
connection to be established
+   * @throws TimeoutException if a connection could not be established within 
the configured
+   *     session timeout
+   */
+  public synchronized ZooKeeper get(Amount<Long, Time> connectionTimeout)
+      throws ZooKeeperConnectionException, InterruptedException, 
TimeoutException {
+
+    if (zooKeeper == null) {
+      final CountDownLatch connected = new CountDownLatch(1);
+      Watcher watcher = new Watcher() {
+        @Override public void process(WatchedEvent event) {
+          switch (event.getType()) {
+            // Guard the None type since this watch may be used as the default 
watch on calls by
+            // the client outside our control.
+            case None:
+              switch (event.getState()) {
+                case Expired:
+                  LOG.info("Zookeeper session expired. Event: " + event);
+                  close();
+                  break;
+                case SyncConnected:
+                  connected.countDown();
+                  break;
+              }
+          }
+
+          eventQueue.offer(event);
+        }
+      };
+
+      try {
+        zooKeeper = (sessionState != null)
+          ? new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher, 
sessionState.sessionId,
+            sessionState.sessionPasswd)
+          : new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher);
+      } catch (IOException e) {
+        throw new ZooKeeperConnectionException(
+            "Problem connecting to servers: " + zooKeeperServers, e);
+      }
+
+      if (connectionTimeout.getValue() > 0) {
+        if (!connected.await(connectionTimeout.as(Time.MILLISECONDS), 
TimeUnit.MILLISECONDS)) {
+          close();
+          throw new TimeoutException("Timed out waiting for a ZK connection 
after "
+                                     + connectionTimeout);
+        }
+      } else {
+        try {
+          connected.await();
+        } catch (InterruptedException ex) {
+          LOG.info("Interrupted while waiting to connect to zooKeeper");
+          close();
+          throw ex;
+        }
+      }
+      credentials.authenticate(zooKeeper);
+
+      sessionState = new SessionState(zooKeeper.getSessionId(), 
zooKeeper.getSessionPasswd());
+    }
+    return zooKeeper;
+  }
+
+  /**
+   * Clients that need to re-establish state after session expiration can 
register an
+   * {@code onExpired} command to execute.
+   *
+   * @param onExpired the {@code Command} to register
+   * @return the new {@link Watcher} which can later be passed to {@link 
#unregister} for
+   *     removal.
+   */
+  public Watcher registerExpirationHandler(final Command onExpired) {
+    Watcher watcher = new Watcher() {
+      @Override public void process(WatchedEvent event) {
+        if (event.getType() == EventType.None && event.getState() == 
KeeperState.Expired) {
+          onExpired.execute();
+        }
+      }
+    };
+    register(watcher);
+    return watcher;
+  }
+
+  /**
+   * Clients that need to register a top-level {@code Watcher} should do so 
using this method.  The
+   * registered {@code watcher} will remain registered across re-connects and 
session expiration
+   * events.
+   *
+   * @param watcher the {@code Watcher to register}
+   */
+  public void register(Watcher watcher) {
+    watchers.add(watcher);
+  }
+
+  /**
+   * Clients can attempt to unregister a top-level {@code Watcher} that has 
previously been
+   * registered.
+   *
+   * @param watcher the {@code Watcher} to unregister as a top-level, 
persistent watch
+   * @return whether the given {@code Watcher} was found and removed from the 
active set
+   */
+  public boolean unregister(Watcher watcher) {
+    return watchers.remove(watcher);
+  }
+
+  /**
+   * Checks to see if the client might reasonably re-try an operation given 
the exception thrown
+   * while attempting it.  If the ZooKeeper session should be expired to 
enable the re-try to
+   * succeed this method will expire it as a side-effect.
+   *
+   * @param e the exception to test
+   * @return true if a retry can be attempted
+   */
+  public boolean shouldRetry(KeeperException e) {
+    if (e instanceof SessionExpiredException) {
+      close();
+    }
+    return ZooKeeperUtils.isRetryable(e);
+  }
+
+  /**
+   * Closes the current connection if any expiring the current ZooKeeper 
session.  Any subsequent
+   * calls to this method will no-op until the next successful {@link #get}.
+   */
+  public synchronized void close() {
+    if (zooKeeper != null) {
+      try {
+        zooKeeper.close();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOG.warning("Interrupted trying to close zooKeeper");
+      } finally {
+        zooKeeper = null;
+        sessionState = null;
+      }
+    }
+  }
+
+  @VisibleForTesting
+  synchronized boolean isClosed() {
+    return zooKeeper == null;
+  }
+
+  @VisibleForTesting
+  ZooKeeper getZooKeeperClientForTests() {
+    return zooKeeper;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperMap.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperMap.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperMap.java
new file mode 100644
index 0000000..29db55a
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperMap.java
@@ -0,0 +1,411 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ForwardingMap;
+import com.google.common.collect.Sets;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.base.ExceptionalSupplier;
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.util.BackoffHelper;
+import 
org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+
+/**
+ * A ZooKeeper backed {@link Map}.  Initialized with a node path, this map 
represents child nodes
+ * under that path as keys, with the data in those nodes as values.  This map 
is readonly from
+ * clients of this class, and only can be modified via direct zookeeper 
operations.
+ *
+ * Note that instances of this class maintain a zookeeper watch for each 
zookeeper node under the
+ * parent, as well as on the parent itself.  Instances of this class should be 
created via the
+ * {@link #create} factory method.
+ *
+ * As of ZooKeeper Version 3.1, the maximum allowable size of a data node is 1 
MB.  A single
+ * client should be able to hold up to maintain several thousand watches, but 
this depends on rate
+ * of data change as well.
+ *
+ * Talk to your zookeeper cluster administrator if you expect number of map 
entries times number
+ * of live clients to exceed a thousand, as a zookeeper cluster is limited by 
total number of
+ * server-side watches enabled.
+ *
+ * For an example of a set of tools to maintain one of these maps, please see
+ * src/scripts/HenAccess.py in the hen repository.
+ *
+ * @param <V> the type of values this map stores
+ */
+public class ZooKeeperMap<V> extends ForwardingMap<String, V> {
+
+  /**
+   * An optional listener which can be supplied and triggered when entries in 
a ZooKeeperMap
+   * are added, changed or removed. For a ZooKeeperMap of type <V>, the 
listener will fire a
+   * "nodeChanged" event with the name of the ZNode that changed, and its 
resulting value as
+   * interpreted by the provided deserializer. Removal of child nodes triggers 
the "nodeRemoved"
+   * method indicating the name of the ZNode which is no longer present in the 
map.
+   */
+  public interface Listener<V> {
+
+    /**
+     * Fired when a node is added to the ZooKeeperMap or changed.
+     *
+     * @param nodeName indicates the name of the ZNode that was added or 
changed.
+     * @param value is the new value of the node after passing through your 
supplied deserializer.
+     */
+    void nodeChanged(String nodeName, V value);
+
+    /**
+     * Fired when a node is removed from the ZooKeeperMap.
+     *
+     * @param nodeName indicates the name of the ZNode that was removed from 
the ZooKeeperMap.
+     */
+    void nodeRemoved(String nodeName);
+  }
+
+  /**
+   * Default deserializer for the constructor if you want to simply store the 
zookeeper byte[] data
+   * in this map.
+   */
+  public static final Function<byte[], byte[]> BYTE_ARRAY_VALUES = 
Functions.identity();
+
+  /**
+   * A listener that ignores all events.
+   */
+  public static <T> Listener<T> noopListener() {
+    return new Listener<T>() {
+      @Override public void nodeChanged(String nodeName, T value) { }
+      @Override public void nodeRemoved(String nodeName) { }
+    };
+  }
+
+  private static final Logger LOG = 
Logger.getLogger(ZooKeeperMap.class.getName());
+
+  private final ZooKeeperClient zkClient;
+  private final String nodePath;
+  private final Function<byte[], V> deserializer;
+
+  private final ConcurrentMap<String, V> localMap;
+  private final Map<String, V> unmodifiableLocalMap;
+  private final BackoffHelper backoffHelper;
+
+  private final Listener<V> mapListener;
+
+  // Whether it's safe to re-establish watches if our zookeeper session has 
expired.
+  private final Object safeToRewatchLock;
+  private volatile boolean safeToRewatch;
+
+  /**
+   * Returns an initialized ZooKeeperMap.  The given path must exist at the 
time of
+   * creation or a {@link KeeperException} will be thrown.
+   *
+   * @param zkClient a zookeeper client
+   * @param nodePath path to a node whose data will be watched
+   * @param deserializer a function that converts byte[] data from a zk node 
to this map's
+   *     value type V
+   * @param listener is a Listener which fires when values are added, changed, 
or removed.
+   *
+   * @throws InterruptedException if the underlying zookeeper server 
transaction is interrupted
+   * @throws KeeperException.NoNodeException if the given nodePath doesn't 
exist
+   * @throws KeeperException if the server signals an error
+   * @throws ZooKeeperConnectionException if there was a problem connecting to 
the zookeeper
+   *     cluster
+   */
+  public static <V> ZooKeeperMap<V> create(
+      ZooKeeperClient zkClient,
+      String nodePath,
+      Function<byte[], V> deserializer,
+      Listener<V> listener)
+      throws InterruptedException, KeeperException, 
ZooKeeperConnectionException {
+
+    ZooKeeperMap<V> zkMap = new ZooKeeperMap<V>(zkClient, nodePath, 
deserializer, listener);
+    zkMap.init();
+    return zkMap;
+  }
+
+
+  /**
+   * Returns an initialized ZooKeeperMap.  The given path must exist at the 
time of
+   * creation or a {@link KeeperException} will be thrown.
+   *
+   * @param zkClient a zookeeper client
+   * @param nodePath path to a node whose data will be watched
+   * @param deserializer a function that converts byte[] data from a zk node 
to this map's
+   *     value type V
+   *
+   * @throws InterruptedException if the underlying zookeeper server 
transaction is interrupted
+   * @throws KeeperException.NoNodeException if the given nodePath doesn't 
exist
+   * @throws KeeperException if the server signals an error
+   * @throws ZooKeeperConnectionException if there was a problem connecting to 
the zookeeper
+   *     cluster
+   */
+  public static <V> ZooKeeperMap<V> create(
+      ZooKeeperClient zkClient,
+      String nodePath,
+      Function<byte[], V> deserializer)
+      throws InterruptedException, KeeperException, 
ZooKeeperConnectionException {
+
+    return ZooKeeperMap.create(zkClient, nodePath, deserializer, 
ZooKeeperMap.<V>noopListener());
+  }
+
+  /**
+   * Initializes a ZooKeeperMap.  The given path must exist at the time of 
object creation or
+   * a {@link KeeperException} will be thrown.
+   *
+   * Please note that this object will not track any remote zookeeper data 
until {@link #init()}
+   * is successfully called.  After construction and before that call, this 
{@link Map} will
+   * be empty.
+   *
+   * @param zkClient a zookeeper client
+   * @param nodePath top-level node path under which the map data lives
+   * @param deserializer a function that converts byte[] data from a zk node 
to this map's
+   *     value type V
+   * @param mapListener is a Listener which fires when values are added, 
changed, or removed.
+   *
+   * @throws InterruptedException if the underlying zookeeper server 
transaction is interrupted
+   * @throws KeeperException.NoNodeException if the given nodePath doesn't 
exist
+   * @throws KeeperException if the server signals an error
+   * @throws ZooKeeperConnectionException if there was a problem connecting to 
the zookeeper
+   *     cluster
+   */
+  @VisibleForTesting
+  ZooKeeperMap(
+      ZooKeeperClient zkClient,
+      String nodePath,
+      Function<byte[], V> deserializer,
+      Listener<V> mapListener)
+      throws InterruptedException, KeeperException, 
ZooKeeperConnectionException {
+
+    super();
+
+    this.mapListener = Preconditions.checkNotNull(mapListener);
+    this.zkClient = Preconditions.checkNotNull(zkClient);
+    this.nodePath = MorePreconditions.checkNotBlank(nodePath);
+    this.deserializer = Preconditions.checkNotNull(deserializer);
+
+    localMap = new ConcurrentHashMap<String, V>();
+    unmodifiableLocalMap = Collections.unmodifiableMap(localMap);
+    backoffHelper = new BackoffHelper();
+    safeToRewatchLock = new Object();
+    safeToRewatch = false;
+
+    if (zkClient.get().exists(nodePath, null) == null) {
+      throw new KeeperException.NoNodeException();
+    }
+  }
+
+  /**
+   * Initialize zookeeper tracking for this {@link Map}.  Once this call 
returns, this object
+   * will be tracking data in zookeeper.
+   *
+   * @throws InterruptedException if the underlying zookeeper server 
transaction is interrupted
+   * @throws KeeperException if the server signals an error
+   * @throws ZooKeeperConnectionException if there was a problem connecting to 
the zookeeper
+   *     cluster
+   */
+  @VisibleForTesting
+  void init() throws InterruptedException, KeeperException, 
ZooKeeperConnectionException {
+    Watcher watcher = zkClient.registerExpirationHandler(new Command() {
+      @Override public void execute() {
+        /*
+         * First rewatch all of our locally cached children.  Some of them may 
not exist anymore,
+         * which will lead to caught KeeperException.NoNode whereafter we'll 
remove that child
+         * from the cached map.
+         *
+         * Next, we'll establish our top level child watch and add any new 
nodes that might exist.
+         */
+        try {
+          synchronized (safeToRewatchLock) {
+            if (safeToRewatch) {
+              rewatchDataNodes();
+              tryWatchChildren();
+            }
+          }
+        } catch (InterruptedException e) {
+          LOG.log(Level.WARNING, "Interrupted while trying to re-establish 
watch.", e);
+          Thread.currentThread().interrupt();
+        }
+      }
+    });
+
+    try {
+      // Synchronize to prevent the race of watchChildren completing and then 
the session expiring
+      // before we update safeToRewatch.
+      synchronized (safeToRewatchLock) {
+        watchChildren();
+        safeToRewatch = true;
+      }
+    } catch (InterruptedException e) {
+      zkClient.unregister(watcher);
+      throw e;
+    } catch (KeeperException e) {
+      zkClient.unregister(watcher);
+      throw e;
+    } catch (ZooKeeperConnectionException e) {
+      zkClient.unregister(watcher);
+      throw e;
+    }
+  }
+
+  @Override
+  protected Map<String, V> delegate() {
+    return unmodifiableLocalMap;
+  }
+
+  private void tryWatchChildren() throws InterruptedException {
+    backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, 
InterruptedException>() {
+      @Override public Boolean get() throws InterruptedException {
+        try {
+          watchChildren();
+          return true;
+        } catch (KeeperException e) {
+          return false;
+        } catch (ZooKeeperConnectionException e) {
+          return false;
+        }
+      }
+    });
+  }
+
+  private synchronized void watchChildren()
+      throws InterruptedException, KeeperException, 
ZooKeeperConnectionException {
+
+    /*
+     * Add a watch on the parent node itself, and attempt to rewatch if it
+     * gets deleted
+     */
+    zkClient.get().exists(nodePath, new Watcher() {
+      @Override public void process(WatchedEvent event) {
+        if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
+          // If the parent node no longer exists
+          localMap.clear();
+          try {
+            tryWatchChildren();
+          } catch (InterruptedException e) {
+            LOG.log(Level.WARNING, "Interrupted while trying to watch 
children.", e);
+            Thread.currentThread().interrupt();
+          }
+        }
+      }});
+
+    final Watcher childWatcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
+          try {
+            tryWatchChildren();
+          } catch (InterruptedException e) {
+            LOG.log(Level.WARNING, "Interrupted while trying to watch 
children.", e);
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+    };
+
+    List<String> children = zkClient.get().getChildren(nodePath, childWatcher);
+    updateChildren(Sets.newHashSet(children));
+  }
+
+  private void tryAddChild(final String child) throws InterruptedException {
+    backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, 
InterruptedException>() {
+      @Override public Boolean get() throws InterruptedException {
+        try {
+          addChild(child);
+          return true;
+        } catch (KeeperException e) {
+          return false;
+        } catch (ZooKeeperConnectionException e) {
+          return false;
+        }
+      }
+    });
+  }
+
+  // TODO(Adam Samet) - Make this use the ZooKeeperNode class.
+  private void addChild(final String child)
+      throws InterruptedException, KeeperException, 
ZooKeeperConnectionException {
+
+    final Watcher nodeWatcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
+          try {
+            tryAddChild(child);
+          } catch (InterruptedException e) {
+            LOG.log(Level.WARNING, "Interrupted while trying to add a child.", 
e);
+            Thread.currentThread().interrupt();
+          }
+        } else if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
+          removeEntry(child);
+        }
+      }
+    };
+
+    try {
+      V value = deserializer.apply(zkClient.get().getData(makePath(child), 
nodeWatcher, null));
+      putEntry(child, value);
+    } catch (KeeperException.NoNodeException e) {
+      // This node doesn't exist anymore, remove it from the map and we're 
done.
+      removeEntry(child);
+    }
+  }
+
+  @VisibleForTesting
+  void removeEntry(String key) {
+    localMap.remove(key);
+    mapListener.nodeRemoved(key);
+  }
+
+  @VisibleForTesting
+  void putEntry(String key, V value) {
+    localMap.put(key, value);
+    mapListener.nodeChanged(key, value);
+  }
+
+  private void rewatchDataNodes() throws InterruptedException {
+    for (String child : keySet()) {
+      tryAddChild(child);
+    }
+  }
+
+  private String makePath(final String child) {
+    return nodePath + "/" + child;
+  }
+
+  private void updateChildren(Set<String> zkChildren) throws 
InterruptedException {
+    Set<String> addedChildren = Sets.difference(zkChildren, keySet());
+    Set<String> removedChildren = Sets.difference(keySet(), zkChildren);
+    for (String child : addedChildren) {
+      tryAddChild(child);
+    }
+    for (String child : removedChildren) {
+      removeEntry(child);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperNode.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperNode.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperNode.java
new file mode 100644
index 0000000..3829ca7
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperNode.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.data.Stat;
+
+import org.apache.aurora.common.base.Closure;
+import org.apache.aurora.common.base.Closures;
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.base.ExceptionalSupplier;
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.util.BackoffHelper;
+import 
org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+
+/**
+ * An implementation of {@link Supplier} that offers a readonly view of a
+ * zookeeper data node.  This class is thread-safe.
+ *
+ * Instances of this class each maintain a zookeeper watch for the remote data 
node.  Instances
+ * of this class should be created via the {@link #create} factory method.
+ *
+ * Please see zookeeper documentation and talk to your cluster administrator 
for guidance on
+ * appropriate node size and total number of nodes you should be using.
+ *
+ * @param <T> the type of data this node stores
+ */
+public class ZooKeeperNode<T> implements Supplier<T> {
+  /**
+   * Deserializer for the constructor if you want to simply store the 
zookeeper byte[] data
+   * as-is.
+   */
+  public static final Function<byte[], byte[]> BYTE_ARRAY_VALUE = 
Functions.identity();
+
+  private static final Logger LOG = 
Logger.getLogger(ZooKeeperNode.class.getName());
+
+  private final ZooKeeperClient zkClient;
+  private final String nodePath;
+  private final NodeDeserializer<T> deserializer;
+
+  private final BackoffHelper backoffHelper;
+
+  // Whether it's safe to re-establish watches if our zookeeper session has 
expired.
+  private final Object safeToRewatchLock;
+  private volatile boolean safeToRewatch;
+
+  private final T NO_DATA = null;
+  @Nullable private volatile T nodeData;
+  private final Closure<T> dataUpdateListener;
+
+  /**
+   * When a call to ZooKeeper.getData is made, the Watcher is added to a Set 
before the the network
+   * request is made and if the request fails, the Watcher remains. There's a 
problem where Watcher
+   * can accumulate when there are failed requests, so they are set to 
instance fields and reused.
+   */
+  private final Watcher nodeWatcher;
+  private final Watcher existenceWatcher;
+
+  /**
+   * Returns an initialized ZooKeeperNode.  The given node must exist at the 
time of object
+   * creation or a {@link KeeperException} will be thrown.
+   *
+   * @param zkClient a zookeeper client
+   * @param nodePath path to a node whose data will be watched
+   * @param deserializer a function that converts byte[] data from a zk node 
to this supplier's
+   *     type T
+   *
+   * @throws InterruptedException if the underlying zookeeper server 
transaction is interrupted
+   * @throws KeeperException.NoNodeException if the given nodePath doesn't 
exist
+   * @throws KeeperException if the server signals an error
+   * @throws ZooKeeperConnectionException if there was a problem connecting to 
the zookeeper
+   *     cluster
+   */
+  public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String 
nodePath,
+      Function<byte[], T> deserializer) throws InterruptedException, 
KeeperException,
+      ZooKeeperConnectionException {
+    return create(zkClient, nodePath, deserializer, Closures.<T>noop());
+  }
+
+  /**
+   * Like the above, but optionally takes in a {@link Closure} that will get 
notified
+   * whenever the data is updated from the remote node.
+   *
+   * @param dataUpdateListener a {@link Closure} to receive data update 
notifications.
+   */
+  public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String 
nodePath,
+      Function<byte[], T> deserializer, Closure<T> dataUpdateListener) throws 
InterruptedException,
+      KeeperException, ZooKeeperConnectionException {
+    return create(zkClient, nodePath, new FunctionWrapper<T>(deserializer), 
dataUpdateListener);
+  }
+
+  /**
+   * Returns an initialized ZooKeeperNode.  The given node must exist at the 
time of object
+   * creation or a {@link KeeperException} will be thrown.
+   *
+   * @param zkClient a zookeeper client
+   * @param nodePath path to a node whose data will be watched
+   * @param deserializer an implentation of {@link NodeDeserializer} that 
converts a byte[] from a
+   *     zk node to this supplier's type T. Also supplies a {@link Stat} 
object which is useful for
+   *     doing versioned updates.
+   *
+   * @throws InterruptedException if the underlying zookeeper server 
transaction is interrupted
+   * @throws KeeperException.NoNodeException if the given nodePath doesn't 
exist
+   * @throws KeeperException if the server signals an error
+   * @throws ZooKeeperConnectionException if there was a problem connecting to 
the zookeeper
+   *     cluster
+   */
+  public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String 
nodePath,
+      NodeDeserializer<T> deserializer) throws InterruptedException, 
KeeperException,
+      ZooKeeperConnectionException {
+    return create(zkClient, nodePath, deserializer, Closures.<T>noop());
+  }
+
+  /**
+   * Like the above, but optionally takes in a {@link Closure} that will get 
notified
+   * whenever the data is updated from the remote node.
+   *
+   * @param dataUpdateListener a {@link Closure} to receive data update 
notifications.
+   */
+  public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String 
nodePath,
+      NodeDeserializer<T> deserializer, Closure<T> dataUpdateListener)
+      throws InterruptedException, KeeperException, 
ZooKeeperConnectionException {
+    ZooKeeperNode<T> zkNode =
+        new ZooKeeperNode<T>(zkClient, nodePath, deserializer, 
dataUpdateListener);
+    zkNode.init();
+    return zkNode;
+  }
+
+  /**
+   * Initializes a ZooKeeperNode.  The given node must exist at the time of 
object creation or
+   * a {@link KeeperException} will be thrown.
+   *
+   * Please note that this object will not track any remote zookeeper data 
until {@link #init()}
+   * is successfully called.  After construction and before that call, this 
{@link Supplier} will
+   * return null.
+   *
+   * @param zkClient a zookeeper client
+   * @param nodePath path to a node whose data will be watched
+   * @param deserializer an implementation of {@link NodeDeserializer} that 
converts byte[] data
+   *     from a zk node to this supplier's type T
+   * @param dataUpdateListener a {@link Closure} to receive data update 
notifications.
+   */
+  @VisibleForTesting
+  ZooKeeperNode(ZooKeeperClient zkClient, String nodePath,
+      NodeDeserializer<T> deserializer, Closure<T> dataUpdateListener) {
+    this.zkClient = Preconditions.checkNotNull(zkClient);
+    this.nodePath = MorePreconditions.checkNotBlank(nodePath);
+    this.deserializer = Preconditions.checkNotNull(deserializer);
+    this.dataUpdateListener = Preconditions.checkNotNull(dataUpdateListener);
+
+    backoffHelper = new BackoffHelper();
+    safeToRewatchLock = new Object();
+    safeToRewatch = false;
+    nodeData = NO_DATA;
+
+    nodeWatcher = new Watcher() {
+      @Override public void process(WatchedEvent event) {
+        if (event.getState() == KeeperState.SyncConnected) {
+          try {
+            tryWatchDataNode();
+          } catch (InterruptedException e) {
+            LOG.log(Level.WARNING, "Interrupted while trying to watch a data 
node.", e);
+            Thread.currentThread().interrupt();
+          }
+        } else {
+          LOG.info("Ignoring watcher event " + event);
+        }
+      }
+    };
+
+    existenceWatcher = new Watcher() {
+      @Override public void process(WatchedEvent event) {
+        if (event.getType() == Watcher.Event.EventType.NodeCreated) {
+          try {
+            tryWatchDataNode();
+          } catch (InterruptedException e) {
+            LOG.log(Level.WARNING, "Interrupted while trying to watch a data 
node.", e);
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+    };
+  }
+
+  /**
+   * Initialize zookeeper tracking for this {@link Supplier}.  Once this call 
returns, this object
+   * will be tracking data in zookeeper.
+   *
+   * @throws InterruptedException if the underlying zookeeper server 
transaction is interrupted
+   * @throws KeeperException if the server signals an error
+   * @throws ZooKeeperConnectionException if there was a problem connecting to 
the zookeeper
+   *     cluster
+   */
+  @VisibleForTesting
+  void init() throws InterruptedException, KeeperException,
+      ZooKeeperConnectionException {
+    Watcher watcher = zkClient.registerExpirationHandler(new Command() {
+      @Override public void execute() {
+        try {
+          synchronized (safeToRewatchLock) {
+            if (safeToRewatch) {
+              tryWatchDataNode();
+            }
+          }
+        } catch (InterruptedException e) {
+          LOG.log(Level.WARNING, "Interrupted while trying to re-establish 
watch.", e);
+          Thread.currentThread().interrupt();
+        }
+      }
+    });
+
+    try {
+      /*
+       * Synchronize to prevent the race of watchDataNode completing and then 
the session expiring
+       * before we update safeToRewatch.
+       */
+      synchronized (safeToRewatchLock) {
+        watchDataNode();
+        safeToRewatch = true;
+      }
+    } catch (InterruptedException e) {
+      zkClient.unregister(watcher);
+      throw e;
+    } catch (KeeperException e) {
+      zkClient.unregister(watcher);
+      throw e;
+    } catch (ZooKeeperConnectionException e) {
+      zkClient.unregister(watcher);
+      throw e;
+    }
+  }
+
+  /**
+   * Returns the data corresponding to a byte array in a remote zookeeper 
node.  This data is
+   * cached locally and updated in the background on watch notifications.
+   *
+   * @return the data currently cached locally or null if {@link #init()} 
hasn't been called
+   *   or the backing node has no data or does not exist anymore.
+   */
+  @Override
+  public @Nullable T get() {
+    return nodeData;
+  }
+
+  @VisibleForTesting
+  void updateData(@Nullable T newData) {
+    nodeData = newData;
+    dataUpdateListener.execute(newData);
+  }
+
+  private void tryWatchDataNode() throws InterruptedException {
+    backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, 
InterruptedException>() {
+      @Override public Boolean get() throws InterruptedException {
+        try {
+          watchDataNode();
+          return true;
+        } catch (KeeperException e) {
+          return false;
+        } catch (ZooKeeperConnectionException e) {
+          return false;
+        }
+      }
+    });
+  }
+
+  private void watchDataNode() throws InterruptedException, KeeperException,
+      ZooKeeperConnectionException {
+    try {
+      Stat stat = new Stat();
+      byte[] rawData = zkClient.get().getData(nodePath, nodeWatcher, stat);
+      T newData = deserializer.deserialize(rawData, stat);
+      updateData(newData);
+    } catch (KeeperException.NoNodeException e) {
+      /*
+       * This node doesn't exist right now, reflect that locally and then 
create a watch to wait
+       * for its recreation.
+       */
+      updateData(NO_DATA);
+      watchForExistence();
+    }
+  }
+
+  private void watchForExistence() throws InterruptedException, 
KeeperException,
+      ZooKeeperConnectionException {
+    /*
+     * If the node was created between the getData call and this call, just 
try watching it.
+     * We'll have an extra exists watch on it that goes off on its next 
deletion, which will
+     * be a no-op.
+     * Otherwise, just let the exists watch wait for its creation.
+     */
+    if (zkClient.get().exists(nodePath, existenceWatcher) != null) {
+      tryWatchDataNode();
+    }
+  }
+
+  /**
+   * Interface for defining zookeeper node data deserialization.
+   *
+   * @param <T> the type of data associated with this node
+   */
+  public interface NodeDeserializer<T> {
+    /**
+     * @param data the byte array returned from ZooKeeper when a watch is 
triggered.
+     * @param stat a ZooKeeper {@link Stat} object. Populated by
+     *             {@link org.apache.zookeeper.ZooKeeper#getData(String, 
boolean, Stat)}.
+     */
+    T deserialize(byte[] data, Stat stat);
+  }
+
+  // wrapper for backwards compatibility with older create() methods with 
Function parameter
+  private static final class FunctionWrapper<T> implements NodeDeserializer<T> 
{
+    private final Function<byte[], T> func;
+    private FunctionWrapper(Function<byte[], T> func) {
+      Preconditions.checkNotNull(func);
+      this.func = func;
+    }
+
+    public T deserialize(byte[] rawData, Stat stat) {
+      return func.apply(rawData);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
new file mode 100644
index 0000000..a8dcfa1
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.List;
+import java.util.logging.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.ACL;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import 
org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+
+/**
+ * Utilities for dealing with zoo keeper.
+ */
+public final class ZooKeeperUtils {
+
+  private static final Logger LOG = 
Logger.getLogger(ZooKeeperUtils.class.getName());
+
+  /**
+   * An appropriate default session timeout for Twitter ZooKeeper clusters.
+   */
+  public static final Amount<Integer,Time> DEFAULT_ZK_SESSION_TIMEOUT = 
Amount.of(4, Time.SECONDS);
+
+  /**
+   * The magic version number that allows any mutation to always succeed 
regardless of actual
+   * version number.
+   */
+  public static final int ANY_VERSION = -1;
+
+  /**
+   * An ACL that gives all permissions any user authenticated or not.
+   */
+  public static final ImmutableList<ACL> OPEN_ACL_UNSAFE =
+      ImmutableList.copyOf(Ids.OPEN_ACL_UNSAFE);
+
+  /**
+   * An ACL that gives all permissions to node creators and read permissions 
only to everyone else.
+   */
+  public static final ImmutableList<ACL> EVERYONE_READ_CREATOR_ALL =
+      ImmutableList.<ACL>builder()
+          .addAll(Ids.CREATOR_ALL_ACL)
+          .addAll(Ids.READ_ACL_UNSAFE)
+          .build();
+
+  /**
+   * Returns true if the given exception indicates an error that can be 
resolved by retrying the
+   * operation without modification.
+   *
+   * @param e the exception to check
+   * @return true if the causing operation is strictly retryable
+   */
+  public static boolean isRetryable(KeeperException e) {
+    Preconditions.checkNotNull(e);
+
+    switch (e.code()) {
+      case CONNECTIONLOSS:
+      case SESSIONEXPIRED:
+      case SESSIONMOVED:
+      case OPERATIONTIMEOUT:
+        return true;
+
+      case RUNTIMEINCONSISTENCY:
+      case DATAINCONSISTENCY:
+      case MARSHALLINGERROR:
+      case BADARGUMENTS:
+      case NONODE:
+      case NOAUTH:
+      case BADVERSION:
+      case NOCHILDRENFOREPHEMERALS:
+      case NODEEXISTS:
+      case NOTEMPTY:
+      case INVALIDCALLBACK:
+      case INVALIDACL:
+      case AUTHFAILED:
+      case UNIMPLEMENTED:
+
+      // These two should not be encountered - they are used internally by ZK 
to specify ranges
+      case SYSTEMERROR:
+      case APIERROR:
+
+      case OK: // This is actually an invalid ZK exception code
+
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * Ensures the given {@code path} exists in the ZK cluster accessed by 
{@code zkClient}.  If the
+   * path already exists, nothing is done; however if any portion of the path 
is missing, it will be
+   * created with the given {@code acl} as a persistent zookeeper node.  The 
given {@code path} must
+   * be a valid zookeeper absolute path.
+   *
+   * @param zkClient the client to use to access the ZK cluster
+   * @param acl the acl to use if creating path nodes
+   * @param path the path to ensure exists
+   * @throws ZooKeeperConnectionException if there was a problem accessing the 
ZK cluster
+   * @throws InterruptedException if we were interrupted attempting to connect 
to the ZK cluster
+   * @throws KeeperException if there was a problem in ZK
+   */
+  public static void ensurePath(ZooKeeperClient zkClient, List<ACL> acl, 
String path)
+      throws ZooKeeperConnectionException, InterruptedException, 
KeeperException {
+    Preconditions.checkNotNull(zkClient);
+    Preconditions.checkNotNull(path);
+    Preconditions.checkArgument(path.startsWith("/"));
+
+    ensurePathInternal(zkClient, acl, path);
+  }
+
+  private static void ensurePathInternal(ZooKeeperClient zkClient, List<ACL> 
acl, String path)
+      throws ZooKeeperConnectionException, InterruptedException, 
KeeperException {
+    if (zkClient.get().exists(path, false) == null) {
+      // The current path does not exist; so back up a level and ensure the 
parent path exists
+      // unless we're already a root-level path.
+      int lastPathIndex = path.lastIndexOf('/');
+      if (lastPathIndex > 0) {
+        ensurePathInternal(zkClient, acl, path.substring(0, lastPathIndex));
+      }
+
+      // We've ensured our parent path (if any) exists so we can proceed to 
create our path.
+      try {
+        zkClient.get().create(path, null, acl, CreateMode.PERSISTENT);
+      } catch (KeeperException.NodeExistsException e) {
+        // This ensures we don't die if a race condition was met between 
checking existence and
+        // trying to create the node.
+        LOG.info("Node existed when trying to ensure path " + path + ", 
somebody beat us to it?");
+      }
+    }
+  }
+
+  /**
+   * Validate and return a normalized zookeeper path which doesn't contain 
consecutive slashes and
+   * never ends with a slash (except for root path).
+   *
+   * @param path the path to be normalized
+   * @return normalized path string
+   */
+  public static String normalizePath(String path) {
+    String normalizedPath = path.replaceAll("//+", "/").replaceFirst("(.+)/$", 
"$1");
+    PathUtils.validatePath(normalizedPath);
+    return normalizedPath;
+  }
+
+  private ZooKeeperUtils() {
+    // utility
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/ServerSetModule.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/ServerSetModule.java
 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/ServerSetModule.java
new file mode 100644
index 0000000..c8a3214
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/ServerSetModule.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper.guice;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Atomics;
+import com.google.inject.AbstractModule;
+import com.google.inject.BindingAnnotation;
+import com.google.inject.Inject;
+import com.google.inject.Key;
+import com.google.inject.Singleton;
+import com.google.inject.TypeLiteral;
+
+import org.apache.aurora.common.application.ShutdownRegistry;
+import org.apache.aurora.common.application.modules.LifecycleModule;
+import org.apache.aurora.common.application.modules.LocalServiceRegistry;
+import org.apache.aurora.common.args.Arg;
+import org.apache.aurora.common.args.CmdLine;
+import org.apache.aurora.common.args.constraints.NotNegative;
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.base.Supplier;
+import org.apache.aurora.common.zookeeper.Group;
+import org.apache.aurora.common.zookeeper.ServerSet;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * A module that registers all ports in the {@link LocalServiceRegistry} in an 
{@link ServerSet}.
+ * <p/>
+ * Required bindings:
+ * <ul>
+ * <li> {@link ServerSet}
+ * <li> {@link ShutdownRegistry}
+ * <li> {@link LocalServiceRegistry}
+ * </ul>
+ * <p/>
+ * {@link LifecycleModule} must also be included by users so a startup action 
may be registered.
+ * <p/>
+ * Provided bindings:
+ * <ul>
+ * <li> {@link Supplier}<{@link ServerSet.EndpointStatus}>
+ * </ul>
+ */
+public class ServerSetModule extends AbstractModule {
+
+  /**
+   * BindingAnnotation for defaults to use in the service instance node.
+   */
+  @BindingAnnotation @Target({PARAMETER, METHOD, FIELD}) @Retention(RUNTIME)
+  private @interface Default {}
+
+  /**
+   * Binding annotation to give the ServerSetJoiner a fixed known ServerSet 
that is appropriate to
+   * {@link ServerSet#join} on.
+   */
+  @BindingAnnotation @Target({METHOD, PARAMETER}) @Retention(RUNTIME)
+  private @interface Joinable {}
+
+  private static final Key<ServerSet> JOINABLE_SS = Key.get(ServerSet.class, 
Joinable.class);
+
+  @CmdLine(name = "aux_port_as_primary",
+      help = "Name of the auxiliary port to use as the primary port in the 
server set."
+          + " This may only be used when no other primary port is specified.")
+  private static final Arg<String> AUX_PORT_AS_PRIMARY = Arg.create(null);
+
+  @NotNegative
+  @CmdLine(name = "shard_id", help = "Shard ID for this application.")
+  private static final Arg<Integer> SHARD_ID = Arg.create();
+
+  private static final Logger LOG = 
Logger.getLogger(ServerSetModule.class.getName());
+
+  /**
+   * Builds a Module tht can be used to join a {@link ServerSet} with the 
ports configured in a
+   * {@link LocalServiceRegistry}.
+   */
+  public static class Builder {
+    private Key<ServerSet> key = Key.get(ServerSet.class);
+    private Optional<String> auxPortAsPrimary = Optional.absent();
+
+    /**
+     * Sets the key of the ServerSet to join.
+     *
+     * @param key Key of the ServerSet to join.
+     * @return This builder for chaining calls.
+     */
+    public Builder key(Key<ServerSet> key) {
+      this.key = key;
+      return this;
+    }
+
+    /**
+     * Allows joining an auxiliary port with the specified {@code name} as the 
primary port of the
+     * ServerSet.
+     *
+     * @param auxPortName The name of the auxiliary port to join as the 
primary ServerSet port.
+     * @return This builder for chaining calls.
+     */
+    public Builder namedPrimaryPort(String auxPortName) {
+      this.auxPortAsPrimary = Optional.of(auxPortName);
+      return this;
+    }
+
+    /**
+     * Creates a Module that will register a startup action that joins a 
ServerSet when installed.
+     *
+     * @return A Module.
+     */
+    public ServerSetModule build() {
+      return new ServerSetModule(key, auxPortAsPrimary);
+    }
+  }
+
+  /**
+   * Creates a builder that can be used to configure and create a 
ServerSetModule.
+   *
+   * @return A ServerSetModule builder.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  private final Key<ServerSet> serverSetKey;
+  private final Optional<String> auxPortAsPrimary;
+
+  /**
+   * Constructs a ServerSetModule that registers a startup action to register 
this process in
+   * ZooKeeper, with the specified initial status and auxiliary port to 
represent as the primary
+   * service port.
+   *
+   * @param serverSetKey The key the ServerSet to join is bound under.
+   * @param auxPortAsPrimary Name of the auxiliary port to use as the primary 
port.
+   */
+  ServerSetModule(Key<ServerSet> serverSetKey, Optional<String> 
auxPortAsPrimary) {
+
+    this.serverSetKey = checkNotNull(serverSetKey);
+    this.auxPortAsPrimary = checkNotNull(auxPortAsPrimary);
+  }
+
+  @Override
+  protected void configure() {
+    requireBinding(serverSetKey);
+    requireBinding(ShutdownRegistry.class);
+    requireBinding(LocalServiceRegistry.class);
+
+    LifecycleModule.bindStartupAction(binder(), ServerSetJoiner.class);
+
+    bind(new TypeLiteral<Supplier<ServerSet.EndpointStatus>>() { 
}).to(EndpointSupplier.class);
+    bind(EndpointSupplier.class).in(Singleton.class);
+
+    Optional<String> primaryPortName;
+    if (AUX_PORT_AS_PRIMARY.hasAppliedValue()) {
+      primaryPortName = Optional.of(AUX_PORT_AS_PRIMARY.get());
+    } else {
+      primaryPortName = auxPortAsPrimary;
+    }
+
+    bind(new TypeLiteral<Optional<String>>() { }).annotatedWith(Default.class)
+        .toInstance(primaryPortName);
+
+    bind(JOINABLE_SS).to(serverSetKey);
+  }
+
+  static class EndpointSupplier implements Supplier<ServerSet.EndpointStatus> {
+    private final AtomicReference<ServerSet.EndpointStatus> reference = 
Atomics.newReference();
+
+    @Nullable
+    @Override public ServerSet.EndpointStatus get() {
+      return reference.get();
+    }
+
+    void set(ServerSet.EndpointStatus endpoint) {
+      reference.set(endpoint);
+    }
+  }
+
+  private static class ServerSetJoiner implements Command {
+    private final ServerSet serverSet;
+    private final LocalServiceRegistry serviceRegistry;
+    private final ShutdownRegistry shutdownRegistry;
+    private final EndpointSupplier endpointSupplier;
+    private final Optional<String> auxPortAsPrimary;
+
+    @Inject
+    ServerSetJoiner(
+        @Joinable ServerSet serverSet,
+        LocalServiceRegistry serviceRegistry,
+        ShutdownRegistry shutdownRegistry,
+        EndpointSupplier endpointSupplier,
+        @Default Optional<String> auxPortAsPrimary) {
+
+      this.serverSet = checkNotNull(serverSet);
+      this.serviceRegistry = checkNotNull(serviceRegistry);
+      this.shutdownRegistry = checkNotNull(shutdownRegistry);
+      this.endpointSupplier = checkNotNull(endpointSupplier);
+      this.auxPortAsPrimary = checkNotNull(auxPortAsPrimary);
+    }
+
+    @Override public void execute() {
+      Optional<InetSocketAddress> primarySocket = 
serviceRegistry.getPrimarySocket();
+      Map<String, InetSocketAddress> auxSockets = 
serviceRegistry.getAuxiliarySockets();
+
+      InetSocketAddress primary;
+      if (primarySocket.isPresent()) {
+        primary = primarySocket.get();
+      } else if (auxPortAsPrimary.isPresent()) {
+        primary = auxSockets.get(auxPortAsPrimary.get());
+        if (primary == null) {
+          throw new IllegalStateException("No auxiliary port named " + 
auxPortAsPrimary.get());
+        }
+      } else {
+        throw new IllegalStateException("No primary service registered with 
LocalServiceRegistry,"
+            + " and -aux_port_as_primary was not specified.");
+      }
+
+      final ServerSet.EndpointStatus endpointStatus;
+      try {
+        if (SHARD_ID.hasAppliedValue()) {
+          endpointStatus = serverSet.join(primary, auxSockets, SHARD_ID.get());
+        } else {
+          endpointStatus = serverSet.join(primary, auxSockets);
+        }
+
+        endpointSupplier.set(endpointStatus);
+      } catch (Group.JoinException e) {
+        LOG.log(Level.WARNING, "Failed to join ServerSet.", e);
+        throw new RuntimeException(e);
+      } catch (InterruptedException e) {
+        LOG.log(Level.WARNING, "Interrupted while joining ServerSet.", e);
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+
+      shutdownRegistry.addAction(new 
ExceptionalCommand<ServerSet.UpdateException>() {
+        @Override public void execute() throws ServerSet.UpdateException {
+          LOG.info("Leaving ServerSet.");
+          endpointStatus.leave();
+        }
+      });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/client/ZooKeeperClientModule.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/client/ZooKeeperClientModule.java
 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/client/ZooKeeperClientModule.java
new file mode 100644
index 0000000..08cdf55
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/client/ZooKeeperClientModule.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper.guice.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.logging.Logger;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+import com.google.inject.Inject;
+import com.google.inject.Key;
+import com.google.inject.PrivateModule;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+import org.apache.aurora.common.application.ShutdownRegistry;
+import org.apache.aurora.common.inject.Bindings.KeyFactory;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.Credentials;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import org.apache.aurora.common.zookeeper.testing.ZooKeeperTestServer;
+
+/**
+ * A guice binding module that configures and binds a {@link ZooKeeperClient} 
instance.
+ */
+public class ZooKeeperClientModule extends PrivateModule {
+  private final KeyFactory keyFactory;
+  private final ClientConfig config;
+
+  /**
+   * Creates a new ZK client module from the provided configuration.
+   *
+   * @param config Configuration parameters for the client.
+   */
+  public ZooKeeperClientModule(ClientConfig config) {
+    this(KeyFactory.PLAIN, config);
+  }
+
+  /**
+   * Creates a new ZK client module from the provided configuration, using a 
key factory to
+   * qualify any bindings.
+   *
+   * @param keyFactory Factory to use when creating any exposed bindings.
+   * @param config Configuration parameters for the client.
+   */
+  public ZooKeeperClientModule(KeyFactory keyFactory, ClientConfig config) {
+    this.keyFactory = Preconditions.checkNotNull(keyFactory);
+    this.config = Preconditions.checkNotNull(config);
+  }
+
+  @Override
+  protected void configure() {
+    Key<ZooKeeperClient> clientKey = keyFactory.create(ZooKeeperClient.class);
+    if (config.inProcess) {
+      requireBinding(ShutdownRegistry.class);
+      // Bound privately to give the local provider access to configuration 
settings.
+      bind(ClientConfig.class).toInstance(config);
+      
bind(clientKey).toProvider(LocalClientProvider.class).in(Singleton.class);
+    } else {
+      ZooKeeperClient client =
+          new ZooKeeperClient(config.sessionTimeout, config.credentials, 
config.chrootPath, config.servers);
+      bind(clientKey).toInstance(client);
+    }
+    expose(clientKey);
+  }
+
+  private static class LocalClientProvider implements 
Provider<ZooKeeperClient> {
+    private static final Logger LOG = 
Logger.getLogger(LocalClientProvider.class.getName());
+
+    private final ClientConfig config;
+    private final ShutdownRegistry shutdownRegistry;
+
+    @Inject
+    LocalClientProvider(ClientConfig config, ShutdownRegistry 
shutdownRegistry) {
+      this.config = Preconditions.checkNotNull(config);
+      this.shutdownRegistry = Preconditions.checkNotNull(shutdownRegistry);
+    }
+
+    @Override
+    public ZooKeeperClient get() {
+      ZooKeeperTestServer zooKeeperServer;
+      try {
+        zooKeeperServer = new ZooKeeperTestServer(0, shutdownRegistry);
+        zooKeeperServer.startNetwork();
+      } catch (IOException e) {
+        throw Throwables.propagate(e);
+      } catch (InterruptedException e) {
+        throw Throwables.propagate(e);
+      }
+
+      LOG.info("Embedded zookeeper cluster started on port " + 
zooKeeperServer.getPort());
+      return zooKeeperServer.createClient(config.sessionTimeout, 
config.credentials);
+    }
+  }
+
+  /**
+   * Composite type that contains configuration parameters used when creating 
a client.
+   * <p>
+   * Instances of this class are immutable, but builder-style chained calls 
are supported.
+   */
+  public static class ClientConfig {
+    public final Iterable<InetSocketAddress> servers;
+    public final boolean inProcess;
+    public final Amount<Integer, Time> sessionTimeout;
+    public final Optional<String> chrootPath;
+    public final Credentials credentials;
+
+    /**
+     * Creates a new client configuration.
+     *
+     * @param servers ZooKeeper server addresses.
+     * @param inProcess Whether to run and create clients for an in-process 
ZooKeeper server.
+     * @param sessionTimeout Timeout duration for established sessions.
+     * @param credentials ZooKeeper authentication credentials.
+     */
+    public ClientConfig(
+        Iterable<InetSocketAddress> servers,
+        boolean inProcess,
+        Amount<Integer, Time> sessionTimeout,
+        Credentials credentials) {
+
+      this(servers, Optional.<String>absent(), inProcess, sessionTimeout, 
credentials);
+    }
+
+    /**
+     * Creates a new client configuration.
+     *
+     * @param servers ZooKeeper server addresses.
+     * @param inProcess Whether to run and create clients for an in-process 
ZooKeeper server.
+     * @param chrootPath an optional chroot path
+     * @param sessionTimeout Timeout duration for established sessions.
+     * @param credentials ZooKeeper authentication credentials.
+     */
+    public ClientConfig(
+        Iterable<InetSocketAddress> servers,
+        Optional<String> chrootPath,
+        boolean inProcess,
+        Amount<Integer, Time> sessionTimeout,
+        Credentials credentials) {
+
+      this.servers = servers;
+      this.chrootPath = chrootPath;
+      this.inProcess = inProcess;
+      this.sessionTimeout = sessionTimeout;
+      this.credentials = credentials;
+    }
+
+    /**
+     * Creates a new client configuration with defaults for the session 
timeout and credentials.
+     *
+     * @param servers ZooKeeper server addresses.
+     * @return A new configuration.
+     */
+    public static ClientConfig create(Iterable<InetSocketAddress> servers) {
+      return new ClientConfig(
+          servers,
+          Optional.<String> absent(),
+          false,
+          ZooKeeperUtils.DEFAULT_ZK_SESSION_TIMEOUT,
+          Credentials.NONE);
+    }
+
+    /**
+     * Creates a new configuration identical to this configuration, but with 
the provided
+     * session timeout.
+     *
+     * @param sessionTimeout Timeout duration for established sessions.
+     * @return A modified clone of this configuration.
+     */
+    public ClientConfig withSessionTimeout(Amount<Integer, Time> 
sessionTimeout) {
+      return new ClientConfig(servers, chrootPath, inProcess, sessionTimeout, 
credentials);
+    }
+
+    /**
+     * Creates a new configuration identical to this configuration, but with 
the provided
+     * credentials.
+     *
+     * @param credentials ZooKeeper authentication credentials.
+     * @return A modified clone of this configuration.
+     */
+    public ClientConfig withCredentials(Credentials credentials) {
+      return new ClientConfig(servers, chrootPath, inProcess, sessionTimeout, 
credentials);
+    }
+
+    /**
+     * Convenience method for calling {@link #withCredentials(Credentials)} 
with digest credentials.
+     *
+     * @param username Digest authentication user.
+     * @param password Digest authentication raw password.
+     * @return A modified clone of this configuration.
+     */
+    public ClientConfig withDigestCredentials(String username, String 
password) {
+      return withCredentials(ZooKeeperClient.digestCredentials(username, 
password));
+    }
+
+    /**
+     * Creates a new configuration identical to this configuration, but with 
the provided
+     * in-process setting.
+     *
+     * @param inProcess If {@code true}, an in-process ZooKeeper server server 
will be used,
+     *                  and all clients will connect to it.
+     * @return A modified clone of this configuration.
+     */
+    public ClientConfig inProcess(boolean inProcess) {
+      return new ClientConfig(servers, chrootPath, inProcess, sessionTimeout, 
credentials);
+    }
+
+    /**
+     * Creates a new configuration identical to this configuration, but with 
the provided
+     * chroot path setting.
+     *
+     * @param chrootPath a valid ZooKeeper path used  as a chroot for 
ZooKeeper connections.
+     * @return A modified clone of this configuration.
+     */
+    public ClientConfig withChrootPath(String chrootPath) {
+      return new ClientConfig(servers, Optional.of(chrootPath), inProcess, 
sessionTimeout, credentials);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/client/flagged/FlaggedClientConfig.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/client/flagged/FlaggedClientConfig.java
 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/client/flagged/FlaggedClientConfig.java
new file mode 100644
index 0000000..f3e3a84
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/client/flagged/FlaggedClientConfig.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper.guice.client.flagged;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.aurora.common.args.Arg;
+import org.apache.aurora.common.args.CmdLine;
+import org.apache.aurora.common.args.constraints.NotEmpty;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.Credentials;
+import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
+import 
org.apache.aurora.common.zookeeper.guice.client.ZooKeeperClientModule.ClientConfig;
+
+/**
+ * A factory that creates a {@link ClientConfig} instance based on command 
line argument values.
+ */
+public class FlaggedClientConfig {
+  @CmdLine(name = "zk_in_proc",
+      help = "Launches an embedded zookeeper server for local testing causing 
-zk_endpoints "
+          + "to be ignored if specified.")
+  private static final Arg<Boolean> IN_PROCESS = Arg.create(false);
+
+  @NotEmpty
+  @CmdLine(name = "zk_endpoints", help ="Endpoint specification for the 
ZooKeeper servers.")
+  private static final Arg<List<InetSocketAddress>> ZK_ENDPOINTS = 
Arg.create();
+
+  @CmdLine(name = "zk_chroot_path", help = "chroot path to use for the 
ZooKeeper connections")
+  private static final Arg<String> CHROOT_PATH = Arg.create(null);
+
+  @CmdLine(name = "zk_session_timeout", help ="The ZooKeeper session timeout.")
+  private static final Arg<Amount<Integer, Time>> SESSION_TIMEOUT =
+      Arg.create(ZooKeeperUtils.DEFAULT_ZK_SESSION_TIMEOUT);
+
+  @CmdLine(name = "zk_digest_credentials",
+           help ="user:password to use when authenticating with ZooKeeper.")
+  private static final Arg<String> DIGEST_CREDENTIALS = Arg.create();
+
+  /**
+   * Creates a configuration from command line arguments.
+   *
+   * @return Configuration instance.
+   */
+  public static ClientConfig create() {
+    return new ClientConfig(
+        ZK_ENDPOINTS.get(),
+        Optional.fromNullable(CHROOT_PATH.get()),
+        IN_PROCESS.get(),
+        SESSION_TIMEOUT.get(),
+        DIGEST_CREDENTIALS.hasAppliedValue()
+            ? getCredentials(DIGEST_CREDENTIALS.get())
+            : Credentials.NONE
+    );
+  }
+
+  private static Credentials getCredentials(String userAndPass) {
+    List<String> parts = 
ImmutableList.copyOf(Splitter.on(":").split(userAndPass));
+    if (parts.size() != 2) {
+      throw new IllegalArgumentException(
+          "zk_digest_credentials must be formatted as user:pass");
+    }
+    return ZooKeeperClient.digestCredentials(parts.get(0), parts.get(1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java
 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java
new file mode 100644
index 0000000..88cd6d2
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper.testing;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.testing.TearDown;
+import com.google.common.testing.junit4.TearDownTestCase;
+
+import org.apache.aurora.common.zookeeper.ZooKeeperClient;
+import org.junit.Before;
+
+import 
org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+
+/**
+ * A baseclass for in-process zookeeper tests.
+ * Uses ZooKeeperTestHelper to start the server and create clients: new tests 
should directly use
+ * that helper class instead of extending this class.
+ */
+public abstract class BaseZooKeeperTest extends TearDownTestCase {
+
+  private final Amount<Integer, Time> defaultSessionTimeout;
+  private ZooKeeperTestServer zkTestServer;
+
+  /**
+   * Creates a test case where the test server uses its
+   * {@link ZooKeeperTestServer#DEFAULT_SESSION_TIMEOUT} for clients created 
without an explicit
+   * session timeout.
+   */
+  public BaseZooKeeperTest() {
+    this(ZooKeeperTestServer.DEFAULT_SESSION_TIMEOUT);
+  }
+
+  /**
+   * Creates a test case where the test server uses the given {@code 
defaultSessionTimeout} for
+   * clients created without an explicit session timeout.
+   */
+  public BaseZooKeeperTest(Amount<Integer, Time> defaultSessionTimeout) {
+    this.defaultSessionTimeout = 
Preconditions.checkNotNull(defaultSessionTimeout);
+  }
+
+  @Before
+  public final void setUp() throws Exception {
+    final ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl();
+    addTearDown(new TearDown() {
+      @Override public void tearDown() {
+        shutdownRegistry.execute();
+      }
+    });
+    zkTestServer = new ZooKeeperTestServer(0, shutdownRegistry, 
defaultSessionTimeout);
+    zkTestServer.startNetwork();
+  }
+
+  /**
+   * Starts zookeeper back up on the last used port.
+   */
+  protected final void restartNetwork() throws IOException, 
InterruptedException {
+    zkTestServer.restartNetwork();
+  }
+
+  /**
+   * Shuts down the in-process zookeeper network server.
+   */
+  protected final void shutdownNetwork() {
+    zkTestServer.shutdownNetwork();
+  }
+
+  /**
+   * Expires the active session for the given client.  The client should be 
one returned from
+   * {@link #createZkClient}.
+   *
+   * @param zkClient the client to expire
+   * @throws ZooKeeperClient.ZooKeeperConnectionException if a problem is 
encountered connecting to
+   *    the local zk server while trying to expire the session
+   * @throws InterruptedException if interrupted while requesting expiration
+   */
+  protected final void expireSession(ZooKeeperClient zkClient)
+      throws ZooKeeperClient.ZooKeeperConnectionException, 
InterruptedException {
+    zkTestServer.expireClientSession(zkClient);
+  }
+
+  /**
+   * Returns the current port to connect to the in-process zookeeper instance.
+   */
+  protected final int getPort() {
+    return zkTestServer.getPort();
+  }
+
+  /**
+   * Returns a new unauthenticated zookeeper client connected to the 
in-process zookeeper server
+   * with the default session timeout.
+   */
+  protected final ZooKeeperClient createZkClient() {
+    return zkTestServer.createClient();
+  }
+
+  /**
+   * Returns a new authenticated zookeeper client connected to the in-process 
zookeeper server with
+   * the default session timeout.
+   */
+  protected final ZooKeeperClient createZkClient(ZooKeeperClient.Credentials 
credentials) {
+    return zkTestServer.createClient(credentials);
+  }
+
+  /**
+   * Returns a new authenticated zookeeper client connected to the in-process 
zookeeper server with
+   * the default session timeout.  The client is authenticated in the digest 
authentication scheme
+   * with the given {@code username} and {@code password}.
+   */
+  protected final ZooKeeperClient createZkClient(String username, String 
password) {
+    return createZkClient(ZooKeeperClient.digestCredentials(username, 
password));
+  }
+
+  /**
+   * Returns a new unauthenticated zookeeper client connected to the 
in-process zookeeper server
+   * with a custom {@code sessionTimeout}.
+   */
+  protected final ZooKeeperClient createZkClient(Amount<Integer, Time> 
sessionTimeout) {
+    return zkTestServer.createClient(sessionTimeout);
+  }
+
+  /**
+   * Returns a new authenticated zookeeper client connected to the in-process 
zookeeper server with
+   * a custom {@code sessionTimeout}.
+   */
+  protected final ZooKeeperClient createZkClient(Amount<Integer, Time> 
sessionTimeout,
+      ZooKeeperClient.Credentials credentials) {
+    return zkTestServer.createClient(sessionTimeout, credentials);
+  }
+
+  /**
+   * Returns a new authenticated zookeeper client connected to the in-process 
zookeeper server with
+   * the default session timeout and the custom chroot path.
+   */
+  protected final ZooKeeperClient createZkClient(String chrootPath) {
+    return zkTestServer.createClient(chrootPath);
+  }
+}

Reply via email to