http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java new file mode 100644 index 0000000..a032aa3 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java @@ -0,0 +1,491 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Objects; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.common.PathUtils; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.base.MorePreconditions; +import org.apache.aurora.common.net.InetSocketAddressHelper; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; + +/** + * Manages a connection to a ZooKeeper cluster. + */ +public class ZooKeeperClient { + + /** + * Indicates an error connecting to a zookeeper cluster. + */ + public class ZooKeeperConnectionException extends Exception { + public ZooKeeperConnectionException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Encapsulates a user's credentials and has the ability to authenticate them through a + * {@link ZooKeeper} client. + */ + public interface Credentials { + + /** + * A set of {@code Credentials} that performs no authentication. + */ + Credentials NONE = new Credentials() { + @Override public void authenticate(ZooKeeper zooKeeper) { + // noop + } + + @Override public String scheme() { + return null; + } + + @Override public byte[] authToken() { + return null; + } + }; + + /** + * Authenticates these credentials against the given {@code ZooKeeper} client. + * + * @param zooKeeper the client to authenticate + */ + void authenticate(ZooKeeper zooKeeper); + + /** + * Returns the authentication scheme these credentials are for. + * + * @return the scheme these credentials are for or {@code null} if no authentication is + * intended. + */ + @Nullable + String scheme(); + + /** + * Returns the authentication token. + * + * @return the authentication token or {@code null} if no authentication is intended. + */ + @Nullable + byte[] authToken(); + } + + /** + * Creates a set of credentials for the zoo keeper digest authentication mechanism. + * + * @param username the username to authenticate with + * @param password the password to authenticate with + * @return a set of credentials that can be used to authenticate the zoo keeper client + */ + public static Credentials digestCredentials(String username, String password) { + MorePreconditions.checkNotBlank(username); + Preconditions.checkNotNull(password); + + // TODO(John Sirois): DigestAuthenticationProvider is broken - uses platform default charset + // (on server) and so we just have to hope here that clients are deployed in compatible jvms. + // Consider writing and installing a version of DigestAuthenticationProvider that controls its + // Charset explicitly. + return credentials("digest", (username + ":" + password).getBytes()); + } + + /** + * Creates a set of credentials for the given authentication {@code scheme}. + * + * @param scheme the scheme to authenticate with + * @param authToken the authentication token + * @return a set of credentials that can be used to authenticate the zoo keeper client + */ + public static Credentials credentials(final String scheme, final byte[] authToken) { + MorePreconditions.checkNotBlank(scheme); + Preconditions.checkNotNull(authToken); + + return new Credentials() { + @Override public void authenticate(ZooKeeper zooKeeper) { + zooKeeper.addAuthInfo(scheme, authToken); + } + + @Override public String scheme() { + return scheme; + } + + @Override public byte[] authToken() { + return authToken; + } + + @Override public boolean equals(Object o) { + if (!(o instanceof Credentials)) { + return false; + } + + Credentials other = (Credentials) o; + return new EqualsBuilder() + .append(scheme, other.scheme()) + .append(authToken, other.authToken()) + .isEquals(); + } + + @Override public int hashCode() { + return Objects.hashCode(scheme, authToken); + } + }; + } + + private final class SessionState { + private final long sessionId; + private final byte[] sessionPasswd; + + private SessionState(long sessionId, byte[] sessionPasswd) { + this.sessionId = sessionId; + this.sessionPasswd = sessionPasswd; + } + } + + private static final Logger LOG = Logger.getLogger(ZooKeeperClient.class.getName()); + + private static final Amount<Long,Time> WAIT_FOREVER = Amount.of(0L, Time.MILLISECONDS); + + private final int sessionTimeoutMs; + private final Credentials credentials; + private final String zooKeeperServers; + // GuardedBy "this", but still volatile for tests, where we want to be able to see writes + // made from within long synchronized blocks. + private volatile ZooKeeper zooKeeper; + private SessionState sessionState; + + private final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>(); + private final BlockingQueue<WatchedEvent> eventQueue = new LinkedBlockingQueue<WatchedEvent>(); + + private static Iterable<InetSocketAddress> combine(InetSocketAddress address, + InetSocketAddress... addresses) { + return ImmutableSet.<InetSocketAddress>builder().add(address).add(addresses).build(); + } + + /** + * Creates an unconnected client that will lazily attempt to connect on the first call to + * {@link #get()}. + * + * @param sessionTimeout the ZK session timeout + * @param zooKeeperServer the first, required ZK server + * @param zooKeeperServers any additional servers forming the ZK cluster + */ + public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, InetSocketAddress zooKeeperServer, + InetSocketAddress... zooKeeperServers) { + this(sessionTimeout, combine(zooKeeperServer, zooKeeperServers)); + } + + /** + * Creates an unconnected client that will lazily attempt to connect on the first call to + * {@link #get}. + * + * @param sessionTimeout the ZK session timeout + * @param zooKeeperServers the set of servers forming the ZK cluster + */ + public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, + Iterable<InetSocketAddress> zooKeeperServers) { + this(sessionTimeout, Credentials.NONE, Optional.<String> absent(), zooKeeperServers); + } + + /** + * Creates an unconnected client that will lazily attempt to connect on the first call to + * {@link #get()}. All successful connections will be authenticated with the given + * {@code credentials}. + * + * @param sessionTimeout the ZK session timeout + * @param credentials the credentials to authenticate with + * @param zooKeeperServer the first, required ZK server + * @param zooKeeperServers any additional servers forming the ZK cluster + */ + public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials, + InetSocketAddress zooKeeperServer, InetSocketAddress... zooKeeperServers) { + this(sessionTimeout, credentials, Optional.<String> absent(), combine(zooKeeperServer, zooKeeperServers)); + } + + /** + * Creates an unconnected client that will lazily attempt to connect on the first call to + * {@link #get}. All successful connections will be authenticated with the given + * {@code credentials}. + * + * @param sessionTimeout the ZK session timeout + * @param credentials the credentials to authenticate with + * @param zooKeeperServers the set of servers forming the ZK cluster + */ + public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials, + Iterable<InetSocketAddress> zooKeeperServers) { + this(sessionTimeout, credentials, Optional.<String> absent(), zooKeeperServers); + } + + /** + * Creates an unconnected client that will lazily attempt to connect on the first call to + * {@link #get}. All successful connections will be authenticated with the given + * {@code credentials}. + * + * @param sessionTimeout the ZK session timeout + * @param credentials the credentials to authenticate with + * @param chrootPath an optional chroot path + * @param zooKeeperServers the set of servers forming the ZK cluster + */ + public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials, + Optional<String> chrootPath, Iterable<InetSocketAddress> zooKeeperServers) { + this.sessionTimeoutMs = Preconditions.checkNotNull(sessionTimeout).as(Time.MILLISECONDS); + this.credentials = Preconditions.checkNotNull(credentials); + + if (chrootPath.isPresent()) { + PathUtils.validatePath(chrootPath.get()); + } + + Preconditions.checkNotNull(zooKeeperServers); + Preconditions.checkArgument(!Iterables.isEmpty(zooKeeperServers), + "Must present at least 1 ZK server"); + + Thread watcherProcessor = new Thread("ZookeeperClient-watcherProcessor") { + @Override + public void run() { + while (true) { + try { + WatchedEvent event = eventQueue.take(); + for (Watcher watcher : watchers) { + watcher.process(event); + } + } catch (InterruptedException e) { /* ignore */ } + } + } + }; + watcherProcessor.setDaemon(true); + watcherProcessor.start(); + + Iterable<String> servers = + Iterables.transform(ImmutableSet.copyOf(zooKeeperServers), + InetSocketAddressHelper.INET_TO_STR); + this.zooKeeperServers = Joiner.on(',').join(servers).concat(chrootPath.or("")); + } + + /** + * Returns true if this client has non-empty credentials set. For example, returns {@code false} + * if this client was constructed with {@link Credentials#NONE}. + * + * @return {@code true} if this client is configured with non-empty credentials. + */ + public boolean hasCredentials() { + return !Strings.isNullOrEmpty(credentials.scheme()) && (credentials.authToken() != null); + } + + /** + * Returns the current active ZK connection or establishes a new one if none has yet been + * established or a previous connection was disconnected or had its session time out. This method + * will attempt to re-use sessions when possible. Equivalent to: + * <pre>get(Amount.of(0L, ...)</pre>. + * + * @return a connected ZooKeeper client + * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster + * @throws InterruptedException if interrupted while waiting for a connection to be established + */ + public synchronized ZooKeeper get() throws ZooKeeperConnectionException, InterruptedException { + try { + return get(WAIT_FOREVER); + } catch (TimeoutException e) { + InterruptedException interruptedException = + new InterruptedException("Got an unexpected TimeoutException for 0 wait"); + interruptedException.initCause(e); + throw interruptedException; + } + } + + /** + * Returns the current active ZK connection or establishes a new one if none has yet been + * established or a previous connection was disconnected or had its session time out. This + * method will attempt to re-use sessions when possible. + * + * @param connectionTimeout the maximum amount of time to wait for the connection to the ZK + * cluster to be established; 0 to wait forever + * @return a connected ZooKeeper client + * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster + * @throws InterruptedException if interrupted while waiting for a connection to be established + * @throws TimeoutException if a connection could not be established within the configured + * session timeout + */ + public synchronized ZooKeeper get(Amount<Long, Time> connectionTimeout) + throws ZooKeeperConnectionException, InterruptedException, TimeoutException { + + if (zooKeeper == null) { + final CountDownLatch connected = new CountDownLatch(1); + Watcher watcher = new Watcher() { + @Override public void process(WatchedEvent event) { + switch (event.getType()) { + // Guard the None type since this watch may be used as the default watch on calls by + // the client outside our control. + case None: + switch (event.getState()) { + case Expired: + LOG.info("Zookeeper session expired. Event: " + event); + close(); + break; + case SyncConnected: + connected.countDown(); + break; + } + } + + eventQueue.offer(event); + } + }; + + try { + zooKeeper = (sessionState != null) + ? new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher, sessionState.sessionId, + sessionState.sessionPasswd) + : new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher); + } catch (IOException e) { + throw new ZooKeeperConnectionException( + "Problem connecting to servers: " + zooKeeperServers, e); + } + + if (connectionTimeout.getValue() > 0) { + if (!connected.await(connectionTimeout.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) { + close(); + throw new TimeoutException("Timed out waiting for a ZK connection after " + + connectionTimeout); + } + } else { + try { + connected.await(); + } catch (InterruptedException ex) { + LOG.info("Interrupted while waiting to connect to zooKeeper"); + close(); + throw ex; + } + } + credentials.authenticate(zooKeeper); + + sessionState = new SessionState(zooKeeper.getSessionId(), zooKeeper.getSessionPasswd()); + } + return zooKeeper; + } + + /** + * Clients that need to re-establish state after session expiration can register an + * {@code onExpired} command to execute. + * + * @param onExpired the {@code Command} to register + * @return the new {@link Watcher} which can later be passed to {@link #unregister} for + * removal. + */ + public Watcher registerExpirationHandler(final Command onExpired) { + Watcher watcher = new Watcher() { + @Override public void process(WatchedEvent event) { + if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) { + onExpired.execute(); + } + } + }; + register(watcher); + return watcher; + } + + /** + * Clients that need to register a top-level {@code Watcher} should do so using this method. The + * registered {@code watcher} will remain registered across re-connects and session expiration + * events. + * + * @param watcher the {@code Watcher to register} + */ + public void register(Watcher watcher) { + watchers.add(watcher); + } + + /** + * Clients can attempt to unregister a top-level {@code Watcher} that has previously been + * registered. + * + * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch + * @return whether the given {@code Watcher} was found and removed from the active set + */ + public boolean unregister(Watcher watcher) { + return watchers.remove(watcher); + } + + /** + * Checks to see if the client might reasonably re-try an operation given the exception thrown + * while attempting it. If the ZooKeeper session should be expired to enable the re-try to + * succeed this method will expire it as a side-effect. + * + * @param e the exception to test + * @return true if a retry can be attempted + */ + public boolean shouldRetry(KeeperException e) { + if (e instanceof SessionExpiredException) { + close(); + } + return ZooKeeperUtils.isRetryable(e); + } + + /** + * Closes the current connection if any expiring the current ZooKeeper session. Any subsequent + * calls to this method will no-op until the next successful {@link #get}. + */ + public synchronized void close() { + if (zooKeeper != null) { + try { + zooKeeper.close(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warning("Interrupted trying to close zooKeeper"); + } finally { + zooKeeper = null; + sessionState = null; + } + } + } + + @VisibleForTesting + synchronized boolean isClosed() { + return zooKeeper == null; + } + + @VisibleForTesting + ZooKeeper getZooKeeperClientForTests() { + return zooKeeper; + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperMap.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperMap.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperMap.java new file mode 100644 index 0000000..29db55a --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperMap.java @@ -0,0 +1,411 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Preconditions; +import com.google.common.collect.ForwardingMap; +import com.google.common.collect.Sets; + +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.base.ExceptionalSupplier; +import org.apache.aurora.common.base.MorePreconditions; +import org.apache.aurora.common.util.BackoffHelper; +import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; + +/** + * A ZooKeeper backed {@link Map}. Initialized with a node path, this map represents child nodes + * under that path as keys, with the data in those nodes as values. This map is readonly from + * clients of this class, and only can be modified via direct zookeeper operations. + * + * Note that instances of this class maintain a zookeeper watch for each zookeeper node under the + * parent, as well as on the parent itself. Instances of this class should be created via the + * {@link #create} factory method. + * + * As of ZooKeeper Version 3.1, the maximum allowable size of a data node is 1 MB. A single + * client should be able to hold up to maintain several thousand watches, but this depends on rate + * of data change as well. + * + * Talk to your zookeeper cluster administrator if you expect number of map entries times number + * of live clients to exceed a thousand, as a zookeeper cluster is limited by total number of + * server-side watches enabled. + * + * For an example of a set of tools to maintain one of these maps, please see + * src/scripts/HenAccess.py in the hen repository. + * + * @param <V> the type of values this map stores + */ +public class ZooKeeperMap<V> extends ForwardingMap<String, V> { + + /** + * An optional listener which can be supplied and triggered when entries in a ZooKeeperMap + * are added, changed or removed. For a ZooKeeperMap of type <V>, the listener will fire a + * "nodeChanged" event with the name of the ZNode that changed, and its resulting value as + * interpreted by the provided deserializer. Removal of child nodes triggers the "nodeRemoved" + * method indicating the name of the ZNode which is no longer present in the map. + */ + public interface Listener<V> { + + /** + * Fired when a node is added to the ZooKeeperMap or changed. + * + * @param nodeName indicates the name of the ZNode that was added or changed. + * @param value is the new value of the node after passing through your supplied deserializer. + */ + void nodeChanged(String nodeName, V value); + + /** + * Fired when a node is removed from the ZooKeeperMap. + * + * @param nodeName indicates the name of the ZNode that was removed from the ZooKeeperMap. + */ + void nodeRemoved(String nodeName); + } + + /** + * Default deserializer for the constructor if you want to simply store the zookeeper byte[] data + * in this map. + */ + public static final Function<byte[], byte[]> BYTE_ARRAY_VALUES = Functions.identity(); + + /** + * A listener that ignores all events. + */ + public static <T> Listener<T> noopListener() { + return new Listener<T>() { + @Override public void nodeChanged(String nodeName, T value) { } + @Override public void nodeRemoved(String nodeName) { } + }; + } + + private static final Logger LOG = Logger.getLogger(ZooKeeperMap.class.getName()); + + private final ZooKeeperClient zkClient; + private final String nodePath; + private final Function<byte[], V> deserializer; + + private final ConcurrentMap<String, V> localMap; + private final Map<String, V> unmodifiableLocalMap; + private final BackoffHelper backoffHelper; + + private final Listener<V> mapListener; + + // Whether it's safe to re-establish watches if our zookeeper session has expired. + private final Object safeToRewatchLock; + private volatile boolean safeToRewatch; + + /** + * Returns an initialized ZooKeeperMap. The given path must exist at the time of + * creation or a {@link KeeperException} will be thrown. + * + * @param zkClient a zookeeper client + * @param nodePath path to a node whose data will be watched + * @param deserializer a function that converts byte[] data from a zk node to this map's + * value type V + * @param listener is a Listener which fires when values are added, changed, or removed. + * + * @throws InterruptedException if the underlying zookeeper server transaction is interrupted + * @throws KeeperException.NoNodeException if the given nodePath doesn't exist + * @throws KeeperException if the server signals an error + * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper + * cluster + */ + public static <V> ZooKeeperMap<V> create( + ZooKeeperClient zkClient, + String nodePath, + Function<byte[], V> deserializer, + Listener<V> listener) + throws InterruptedException, KeeperException, ZooKeeperConnectionException { + + ZooKeeperMap<V> zkMap = new ZooKeeperMap<V>(zkClient, nodePath, deserializer, listener); + zkMap.init(); + return zkMap; + } + + + /** + * Returns an initialized ZooKeeperMap. The given path must exist at the time of + * creation or a {@link KeeperException} will be thrown. + * + * @param zkClient a zookeeper client + * @param nodePath path to a node whose data will be watched + * @param deserializer a function that converts byte[] data from a zk node to this map's + * value type V + * + * @throws InterruptedException if the underlying zookeeper server transaction is interrupted + * @throws KeeperException.NoNodeException if the given nodePath doesn't exist + * @throws KeeperException if the server signals an error + * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper + * cluster + */ + public static <V> ZooKeeperMap<V> create( + ZooKeeperClient zkClient, + String nodePath, + Function<byte[], V> deserializer) + throws InterruptedException, KeeperException, ZooKeeperConnectionException { + + return ZooKeeperMap.create(zkClient, nodePath, deserializer, ZooKeeperMap.<V>noopListener()); + } + + /** + * Initializes a ZooKeeperMap. The given path must exist at the time of object creation or + * a {@link KeeperException} will be thrown. + * + * Please note that this object will not track any remote zookeeper data until {@link #init()} + * is successfully called. After construction and before that call, this {@link Map} will + * be empty. + * + * @param zkClient a zookeeper client + * @param nodePath top-level node path under which the map data lives + * @param deserializer a function that converts byte[] data from a zk node to this map's + * value type V + * @param mapListener is a Listener which fires when values are added, changed, or removed. + * + * @throws InterruptedException if the underlying zookeeper server transaction is interrupted + * @throws KeeperException.NoNodeException if the given nodePath doesn't exist + * @throws KeeperException if the server signals an error + * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper + * cluster + */ + @VisibleForTesting + ZooKeeperMap( + ZooKeeperClient zkClient, + String nodePath, + Function<byte[], V> deserializer, + Listener<V> mapListener) + throws InterruptedException, KeeperException, ZooKeeperConnectionException { + + super(); + + this.mapListener = Preconditions.checkNotNull(mapListener); + this.zkClient = Preconditions.checkNotNull(zkClient); + this.nodePath = MorePreconditions.checkNotBlank(nodePath); + this.deserializer = Preconditions.checkNotNull(deserializer); + + localMap = new ConcurrentHashMap<String, V>(); + unmodifiableLocalMap = Collections.unmodifiableMap(localMap); + backoffHelper = new BackoffHelper(); + safeToRewatchLock = new Object(); + safeToRewatch = false; + + if (zkClient.get().exists(nodePath, null) == null) { + throw new KeeperException.NoNodeException(); + } + } + + /** + * Initialize zookeeper tracking for this {@link Map}. Once this call returns, this object + * will be tracking data in zookeeper. + * + * @throws InterruptedException if the underlying zookeeper server transaction is interrupted + * @throws KeeperException if the server signals an error + * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper + * cluster + */ + @VisibleForTesting + void init() throws InterruptedException, KeeperException, ZooKeeperConnectionException { + Watcher watcher = zkClient.registerExpirationHandler(new Command() { + @Override public void execute() { + /* + * First rewatch all of our locally cached children. Some of them may not exist anymore, + * which will lead to caught KeeperException.NoNode whereafter we'll remove that child + * from the cached map. + * + * Next, we'll establish our top level child watch and add any new nodes that might exist. + */ + try { + synchronized (safeToRewatchLock) { + if (safeToRewatch) { + rewatchDataNodes(); + tryWatchChildren(); + } + } + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "Interrupted while trying to re-establish watch.", e); + Thread.currentThread().interrupt(); + } + } + }); + + try { + // Synchronize to prevent the race of watchChildren completing and then the session expiring + // before we update safeToRewatch. + synchronized (safeToRewatchLock) { + watchChildren(); + safeToRewatch = true; + } + } catch (InterruptedException e) { + zkClient.unregister(watcher); + throw e; + } catch (KeeperException e) { + zkClient.unregister(watcher); + throw e; + } catch (ZooKeeperConnectionException e) { + zkClient.unregister(watcher); + throw e; + } + } + + @Override + protected Map<String, V> delegate() { + return unmodifiableLocalMap; + } + + private void tryWatchChildren() throws InterruptedException { + backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, InterruptedException>() { + @Override public Boolean get() throws InterruptedException { + try { + watchChildren(); + return true; + } catch (KeeperException e) { + return false; + } catch (ZooKeeperConnectionException e) { + return false; + } + } + }); + } + + private synchronized void watchChildren() + throws InterruptedException, KeeperException, ZooKeeperConnectionException { + + /* + * Add a watch on the parent node itself, and attempt to rewatch if it + * gets deleted + */ + zkClient.get().exists(nodePath, new Watcher() { + @Override public void process(WatchedEvent event) { + if (event.getType() == Watcher.Event.EventType.NodeDeleted) { + // If the parent node no longer exists + localMap.clear(); + try { + tryWatchChildren(); + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "Interrupted while trying to watch children.", e); + Thread.currentThread().interrupt(); + } + } + }}); + + final Watcher childWatcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { + try { + tryWatchChildren(); + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "Interrupted while trying to watch children.", e); + Thread.currentThread().interrupt(); + } + } + } + }; + + List<String> children = zkClient.get().getChildren(nodePath, childWatcher); + updateChildren(Sets.newHashSet(children)); + } + + private void tryAddChild(final String child) throws InterruptedException { + backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, InterruptedException>() { + @Override public Boolean get() throws InterruptedException { + try { + addChild(child); + return true; + } catch (KeeperException e) { + return false; + } catch (ZooKeeperConnectionException e) { + return false; + } + } + }); + } + + // TODO(Adam Samet) - Make this use the ZooKeeperNode class. + private void addChild(final String child) + throws InterruptedException, KeeperException, ZooKeeperConnectionException { + + final Watcher nodeWatcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + if (event.getType() == Watcher.Event.EventType.NodeDataChanged) { + try { + tryAddChild(child); + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "Interrupted while trying to add a child.", e); + Thread.currentThread().interrupt(); + } + } else if (event.getType() == Watcher.Event.EventType.NodeDeleted) { + removeEntry(child); + } + } + }; + + try { + V value = deserializer.apply(zkClient.get().getData(makePath(child), nodeWatcher, null)); + putEntry(child, value); + } catch (KeeperException.NoNodeException e) { + // This node doesn't exist anymore, remove it from the map and we're done. + removeEntry(child); + } + } + + @VisibleForTesting + void removeEntry(String key) { + localMap.remove(key); + mapListener.nodeRemoved(key); + } + + @VisibleForTesting + void putEntry(String key, V value) { + localMap.put(key, value); + mapListener.nodeChanged(key, value); + } + + private void rewatchDataNodes() throws InterruptedException { + for (String child : keySet()) { + tryAddChild(child); + } + } + + private String makePath(final String child) { + return nodePath + "/" + child; + } + + private void updateChildren(Set<String> zkChildren) throws InterruptedException { + Set<String> addedChildren = Sets.difference(zkChildren, keySet()); + Set<String> removedChildren = Sets.difference(keySet(), zkChildren); + for (String child : addedChildren) { + tryAddChild(child); + } + for (String child : removedChildren) { + removeEntry(child); + } + } +} + http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperNode.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperNode.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperNode.java new file mode 100644 index 0000000..3829ca7 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperNode.java @@ -0,0 +1,349 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; + +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.data.Stat; + +import org.apache.aurora.common.base.Closure; +import org.apache.aurora.common.base.Closures; +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.base.ExceptionalSupplier; +import org.apache.aurora.common.base.MorePreconditions; +import org.apache.aurora.common.util.BackoffHelper; +import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; + +/** + * An implementation of {@link Supplier} that offers a readonly view of a + * zookeeper data node. This class is thread-safe. + * + * Instances of this class each maintain a zookeeper watch for the remote data node. Instances + * of this class should be created via the {@link #create} factory method. + * + * Please see zookeeper documentation and talk to your cluster administrator for guidance on + * appropriate node size and total number of nodes you should be using. + * + * @param <T> the type of data this node stores + */ +public class ZooKeeperNode<T> implements Supplier<T> { + /** + * Deserializer for the constructor if you want to simply store the zookeeper byte[] data + * as-is. + */ + public static final Function<byte[], byte[]> BYTE_ARRAY_VALUE = Functions.identity(); + + private static final Logger LOG = Logger.getLogger(ZooKeeperNode.class.getName()); + + private final ZooKeeperClient zkClient; + private final String nodePath; + private final NodeDeserializer<T> deserializer; + + private final BackoffHelper backoffHelper; + + // Whether it's safe to re-establish watches if our zookeeper session has expired. + private final Object safeToRewatchLock; + private volatile boolean safeToRewatch; + + private final T NO_DATA = null; + @Nullable private volatile T nodeData; + private final Closure<T> dataUpdateListener; + + /** + * When a call to ZooKeeper.getData is made, the Watcher is added to a Set before the the network + * request is made and if the request fails, the Watcher remains. There's a problem where Watcher + * can accumulate when there are failed requests, so they are set to instance fields and reused. + */ + private final Watcher nodeWatcher; + private final Watcher existenceWatcher; + + /** + * Returns an initialized ZooKeeperNode. The given node must exist at the time of object + * creation or a {@link KeeperException} will be thrown. + * + * @param zkClient a zookeeper client + * @param nodePath path to a node whose data will be watched + * @param deserializer a function that converts byte[] data from a zk node to this supplier's + * type T + * + * @throws InterruptedException if the underlying zookeeper server transaction is interrupted + * @throws KeeperException.NoNodeException if the given nodePath doesn't exist + * @throws KeeperException if the server signals an error + * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper + * cluster + */ + public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath, + Function<byte[], T> deserializer) throws InterruptedException, KeeperException, + ZooKeeperConnectionException { + return create(zkClient, nodePath, deserializer, Closures.<T>noop()); + } + + /** + * Like the above, but optionally takes in a {@link Closure} that will get notified + * whenever the data is updated from the remote node. + * + * @param dataUpdateListener a {@link Closure} to receive data update notifications. + */ + public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath, + Function<byte[], T> deserializer, Closure<T> dataUpdateListener) throws InterruptedException, + KeeperException, ZooKeeperConnectionException { + return create(zkClient, nodePath, new FunctionWrapper<T>(deserializer), dataUpdateListener); + } + + /** + * Returns an initialized ZooKeeperNode. The given node must exist at the time of object + * creation or a {@link KeeperException} will be thrown. + * + * @param zkClient a zookeeper client + * @param nodePath path to a node whose data will be watched + * @param deserializer an implentation of {@link NodeDeserializer} that converts a byte[] from a + * zk node to this supplier's type T. Also supplies a {@link Stat} object which is useful for + * doing versioned updates. + * + * @throws InterruptedException if the underlying zookeeper server transaction is interrupted + * @throws KeeperException.NoNodeException if the given nodePath doesn't exist + * @throws KeeperException if the server signals an error + * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper + * cluster + */ + public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath, + NodeDeserializer<T> deserializer) throws InterruptedException, KeeperException, + ZooKeeperConnectionException { + return create(zkClient, nodePath, deserializer, Closures.<T>noop()); + } + + /** + * Like the above, but optionally takes in a {@link Closure} that will get notified + * whenever the data is updated from the remote node. + * + * @param dataUpdateListener a {@link Closure} to receive data update notifications. + */ + public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath, + NodeDeserializer<T> deserializer, Closure<T> dataUpdateListener) + throws InterruptedException, KeeperException, ZooKeeperConnectionException { + ZooKeeperNode<T> zkNode = + new ZooKeeperNode<T>(zkClient, nodePath, deserializer, dataUpdateListener); + zkNode.init(); + return zkNode; + } + + /** + * Initializes a ZooKeeperNode. The given node must exist at the time of object creation or + * a {@link KeeperException} will be thrown. + * + * Please note that this object will not track any remote zookeeper data until {@link #init()} + * is successfully called. After construction and before that call, this {@link Supplier} will + * return null. + * + * @param zkClient a zookeeper client + * @param nodePath path to a node whose data will be watched + * @param deserializer an implementation of {@link NodeDeserializer} that converts byte[] data + * from a zk node to this supplier's type T + * @param dataUpdateListener a {@link Closure} to receive data update notifications. + */ + @VisibleForTesting + ZooKeeperNode(ZooKeeperClient zkClient, String nodePath, + NodeDeserializer<T> deserializer, Closure<T> dataUpdateListener) { + this.zkClient = Preconditions.checkNotNull(zkClient); + this.nodePath = MorePreconditions.checkNotBlank(nodePath); + this.deserializer = Preconditions.checkNotNull(deserializer); + this.dataUpdateListener = Preconditions.checkNotNull(dataUpdateListener); + + backoffHelper = new BackoffHelper(); + safeToRewatchLock = new Object(); + safeToRewatch = false; + nodeData = NO_DATA; + + nodeWatcher = new Watcher() { + @Override public void process(WatchedEvent event) { + if (event.getState() == KeeperState.SyncConnected) { + try { + tryWatchDataNode(); + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "Interrupted while trying to watch a data node.", e); + Thread.currentThread().interrupt(); + } + } else { + LOG.info("Ignoring watcher event " + event); + } + } + }; + + existenceWatcher = new Watcher() { + @Override public void process(WatchedEvent event) { + if (event.getType() == Watcher.Event.EventType.NodeCreated) { + try { + tryWatchDataNode(); + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "Interrupted while trying to watch a data node.", e); + Thread.currentThread().interrupt(); + } + } + } + }; + } + + /** + * Initialize zookeeper tracking for this {@link Supplier}. Once this call returns, this object + * will be tracking data in zookeeper. + * + * @throws InterruptedException if the underlying zookeeper server transaction is interrupted + * @throws KeeperException if the server signals an error + * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper + * cluster + */ + @VisibleForTesting + void init() throws InterruptedException, KeeperException, + ZooKeeperConnectionException { + Watcher watcher = zkClient.registerExpirationHandler(new Command() { + @Override public void execute() { + try { + synchronized (safeToRewatchLock) { + if (safeToRewatch) { + tryWatchDataNode(); + } + } + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "Interrupted while trying to re-establish watch.", e); + Thread.currentThread().interrupt(); + } + } + }); + + try { + /* + * Synchronize to prevent the race of watchDataNode completing and then the session expiring + * before we update safeToRewatch. + */ + synchronized (safeToRewatchLock) { + watchDataNode(); + safeToRewatch = true; + } + } catch (InterruptedException e) { + zkClient.unregister(watcher); + throw e; + } catch (KeeperException e) { + zkClient.unregister(watcher); + throw e; + } catch (ZooKeeperConnectionException e) { + zkClient.unregister(watcher); + throw e; + } + } + + /** + * Returns the data corresponding to a byte array in a remote zookeeper node. This data is + * cached locally and updated in the background on watch notifications. + * + * @return the data currently cached locally or null if {@link #init()} hasn't been called + * or the backing node has no data or does not exist anymore. + */ + @Override + public @Nullable T get() { + return nodeData; + } + + @VisibleForTesting + void updateData(@Nullable T newData) { + nodeData = newData; + dataUpdateListener.execute(newData); + } + + private void tryWatchDataNode() throws InterruptedException { + backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, InterruptedException>() { + @Override public Boolean get() throws InterruptedException { + try { + watchDataNode(); + return true; + } catch (KeeperException e) { + return false; + } catch (ZooKeeperConnectionException e) { + return false; + } + } + }); + } + + private void watchDataNode() throws InterruptedException, KeeperException, + ZooKeeperConnectionException { + try { + Stat stat = new Stat(); + byte[] rawData = zkClient.get().getData(nodePath, nodeWatcher, stat); + T newData = deserializer.deserialize(rawData, stat); + updateData(newData); + } catch (KeeperException.NoNodeException e) { + /* + * This node doesn't exist right now, reflect that locally and then create a watch to wait + * for its recreation. + */ + updateData(NO_DATA); + watchForExistence(); + } + } + + private void watchForExistence() throws InterruptedException, KeeperException, + ZooKeeperConnectionException { + /* + * If the node was created between the getData call and this call, just try watching it. + * We'll have an extra exists watch on it that goes off on its next deletion, which will + * be a no-op. + * Otherwise, just let the exists watch wait for its creation. + */ + if (zkClient.get().exists(nodePath, existenceWatcher) != null) { + tryWatchDataNode(); + } + } + + /** + * Interface for defining zookeeper node data deserialization. + * + * @param <T> the type of data associated with this node + */ + public interface NodeDeserializer<T> { + /** + * @param data the byte array returned from ZooKeeper when a watch is triggered. + * @param stat a ZooKeeper {@link Stat} object. Populated by + * {@link org.apache.zookeeper.ZooKeeper#getData(String, boolean, Stat)}. + */ + T deserialize(byte[] data, Stat stat); + } + + // wrapper for backwards compatibility with older create() methods with Function parameter + private static final class FunctionWrapper<T> implements NodeDeserializer<T> { + private final Function<byte[], T> func; + private FunctionWrapper(Function<byte[], T> func) { + Preconditions.checkNotNull(func); + this.func = func; + } + + public T deserialize(byte[] rawData, Stat stat) { + return func.apply(rawData); + } + } + +} + http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java new file mode 100644 index 0000000..a8dcfa1 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java @@ -0,0 +1,167 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.util.List; +import java.util.logging.Logger; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.common.PathUtils; +import org.apache.zookeeper.data.ACL; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; + +/** + * Utilities for dealing with zoo keeper. + */ +public final class ZooKeeperUtils { + + private static final Logger LOG = Logger.getLogger(ZooKeeperUtils.class.getName()); + + /** + * An appropriate default session timeout for Twitter ZooKeeper clusters. + */ + public static final Amount<Integer,Time> DEFAULT_ZK_SESSION_TIMEOUT = Amount.of(4, Time.SECONDS); + + /** + * The magic version number that allows any mutation to always succeed regardless of actual + * version number. + */ + public static final int ANY_VERSION = -1; + + /** + * An ACL that gives all permissions any user authenticated or not. + */ + public static final ImmutableList<ACL> OPEN_ACL_UNSAFE = + ImmutableList.copyOf(Ids.OPEN_ACL_UNSAFE); + + /** + * An ACL that gives all permissions to node creators and read permissions only to everyone else. + */ + public static final ImmutableList<ACL> EVERYONE_READ_CREATOR_ALL = + ImmutableList.<ACL>builder() + .addAll(Ids.CREATOR_ALL_ACL) + .addAll(Ids.READ_ACL_UNSAFE) + .build(); + + /** + * Returns true if the given exception indicates an error that can be resolved by retrying the + * operation without modification. + * + * @param e the exception to check + * @return true if the causing operation is strictly retryable + */ + public static boolean isRetryable(KeeperException e) { + Preconditions.checkNotNull(e); + + switch (e.code()) { + case CONNECTIONLOSS: + case SESSIONEXPIRED: + case SESSIONMOVED: + case OPERATIONTIMEOUT: + return true; + + case RUNTIMEINCONSISTENCY: + case DATAINCONSISTENCY: + case MARSHALLINGERROR: + case BADARGUMENTS: + case NONODE: + case NOAUTH: + case BADVERSION: + case NOCHILDRENFOREPHEMERALS: + case NODEEXISTS: + case NOTEMPTY: + case INVALIDCALLBACK: + case INVALIDACL: + case AUTHFAILED: + case UNIMPLEMENTED: + + // These two should not be encountered - they are used internally by ZK to specify ranges + case SYSTEMERROR: + case APIERROR: + + case OK: // This is actually an invalid ZK exception code + + default: + return false; + } + } + + /** + * Ensures the given {@code path} exists in the ZK cluster accessed by {@code zkClient}. If the + * path already exists, nothing is done; however if any portion of the path is missing, it will be + * created with the given {@code acl} as a persistent zookeeper node. The given {@code path} must + * be a valid zookeeper absolute path. + * + * @param zkClient the client to use to access the ZK cluster + * @param acl the acl to use if creating path nodes + * @param path the path to ensure exists + * @throws ZooKeeperConnectionException if there was a problem accessing the ZK cluster + * @throws InterruptedException if we were interrupted attempting to connect to the ZK cluster + * @throws KeeperException if there was a problem in ZK + */ + public static void ensurePath(ZooKeeperClient zkClient, List<ACL> acl, String path) + throws ZooKeeperConnectionException, InterruptedException, KeeperException { + Preconditions.checkNotNull(zkClient); + Preconditions.checkNotNull(path); + Preconditions.checkArgument(path.startsWith("/")); + + ensurePathInternal(zkClient, acl, path); + } + + private static void ensurePathInternal(ZooKeeperClient zkClient, List<ACL> acl, String path) + throws ZooKeeperConnectionException, InterruptedException, KeeperException { + if (zkClient.get().exists(path, false) == null) { + // The current path does not exist; so back up a level and ensure the parent path exists + // unless we're already a root-level path. + int lastPathIndex = path.lastIndexOf('/'); + if (lastPathIndex > 0) { + ensurePathInternal(zkClient, acl, path.substring(0, lastPathIndex)); + } + + // We've ensured our parent path (if any) exists so we can proceed to create our path. + try { + zkClient.get().create(path, null, acl, CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException e) { + // This ensures we don't die if a race condition was met between checking existence and + // trying to create the node. + LOG.info("Node existed when trying to ensure path " + path + ", somebody beat us to it?"); + } + } + } + + /** + * Validate and return a normalized zookeeper path which doesn't contain consecutive slashes and + * never ends with a slash (except for root path). + * + * @param path the path to be normalized + * @return normalized path string + */ + public static String normalizePath(String path) { + String normalizedPath = path.replaceAll("//+", "/").replaceFirst("(.+)/$", "$1"); + PathUtils.validatePath(normalizedPath); + return normalizedPath; + } + + private ZooKeeperUtils() { + // utility + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/ServerSetModule.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/ServerSetModule.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/ServerSetModule.java new file mode 100644 index 0000000..c8a3214 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/ServerSetModule.java @@ -0,0 +1,267 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper.guice; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.annotation.Nullable; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.Atomics; +import com.google.inject.AbstractModule; +import com.google.inject.BindingAnnotation; +import com.google.inject.Inject; +import com.google.inject.Key; +import com.google.inject.Singleton; +import com.google.inject.TypeLiteral; + +import org.apache.aurora.common.application.ShutdownRegistry; +import org.apache.aurora.common.application.modules.LifecycleModule; +import org.apache.aurora.common.application.modules.LocalServiceRegistry; +import org.apache.aurora.common.args.Arg; +import org.apache.aurora.common.args.CmdLine; +import org.apache.aurora.common.args.constraints.NotNegative; +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.base.ExceptionalCommand; +import org.apache.aurora.common.base.Supplier; +import org.apache.aurora.common.zookeeper.Group; +import org.apache.aurora.common.zookeeper.ServerSet; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * A module that registers all ports in the {@link LocalServiceRegistry} in an {@link ServerSet}. + * <p/> + * Required bindings: + * <ul> + * <li> {@link ServerSet} + * <li> {@link ShutdownRegistry} + * <li> {@link LocalServiceRegistry} + * </ul> + * <p/> + * {@link LifecycleModule} must also be included by users so a startup action may be registered. + * <p/> + * Provided bindings: + * <ul> + * <li> {@link Supplier}<{@link ServerSet.EndpointStatus}> + * </ul> + */ +public class ServerSetModule extends AbstractModule { + + /** + * BindingAnnotation for defaults to use in the service instance node. + */ + @BindingAnnotation @Target({PARAMETER, METHOD, FIELD}) @Retention(RUNTIME) + private @interface Default {} + + /** + * Binding annotation to give the ServerSetJoiner a fixed known ServerSet that is appropriate to + * {@link ServerSet#join} on. + */ + @BindingAnnotation @Target({METHOD, PARAMETER}) @Retention(RUNTIME) + private @interface Joinable {} + + private static final Key<ServerSet> JOINABLE_SS = Key.get(ServerSet.class, Joinable.class); + + @CmdLine(name = "aux_port_as_primary", + help = "Name of the auxiliary port to use as the primary port in the server set." + + " This may only be used when no other primary port is specified.") + private static final Arg<String> AUX_PORT_AS_PRIMARY = Arg.create(null); + + @NotNegative + @CmdLine(name = "shard_id", help = "Shard ID for this application.") + private static final Arg<Integer> SHARD_ID = Arg.create(); + + private static final Logger LOG = Logger.getLogger(ServerSetModule.class.getName()); + + /** + * Builds a Module tht can be used to join a {@link ServerSet} with the ports configured in a + * {@link LocalServiceRegistry}. + */ + public static class Builder { + private Key<ServerSet> key = Key.get(ServerSet.class); + private Optional<String> auxPortAsPrimary = Optional.absent(); + + /** + * Sets the key of the ServerSet to join. + * + * @param key Key of the ServerSet to join. + * @return This builder for chaining calls. + */ + public Builder key(Key<ServerSet> key) { + this.key = key; + return this; + } + + /** + * Allows joining an auxiliary port with the specified {@code name} as the primary port of the + * ServerSet. + * + * @param auxPortName The name of the auxiliary port to join as the primary ServerSet port. + * @return This builder for chaining calls. + */ + public Builder namedPrimaryPort(String auxPortName) { + this.auxPortAsPrimary = Optional.of(auxPortName); + return this; + } + + /** + * Creates a Module that will register a startup action that joins a ServerSet when installed. + * + * @return A Module. + */ + public ServerSetModule build() { + return new ServerSetModule(key, auxPortAsPrimary); + } + } + + /** + * Creates a builder that can be used to configure and create a ServerSetModule. + * + * @return A ServerSetModule builder. + */ + public static Builder builder() { + return new Builder(); + } + + private final Key<ServerSet> serverSetKey; + private final Optional<String> auxPortAsPrimary; + + /** + * Constructs a ServerSetModule that registers a startup action to register this process in + * ZooKeeper, with the specified initial status and auxiliary port to represent as the primary + * service port. + * + * @param serverSetKey The key the ServerSet to join is bound under. + * @param auxPortAsPrimary Name of the auxiliary port to use as the primary port. + */ + ServerSetModule(Key<ServerSet> serverSetKey, Optional<String> auxPortAsPrimary) { + + this.serverSetKey = checkNotNull(serverSetKey); + this.auxPortAsPrimary = checkNotNull(auxPortAsPrimary); + } + + @Override + protected void configure() { + requireBinding(serverSetKey); + requireBinding(ShutdownRegistry.class); + requireBinding(LocalServiceRegistry.class); + + LifecycleModule.bindStartupAction(binder(), ServerSetJoiner.class); + + bind(new TypeLiteral<Supplier<ServerSet.EndpointStatus>>() { }).to(EndpointSupplier.class); + bind(EndpointSupplier.class).in(Singleton.class); + + Optional<String> primaryPortName; + if (AUX_PORT_AS_PRIMARY.hasAppliedValue()) { + primaryPortName = Optional.of(AUX_PORT_AS_PRIMARY.get()); + } else { + primaryPortName = auxPortAsPrimary; + } + + bind(new TypeLiteral<Optional<String>>() { }).annotatedWith(Default.class) + .toInstance(primaryPortName); + + bind(JOINABLE_SS).to(serverSetKey); + } + + static class EndpointSupplier implements Supplier<ServerSet.EndpointStatus> { + private final AtomicReference<ServerSet.EndpointStatus> reference = Atomics.newReference(); + + @Nullable + @Override public ServerSet.EndpointStatus get() { + return reference.get(); + } + + void set(ServerSet.EndpointStatus endpoint) { + reference.set(endpoint); + } + } + + private static class ServerSetJoiner implements Command { + private final ServerSet serverSet; + private final LocalServiceRegistry serviceRegistry; + private final ShutdownRegistry shutdownRegistry; + private final EndpointSupplier endpointSupplier; + private final Optional<String> auxPortAsPrimary; + + @Inject + ServerSetJoiner( + @Joinable ServerSet serverSet, + LocalServiceRegistry serviceRegistry, + ShutdownRegistry shutdownRegistry, + EndpointSupplier endpointSupplier, + @Default Optional<String> auxPortAsPrimary) { + + this.serverSet = checkNotNull(serverSet); + this.serviceRegistry = checkNotNull(serviceRegistry); + this.shutdownRegistry = checkNotNull(shutdownRegistry); + this.endpointSupplier = checkNotNull(endpointSupplier); + this.auxPortAsPrimary = checkNotNull(auxPortAsPrimary); + } + + @Override public void execute() { + Optional<InetSocketAddress> primarySocket = serviceRegistry.getPrimarySocket(); + Map<String, InetSocketAddress> auxSockets = serviceRegistry.getAuxiliarySockets(); + + InetSocketAddress primary; + if (primarySocket.isPresent()) { + primary = primarySocket.get(); + } else if (auxPortAsPrimary.isPresent()) { + primary = auxSockets.get(auxPortAsPrimary.get()); + if (primary == null) { + throw new IllegalStateException("No auxiliary port named " + auxPortAsPrimary.get()); + } + } else { + throw new IllegalStateException("No primary service registered with LocalServiceRegistry," + + " and -aux_port_as_primary was not specified."); + } + + final ServerSet.EndpointStatus endpointStatus; + try { + if (SHARD_ID.hasAppliedValue()) { + endpointStatus = serverSet.join(primary, auxSockets, SHARD_ID.get()); + } else { + endpointStatus = serverSet.join(primary, auxSockets); + } + + endpointSupplier.set(endpointStatus); + } catch (Group.JoinException e) { + LOG.log(Level.WARNING, "Failed to join ServerSet.", e); + throw new RuntimeException(e); + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "Interrupted while joining ServerSet.", e); + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + shutdownRegistry.addAction(new ExceptionalCommand<ServerSet.UpdateException>() { + @Override public void execute() throws ServerSet.UpdateException { + LOG.info("Leaving ServerSet."); + endpointStatus.leave(); + } + }); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/client/ZooKeeperClientModule.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/client/ZooKeeperClientModule.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/client/ZooKeeperClientModule.java new file mode 100644 index 0000000..08cdf55 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/client/ZooKeeperClientModule.java @@ -0,0 +1,235 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper.guice.client; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.logging.Logger; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; + +import com.google.inject.Inject; +import com.google.inject.Key; +import com.google.inject.PrivateModule; +import com.google.inject.Provider; +import com.google.inject.Singleton; + +import org.apache.aurora.common.application.ShutdownRegistry; +import org.apache.aurora.common.inject.Bindings.KeyFactory; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.zookeeper.ZooKeeperClient; +import org.apache.aurora.common.zookeeper.ZooKeeperClient.Credentials; +import org.apache.aurora.common.zookeeper.ZooKeeperUtils; +import org.apache.aurora.common.zookeeper.testing.ZooKeeperTestServer; + +/** + * A guice binding module that configures and binds a {@link ZooKeeperClient} instance. + */ +public class ZooKeeperClientModule extends PrivateModule { + private final KeyFactory keyFactory; + private final ClientConfig config; + + /** + * Creates a new ZK client module from the provided configuration. + * + * @param config Configuration parameters for the client. + */ + public ZooKeeperClientModule(ClientConfig config) { + this(KeyFactory.PLAIN, config); + } + + /** + * Creates a new ZK client module from the provided configuration, using a key factory to + * qualify any bindings. + * + * @param keyFactory Factory to use when creating any exposed bindings. + * @param config Configuration parameters for the client. + */ + public ZooKeeperClientModule(KeyFactory keyFactory, ClientConfig config) { + this.keyFactory = Preconditions.checkNotNull(keyFactory); + this.config = Preconditions.checkNotNull(config); + } + + @Override + protected void configure() { + Key<ZooKeeperClient> clientKey = keyFactory.create(ZooKeeperClient.class); + if (config.inProcess) { + requireBinding(ShutdownRegistry.class); + // Bound privately to give the local provider access to configuration settings. + bind(ClientConfig.class).toInstance(config); + bind(clientKey).toProvider(LocalClientProvider.class).in(Singleton.class); + } else { + ZooKeeperClient client = + new ZooKeeperClient(config.sessionTimeout, config.credentials, config.chrootPath, config.servers); + bind(clientKey).toInstance(client); + } + expose(clientKey); + } + + private static class LocalClientProvider implements Provider<ZooKeeperClient> { + private static final Logger LOG = Logger.getLogger(LocalClientProvider.class.getName()); + + private final ClientConfig config; + private final ShutdownRegistry shutdownRegistry; + + @Inject + LocalClientProvider(ClientConfig config, ShutdownRegistry shutdownRegistry) { + this.config = Preconditions.checkNotNull(config); + this.shutdownRegistry = Preconditions.checkNotNull(shutdownRegistry); + } + + @Override + public ZooKeeperClient get() { + ZooKeeperTestServer zooKeeperServer; + try { + zooKeeperServer = new ZooKeeperTestServer(0, shutdownRegistry); + zooKeeperServer.startNetwork(); + } catch (IOException e) { + throw Throwables.propagate(e); + } catch (InterruptedException e) { + throw Throwables.propagate(e); + } + + LOG.info("Embedded zookeeper cluster started on port " + zooKeeperServer.getPort()); + return zooKeeperServer.createClient(config.sessionTimeout, config.credentials); + } + } + + /** + * Composite type that contains configuration parameters used when creating a client. + * <p> + * Instances of this class are immutable, but builder-style chained calls are supported. + */ + public static class ClientConfig { + public final Iterable<InetSocketAddress> servers; + public final boolean inProcess; + public final Amount<Integer, Time> sessionTimeout; + public final Optional<String> chrootPath; + public final Credentials credentials; + + /** + * Creates a new client configuration. + * + * @param servers ZooKeeper server addresses. + * @param inProcess Whether to run and create clients for an in-process ZooKeeper server. + * @param sessionTimeout Timeout duration for established sessions. + * @param credentials ZooKeeper authentication credentials. + */ + public ClientConfig( + Iterable<InetSocketAddress> servers, + boolean inProcess, + Amount<Integer, Time> sessionTimeout, + Credentials credentials) { + + this(servers, Optional.<String>absent(), inProcess, sessionTimeout, credentials); + } + + /** + * Creates a new client configuration. + * + * @param servers ZooKeeper server addresses. + * @param inProcess Whether to run and create clients for an in-process ZooKeeper server. + * @param chrootPath an optional chroot path + * @param sessionTimeout Timeout duration for established sessions. + * @param credentials ZooKeeper authentication credentials. + */ + public ClientConfig( + Iterable<InetSocketAddress> servers, + Optional<String> chrootPath, + boolean inProcess, + Amount<Integer, Time> sessionTimeout, + Credentials credentials) { + + this.servers = servers; + this.chrootPath = chrootPath; + this.inProcess = inProcess; + this.sessionTimeout = sessionTimeout; + this.credentials = credentials; + } + + /** + * Creates a new client configuration with defaults for the session timeout and credentials. + * + * @param servers ZooKeeper server addresses. + * @return A new configuration. + */ + public static ClientConfig create(Iterable<InetSocketAddress> servers) { + return new ClientConfig( + servers, + Optional.<String> absent(), + false, + ZooKeeperUtils.DEFAULT_ZK_SESSION_TIMEOUT, + Credentials.NONE); + } + + /** + * Creates a new configuration identical to this configuration, but with the provided + * session timeout. + * + * @param sessionTimeout Timeout duration for established sessions. + * @return A modified clone of this configuration. + */ + public ClientConfig withSessionTimeout(Amount<Integer, Time> sessionTimeout) { + return new ClientConfig(servers, chrootPath, inProcess, sessionTimeout, credentials); + } + + /** + * Creates a new configuration identical to this configuration, but with the provided + * credentials. + * + * @param credentials ZooKeeper authentication credentials. + * @return A modified clone of this configuration. + */ + public ClientConfig withCredentials(Credentials credentials) { + return new ClientConfig(servers, chrootPath, inProcess, sessionTimeout, credentials); + } + + /** + * Convenience method for calling {@link #withCredentials(Credentials)} with digest credentials. + * + * @param username Digest authentication user. + * @param password Digest authentication raw password. + * @return A modified clone of this configuration. + */ + public ClientConfig withDigestCredentials(String username, String password) { + return withCredentials(ZooKeeperClient.digestCredentials(username, password)); + } + + /** + * Creates a new configuration identical to this configuration, but with the provided + * in-process setting. + * + * @param inProcess If {@code true}, an in-process ZooKeeper server server will be used, + * and all clients will connect to it. + * @return A modified clone of this configuration. + */ + public ClientConfig inProcess(boolean inProcess) { + return new ClientConfig(servers, chrootPath, inProcess, sessionTimeout, credentials); + } + + /** + * Creates a new configuration identical to this configuration, but with the provided + * chroot path setting. + * + * @param chrootPath a valid ZooKeeper path used as a chroot for ZooKeeper connections. + * @return A modified clone of this configuration. + */ + public ClientConfig withChrootPath(String chrootPath) { + return new ClientConfig(servers, Optional.of(chrootPath), inProcess, sessionTimeout, credentials); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/client/flagged/FlaggedClientConfig.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/client/flagged/FlaggedClientConfig.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/client/flagged/FlaggedClientConfig.java new file mode 100644 index 0000000..f3e3a84 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/guice/client/flagged/FlaggedClientConfig.java @@ -0,0 +1,82 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper.guice.client.flagged; + +import java.net.InetSocketAddress; +import java.util.List; + +import com.google.common.base.Optional; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; + +import org.apache.aurora.common.args.Arg; +import org.apache.aurora.common.args.CmdLine; +import org.apache.aurora.common.args.constraints.NotEmpty; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.zookeeper.ZooKeeperClient; +import org.apache.aurora.common.zookeeper.ZooKeeperClient.Credentials; +import org.apache.aurora.common.zookeeper.ZooKeeperUtils; +import org.apache.aurora.common.zookeeper.guice.client.ZooKeeperClientModule.ClientConfig; + +/** + * A factory that creates a {@link ClientConfig} instance based on command line argument values. + */ +public class FlaggedClientConfig { + @CmdLine(name = "zk_in_proc", + help = "Launches an embedded zookeeper server for local testing causing -zk_endpoints " + + "to be ignored if specified.") + private static final Arg<Boolean> IN_PROCESS = Arg.create(false); + + @NotEmpty + @CmdLine(name = "zk_endpoints", help ="Endpoint specification for the ZooKeeper servers.") + private static final Arg<List<InetSocketAddress>> ZK_ENDPOINTS = Arg.create(); + + @CmdLine(name = "zk_chroot_path", help = "chroot path to use for the ZooKeeper connections") + private static final Arg<String> CHROOT_PATH = Arg.create(null); + + @CmdLine(name = "zk_session_timeout", help ="The ZooKeeper session timeout.") + private static final Arg<Amount<Integer, Time>> SESSION_TIMEOUT = + Arg.create(ZooKeeperUtils.DEFAULT_ZK_SESSION_TIMEOUT); + + @CmdLine(name = "zk_digest_credentials", + help ="user:password to use when authenticating with ZooKeeper.") + private static final Arg<String> DIGEST_CREDENTIALS = Arg.create(); + + /** + * Creates a configuration from command line arguments. + * + * @return Configuration instance. + */ + public static ClientConfig create() { + return new ClientConfig( + ZK_ENDPOINTS.get(), + Optional.fromNullable(CHROOT_PATH.get()), + IN_PROCESS.get(), + SESSION_TIMEOUT.get(), + DIGEST_CREDENTIALS.hasAppliedValue() + ? getCredentials(DIGEST_CREDENTIALS.get()) + : Credentials.NONE + ); + } + + private static Credentials getCredentials(String userAndPass) { + List<String> parts = ImmutableList.copyOf(Splitter.on(":").split(userAndPass)); + if (parts.size() != 2) { + throw new IllegalArgumentException( + "zk_digest_credentials must be formatted as user:pass"); + } + return ZooKeeperClient.digestCredentials(parts.get(0), parts.get(1)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java new file mode 100644 index 0000000..88cd6d2 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java @@ -0,0 +1,152 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper.testing; + +import java.io.IOException; + +import com.google.common.base.Preconditions; +import com.google.common.testing.TearDown; +import com.google.common.testing.junit4.TearDownTestCase; + +import org.apache.aurora.common.zookeeper.ZooKeeperClient; +import org.junit.Before; + +import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; + +/** + * A baseclass for in-process zookeeper tests. + * Uses ZooKeeperTestHelper to start the server and create clients: new tests should directly use + * that helper class instead of extending this class. + */ +public abstract class BaseZooKeeperTest extends TearDownTestCase { + + private final Amount<Integer, Time> defaultSessionTimeout; + private ZooKeeperTestServer zkTestServer; + + /** + * Creates a test case where the test server uses its + * {@link ZooKeeperTestServer#DEFAULT_SESSION_TIMEOUT} for clients created without an explicit + * session timeout. + */ + public BaseZooKeeperTest() { + this(ZooKeeperTestServer.DEFAULT_SESSION_TIMEOUT); + } + + /** + * Creates a test case where the test server uses the given {@code defaultSessionTimeout} for + * clients created without an explicit session timeout. + */ + public BaseZooKeeperTest(Amount<Integer, Time> defaultSessionTimeout) { + this.defaultSessionTimeout = Preconditions.checkNotNull(defaultSessionTimeout); + } + + @Before + public final void setUp() throws Exception { + final ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl(); + addTearDown(new TearDown() { + @Override public void tearDown() { + shutdownRegistry.execute(); + } + }); + zkTestServer = new ZooKeeperTestServer(0, shutdownRegistry, defaultSessionTimeout); + zkTestServer.startNetwork(); + } + + /** + * Starts zookeeper back up on the last used port. + */ + protected final void restartNetwork() throws IOException, InterruptedException { + zkTestServer.restartNetwork(); + } + + /** + * Shuts down the in-process zookeeper network server. + */ + protected final void shutdownNetwork() { + zkTestServer.shutdownNetwork(); + } + + /** + * Expires the active session for the given client. The client should be one returned from + * {@link #createZkClient}. + * + * @param zkClient the client to expire + * @throws ZooKeeperClient.ZooKeeperConnectionException if a problem is encountered connecting to + * the local zk server while trying to expire the session + * @throws InterruptedException if interrupted while requesting expiration + */ + protected final void expireSession(ZooKeeperClient zkClient) + throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException { + zkTestServer.expireClientSession(zkClient); + } + + /** + * Returns the current port to connect to the in-process zookeeper instance. + */ + protected final int getPort() { + return zkTestServer.getPort(); + } + + /** + * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server + * with the default session timeout. + */ + protected final ZooKeeperClient createZkClient() { + return zkTestServer.createClient(); + } + + /** + * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with + * the default session timeout. + */ + protected final ZooKeeperClient createZkClient(ZooKeeperClient.Credentials credentials) { + return zkTestServer.createClient(credentials); + } + + /** + * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with + * the default session timeout. The client is authenticated in the digest authentication scheme + * with the given {@code username} and {@code password}. + */ + protected final ZooKeeperClient createZkClient(String username, String password) { + return createZkClient(ZooKeeperClient.digestCredentials(username, password)); + } + + /** + * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server + * with a custom {@code sessionTimeout}. + */ + protected final ZooKeeperClient createZkClient(Amount<Integer, Time> sessionTimeout) { + return zkTestServer.createClient(sessionTimeout); + } + + /** + * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with + * a custom {@code sessionTimeout}. + */ + protected final ZooKeeperClient createZkClient(Amount<Integer, Time> sessionTimeout, + ZooKeeperClient.Credentials credentials) { + return zkTestServer.createClient(sessionTimeout, credentials); + } + + /** + * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with + * the default session timeout and the custom chroot path. + */ + protected final ZooKeeperClient createZkClient(String chrootPath) { + return zkTestServer.createClient(chrootPath); + } +}
