http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperClient.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperClient.java b/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperClient.java deleted file mode 100644 index ae327f8..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperClient.java +++ /dev/null @@ -1,493 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.logging.Logger; - -import javax.annotation.Nullable; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Objects; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; - -import org.apache.commons.lang.builder.EqualsBuilder; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.SessionExpiredException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.common.PathUtils; - -import com.twitter.common.base.Command; -import com.twitter.common.base.MorePreconditions; -import com.twitter.common.net.InetSocketAddressHelper; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; - -/** - * Manages a connection to a ZooKeeper cluster. - */ -public class ZooKeeperClient { - - /** - * Indicates an error connecting to a zookeeper cluster. - */ - public class ZooKeeperConnectionException extends Exception { - public ZooKeeperConnectionException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Encapsulates a user's credentials and has the ability to authenticate them through a - * {@link ZooKeeper} client. - */ - public interface Credentials { - - /** - * A set of {@code Credentials} that performs no authentication. - */ - Credentials NONE = new Credentials() { - @Override public void authenticate(ZooKeeper zooKeeper) { - // noop - } - - @Override public String scheme() { - return null; - } - - @Override public byte[] authToken() { - return null; - } - }; - - /** - * Authenticates these credentials against the given {@code ZooKeeper} client. - * - * @param zooKeeper the client to authenticate - */ - void authenticate(ZooKeeper zooKeeper); - - /** - * Returns the authentication scheme these credentials are for. - * - * @return the scheme these credentials are for or {@code null} if no authentication is - * intended. - */ - @Nullable - String scheme(); - - /** - * Returns the authentication token. - * - * @return the authentication token or {@code null} if no authentication is intended. - */ - @Nullable - byte[] authToken(); - } - - /** - * Creates a set of credentials for the zoo keeper digest authentication mechanism. - * - * @param username the username to authenticate with - * @param password the password to authenticate with - * @return a set of credentials that can be used to authenticate the zoo keeper client - */ - public static Credentials digestCredentials(String username, String password) { - MorePreconditions.checkNotBlank(username); - Preconditions.checkNotNull(password); - - // TODO(John Sirois): DigestAuthenticationProvider is broken - uses platform default charset - // (on server) and so we just have to hope here that clients are deployed in compatible jvms. - // Consider writing and installing a version of DigestAuthenticationProvider that controls its - // Charset explicitly. - return credentials("digest", (username + ":" + password).getBytes()); - } - - /** - * Creates a set of credentials for the given authentication {@code scheme}. - * - * @param scheme the scheme to authenticate with - * @param authToken the authentication token - * @return a set of credentials that can be used to authenticate the zoo keeper client - */ - public static Credentials credentials(final String scheme, final byte[] authToken) { - MorePreconditions.checkNotBlank(scheme); - Preconditions.checkNotNull(authToken); - - return new Credentials() { - @Override public void authenticate(ZooKeeper zooKeeper) { - zooKeeper.addAuthInfo(scheme, authToken); - } - - @Override public String scheme() { - return scheme; - } - - @Override public byte[] authToken() { - return authToken; - } - - @Override public boolean equals(Object o) { - if (!(o instanceof Credentials)) { - return false; - } - - Credentials other = (Credentials) o; - return new EqualsBuilder() - .append(scheme, other.scheme()) - .append(authToken, other.authToken()) - .isEquals(); - } - - @Override public int hashCode() { - return Objects.hashCode(scheme, authToken); - } - }; - } - - private final class SessionState { - private final long sessionId; - private final byte[] sessionPasswd; - - private SessionState(long sessionId, byte[] sessionPasswd) { - this.sessionId = sessionId; - this.sessionPasswd = sessionPasswd; - } - } - - private static final Logger LOG = Logger.getLogger(ZooKeeperClient.class.getName()); - - private static final Amount<Long,Time> WAIT_FOREVER = Amount.of(0L, Time.MILLISECONDS); - - private final int sessionTimeoutMs; - private final Credentials credentials; - private final String zooKeeperServers; - // GuardedBy "this", but still volatile for tests, where we want to be able to see writes - // made from within long synchronized blocks. - private volatile ZooKeeper zooKeeper; - private SessionState sessionState; - - private final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>(); - private final BlockingQueue<WatchedEvent> eventQueue = new LinkedBlockingQueue<WatchedEvent>(); - - private static Iterable<InetSocketAddress> combine(InetSocketAddress address, - InetSocketAddress... addresses) { - return ImmutableSet.<InetSocketAddress>builder().add(address).add(addresses).build(); - } - - /** - * Creates an unconnected client that will lazily attempt to connect on the first call to - * {@link #get()}. - * - * @param sessionTimeout the ZK session timeout - * @param zooKeeperServer the first, required ZK server - * @param zooKeeperServers any additional servers forming the ZK cluster - */ - public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, InetSocketAddress zooKeeperServer, - InetSocketAddress... zooKeeperServers) { - this(sessionTimeout, combine(zooKeeperServer, zooKeeperServers)); - } - - /** - * Creates an unconnected client that will lazily attempt to connect on the first call to - * {@link #get}. - * - * @param sessionTimeout the ZK session timeout - * @param zooKeeperServers the set of servers forming the ZK cluster - */ - public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, - Iterable<InetSocketAddress> zooKeeperServers) { - this(sessionTimeout, Credentials.NONE, Optional.<String> absent(), zooKeeperServers); - } - - /** - * Creates an unconnected client that will lazily attempt to connect on the first call to - * {@link #get()}. All successful connections will be authenticated with the given - * {@code credentials}. - * - * @param sessionTimeout the ZK session timeout - * @param credentials the credentials to authenticate with - * @param zooKeeperServer the first, required ZK server - * @param zooKeeperServers any additional servers forming the ZK cluster - */ - public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials, - InetSocketAddress zooKeeperServer, InetSocketAddress... zooKeeperServers) { - this(sessionTimeout, credentials, Optional.<String> absent(), combine(zooKeeperServer, zooKeeperServers)); - } - - /** - * Creates an unconnected client that will lazily attempt to connect on the first call to - * {@link #get}. All successful connections will be authenticated with the given - * {@code credentials}. - * - * @param sessionTimeout the ZK session timeout - * @param credentials the credentials to authenticate with - * @param zooKeeperServers the set of servers forming the ZK cluster - */ - public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials, - Iterable<InetSocketAddress> zooKeeperServers) { - this(sessionTimeout, credentials, Optional.<String> absent(), zooKeeperServers); - } - - /** - * Creates an unconnected client that will lazily attempt to connect on the first call to - * {@link #get}. All successful connections will be authenticated with the given - * {@code credentials}. - * - * @param sessionTimeout the ZK session timeout - * @param credentials the credentials to authenticate with - * @param chrootPath an optional chroot path - * @param zooKeeperServers the set of servers forming the ZK cluster - */ - public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials, - Optional<String> chrootPath, Iterable<InetSocketAddress> zooKeeperServers) { - this.sessionTimeoutMs = Preconditions.checkNotNull(sessionTimeout).as(Time.MILLISECONDS); - this.credentials = Preconditions.checkNotNull(credentials); - - if (chrootPath.isPresent()) { - PathUtils.validatePath(chrootPath.get()); - } - - Preconditions.checkNotNull(zooKeeperServers); - Preconditions.checkArgument(!Iterables.isEmpty(zooKeeperServers), - "Must present at least 1 ZK server"); - - Thread watcherProcessor = new Thread("ZookeeperClient-watcherProcessor") { - @Override - public void run() { - while (true) { - try { - WatchedEvent event = eventQueue.take(); - for (Watcher watcher : watchers) { - watcher.process(event); - } - } catch (InterruptedException e) { /* ignore */ } - } - } - }; - watcherProcessor.setDaemon(true); - watcherProcessor.start(); - - Iterable<String> servers = - Iterables.transform(ImmutableSet.copyOf(zooKeeperServers), - InetSocketAddressHelper.INET_TO_STR); - this.zooKeeperServers = Joiner.on(',').join(servers).concat(chrootPath.or("")); - } - - /** - * Returns true if this client has non-empty credentials set. For example, returns {@code false} - * if this client was constructed with {@link Credentials#NONE}. - * - * @return {@code true} if this client is configured with non-empty credentials. - */ - public boolean hasCredentials() { - return !Strings.isNullOrEmpty(credentials.scheme()) && (credentials.authToken() != null); - } - - /** - * Returns the current active ZK connection or establishes a new one if none has yet been - * established or a previous connection was disconnected or had its session time out. This method - * will attempt to re-use sessions when possible. Equivalent to: - * <pre>get(Amount.of(0L, ...)</pre>. - * - * @return a connected ZooKeeper client - * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster - * @throws InterruptedException if interrupted while waiting for a connection to be established - */ - public synchronized ZooKeeper get() throws ZooKeeperConnectionException, InterruptedException { - try { - return get(WAIT_FOREVER); - } catch (TimeoutException e) { - InterruptedException interruptedException = - new InterruptedException("Got an unexpected TimeoutException for 0 wait"); - interruptedException.initCause(e); - throw interruptedException; - } - } - - /** - * Returns the current active ZK connection or establishes a new one if none has yet been - * established or a previous connection was disconnected or had its session time out. This - * method will attempt to re-use sessions when possible. - * - * @param connectionTimeout the maximum amount of time to wait for the connection to the ZK - * cluster to be established; 0 to wait forever - * @return a connected ZooKeeper client - * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster - * @throws InterruptedException if interrupted while waiting for a connection to be established - * @throws TimeoutException if a connection could not be established within the configured - * session timeout - */ - public synchronized ZooKeeper get(Amount<Long, Time> connectionTimeout) - throws ZooKeeperConnectionException, InterruptedException, TimeoutException { - - if (zooKeeper == null) { - final CountDownLatch connected = new CountDownLatch(1); - Watcher watcher = new Watcher() { - @Override public void process(WatchedEvent event) { - switch (event.getType()) { - // Guard the None type since this watch may be used as the default watch on calls by - // the client outside our control. - case None: - switch (event.getState()) { - case Expired: - LOG.info("Zookeeper session expired. Event: " + event); - close(); - break; - case SyncConnected: - connected.countDown(); - break; - } - } - - eventQueue.offer(event); - } - }; - - try { - zooKeeper = (sessionState != null) - ? new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher, sessionState.sessionId, - sessionState.sessionPasswd) - : new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher); - } catch (IOException e) { - throw new ZooKeeperConnectionException( - "Problem connecting to servers: " + zooKeeperServers, e); - } - - if (connectionTimeout.getValue() > 0) { - if (!connected.await(connectionTimeout.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) { - close(); - throw new TimeoutException("Timed out waiting for a ZK connection after " - + connectionTimeout); - } - } else { - try { - connected.await(); - } catch (InterruptedException ex) { - LOG.info("Interrupted while waiting to connect to zooKeeper"); - close(); - throw ex; - } - } - credentials.authenticate(zooKeeper); - - sessionState = new SessionState(zooKeeper.getSessionId(), zooKeeper.getSessionPasswd()); - } - return zooKeeper; - } - - /** - * Clients that need to re-establish state after session expiration can register an - * {@code onExpired} command to execute. - * - * @param onExpired the {@code Command} to register - * @return the new {@link Watcher} which can later be passed to {@link #unregister} for - * removal. - */ - public Watcher registerExpirationHandler(final Command onExpired) { - Watcher watcher = new Watcher() { - @Override public void process(WatchedEvent event) { - if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) { - onExpired.execute(); - } - } - }; - register(watcher); - return watcher; - } - - /** - * Clients that need to register a top-level {@code Watcher} should do so using this method. The - * registered {@code watcher} will remain registered across re-connects and session expiration - * events. - * - * @param watcher the {@code Watcher to register} - */ - public void register(Watcher watcher) { - watchers.add(watcher); - } - - /** - * Clients can attempt to unregister a top-level {@code Watcher} that has previously been - * registered. - * - * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch - * @return whether the given {@code Watcher} was found and removed from the active set - */ - public boolean unregister(Watcher watcher) { - return watchers.remove(watcher); - } - - /** - * Checks to see if the client might reasonably re-try an operation given the exception thrown - * while attempting it. If the ZooKeeper session should be expired to enable the re-try to - * succeed this method will expire it as a side-effect. - * - * @param e the exception to test - * @return true if a retry can be attempted - */ - public boolean shouldRetry(KeeperException e) { - if (e instanceof SessionExpiredException) { - close(); - } - return ZooKeeperUtils.isRetryable(e); - } - - /** - * Closes the current connection if any expiring the current ZooKeeper session. Any subsequent - * calls to this method will no-op until the next successful {@link #get}. - */ - public synchronized void close() { - if (zooKeeper != null) { - try { - zooKeeper.close(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warning("Interrupted trying to close zooKeeper"); - } finally { - zooKeeper = null; - sessionState = null; - } - } - } - - @VisibleForTesting - synchronized boolean isClosed() { - return zooKeeper == null; - } - - @VisibleForTesting - ZooKeeper getZooKeeperClientForTests() { - return zooKeeper; - } -}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperMap.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperMap.java b/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperMap.java deleted file mode 100644 index 8869678..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperMap.java +++ /dev/null @@ -1,411 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper; - -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Functions; -import com.google.common.base.Preconditions; -import com.google.common.collect.ForwardingMap; -import com.google.common.collect.Sets; - -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; - -import com.twitter.common.base.Command; -import com.twitter.common.base.ExceptionalSupplier; -import com.twitter.common.base.MorePreconditions; -import com.twitter.common.util.BackoffHelper; -import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; - -/** - * A ZooKeeper backed {@link Map}. Initialized with a node path, this map represents child nodes - * under that path as keys, with the data in those nodes as values. This map is readonly from - * clients of this class, and only can be modified via direct zookeeper operations. - * - * Note that instances of this class maintain a zookeeper watch for each zookeeper node under the - * parent, as well as on the parent itself. Instances of this class should be created via the - * {@link #create} factory method. - * - * As of ZooKeeper Version 3.1, the maximum allowable size of a data node is 1 MB. A single - * client should be able to hold up to maintain several thousand watches, but this depends on rate - * of data change as well. - * - * Talk to your zookeeper cluster administrator if you expect number of map entries times number - * of live clients to exceed a thousand, as a zookeeper cluster is limited by total number of - * server-side watches enabled. - * - * For an example of a set of tools to maintain one of these maps, please see - * src/scripts/HenAccess.py in the hen repository. - * - * @param <V> the type of values this map stores - */ -public class ZooKeeperMap<V> extends ForwardingMap<String, V> { - - /** - * An optional listener which can be supplied and triggered when entries in a ZooKeeperMap - * are added, changed or removed. For a ZooKeeperMap of type <V>, the listener will fire a - * "nodeChanged" event with the name of the ZNode that changed, and its resulting value as - * interpreted by the provided deserializer. Removal of child nodes triggers the "nodeRemoved" - * method indicating the name of the ZNode which is no longer present in the map. - */ - public interface Listener<V> { - - /** - * Fired when a node is added to the ZooKeeperMap or changed. - * - * @param nodeName indicates the name of the ZNode that was added or changed. - * @param value is the new value of the node after passing through your supplied deserializer. - */ - void nodeChanged(String nodeName, V value); - - /** - * Fired when a node is removed from the ZooKeeperMap. - * - * @param nodeName indicates the name of the ZNode that was removed from the ZooKeeperMap. - */ - void nodeRemoved(String nodeName); - } - - /** - * Default deserializer for the constructor if you want to simply store the zookeeper byte[] data - * in this map. - */ - public static final Function<byte[], byte[]> BYTE_ARRAY_VALUES = Functions.identity(); - - /** - * A listener that ignores all events. - */ - public static <T> Listener<T> noopListener() { - return new Listener<T>() { - @Override public void nodeChanged(String nodeName, T value) { } - @Override public void nodeRemoved(String nodeName) { } - }; - } - - private static final Logger LOG = Logger.getLogger(ZooKeeperMap.class.getName()); - - private final ZooKeeperClient zkClient; - private final String nodePath; - private final Function<byte[], V> deserializer; - - private final ConcurrentMap<String, V> localMap; - private final Map<String, V> unmodifiableLocalMap; - private final BackoffHelper backoffHelper; - - private final Listener<V> mapListener; - - // Whether it's safe to re-establish watches if our zookeeper session has expired. - private final Object safeToRewatchLock; - private volatile boolean safeToRewatch; - - /** - * Returns an initialized ZooKeeperMap. The given path must exist at the time of - * creation or a {@link KeeperException} will be thrown. - * - * @param zkClient a zookeeper client - * @param nodePath path to a node whose data will be watched - * @param deserializer a function that converts byte[] data from a zk node to this map's - * value type V - * @param listener is a Listener which fires when values are added, changed, or removed. - * - * @throws InterruptedException if the underlying zookeeper server transaction is interrupted - * @throws KeeperException.NoNodeException if the given nodePath doesn't exist - * @throws KeeperException if the server signals an error - * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper - * cluster - */ - public static <V> ZooKeeperMap<V> create( - ZooKeeperClient zkClient, - String nodePath, - Function<byte[], V> deserializer, - Listener<V> listener) - throws InterruptedException, KeeperException, ZooKeeperConnectionException { - - ZooKeeperMap<V> zkMap = new ZooKeeperMap<V>(zkClient, nodePath, deserializer, listener); - zkMap.init(); - return zkMap; - } - - - /** - * Returns an initialized ZooKeeperMap. The given path must exist at the time of - * creation or a {@link KeeperException} will be thrown. - * - * @param zkClient a zookeeper client - * @param nodePath path to a node whose data will be watched - * @param deserializer a function that converts byte[] data from a zk node to this map's - * value type V - * - * @throws InterruptedException if the underlying zookeeper server transaction is interrupted - * @throws KeeperException.NoNodeException if the given nodePath doesn't exist - * @throws KeeperException if the server signals an error - * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper - * cluster - */ - public static <V> ZooKeeperMap<V> create( - ZooKeeperClient zkClient, - String nodePath, - Function<byte[], V> deserializer) - throws InterruptedException, KeeperException, ZooKeeperConnectionException { - - return ZooKeeperMap.create(zkClient, nodePath, deserializer, ZooKeeperMap.<V>noopListener()); - } - - /** - * Initializes a ZooKeeperMap. The given path must exist at the time of object creation or - * a {@link KeeperException} will be thrown. - * - * Please note that this object will not track any remote zookeeper data until {@link #init()} - * is successfully called. After construction and before that call, this {@link Map} will - * be empty. - * - * @param zkClient a zookeeper client - * @param nodePath top-level node path under which the map data lives - * @param deserializer a function that converts byte[] data from a zk node to this map's - * value type V - * @param mapListener is a Listener which fires when values are added, changed, or removed. - * - * @throws InterruptedException if the underlying zookeeper server transaction is interrupted - * @throws KeeperException.NoNodeException if the given nodePath doesn't exist - * @throws KeeperException if the server signals an error - * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper - * cluster - */ - @VisibleForTesting - ZooKeeperMap( - ZooKeeperClient zkClient, - String nodePath, - Function<byte[], V> deserializer, - Listener<V> mapListener) - throws InterruptedException, KeeperException, ZooKeeperConnectionException { - - super(); - - this.mapListener = Preconditions.checkNotNull(mapListener); - this.zkClient = Preconditions.checkNotNull(zkClient); - this.nodePath = MorePreconditions.checkNotBlank(nodePath); - this.deserializer = Preconditions.checkNotNull(deserializer); - - localMap = new ConcurrentHashMap<String, V>(); - unmodifiableLocalMap = Collections.unmodifiableMap(localMap); - backoffHelper = new BackoffHelper(); - safeToRewatchLock = new Object(); - safeToRewatch = false; - - if (zkClient.get().exists(nodePath, null) == null) { - throw new KeeperException.NoNodeException(); - } - } - - /** - * Initialize zookeeper tracking for this {@link Map}. Once this call returns, this object - * will be tracking data in zookeeper. - * - * @throws InterruptedException if the underlying zookeeper server transaction is interrupted - * @throws KeeperException if the server signals an error - * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper - * cluster - */ - @VisibleForTesting - void init() throws InterruptedException, KeeperException, ZooKeeperConnectionException { - Watcher watcher = zkClient.registerExpirationHandler(new Command() { - @Override public void execute() { - /* - * First rewatch all of our locally cached children. Some of them may not exist anymore, - * which will lead to caught KeeperException.NoNode whereafter we'll remove that child - * from the cached map. - * - * Next, we'll establish our top level child watch and add any new nodes that might exist. - */ - try { - synchronized (safeToRewatchLock) { - if (safeToRewatch) { - rewatchDataNodes(); - tryWatchChildren(); - } - } - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "Interrupted while trying to re-establish watch.", e); - Thread.currentThread().interrupt(); - } - } - }); - - try { - // Synchronize to prevent the race of watchChildren completing and then the session expiring - // before we update safeToRewatch. - synchronized (safeToRewatchLock) { - watchChildren(); - safeToRewatch = true; - } - } catch (InterruptedException e) { - zkClient.unregister(watcher); - throw e; - } catch (KeeperException e) { - zkClient.unregister(watcher); - throw e; - } catch (ZooKeeperConnectionException e) { - zkClient.unregister(watcher); - throw e; - } - } - - @Override - protected Map<String, V> delegate() { - return unmodifiableLocalMap; - } - - private void tryWatchChildren() throws InterruptedException { - backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, InterruptedException>() { - @Override public Boolean get() throws InterruptedException { - try { - watchChildren(); - return true; - } catch (KeeperException e) { - return false; - } catch (ZooKeeperConnectionException e) { - return false; - } - } - }); - } - - private synchronized void watchChildren() - throws InterruptedException, KeeperException, ZooKeeperConnectionException { - - /* - * Add a watch on the parent node itself, and attempt to rewatch if it - * gets deleted - */ - zkClient.get().exists(nodePath, new Watcher() { - @Override public void process(WatchedEvent event) { - if (event.getType() == Watcher.Event.EventType.NodeDeleted) { - // If the parent node no longer exists - localMap.clear(); - try { - tryWatchChildren(); - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "Interrupted while trying to watch children.", e); - Thread.currentThread().interrupt(); - } - } - }}); - - final Watcher childWatcher = new Watcher() { - @Override - public void process(WatchedEvent event) { - if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { - try { - tryWatchChildren(); - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "Interrupted while trying to watch children.", e); - Thread.currentThread().interrupt(); - } - } - } - }; - - List<String> children = zkClient.get().getChildren(nodePath, childWatcher); - updateChildren(Sets.newHashSet(children)); - } - - private void tryAddChild(final String child) throws InterruptedException { - backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, InterruptedException>() { - @Override public Boolean get() throws InterruptedException { - try { - addChild(child); - return true; - } catch (KeeperException e) { - return false; - } catch (ZooKeeperConnectionException e) { - return false; - } - } - }); - } - - // TODO(Adam Samet) - Make this use the ZooKeeperNode class. - private void addChild(final String child) - throws InterruptedException, KeeperException, ZooKeeperConnectionException { - - final Watcher nodeWatcher = new Watcher() { - @Override - public void process(WatchedEvent event) { - if (event.getType() == Watcher.Event.EventType.NodeDataChanged) { - try { - tryAddChild(child); - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "Interrupted while trying to add a child.", e); - Thread.currentThread().interrupt(); - } - } else if (event.getType() == Watcher.Event.EventType.NodeDeleted) { - removeEntry(child); - } - } - }; - - try { - V value = deserializer.apply(zkClient.get().getData(makePath(child), nodeWatcher, null)); - putEntry(child, value); - } catch (KeeperException.NoNodeException e) { - // This node doesn't exist anymore, remove it from the map and we're done. - removeEntry(child); - } - } - - @VisibleForTesting - void removeEntry(String key) { - localMap.remove(key); - mapListener.nodeRemoved(key); - } - - @VisibleForTesting - void putEntry(String key, V value) { - localMap.put(key, value); - mapListener.nodeChanged(key, value); - } - - private void rewatchDataNodes() throws InterruptedException { - for (String child : keySet()) { - tryAddChild(child); - } - } - - private String makePath(final String child) { - return nodePath + "/" + child; - } - - private void updateChildren(Set<String> zkChildren) throws InterruptedException { - Set<String> addedChildren = Sets.difference(zkChildren, keySet()); - Set<String> removedChildren = Sets.difference(keySet(), zkChildren); - for (String child : addedChildren) { - tryAddChild(child); - } - for (String child : removedChildren) { - removeEntry(child); - } - } -} - http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperNode.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperNode.java b/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperNode.java deleted file mode 100644 index 7b08cb2..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperNode.java +++ /dev/null @@ -1,349 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper; - -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.annotation.Nullable; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Functions; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; - -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.data.Stat; - -import com.twitter.common.base.Closure; -import com.twitter.common.base.Closures; -import com.twitter.common.base.Command; -import com.twitter.common.base.ExceptionalSupplier; -import com.twitter.common.base.MorePreconditions; -import com.twitter.common.util.BackoffHelper; -import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; - -/** - * An implementation of {@link Supplier} that offers a readonly view of a - * zookeeper data node. This class is thread-safe. - * - * Instances of this class each maintain a zookeeper watch for the remote data node. Instances - * of this class should be created via the {@link #create} factory method. - * - * Please see zookeeper documentation and talk to your cluster administrator for guidance on - * appropriate node size and total number of nodes you should be using. - * - * @param <T> the type of data this node stores - */ -public class ZooKeeperNode<T> implements Supplier<T> { - /** - * Deserializer for the constructor if you want to simply store the zookeeper byte[] data - * as-is. - */ - public static final Function<byte[], byte[]> BYTE_ARRAY_VALUE = Functions.identity(); - - private static final Logger LOG = Logger.getLogger(ZooKeeperNode.class.getName()); - - private final ZooKeeperClient zkClient; - private final String nodePath; - private final NodeDeserializer<T> deserializer; - - private final BackoffHelper backoffHelper; - - // Whether it's safe to re-establish watches if our zookeeper session has expired. - private final Object safeToRewatchLock; - private volatile boolean safeToRewatch; - - private final T NO_DATA = null; - @Nullable private volatile T nodeData; - private final Closure<T> dataUpdateListener; - - /** - * When a call to ZooKeeper.getData is made, the Watcher is added to a Set before the the network - * request is made and if the request fails, the Watcher remains. There's a problem where Watcher - * can accumulate when there are failed requests, so they are set to instance fields and reused. - */ - private final Watcher nodeWatcher; - private final Watcher existenceWatcher; - - /** - * Returns an initialized ZooKeeperNode. The given node must exist at the time of object - * creation or a {@link KeeperException} will be thrown. - * - * @param zkClient a zookeeper client - * @param nodePath path to a node whose data will be watched - * @param deserializer a function that converts byte[] data from a zk node to this supplier's - * type T - * - * @throws InterruptedException if the underlying zookeeper server transaction is interrupted - * @throws KeeperException.NoNodeException if the given nodePath doesn't exist - * @throws KeeperException if the server signals an error - * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper - * cluster - */ - public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath, - Function<byte[], T> deserializer) throws InterruptedException, KeeperException, - ZooKeeperConnectionException { - return create(zkClient, nodePath, deserializer, Closures.<T>noop()); - } - - /** - * Like the above, but optionally takes in a {@link Closure} that will get notified - * whenever the data is updated from the remote node. - * - * @param dataUpdateListener a {@link Closure} to receive data update notifications. - */ - public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath, - Function<byte[], T> deserializer, Closure<T> dataUpdateListener) throws InterruptedException, - KeeperException, ZooKeeperConnectionException { - return create(zkClient, nodePath, new FunctionWrapper<T>(deserializer), dataUpdateListener); - } - - /** - * Returns an initialized ZooKeeperNode. The given node must exist at the time of object - * creation or a {@link KeeperException} will be thrown. - * - * @param zkClient a zookeeper client - * @param nodePath path to a node whose data will be watched - * @param deserializer an implentation of {@link NodeDeserializer} that converts a byte[] from a - * zk node to this supplier's type T. Also supplies a {@link Stat} object which is useful for - * doing versioned updates. - * - * @throws InterruptedException if the underlying zookeeper server transaction is interrupted - * @throws KeeperException.NoNodeException if the given nodePath doesn't exist - * @throws KeeperException if the server signals an error - * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper - * cluster - */ - public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath, - NodeDeserializer<T> deserializer) throws InterruptedException, KeeperException, - ZooKeeperConnectionException { - return create(zkClient, nodePath, deserializer, Closures.<T>noop()); - } - - /** - * Like the above, but optionally takes in a {@link Closure} that will get notified - * whenever the data is updated from the remote node. - * - * @param dataUpdateListener a {@link Closure} to receive data update notifications. - */ - public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath, - NodeDeserializer<T> deserializer, Closure<T> dataUpdateListener) - throws InterruptedException, KeeperException, ZooKeeperConnectionException { - ZooKeeperNode<T> zkNode = - new ZooKeeperNode<T>(zkClient, nodePath, deserializer, dataUpdateListener); - zkNode.init(); - return zkNode; - } - - /** - * Initializes a ZooKeeperNode. The given node must exist at the time of object creation or - * a {@link KeeperException} will be thrown. - * - * Please note that this object will not track any remote zookeeper data until {@link #init()} - * is successfully called. After construction and before that call, this {@link Supplier} will - * return null. - * - * @param zkClient a zookeeper client - * @param nodePath path to a node whose data will be watched - * @param deserializer an implementation of {@link NodeDeserializer} that converts byte[] data - * from a zk node to this supplier's type T - * @param dataUpdateListener a {@link Closure} to receive data update notifications. - */ - @VisibleForTesting - ZooKeeperNode(ZooKeeperClient zkClient, String nodePath, - NodeDeserializer<T> deserializer, Closure<T> dataUpdateListener) { - this.zkClient = Preconditions.checkNotNull(zkClient); - this.nodePath = MorePreconditions.checkNotBlank(nodePath); - this.deserializer = Preconditions.checkNotNull(deserializer); - this.dataUpdateListener = Preconditions.checkNotNull(dataUpdateListener); - - backoffHelper = new BackoffHelper(); - safeToRewatchLock = new Object(); - safeToRewatch = false; - nodeData = NO_DATA; - - nodeWatcher = new Watcher() { - @Override public void process(WatchedEvent event) { - if (event.getState() == KeeperState.SyncConnected) { - try { - tryWatchDataNode(); - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "Interrupted while trying to watch a data node.", e); - Thread.currentThread().interrupt(); - } - } else { - LOG.info("Ignoring watcher event " + event); - } - } - }; - - existenceWatcher = new Watcher() { - @Override public void process(WatchedEvent event) { - if (event.getType() == Watcher.Event.EventType.NodeCreated) { - try { - tryWatchDataNode(); - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "Interrupted while trying to watch a data node.", e); - Thread.currentThread().interrupt(); - } - } - } - }; - } - - /** - * Initialize zookeeper tracking for this {@link Supplier}. Once this call returns, this object - * will be tracking data in zookeeper. - * - * @throws InterruptedException if the underlying zookeeper server transaction is interrupted - * @throws KeeperException if the server signals an error - * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper - * cluster - */ - @VisibleForTesting - void init() throws InterruptedException, KeeperException, - ZooKeeperConnectionException { - Watcher watcher = zkClient.registerExpirationHandler(new Command() { - @Override public void execute() { - try { - synchronized (safeToRewatchLock) { - if (safeToRewatch) { - tryWatchDataNode(); - } - } - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "Interrupted while trying to re-establish watch.", e); - Thread.currentThread().interrupt(); - } - } - }); - - try { - /* - * Synchronize to prevent the race of watchDataNode completing and then the session expiring - * before we update safeToRewatch. - */ - synchronized (safeToRewatchLock) { - watchDataNode(); - safeToRewatch = true; - } - } catch (InterruptedException e) { - zkClient.unregister(watcher); - throw e; - } catch (KeeperException e) { - zkClient.unregister(watcher); - throw e; - } catch (ZooKeeperConnectionException e) { - zkClient.unregister(watcher); - throw e; - } - } - - /** - * Returns the data corresponding to a byte array in a remote zookeeper node. This data is - * cached locally and updated in the background on watch notifications. - * - * @return the data currently cached locally or null if {@link #init()} hasn't been called - * or the backing node has no data or does not exist anymore. - */ - @Override - public @Nullable T get() { - return nodeData; - } - - @VisibleForTesting - void updateData(@Nullable T newData) { - nodeData = newData; - dataUpdateListener.execute(newData); - } - - private void tryWatchDataNode() throws InterruptedException { - backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, InterruptedException>() { - @Override public Boolean get() throws InterruptedException { - try { - watchDataNode(); - return true; - } catch (KeeperException e) { - return false; - } catch (ZooKeeperConnectionException e) { - return false; - } - } - }); - } - - private void watchDataNode() throws InterruptedException, KeeperException, - ZooKeeperConnectionException { - try { - Stat stat = new Stat(); - byte[] rawData = zkClient.get().getData(nodePath, nodeWatcher, stat); - T newData = deserializer.deserialize(rawData, stat); - updateData(newData); - } catch (KeeperException.NoNodeException e) { - /* - * This node doesn't exist right now, reflect that locally and then create a watch to wait - * for its recreation. - */ - updateData(NO_DATA); - watchForExistence(); - } - } - - private void watchForExistence() throws InterruptedException, KeeperException, - ZooKeeperConnectionException { - /* - * If the node was created between the getData call and this call, just try watching it. - * We'll have an extra exists watch on it that goes off on its next deletion, which will - * be a no-op. - * Otherwise, just let the exists watch wait for its creation. - */ - if (zkClient.get().exists(nodePath, existenceWatcher) != null) { - tryWatchDataNode(); - } - } - - /** - * Interface for defining zookeeper node data deserialization. - * - * @param <T> the type of data associated with this node - */ - public interface NodeDeserializer<T> { - /** - * @param data the byte array returned from ZooKeeper when a watch is triggered. - * @param stat a ZooKeeper {@link Stat} object. Populated by - * {@link org.apache.zookeeper.ZooKeeper#getData(String, boolean, Stat)}. - */ - T deserialize(byte[] data, Stat stat); - } - - // wrapper for backwards compatibility with older create() methods with Function parameter - private static final class FunctionWrapper<T> implements NodeDeserializer<T> { - private final Function<byte[], T> func; - private FunctionWrapper(Function<byte[], T> func) { - Preconditions.checkNotNull(func); - this.func = func; - } - - public T deserialize(byte[] rawData, Stat stat) { - return func.apply(rawData); - } - } - -} - http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperUtils.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperUtils.java b/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperUtils.java deleted file mode 100644 index c8c0a33..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperUtils.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper; - -import java.util.List; -import java.util.logging.Logger; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; - -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.common.PathUtils; -import org.apache.zookeeper.data.ACL; - -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; - -/** - * Utilities for dealing with zoo keeper. - */ -public final class ZooKeeperUtils { - - private static final Logger LOG = Logger.getLogger(ZooKeeperUtils.class.getName()); - - /** - * An appropriate default session timeout for Twitter ZooKeeper clusters. - */ - public static final Amount<Integer,Time> DEFAULT_ZK_SESSION_TIMEOUT = Amount.of(4, Time.SECONDS); - - /** - * The magic version number that allows any mutation to always succeed regardless of actual - * version number. - */ - public static final int ANY_VERSION = -1; - - /** - * An ACL that gives all permissions any user authenticated or not. - */ - public static final ImmutableList<ACL> OPEN_ACL_UNSAFE = - ImmutableList.copyOf(Ids.OPEN_ACL_UNSAFE); - - /** - * An ACL that gives all permissions to node creators and read permissions only to everyone else. - */ - public static final ImmutableList<ACL> EVERYONE_READ_CREATOR_ALL = - ImmutableList.<ACL>builder() - .addAll(Ids.CREATOR_ALL_ACL) - .addAll(Ids.READ_ACL_UNSAFE) - .build(); - - /** - * Returns true if the given exception indicates an error that can be resolved by retrying the - * operation without modification. - * - * @param e the exception to check - * @return true if the causing operation is strictly retryable - */ - public static boolean isRetryable(KeeperException e) { - Preconditions.checkNotNull(e); - - switch (e.code()) { - case CONNECTIONLOSS: - case SESSIONEXPIRED: - case SESSIONMOVED: - case OPERATIONTIMEOUT: - return true; - - case RUNTIMEINCONSISTENCY: - case DATAINCONSISTENCY: - case MARSHALLINGERROR: - case BADARGUMENTS: - case NONODE: - case NOAUTH: - case BADVERSION: - case NOCHILDRENFOREPHEMERALS: - case NODEEXISTS: - case NOTEMPTY: - case INVALIDCALLBACK: - case INVALIDACL: - case AUTHFAILED: - case UNIMPLEMENTED: - - // These two should not be encountered - they are used internally by ZK to specify ranges - case SYSTEMERROR: - case APIERROR: - - case OK: // This is actually an invalid ZK exception code - - default: - return false; - } - } - - /** - * Ensures the given {@code path} exists in the ZK cluster accessed by {@code zkClient}. If the - * path already exists, nothing is done; however if any portion of the path is missing, it will be - * created with the given {@code acl} as a persistent zookeeper node. The given {@code path} must - * be a valid zookeeper absolute path. - * - * @param zkClient the client to use to access the ZK cluster - * @param acl the acl to use if creating path nodes - * @param path the path to ensure exists - * @throws ZooKeeperConnectionException if there was a problem accessing the ZK cluster - * @throws InterruptedException if we were interrupted attempting to connect to the ZK cluster - * @throws KeeperException if there was a problem in ZK - */ - public static void ensurePath(ZooKeeperClient zkClient, List<ACL> acl, String path) - throws ZooKeeperConnectionException, InterruptedException, KeeperException { - Preconditions.checkNotNull(zkClient); - Preconditions.checkNotNull(path); - Preconditions.checkArgument(path.startsWith("/")); - - ensurePathInternal(zkClient, acl, path); - } - - private static void ensurePathInternal(ZooKeeperClient zkClient, List<ACL> acl, String path) - throws ZooKeeperConnectionException, InterruptedException, KeeperException { - if (zkClient.get().exists(path, false) == null) { - // The current path does not exist; so back up a level and ensure the parent path exists - // unless we're already a root-level path. - int lastPathIndex = path.lastIndexOf('/'); - if (lastPathIndex > 0) { - ensurePathInternal(zkClient, acl, path.substring(0, lastPathIndex)); - } - - // We've ensured our parent path (if any) exists so we can proceed to create our path. - try { - zkClient.get().create(path, null, acl, CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException e) { - // This ensures we don't die if a race condition was met between checking existence and - // trying to create the node. - LOG.info("Node existed when trying to ensure path " + path + ", somebody beat us to it?"); - } - } - } - - /** - * Validate and return a normalized zookeeper path which doesn't contain consecutive slashes and - * never ends with a slash (except for root path). - * - * @param path the path to be normalized - * @return normalized path string - */ - public static String normalizePath(String path) { - String normalizedPath = path.replaceAll("//+", "/").replaceFirst("(.+)/$", "$1"); - PathUtils.validatePath(normalizedPath); - return normalizedPath; - } - - private ZooKeeperUtils() { - // utility - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/guice/ServerSetModule.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/guice/ServerSetModule.java b/commons/src/main/java/com/twitter/common/zookeeper/guice/ServerSetModule.java deleted file mode 100644 index 1275e32..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/guice/ServerSetModule.java +++ /dev/null @@ -1,270 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper.guice; - -import java.lang.annotation.Retention; -import java.lang.annotation.Target; -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.annotation.Nullable; - -import com.google.common.base.Optional; -import com.google.common.util.concurrent.Atomics; -import com.google.inject.AbstractModule; -import com.google.inject.BindingAnnotation; -import com.google.inject.Inject; -import com.google.inject.Key; -import com.google.inject.Singleton; -import com.google.inject.TypeLiteral; - -import com.twitter.common.application.ShutdownRegistry; -import com.twitter.common.application.modules.LifecycleModule; -import com.twitter.common.application.modules.LocalServiceRegistry; -import com.twitter.common.args.Arg; -import com.twitter.common.args.CmdLine; -import com.twitter.common.args.constraints.NotNegative; -import com.twitter.common.base.Command; -import com.twitter.common.base.ExceptionalCommand; -import com.twitter.common.base.Supplier; -import com.twitter.common.zookeeper.Group.JoinException; -import com.twitter.common.zookeeper.ServerSet; -import com.twitter.common.zookeeper.ServerSet.EndpointStatus; -import com.twitter.common.zookeeper.ServerSet.UpdateException; -import com.twitter.thrift.Status; - -import static com.google.common.base.Preconditions.checkNotNull; -import static java.lang.annotation.ElementType.FIELD; -import static java.lang.annotation.ElementType.METHOD; -import static java.lang.annotation.ElementType.PARAMETER; -import static java.lang.annotation.RetentionPolicy.RUNTIME; - -/** - * A module that registers all ports in the {@link LocalServiceRegistry} in an {@link ServerSet}. - * <p/> - * Required bindings: - * <ul> - * <li> {@link ServerSet} - * <li> {@link ShutdownRegistry} - * <li> {@link LocalServiceRegistry} - * </ul> - * <p/> - * {@link LifecycleModule} must also be included by users so a startup action may be registered. - * <p/> - * Provided bindings: - * <ul> - * <li> {@link Supplier}<{@link EndpointStatus}> - * </ul> - */ -public class ServerSetModule extends AbstractModule { - - /** - * BindingAnnotation for defaults to use in the service instance node. - */ - @BindingAnnotation @Target({PARAMETER, METHOD, FIELD}) @Retention(RUNTIME) - private @interface Default {} - - /** - * Binding annotation to give the ServerSetJoiner a fixed known ServerSet that is appropriate to - * {@link ServerSet#join} on. - */ - @BindingAnnotation @Target({METHOD, PARAMETER}) @Retention(RUNTIME) - private @interface Joinable {} - - private static final Key<ServerSet> JOINABLE_SS = Key.get(ServerSet.class, Joinable.class); - - @CmdLine(name = "aux_port_as_primary", - help = "Name of the auxiliary port to use as the primary port in the server set." - + " This may only be used when no other primary port is specified.") - private static final Arg<String> AUX_PORT_AS_PRIMARY = Arg.create(null); - - @NotNegative - @CmdLine(name = "shard_id", help = "Shard ID for this application.") - private static final Arg<Integer> SHARD_ID = Arg.create(); - - private static final Logger LOG = Logger.getLogger(ServerSetModule.class.getName()); - - /** - * Builds a Module tht can be used to join a {@link ServerSet} with the ports configured in a - * {@link LocalServiceRegistry}. - */ - public static class Builder { - private Key<ServerSet> key = Key.get(ServerSet.class); - private Optional<String> auxPortAsPrimary = Optional.absent(); - - /** - * Sets the key of the ServerSet to join. - * - * @param key Key of the ServerSet to join. - * @return This builder for chaining calls. - */ - public Builder key(Key<ServerSet> key) { - this.key = key; - return this; - } - - /** - * Allows joining an auxiliary port with the specified {@code name} as the primary port of the - * ServerSet. - * - * @param auxPortName The name of the auxiliary port to join as the primary ServerSet port. - * @return This builder for chaining calls. - */ - public Builder namedPrimaryPort(String auxPortName) { - this.auxPortAsPrimary = Optional.of(auxPortName); - return this; - } - - /** - * Creates a Module that will register a startup action that joins a ServerSet when installed. - * - * @return A Module. - */ - public ServerSetModule build() { - return new ServerSetModule(key, auxPortAsPrimary); - } - } - - /** - * Creates a builder that can be used to configure and create a ServerSetModule. - * - * @return A ServerSetModule builder. - */ - public static Builder builder() { - return new Builder(); - } - - private final Key<ServerSet> serverSetKey; - private final Optional<String> auxPortAsPrimary; - - /** - * Constructs a ServerSetModule that registers a startup action to register this process in - * ZooKeeper, with the specified initial status and auxiliary port to represent as the primary - * service port. - * - * @param serverSetKey The key the ServerSet to join is bound under. - * @param auxPortAsPrimary Name of the auxiliary port to use as the primary port. - */ - ServerSetModule(Key<ServerSet> serverSetKey, Optional<String> auxPortAsPrimary) { - - this.serverSetKey = checkNotNull(serverSetKey); - this.auxPortAsPrimary = checkNotNull(auxPortAsPrimary); - } - - @Override - protected void configure() { - requireBinding(serverSetKey); - requireBinding(ShutdownRegistry.class); - requireBinding(LocalServiceRegistry.class); - - LifecycleModule.bindStartupAction(binder(), ServerSetJoiner.class); - - bind(new TypeLiteral<Supplier<EndpointStatus>>() { }).to(EndpointSupplier.class); - bind(EndpointSupplier.class).in(Singleton.class); - - Optional<String> primaryPortName; - if (AUX_PORT_AS_PRIMARY.hasAppliedValue()) { - primaryPortName = Optional.of(AUX_PORT_AS_PRIMARY.get()); - } else { - primaryPortName = auxPortAsPrimary; - } - - bind(new TypeLiteral<Optional<String>>() { }).annotatedWith(Default.class) - .toInstance(primaryPortName); - - bind(JOINABLE_SS).to(serverSetKey); - } - - static class EndpointSupplier implements Supplier<EndpointStatus> { - private final AtomicReference<EndpointStatus> reference = Atomics.newReference(); - - @Nullable - @Override public EndpointStatus get() { - return reference.get(); - } - - void set(EndpointStatus endpoint) { - reference.set(endpoint); - } - } - - private static class ServerSetJoiner implements Command { - private final ServerSet serverSet; - private final LocalServiceRegistry serviceRegistry; - private final ShutdownRegistry shutdownRegistry; - private final EndpointSupplier endpointSupplier; - private final Optional<String> auxPortAsPrimary; - - @Inject - ServerSetJoiner( - @Joinable ServerSet serverSet, - LocalServiceRegistry serviceRegistry, - ShutdownRegistry shutdownRegistry, - EndpointSupplier endpointSupplier, - @Default Optional<String> auxPortAsPrimary) { - - this.serverSet = checkNotNull(serverSet); - this.serviceRegistry = checkNotNull(serviceRegistry); - this.shutdownRegistry = checkNotNull(shutdownRegistry); - this.endpointSupplier = checkNotNull(endpointSupplier); - this.auxPortAsPrimary = checkNotNull(auxPortAsPrimary); - } - - @Override public void execute() { - Optional<InetSocketAddress> primarySocket = serviceRegistry.getPrimarySocket(); - Map<String, InetSocketAddress> auxSockets = serviceRegistry.getAuxiliarySockets(); - - InetSocketAddress primary; - if (primarySocket.isPresent()) { - primary = primarySocket.get(); - } else if (auxPortAsPrimary.isPresent()) { - primary = auxSockets.get(auxPortAsPrimary.get()); - if (primary == null) { - throw new IllegalStateException("No auxiliary port named " + auxPortAsPrimary.get()); - } - } else { - throw new IllegalStateException("No primary service registered with LocalServiceRegistry," - + " and -aux_port_as_primary was not specified."); - } - - final EndpointStatus endpointStatus; - try { - if (SHARD_ID.hasAppliedValue()) { - endpointStatus = serverSet.join(primary, auxSockets, SHARD_ID.get()); - } else { - endpointStatus = serverSet.join(primary, auxSockets); - } - - endpointSupplier.set(endpointStatus); - } catch (JoinException e) { - LOG.log(Level.WARNING, "Failed to join ServerSet.", e); - throw new RuntimeException(e); - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "Interrupted while joining ServerSet.", e); - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - - shutdownRegistry.addAction(new ExceptionalCommand<UpdateException>() { - @Override public void execute() throws UpdateException { - LOG.info("Leaving ServerSet."); - endpointStatus.leave(); - } - }); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/guice/client/ZooKeeperClientModule.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/guice/client/ZooKeeperClientModule.java b/commons/src/main/java/com/twitter/common/zookeeper/guice/client/ZooKeeperClientModule.java deleted file mode 100644 index 7d4502f..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/guice/client/ZooKeeperClientModule.java +++ /dev/null @@ -1,235 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper.guice.client; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.logging.Logger; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; - -import com.google.inject.Inject; -import com.google.inject.Key; -import com.google.inject.PrivateModule; -import com.google.inject.Provider; -import com.google.inject.Singleton; - -import com.twitter.common.application.ShutdownRegistry; -import com.twitter.common.inject.Bindings.KeyFactory; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.zookeeper.ZooKeeperClient; -import com.twitter.common.zookeeper.ZooKeeperClient.Credentials; -import com.twitter.common.zookeeper.ZooKeeperUtils; -import com.twitter.common.zookeeper.testing.ZooKeeperTestServer; - -/** - * A guice binding module that configures and binds a {@link ZooKeeperClient} instance. - */ -public class ZooKeeperClientModule extends PrivateModule { - private final KeyFactory keyFactory; - private final ClientConfig config; - - /** - * Creates a new ZK client module from the provided configuration. - * - * @param config Configuration parameters for the client. - */ - public ZooKeeperClientModule(ClientConfig config) { - this(KeyFactory.PLAIN, config); - } - - /** - * Creates a new ZK client module from the provided configuration, using a key factory to - * qualify any bindings. - * - * @param keyFactory Factory to use when creating any exposed bindings. - * @param config Configuration parameters for the client. - */ - public ZooKeeperClientModule(KeyFactory keyFactory, ClientConfig config) { - this.keyFactory = Preconditions.checkNotNull(keyFactory); - this.config = Preconditions.checkNotNull(config); - } - - @Override - protected void configure() { - Key<ZooKeeperClient> clientKey = keyFactory.create(ZooKeeperClient.class); - if (config.inProcess) { - requireBinding(ShutdownRegistry.class); - // Bound privately to give the local provider access to configuration settings. - bind(ClientConfig.class).toInstance(config); - bind(clientKey).toProvider(LocalClientProvider.class).in(Singleton.class); - } else { - ZooKeeperClient client = - new ZooKeeperClient(config.sessionTimeout, config.credentials, config.chrootPath, config.servers); - bind(clientKey).toInstance(client); - } - expose(clientKey); - } - - private static class LocalClientProvider implements Provider<ZooKeeperClient> { - private static final Logger LOG = Logger.getLogger(LocalClientProvider.class.getName()); - - private final ClientConfig config; - private final ShutdownRegistry shutdownRegistry; - - @Inject - LocalClientProvider(ClientConfig config, ShutdownRegistry shutdownRegistry) { - this.config = Preconditions.checkNotNull(config); - this.shutdownRegistry = Preconditions.checkNotNull(shutdownRegistry); - } - - @Override - public ZooKeeperClient get() { - ZooKeeperTestServer zooKeeperServer; - try { - zooKeeperServer = new ZooKeeperTestServer(0, shutdownRegistry); - zooKeeperServer.startNetwork(); - } catch (IOException e) { - throw Throwables.propagate(e); - } catch (InterruptedException e) { - throw Throwables.propagate(e); - } - - LOG.info("Embedded zookeeper cluster started on port " + zooKeeperServer.getPort()); - return zooKeeperServer.createClient(config.sessionTimeout, config.credentials); - } - } - - /** - * Composite type that contains configuration parameters used when creating a client. - * <p> - * Instances of this class are immutable, but builder-style chained calls are supported. - */ - public static class ClientConfig { - public final Iterable<InetSocketAddress> servers; - public final boolean inProcess; - public final Amount<Integer, Time> sessionTimeout; - public final Optional<String> chrootPath; - public final Credentials credentials; - - /** - * Creates a new client configuration. - * - * @param servers ZooKeeper server addresses. - * @param inProcess Whether to run and create clients for an in-process ZooKeeper server. - * @param sessionTimeout Timeout duration for established sessions. - * @param credentials ZooKeeper authentication credentials. - */ - public ClientConfig( - Iterable<InetSocketAddress> servers, - boolean inProcess, - Amount<Integer, Time> sessionTimeout, - Credentials credentials) { - - this(servers, Optional.<String>absent(), inProcess, sessionTimeout, credentials); - } - - /** - * Creates a new client configuration. - * - * @param servers ZooKeeper server addresses. - * @param inProcess Whether to run and create clients for an in-process ZooKeeper server. - * @param chrootPath an optional chroot path - * @param sessionTimeout Timeout duration for established sessions. - * @param credentials ZooKeeper authentication credentials. - */ - public ClientConfig( - Iterable<InetSocketAddress> servers, - Optional<String> chrootPath, - boolean inProcess, - Amount<Integer, Time> sessionTimeout, - Credentials credentials) { - - this.servers = servers; - this.chrootPath = chrootPath; - this.inProcess = inProcess; - this.sessionTimeout = sessionTimeout; - this.credentials = credentials; - } - - /** - * Creates a new client configuration with defaults for the session timeout and credentials. - * - * @param servers ZooKeeper server addresses. - * @return A new configuration. - */ - public static ClientConfig create(Iterable<InetSocketAddress> servers) { - return new ClientConfig( - servers, - Optional.<String> absent(), - false, - ZooKeeperUtils.DEFAULT_ZK_SESSION_TIMEOUT, - Credentials.NONE); - } - - /** - * Creates a new configuration identical to this configuration, but with the provided - * session timeout. - * - * @param sessionTimeout Timeout duration for established sessions. - * @return A modified clone of this configuration. - */ - public ClientConfig withSessionTimeout(Amount<Integer, Time> sessionTimeout) { - return new ClientConfig(servers, chrootPath, inProcess, sessionTimeout, credentials); - } - - /** - * Creates a new configuration identical to this configuration, but with the provided - * credentials. - * - * @param credentials ZooKeeper authentication credentials. - * @return A modified clone of this configuration. - */ - public ClientConfig withCredentials(Credentials credentials) { - return new ClientConfig(servers, chrootPath, inProcess, sessionTimeout, credentials); - } - - /** - * Convenience method for calling {@link #withCredentials(Credentials)} with digest credentials. - * - * @param username Digest authentication user. - * @param password Digest authentication raw password. - * @return A modified clone of this configuration. - */ - public ClientConfig withDigestCredentials(String username, String password) { - return withCredentials(ZooKeeperClient.digestCredentials(username, password)); - } - - /** - * Creates a new configuration identical to this configuration, but with the provided - * in-process setting. - * - * @param inProcess If {@code true}, an in-process ZooKeeper server server will be used, - * and all clients will connect to it. - * @return A modified clone of this configuration. - */ - public ClientConfig inProcess(boolean inProcess) { - return new ClientConfig(servers, chrootPath, inProcess, sessionTimeout, credentials); - } - - /** - * Creates a new configuration identical to this configuration, but with the provided - * chroot path setting. - * - * @param chrootPath a valid ZooKeeper path used as a chroot for ZooKeeper connections. - * @return A modified clone of this configuration. - */ - public ClientConfig withChrootPath(String chrootPath) { - return new ClientConfig(servers, Optional.of(chrootPath), inProcess, sessionTimeout, credentials); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/guice/client/flagged/FlaggedClientConfig.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/guice/client/flagged/FlaggedClientConfig.java b/commons/src/main/java/com/twitter/common/zookeeper/guice/client/flagged/FlaggedClientConfig.java deleted file mode 100644 index 9138279..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/guice/client/flagged/FlaggedClientConfig.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper.guice.client.flagged; - -import java.net.InetSocketAddress; -import java.util.List; - -import com.google.common.base.Optional; -import com.google.common.base.Splitter; -import com.google.common.collect.ImmutableList; - -import com.twitter.common.args.Arg; -import com.twitter.common.args.CmdLine; -import com.twitter.common.args.constraints.NotEmpty; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.zookeeper.ZooKeeperClient; -import com.twitter.common.zookeeper.ZooKeeperClient.Credentials; -import com.twitter.common.zookeeper.ZooKeeperUtils; -import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule.ClientConfig; - -/** - * A factory that creates a {@link ClientConfig} instance based on command line argument values. - */ -public class FlaggedClientConfig { - @CmdLine(name = "zk_in_proc", - help = "Launches an embedded zookeeper server for local testing causing -zk_endpoints " - + "to be ignored if specified.") - private static final Arg<Boolean> IN_PROCESS = Arg.create(false); - - @NotEmpty - @CmdLine(name = "zk_endpoints", help ="Endpoint specification for the ZooKeeper servers.") - private static final Arg<List<InetSocketAddress>> ZK_ENDPOINTS = Arg.create(); - - @CmdLine(name = "zk_chroot_path", help = "chroot path to use for the ZooKeeper connections") - private static final Arg<String> CHROOT_PATH = Arg.create(null); - - @CmdLine(name = "zk_session_timeout", help ="The ZooKeeper session timeout.") - private static final Arg<Amount<Integer, Time>> SESSION_TIMEOUT = - Arg.create(ZooKeeperUtils.DEFAULT_ZK_SESSION_TIMEOUT); - - @CmdLine(name = "zk_digest_credentials", - help ="user:password to use when authenticating with ZooKeeper.") - private static final Arg<String> DIGEST_CREDENTIALS = Arg.create(); - - /** - * Creates a configuration from command line arguments. - * - * @return Configuration instance. - */ - public static ClientConfig create() { - return new ClientConfig( - ZK_ENDPOINTS.get(), - Optional.fromNullable(CHROOT_PATH.get()), - IN_PROCESS.get(), - SESSION_TIMEOUT.get(), - DIGEST_CREDENTIALS.hasAppliedValue() - ? getCredentials(DIGEST_CREDENTIALS.get()) - : Credentials.NONE - ); - } - - private static Credentials getCredentials(String userAndPass) { - List<String> parts = ImmutableList.copyOf(Splitter.on(":").split(userAndPass)); - if (parts.size() != 2) { - throw new IllegalArgumentException( - "zk_digest_credentials must be formatted as user:pass"); - } - return ZooKeeperClient.digestCredentials(parts.get(0), parts.get(1)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/testing/BaseZooKeeperTest.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/testing/BaseZooKeeperTest.java b/commons/src/main/java/com/twitter/common/zookeeper/testing/BaseZooKeeperTest.java deleted file mode 100644 index b04d597..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/testing/BaseZooKeeperTest.java +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper.testing; - -import java.io.IOException; - -import com.google.common.base.Preconditions; -import com.google.common.testing.TearDown; -import com.google.common.testing.junit4.TearDownTestCase; - -import org.junit.Before; - -import com.twitter.common.application.ShutdownRegistry.ShutdownRegistryImpl; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.zookeeper.ZooKeeperClient; -import com.twitter.common.zookeeper.ZooKeeperClient.Credentials; -import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; - -/** - * A baseclass for in-process zookeeper tests. - * Uses ZooKeeperTestHelper to start the server and create clients: new tests should directly use - * that helper class instead of extending this class. - */ -public abstract class BaseZooKeeperTest extends TearDownTestCase { - - private final Amount<Integer, Time> defaultSessionTimeout; - private ZooKeeperTestServer zkTestServer; - - /** - * Creates a test case where the test server uses its - * {@link ZooKeeperTestServer#DEFAULT_SESSION_TIMEOUT} for clients created without an explicit - * session timeout. - */ - public BaseZooKeeperTest() { - this(ZooKeeperTestServer.DEFAULT_SESSION_TIMEOUT); - } - - /** - * Creates a test case where the test server uses the given {@code defaultSessionTimeout} for - * clients created without an explicit session timeout. - */ - public BaseZooKeeperTest(Amount<Integer, Time> defaultSessionTimeout) { - this.defaultSessionTimeout = Preconditions.checkNotNull(defaultSessionTimeout); - } - - @Before - public final void setUp() throws Exception { - final ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl(); - addTearDown(new TearDown() { - @Override public void tearDown() { - shutdownRegistry.execute(); - } - }); - zkTestServer = new ZooKeeperTestServer(0, shutdownRegistry, defaultSessionTimeout); - zkTestServer.startNetwork(); - } - - /** - * Starts zookeeper back up on the last used port. - */ - protected final void restartNetwork() throws IOException, InterruptedException { - zkTestServer.restartNetwork(); - } - - /** - * Shuts down the in-process zookeeper network server. - */ - protected final void shutdownNetwork() { - zkTestServer.shutdownNetwork(); - } - - /** - * Expires the active session for the given client. The client should be one returned from - * {@link #createZkClient}. - * - * @param zkClient the client to expire - * @throws ZooKeeperClient.ZooKeeperConnectionException if a problem is encountered connecting to - * the local zk server while trying to expire the session - * @throws InterruptedException if interrupted while requesting expiration - */ - protected final void expireSession(ZooKeeperClient zkClient) - throws ZooKeeperConnectionException, InterruptedException { - zkTestServer.expireClientSession(zkClient); - } - - /** - * Returns the current port to connect to the in-process zookeeper instance. - */ - protected final int getPort() { - return zkTestServer.getPort(); - } - - /** - * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server - * with the default session timeout. - */ - protected final ZooKeeperClient createZkClient() { - return zkTestServer.createClient(); - } - - /** - * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with - * the default session timeout. - */ - protected final ZooKeeperClient createZkClient(Credentials credentials) { - return zkTestServer.createClient(credentials); - } - - /** - * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with - * the default session timeout. The client is authenticated in the digest authentication scheme - * with the given {@code username} and {@code password}. - */ - protected final ZooKeeperClient createZkClient(String username, String password) { - return createZkClient(ZooKeeperClient.digestCredentials(username, password)); - } - - /** - * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server - * with a custom {@code sessionTimeout}. - */ - protected final ZooKeeperClient createZkClient(Amount<Integer, Time> sessionTimeout) { - return zkTestServer.createClient(sessionTimeout); - } - - /** - * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with - * a custom {@code sessionTimeout}. - */ - protected final ZooKeeperClient createZkClient(Amount<Integer, Time> sessionTimeout, - Credentials credentials) { - return zkTestServer.createClient(sessionTimeout, credentials); - } - - /** - * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with - * the default session timeout and the custom chroot path. - */ - protected final ZooKeeperClient createZkClient(String chrootPath) { - return zkTestServer.createClient(chrootPath); - } -}
