http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperMap.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperMap.java b/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperMap.java new file mode 100644 index 0000000..3b1dfb8 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperMap.java @@ -0,0 +1,414 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Preconditions; +import com.google.common.collect.ForwardingMap; +import com.google.common.collect.Sets; + +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +import com.twitter.common.base.Command; +import com.twitter.common.base.ExceptionalSupplier; +import com.twitter.common.base.MorePreconditions; +import com.twitter.common.util.BackoffHelper; +import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; + +/** + * A ZooKeeper backed {@link Map}. Initialized with a node path, this map represents child nodes + * under that path as keys, with the data in those nodes as values. This map is readonly from + * clients of this class, and only can be modified via direct zookeeper operations. + * + * Note that instances of this class maintain a zookeeper watch for each zookeeper node under the + * parent, as well as on the parent itself. Instances of this class should be created via the + * {@link #create} factory method. + * + * As of ZooKeeper Version 3.1, the maximum allowable size of a data node is 1 MB. A single + * client should be able to hold up to maintain several thousand watches, but this depends on rate + * of data change as well. + * + * Talk to your zookeeper cluster administrator if you expect number of map entries times number + * of live clients to exceed a thousand, as a zookeeper cluster is limited by total number of + * server-side watches enabled. + * + * For an example of a set of tools to maintain one of these maps, please see + * src/scripts/HenAccess.py in the hen repository. + * + * @param <V> the type of values this map stores + */ +public class ZooKeeperMap<V> extends ForwardingMap<String, V> { + + /** + * An optional listener which can be supplied and triggered when entries in a ZooKeeperMap + * are added, changed or removed. For a ZooKeeperMap of type <V>, the listener will fire a + * "nodeChanged" event with the name of the ZNode that changed, and its resulting value as + * interpreted by the provided deserializer. Removal of child nodes triggers the "nodeRemoved" + * method indicating the name of the ZNode which is no longer present in the map. + */ + public interface Listener<V> { + + /** + * Fired when a node is added to the ZooKeeperMap or changed. + * + * @param nodeName indicates the name of the ZNode that was added or changed. + * @param value is the new value of the node after passing through your supplied deserializer. + */ + void nodeChanged(String nodeName, V value); + + /** + * Fired when a node is removed from the ZooKeeperMap. + * + * @param nodeName indicates the name of the ZNode that was removed from the ZooKeeperMap. + */ + void nodeRemoved(String nodeName); + } + + /** + * Default deserializer for the constructor if you want to simply store the zookeeper byte[] data + * in this map. + */ + public static final Function<byte[], byte[]> BYTE_ARRAY_VALUES = Functions.identity(); + + /** + * A listener that ignores all events. + */ + public static <T> Listener<T> noopListener() { + return new Listener<T>() { + @Override public void nodeChanged(String nodeName, T value) { } + @Override public void nodeRemoved(String nodeName) { } + }; + } + + private static final Logger LOG = Logger.getLogger(ZooKeeperMap.class.getName()); + + private final ZooKeeperClient zkClient; + private final String nodePath; + private final Function<byte[], V> deserializer; + + private final ConcurrentMap<String, V> localMap; + private final Map<String, V> unmodifiableLocalMap; + private final BackoffHelper backoffHelper; + + private final Listener<V> mapListener; + + // Whether it's safe to re-establish watches if our zookeeper session has expired. + private final Object safeToRewatchLock; + private volatile boolean safeToRewatch; + + /** + * Returns an initialized ZooKeeperMap. The given path must exist at the time of + * creation or a {@link KeeperException} will be thrown. + * + * @param zkClient a zookeeper client + * @param nodePath path to a node whose data will be watched + * @param deserializer a function that converts byte[] data from a zk node to this map's + * value type V + * @param listener is a Listener which fires when values are added, changed, or removed. + * + * @throws InterruptedException if the underlying zookeeper server transaction is interrupted + * @throws KeeperException.NoNodeException if the given nodePath doesn't exist + * @throws KeeperException if the server signals an error + * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper + * cluster + */ + public static <V> ZooKeeperMap<V> create( + ZooKeeperClient zkClient, + String nodePath, + Function<byte[], V> deserializer, + Listener<V> listener) + throws InterruptedException, KeeperException, ZooKeeperConnectionException { + + ZooKeeperMap<V> zkMap = new ZooKeeperMap<V>(zkClient, nodePath, deserializer, listener); + zkMap.init(); + return zkMap; + } + + + /** + * Returns an initialized ZooKeeperMap. The given path must exist at the time of + * creation or a {@link KeeperException} will be thrown. + * + * @param zkClient a zookeeper client + * @param nodePath path to a node whose data will be watched + * @param deserializer a function that converts byte[] data from a zk node to this map's + * value type V + * + * @throws InterruptedException if the underlying zookeeper server transaction is interrupted + * @throws KeeperException.NoNodeException if the given nodePath doesn't exist + * @throws KeeperException if the server signals an error + * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper + * cluster + */ + public static <V> ZooKeeperMap<V> create( + ZooKeeperClient zkClient, + String nodePath, + Function<byte[], V> deserializer) + throws InterruptedException, KeeperException, ZooKeeperConnectionException { + + return ZooKeeperMap.create(zkClient, nodePath, deserializer, ZooKeeperMap.<V>noopListener()); + } + + /** + * Initializes a ZooKeeperMap. The given path must exist at the time of object creation or + * a {@link KeeperException} will be thrown. + * + * Please note that this object will not track any remote zookeeper data until {@link #init()} + * is successfully called. After construction and before that call, this {@link Map} will + * be empty. + * + * @param zkClient a zookeeper client + * @param nodePath top-level node path under which the map data lives + * @param deserializer a function that converts byte[] data from a zk node to this map's + * value type V + * @param mapListener is a Listener which fires when values are added, changed, or removed. + * + * @throws InterruptedException if the underlying zookeeper server transaction is interrupted + * @throws KeeperException.NoNodeException if the given nodePath doesn't exist + * @throws KeeperException if the server signals an error + * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper + * cluster + */ + @VisibleForTesting + ZooKeeperMap( + ZooKeeperClient zkClient, + String nodePath, + Function<byte[], V> deserializer, + Listener<V> mapListener) + throws InterruptedException, KeeperException, ZooKeeperConnectionException { + + super(); + + this.mapListener = Preconditions.checkNotNull(mapListener); + this.zkClient = Preconditions.checkNotNull(zkClient); + this.nodePath = MorePreconditions.checkNotBlank(nodePath); + this.deserializer = Preconditions.checkNotNull(deserializer); + + localMap = new ConcurrentHashMap<String, V>(); + unmodifiableLocalMap = Collections.unmodifiableMap(localMap); + backoffHelper = new BackoffHelper(); + safeToRewatchLock = new Object(); + safeToRewatch = false; + + if (zkClient.get().exists(nodePath, null) == null) { + throw new KeeperException.NoNodeException(); + } + } + + /** + * Initialize zookeeper tracking for this {@link Map}. Once this call returns, this object + * will be tracking data in zookeeper. + * + * @throws InterruptedException if the underlying zookeeper server transaction is interrupted + * @throws KeeperException if the server signals an error + * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper + * cluster + */ + @VisibleForTesting + void init() throws InterruptedException, KeeperException, ZooKeeperConnectionException { + Watcher watcher = zkClient.registerExpirationHandler(new Command() { + @Override public void execute() { + /* + * First rewatch all of our locally cached children. Some of them may not exist anymore, + * which will lead to caught KeeperException.NoNode whereafter we'll remove that child + * from the cached map. + * + * Next, we'll establish our top level child watch and add any new nodes that might exist. + */ + try { + synchronized (safeToRewatchLock) { + if (safeToRewatch) { + rewatchDataNodes(); + tryWatchChildren(); + } + } + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "Interrupted while trying to re-establish watch.", e); + Thread.currentThread().interrupt(); + } + } + }); + + try { + // Synchronize to prevent the race of watchChildren completing and then the session expiring + // before we update safeToRewatch. + synchronized (safeToRewatchLock) { + watchChildren(); + safeToRewatch = true; + } + } catch (InterruptedException e) { + zkClient.unregister(watcher); + throw e; + } catch (KeeperException e) { + zkClient.unregister(watcher); + throw e; + } catch (ZooKeeperConnectionException e) { + zkClient.unregister(watcher); + throw e; + } + } + + @Override + protected Map<String, V> delegate() { + return unmodifiableLocalMap; + } + + private void tryWatchChildren() throws InterruptedException { + backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, InterruptedException>() { + @Override public Boolean get() throws InterruptedException { + try { + watchChildren(); + return true; + } catch (KeeperException e) { + return false; + } catch (ZooKeeperConnectionException e) { + return false; + } + } + }); + } + + private synchronized void watchChildren() + throws InterruptedException, KeeperException, ZooKeeperConnectionException { + + /* + * Add a watch on the parent node itself, and attempt to rewatch if it + * gets deleted + */ + zkClient.get().exists(nodePath, new Watcher() { + @Override public void process(WatchedEvent event) { + if (event.getType() == Watcher.Event.EventType.NodeDeleted) { + // If the parent node no longer exists + localMap.clear(); + try { + tryWatchChildren(); + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "Interrupted while trying to watch children.", e); + Thread.currentThread().interrupt(); + } + } + }}); + + final Watcher childWatcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { + try { + tryWatchChildren(); + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "Interrupted while trying to watch children.", e); + Thread.currentThread().interrupt(); + } + } + } + }; + + List<String> children = zkClient.get().getChildren(nodePath, childWatcher); + updateChildren(Sets.newHashSet(children)); + } + + private void tryAddChild(final String child) throws InterruptedException { + backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, InterruptedException>() { + @Override public Boolean get() throws InterruptedException { + try { + addChild(child); + return true; + } catch (KeeperException e) { + return false; + } catch (ZooKeeperConnectionException e) { + return false; + } + } + }); + } + + // TODO(Adam Samet) - Make this use the ZooKeeperNode class. + private void addChild(final String child) + throws InterruptedException, KeeperException, ZooKeeperConnectionException { + + final Watcher nodeWatcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + if (event.getType() == Watcher.Event.EventType.NodeDataChanged) { + try { + tryAddChild(child); + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "Interrupted while trying to add a child.", e); + Thread.currentThread().interrupt(); + } + } else if (event.getType() == Watcher.Event.EventType.NodeDeleted) { + removeEntry(child); + } + } + }; + + try { + V value = deserializer.apply(zkClient.get().getData(makePath(child), nodeWatcher, null)); + putEntry(child, value); + } catch (KeeperException.NoNodeException e) { + // This node doesn't exist anymore, remove it from the map and we're done. + removeEntry(child); + } + } + + @VisibleForTesting + void removeEntry(String key) { + localMap.remove(key); + mapListener.nodeRemoved(key); + } + + @VisibleForTesting + void putEntry(String key, V value) { + localMap.put(key, value); + mapListener.nodeChanged(key, value); + } + + private void rewatchDataNodes() throws InterruptedException { + for (String child : keySet()) { + tryAddChild(child); + } + } + + private String makePath(final String child) { + return nodePath + "/" + child; + } + + private void updateChildren(Set<String> zkChildren) throws InterruptedException { + Set<String> addedChildren = Sets.difference(zkChildren, keySet()); + Set<String> removedChildren = Sets.difference(keySet(), zkChildren); + for (String child : addedChildren) { + tryAddChild(child); + } + for (String child : removedChildren) { + removeEntry(child); + } + } +} +
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperNode.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperNode.java b/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperNode.java new file mode 100644 index 0000000..29bef5e --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperNode.java @@ -0,0 +1,336 @@ +package com.twitter.common.zookeeper; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; + +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.data.Stat; + +import com.twitter.common.base.Closure; +import com.twitter.common.base.Closures; +import com.twitter.common.base.Command; +import com.twitter.common.base.ExceptionalSupplier; +import com.twitter.common.base.MorePreconditions; +import com.twitter.common.util.BackoffHelper; +import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; + +/** + * An implementation of {@link Supplier} that offers a readonly view of a + * zookeeper data node. This class is thread-safe. + * + * Instances of this class each maintain a zookeeper watch for the remote data node. Instances + * of this class should be created via the {@link #create} factory method. + * + * Please see zookeeper documentation and talk to your cluster administrator for guidance on + * appropriate node size and total number of nodes you should be using. + * + * @param <T> the type of data this node stores + */ +public class ZooKeeperNode<T> implements Supplier<T> { + /** + * Deserializer for the constructor if you want to simply store the zookeeper byte[] data + * as-is. + */ + public static final Function<byte[], byte[]> BYTE_ARRAY_VALUE = Functions.identity(); + + private static final Logger LOG = Logger.getLogger(ZooKeeperNode.class.getName()); + + private final ZooKeeperClient zkClient; + private final String nodePath; + private final NodeDeserializer<T> deserializer; + + private final BackoffHelper backoffHelper; + + // Whether it's safe to re-establish watches if our zookeeper session has expired. + private final Object safeToRewatchLock; + private volatile boolean safeToRewatch; + + private final T NO_DATA = null; + @Nullable private volatile T nodeData; + private final Closure<T> dataUpdateListener; + + /** + * When a call to ZooKeeper.getData is made, the Watcher is added to a Set before the the network + * request is made and if the request fails, the Watcher remains. There's a problem where Watcher + * can accumulate when there are failed requests, so they are set to instance fields and reused. + */ + private final Watcher nodeWatcher; + private final Watcher existenceWatcher; + + /** + * Returns an initialized ZooKeeperNode. The given node must exist at the time of object + * creation or a {@link KeeperException} will be thrown. + * + * @param zkClient a zookeeper client + * @param nodePath path to a node whose data will be watched + * @param deserializer a function that converts byte[] data from a zk node to this supplier's + * type T + * + * @throws InterruptedException if the underlying zookeeper server transaction is interrupted + * @throws KeeperException.NoNodeException if the given nodePath doesn't exist + * @throws KeeperException if the server signals an error + * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper + * cluster + */ + public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath, + Function<byte[], T> deserializer) throws InterruptedException, KeeperException, + ZooKeeperConnectionException { + return create(zkClient, nodePath, deserializer, Closures.<T>noop()); + } + + /** + * Like the above, but optionally takes in a {@link Closure} that will get notified + * whenever the data is updated from the remote node. + * + * @param dataUpdateListener a {@link Closure} to receive data update notifications. + */ + public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath, + Function<byte[], T> deserializer, Closure<T> dataUpdateListener) throws InterruptedException, + KeeperException, ZooKeeperConnectionException { + return create(zkClient, nodePath, new FunctionWrapper<T>(deserializer), dataUpdateListener); + } + + /** + * Returns an initialized ZooKeeperNode. The given node must exist at the time of object + * creation or a {@link KeeperException} will be thrown. + * + * @param zkClient a zookeeper client + * @param nodePath path to a node whose data will be watched + * @param deserializer an implentation of {@link NodeDeserializer} that converts a byte[] from a + * zk node to this supplier's type T. Also supplies a {@link Stat} object which is useful for + * doing versioned updates. + * + * @throws InterruptedException if the underlying zookeeper server transaction is interrupted + * @throws KeeperException.NoNodeException if the given nodePath doesn't exist + * @throws KeeperException if the server signals an error + * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper + * cluster + */ + public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath, + NodeDeserializer<T> deserializer) throws InterruptedException, KeeperException, + ZooKeeperConnectionException { + return create(zkClient, nodePath, deserializer, Closures.<T>noop()); + } + + /** + * Like the above, but optionally takes in a {@link Closure} that will get notified + * whenever the data is updated from the remote node. + * + * @param dataUpdateListener a {@link Closure} to receive data update notifications. + */ + public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath, + NodeDeserializer<T> deserializer, Closure<T> dataUpdateListener) + throws InterruptedException, KeeperException, ZooKeeperConnectionException { + ZooKeeperNode<T> zkNode = + new ZooKeeperNode<T>(zkClient, nodePath, deserializer, dataUpdateListener); + zkNode.init(); + return zkNode; + } + + /** + * Initializes a ZooKeeperNode. The given node must exist at the time of object creation or + * a {@link KeeperException} will be thrown. + * + * Please note that this object will not track any remote zookeeper data until {@link #init()} + * is successfully called. After construction and before that call, this {@link Supplier} will + * return null. + * + * @param zkClient a zookeeper client + * @param nodePath path to a node whose data will be watched + * @param deserializer an implementation of {@link NodeDeserializer} that converts byte[] data + * from a zk node to this supplier's type T + * @param dataUpdateListener a {@link Closure} to receive data update notifications. + */ + @VisibleForTesting + ZooKeeperNode(ZooKeeperClient zkClient, String nodePath, + NodeDeserializer<T> deserializer, Closure<T> dataUpdateListener) { + this.zkClient = Preconditions.checkNotNull(zkClient); + this.nodePath = MorePreconditions.checkNotBlank(nodePath); + this.deserializer = Preconditions.checkNotNull(deserializer); + this.dataUpdateListener = Preconditions.checkNotNull(dataUpdateListener); + + backoffHelper = new BackoffHelper(); + safeToRewatchLock = new Object(); + safeToRewatch = false; + nodeData = NO_DATA; + + nodeWatcher = new Watcher() { + @Override public void process(WatchedEvent event) { + if (event.getState() == KeeperState.SyncConnected) { + try { + tryWatchDataNode(); + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "Interrupted while trying to watch a data node.", e); + Thread.currentThread().interrupt(); + } + } else { + LOG.info("Ignoring watcher event " + event); + } + } + }; + + existenceWatcher = new Watcher() { + @Override public void process(WatchedEvent event) { + if (event.getType() == Watcher.Event.EventType.NodeCreated) { + try { + tryWatchDataNode(); + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "Interrupted while trying to watch a data node.", e); + Thread.currentThread().interrupt(); + } + } + } + }; + } + + /** + * Initialize zookeeper tracking for this {@link Supplier}. Once this call returns, this object + * will be tracking data in zookeeper. + * + * @throws InterruptedException if the underlying zookeeper server transaction is interrupted + * @throws KeeperException if the server signals an error + * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper + * cluster + */ + @VisibleForTesting + void init() throws InterruptedException, KeeperException, + ZooKeeperConnectionException { + Watcher watcher = zkClient.registerExpirationHandler(new Command() { + @Override public void execute() { + try { + synchronized (safeToRewatchLock) { + if (safeToRewatch) { + tryWatchDataNode(); + } + } + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "Interrupted while trying to re-establish watch.", e); + Thread.currentThread().interrupt(); + } + } + }); + + try { + /* + * Synchronize to prevent the race of watchDataNode completing and then the session expiring + * before we update safeToRewatch. + */ + synchronized (safeToRewatchLock) { + watchDataNode(); + safeToRewatch = true; + } + } catch (InterruptedException e) { + zkClient.unregister(watcher); + throw e; + } catch (KeeperException e) { + zkClient.unregister(watcher); + throw e; + } catch (ZooKeeperConnectionException e) { + zkClient.unregister(watcher); + throw e; + } + } + + /** + * Returns the data corresponding to a byte array in a remote zookeeper node. This data is + * cached locally and updated in the background on watch notifications. + * + * @return the data currently cached locally or null if {@link #init()} hasn't been called + * or the backing node has no data or does not exist anymore. + */ + @Override + public @Nullable T get() { + return nodeData; + } + + @VisibleForTesting + void updateData(@Nullable T newData) { + nodeData = newData; + dataUpdateListener.execute(newData); + } + + private void tryWatchDataNode() throws InterruptedException { + backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, InterruptedException>() { + @Override public Boolean get() throws InterruptedException { + try { + watchDataNode(); + return true; + } catch (KeeperException e) { + return false; + } catch (ZooKeeperConnectionException e) { + return false; + } + } + }); + } + + private void watchDataNode() throws InterruptedException, KeeperException, + ZooKeeperConnectionException { + try { + Stat stat = new Stat(); + byte[] rawData = zkClient.get().getData(nodePath, nodeWatcher, stat); + T newData = deserializer.deserialize(rawData, stat); + updateData(newData); + } catch (KeeperException.NoNodeException e) { + /* + * This node doesn't exist right now, reflect that locally and then create a watch to wait + * for its recreation. + */ + updateData(NO_DATA); + watchForExistence(); + } + } + + private void watchForExistence() throws InterruptedException, KeeperException, + ZooKeeperConnectionException { + /* + * If the node was created between the getData call and this call, just try watching it. + * We'll have an extra exists watch on it that goes off on its next deletion, which will + * be a no-op. + * Otherwise, just let the exists watch wait for its creation. + */ + if (zkClient.get().exists(nodePath, existenceWatcher) != null) { + tryWatchDataNode(); + } + } + + /** + * Interface for defining zookeeper node data deserialization. + * + * @param <T> the type of data associated with this node + */ + public interface NodeDeserializer<T> { + /** + * @param data the byte array returned from ZooKeeper when a watch is triggered. + * @param stat a ZooKeeper {@link Stat} object. Populated by + * {@link org.apache.zookeeper.ZooKeeper#getData(String, boolean, Stat)}. + */ + T deserialize(byte[] data, Stat stat); + } + + // wrapper for backwards compatibility with older create() methods with Function parameter + private static final class FunctionWrapper<T> implements NodeDeserializer<T> { + private final Function<byte[], T> func; + private FunctionWrapper(Function<byte[], T> func) { + Preconditions.checkNotNull(func); + this.func = func; + } + + public T deserialize(byte[] rawData, Stat stat) { + return func.apply(rawData); + } + } + +} + http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperUtils.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperUtils.java b/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperUtils.java new file mode 100644 index 0000000..4321966 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperUtils.java @@ -0,0 +1,170 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import java.util.List; +import java.util.logging.Logger; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.common.PathUtils; +import org.apache.zookeeper.data.ACL; + +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; + +/** + * Utilities for dealing with zoo keeper. + */ +public final class ZooKeeperUtils { + + private static final Logger LOG = Logger.getLogger(ZooKeeperUtils.class.getName()); + + /** + * An appropriate default session timeout for Twitter ZooKeeper clusters. + */ + public static final Amount<Integer,Time> DEFAULT_ZK_SESSION_TIMEOUT = Amount.of(4, Time.SECONDS); + + /** + * The magic version number that allows any mutation to always succeed regardless of actual + * version number. + */ + public static final int ANY_VERSION = -1; + + /** + * An ACL that gives all permissions any user authenticated or not. + */ + public static final ImmutableList<ACL> OPEN_ACL_UNSAFE = + ImmutableList.copyOf(Ids.OPEN_ACL_UNSAFE); + + /** + * An ACL that gives all permissions to node creators and read permissions only to everyone else. + */ + public static final ImmutableList<ACL> EVERYONE_READ_CREATOR_ALL = + ImmutableList.<ACL>builder() + .addAll(Ids.CREATOR_ALL_ACL) + .addAll(Ids.READ_ACL_UNSAFE) + .build(); + + /** + * Returns true if the given exception indicates an error that can be resolved by retrying the + * operation without modification. + * + * @param e the exception to check + * @return true if the causing operation is strictly retryable + */ + public static boolean isRetryable(KeeperException e) { + Preconditions.checkNotNull(e); + + switch (e.code()) { + case CONNECTIONLOSS: + case SESSIONEXPIRED: + case SESSIONMOVED: + case OPERATIONTIMEOUT: + return true; + + case RUNTIMEINCONSISTENCY: + case DATAINCONSISTENCY: + case MARSHALLINGERROR: + case BADARGUMENTS: + case NONODE: + case NOAUTH: + case BADVERSION: + case NOCHILDRENFOREPHEMERALS: + case NODEEXISTS: + case NOTEMPTY: + case INVALIDCALLBACK: + case INVALIDACL: + case AUTHFAILED: + case UNIMPLEMENTED: + + // These two should not be encountered - they are used internally by ZK to specify ranges + case SYSTEMERROR: + case APIERROR: + + case OK: // This is actually an invalid ZK exception code + + default: + return false; + } + } + + /** + * Ensures the given {@code path} exists in the ZK cluster accessed by {@code zkClient}. If the + * path already exists, nothing is done; however if any portion of the path is missing, it will be + * created with the given {@code acl} as a persistent zookeeper node. The given {@code path} must + * be a valid zookeeper absolute path. + * + * @param zkClient the client to use to access the ZK cluster + * @param acl the acl to use if creating path nodes + * @param path the path to ensure exists + * @throws ZooKeeperConnectionException if there was a problem accessing the ZK cluster + * @throws InterruptedException if we were interrupted attempting to connect to the ZK cluster + * @throws KeeperException if there was a problem in ZK + */ + public static void ensurePath(ZooKeeperClient zkClient, List<ACL> acl, String path) + throws ZooKeeperConnectionException, InterruptedException, KeeperException { + Preconditions.checkNotNull(zkClient); + Preconditions.checkNotNull(path); + Preconditions.checkArgument(path.startsWith("/")); + + ensurePathInternal(zkClient, acl, path); + } + + private static void ensurePathInternal(ZooKeeperClient zkClient, List<ACL> acl, String path) + throws ZooKeeperConnectionException, InterruptedException, KeeperException { + if (zkClient.get().exists(path, false) == null) { + // The current path does not exist; so back up a level and ensure the parent path exists + // unless we're already a root-level path. + int lastPathIndex = path.lastIndexOf('/'); + if (lastPathIndex > 0) { + ensurePathInternal(zkClient, acl, path.substring(0, lastPathIndex)); + } + + // We've ensured our parent path (if any) exists so we can proceed to create our path. + try { + zkClient.get().create(path, null, acl, CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException e) { + // This ensures we don't die if a race condition was met between checking existence and + // trying to create the node. + LOG.info("Node existed when trying to ensure path " + path + ", somebody beat us to it?"); + } + } + } + + /** + * Validate and return a normalized zookeeper path which doesn't contain consecutive slashes and + * never ends with a slash (except for root path). + * + * @param path the path to be normalized + * @return normalized path string + */ + public static String normalizePath(String path) { + String normalizedPath = path.replaceAll("//+", "/").replaceFirst("(.+)/$", "$1"); + PathUtils.validatePath(normalizedPath); + return normalizedPath; + } + + private ZooKeeperUtils() { + // utility + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/guice/ServerSetModule.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/guice/ServerSetModule.java b/commons/src/main/java/com/twitter/common/zookeeper/guice/ServerSetModule.java new file mode 100644 index 0000000..a10c1fe --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/guice/ServerSetModule.java @@ -0,0 +1,273 @@ +// ================================================================================================= +// Copyright 2012 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper.guice; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.annotation.Nullable; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.Atomics; +import com.google.inject.AbstractModule; +import com.google.inject.BindingAnnotation; +import com.google.inject.Inject; +import com.google.inject.Key; +import com.google.inject.Singleton; +import com.google.inject.TypeLiteral; + +import com.twitter.common.application.ShutdownRegistry; +import com.twitter.common.application.modules.LifecycleModule; +import com.twitter.common.application.modules.LocalServiceRegistry; +import com.twitter.common.args.Arg; +import com.twitter.common.args.CmdLine; +import com.twitter.common.args.constraints.NotNegative; +import com.twitter.common.base.Command; +import com.twitter.common.base.ExceptionalCommand; +import com.twitter.common.base.Supplier; +import com.twitter.common.zookeeper.Group.JoinException; +import com.twitter.common.zookeeper.ServerSet; +import com.twitter.common.zookeeper.ServerSet.EndpointStatus; +import com.twitter.common.zookeeper.ServerSet.UpdateException; +import com.twitter.thrift.Status; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * A module that registers all ports in the {@link LocalServiceRegistry} in an {@link ServerSet}. + * <p/> + * Required bindings: + * <ul> + * <li> {@link ServerSet} + * <li> {@link ShutdownRegistry} + * <li> {@link LocalServiceRegistry} + * </ul> + * <p/> + * {@link LifecycleModule} must also be included by users so a startup action may be registered. + * <p/> + * Provided bindings: + * <ul> + * <li> {@link Supplier}<{@link EndpointStatus}> + * </ul> + */ +public class ServerSetModule extends AbstractModule { + + /** + * BindingAnnotation for defaults to use in the service instance node. + */ + @BindingAnnotation @Target({PARAMETER, METHOD, FIELD}) @Retention(RUNTIME) + private @interface Default {} + + /** + * Binding annotation to give the ServerSetJoiner a fixed known ServerSet that is appropriate to + * {@link ServerSet#join} on. + */ + @BindingAnnotation @Target({METHOD, PARAMETER}) @Retention(RUNTIME) + private @interface Joinable {} + + private static final Key<ServerSet> JOINABLE_SS = Key.get(ServerSet.class, Joinable.class); + + @CmdLine(name = "aux_port_as_primary", + help = "Name of the auxiliary port to use as the primary port in the server set." + + " This may only be used when no other primary port is specified.") + private static final Arg<String> AUX_PORT_AS_PRIMARY = Arg.create(null); + + @NotNegative + @CmdLine(name = "shard_id", help = "Shard ID for this application.") + private static final Arg<Integer> SHARD_ID = Arg.create(); + + private static final Logger LOG = Logger.getLogger(ServerSetModule.class.getName()); + + /** + * Builds a Module tht can be used to join a {@link ServerSet} with the ports configured in a + * {@link LocalServiceRegistry}. + */ + public static class Builder { + private Key<ServerSet> key = Key.get(ServerSet.class); + private Optional<String> auxPortAsPrimary = Optional.absent(); + + /** + * Sets the key of the ServerSet to join. + * + * @param key Key of the ServerSet to join. + * @return This builder for chaining calls. + */ + public Builder key(Key<ServerSet> key) { + this.key = key; + return this; + } + + /** + * Allows joining an auxiliary port with the specified {@code name} as the primary port of the + * ServerSet. + * + * @param auxPortName The name of the auxiliary port to join as the primary ServerSet port. + * @return This builder for chaining calls. + */ + public Builder namedPrimaryPort(String auxPortName) { + this.auxPortAsPrimary = Optional.of(auxPortName); + return this; + } + + /** + * Creates a Module that will register a startup action that joins a ServerSet when installed. + * + * @return A Module. + */ + public ServerSetModule build() { + return new ServerSetModule(key, auxPortAsPrimary); + } + } + + /** + * Creates a builder that can be used to configure and create a ServerSetModule. + * + * @return A ServerSetModule builder. + */ + public static Builder builder() { + return new Builder(); + } + + private final Key<ServerSet> serverSetKey; + private final Optional<String> auxPortAsPrimary; + + /** + * Constructs a ServerSetModule that registers a startup action to register this process in + * ZooKeeper, with the specified initial status and auxiliary port to represent as the primary + * service port. + * + * @param serverSetKey The key the ServerSet to join is bound under. + * @param auxPortAsPrimary Name of the auxiliary port to use as the primary port. + */ + ServerSetModule(Key<ServerSet> serverSetKey, Optional<String> auxPortAsPrimary) { + + this.serverSetKey = checkNotNull(serverSetKey); + this.auxPortAsPrimary = checkNotNull(auxPortAsPrimary); + } + + @Override + protected void configure() { + requireBinding(serverSetKey); + requireBinding(ShutdownRegistry.class); + requireBinding(LocalServiceRegistry.class); + + LifecycleModule.bindStartupAction(binder(), ServerSetJoiner.class); + + bind(new TypeLiteral<Supplier<EndpointStatus>>() { }).to(EndpointSupplier.class); + bind(EndpointSupplier.class).in(Singleton.class); + + Optional<String> primaryPortName; + if (AUX_PORT_AS_PRIMARY.hasAppliedValue()) { + primaryPortName = Optional.of(AUX_PORT_AS_PRIMARY.get()); + } else { + primaryPortName = auxPortAsPrimary; + } + + bind(new TypeLiteral<Optional<String>>() { }).annotatedWith(Default.class) + .toInstance(primaryPortName); + + bind(JOINABLE_SS).to(serverSetKey); + } + + static class EndpointSupplier implements Supplier<EndpointStatus> { + private final AtomicReference<EndpointStatus> reference = Atomics.newReference(); + + @Nullable + @Override public EndpointStatus get() { + return reference.get(); + } + + void set(EndpointStatus endpoint) { + reference.set(endpoint); + } + } + + private static class ServerSetJoiner implements Command { + private final ServerSet serverSet; + private final LocalServiceRegistry serviceRegistry; + private final ShutdownRegistry shutdownRegistry; + private final EndpointSupplier endpointSupplier; + private final Optional<String> auxPortAsPrimary; + + @Inject + ServerSetJoiner( + @Joinable ServerSet serverSet, + LocalServiceRegistry serviceRegistry, + ShutdownRegistry shutdownRegistry, + EndpointSupplier endpointSupplier, + @Default Optional<String> auxPortAsPrimary) { + + this.serverSet = checkNotNull(serverSet); + this.serviceRegistry = checkNotNull(serviceRegistry); + this.shutdownRegistry = checkNotNull(shutdownRegistry); + this.endpointSupplier = checkNotNull(endpointSupplier); + this.auxPortAsPrimary = checkNotNull(auxPortAsPrimary); + } + + @Override public void execute() { + Optional<InetSocketAddress> primarySocket = serviceRegistry.getPrimarySocket(); + Map<String, InetSocketAddress> auxSockets = serviceRegistry.getAuxiliarySockets(); + + InetSocketAddress primary; + if (primarySocket.isPresent()) { + primary = primarySocket.get(); + } else if (auxPortAsPrimary.isPresent()) { + primary = auxSockets.get(auxPortAsPrimary.get()); + if (primary == null) { + throw new IllegalStateException("No auxiliary port named " + auxPortAsPrimary.get()); + } + } else { + throw new IllegalStateException("No primary service registered with LocalServiceRegistry," + + " and -aux_port_as_primary was not specified."); + } + + final EndpointStatus endpointStatus; + try { + if (SHARD_ID.hasAppliedValue()) { + endpointStatus = serverSet.join(primary, auxSockets, SHARD_ID.get()); + } else { + endpointStatus = serverSet.join(primary, auxSockets); + } + + endpointSupplier.set(endpointStatus); + } catch (JoinException e) { + LOG.log(Level.WARNING, "Failed to join ServerSet.", e); + throw new RuntimeException(e); + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "Interrupted while joining ServerSet.", e); + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + shutdownRegistry.addAction(new ExceptionalCommand<UpdateException>() { + @Override public void execute() throws UpdateException { + LOG.info("Leaving ServerSet."); + endpointStatus.leave(); + } + }); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/guice/client/ZooKeeperClientModule.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/guice/client/ZooKeeperClientModule.java b/commons/src/main/java/com/twitter/common/zookeeper/guice/client/ZooKeeperClientModule.java new file mode 100644 index 0000000..359f9ae --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/guice/client/ZooKeeperClientModule.java @@ -0,0 +1,238 @@ +// ================================================================================================= +// Copyright 2013 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper.guice.client; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.logging.Logger; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; + +import com.google.inject.Inject; +import com.google.inject.Key; +import com.google.inject.PrivateModule; +import com.google.inject.Provider; +import com.google.inject.Singleton; + +import com.twitter.common.application.ShutdownRegistry; +import com.twitter.common.inject.Bindings.KeyFactory; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.zookeeper.ZooKeeperClient; +import com.twitter.common.zookeeper.ZooKeeperClient.Credentials; +import com.twitter.common.zookeeper.ZooKeeperUtils; +import com.twitter.common.zookeeper.testing.ZooKeeperTestServer; + +/** + * A guice binding module that configures and binds a {@link ZooKeeperClient} instance. + */ +public class ZooKeeperClientModule extends PrivateModule { + private final KeyFactory keyFactory; + private final ClientConfig config; + + /** + * Creates a new ZK client module from the provided configuration. + * + * @param config Configuration parameters for the client. + */ + public ZooKeeperClientModule(ClientConfig config) { + this(KeyFactory.PLAIN, config); + } + + /** + * Creates a new ZK client module from the provided configuration, using a key factory to + * qualify any bindings. + * + * @param keyFactory Factory to use when creating any exposed bindings. + * @param config Configuration parameters for the client. + */ + public ZooKeeperClientModule(KeyFactory keyFactory, ClientConfig config) { + this.keyFactory = Preconditions.checkNotNull(keyFactory); + this.config = Preconditions.checkNotNull(config); + } + + @Override + protected void configure() { + Key<ZooKeeperClient> clientKey = keyFactory.create(ZooKeeperClient.class); + if (config.inProcess) { + requireBinding(ShutdownRegistry.class); + // Bound privately to give the local provider access to configuration settings. + bind(ClientConfig.class).toInstance(config); + bind(clientKey).toProvider(LocalClientProvider.class).in(Singleton.class); + } else { + ZooKeeperClient client = + new ZooKeeperClient(config.sessionTimeout, config.credentials, config.chrootPath, config.servers); + bind(clientKey).toInstance(client); + } + expose(clientKey); + } + + private static class LocalClientProvider implements Provider<ZooKeeperClient> { + private static final Logger LOG = Logger.getLogger(LocalClientProvider.class.getName()); + + private final ClientConfig config; + private final ShutdownRegistry shutdownRegistry; + + @Inject + LocalClientProvider(ClientConfig config, ShutdownRegistry shutdownRegistry) { + this.config = Preconditions.checkNotNull(config); + this.shutdownRegistry = Preconditions.checkNotNull(shutdownRegistry); + } + + @Override + public ZooKeeperClient get() { + ZooKeeperTestServer zooKeeperServer; + try { + zooKeeperServer = new ZooKeeperTestServer(0, shutdownRegistry); + zooKeeperServer.startNetwork(); + } catch (IOException e) { + throw Throwables.propagate(e); + } catch (InterruptedException e) { + throw Throwables.propagate(e); + } + + LOG.info("Embedded zookeeper cluster started on port " + zooKeeperServer.getPort()); + return zooKeeperServer.createClient(config.sessionTimeout, config.credentials); + } + } + + /** + * Composite type that contains configuration parameters used when creating a client. + * <p> + * Instances of this class are immutable, but builder-style chained calls are supported. + */ + public static class ClientConfig { + public final Iterable<InetSocketAddress> servers; + public final boolean inProcess; + public final Amount<Integer, Time> sessionTimeout; + public final Optional<String> chrootPath; + public final Credentials credentials; + + /** + * Creates a new client configuration. + * + * @param servers ZooKeeper server addresses. + * @param inProcess Whether to run and create clients for an in-process ZooKeeper server. + * @param sessionTimeout Timeout duration for established sessions. + * @param credentials ZooKeeper authentication credentials. + */ + public ClientConfig( + Iterable<InetSocketAddress> servers, + boolean inProcess, + Amount<Integer, Time> sessionTimeout, + Credentials credentials) { + + this(servers, Optional.<String>absent(), inProcess, sessionTimeout, credentials); + } + + /** + * Creates a new client configuration. + * + * @param servers ZooKeeper server addresses. + * @param inProcess Whether to run and create clients for an in-process ZooKeeper server. + * @param chrootPath an optional chroot path + * @param sessionTimeout Timeout duration for established sessions. + * @param credentials ZooKeeper authentication credentials. + */ + public ClientConfig( + Iterable<InetSocketAddress> servers, + Optional<String> chrootPath, + boolean inProcess, + Amount<Integer, Time> sessionTimeout, + Credentials credentials) { + + this.servers = servers; + this.chrootPath = chrootPath; + this.inProcess = inProcess; + this.sessionTimeout = sessionTimeout; + this.credentials = credentials; + } + + /** + * Creates a new client configuration with defaults for the session timeout and credentials. + * + * @param servers ZooKeeper server addresses. + * @return A new configuration. + */ + public static ClientConfig create(Iterable<InetSocketAddress> servers) { + return new ClientConfig( + servers, + Optional.<String> absent(), + false, + ZooKeeperUtils.DEFAULT_ZK_SESSION_TIMEOUT, + Credentials.NONE); + } + + /** + * Creates a new configuration identical to this configuration, but with the provided + * session timeout. + * + * @param sessionTimeout Timeout duration for established sessions. + * @return A modified clone of this configuration. + */ + public ClientConfig withSessionTimeout(Amount<Integer, Time> sessionTimeout) { + return new ClientConfig(servers, chrootPath, inProcess, sessionTimeout, credentials); + } + + /** + * Creates a new configuration identical to this configuration, but with the provided + * credentials. + * + * @param credentials ZooKeeper authentication credentials. + * @return A modified clone of this configuration. + */ + public ClientConfig withCredentials(Credentials credentials) { + return new ClientConfig(servers, chrootPath, inProcess, sessionTimeout, credentials); + } + + /** + * Convenience method for calling {@link #withCredentials(Credentials)} with digest credentials. + * + * @param username Digest authentication user. + * @param password Digest authentication raw password. + * @return A modified clone of this configuration. + */ + public ClientConfig withDigestCredentials(String username, String password) { + return withCredentials(ZooKeeperClient.digestCredentials(username, password)); + } + + /** + * Creates a new configuration identical to this configuration, but with the provided + * in-process setting. + * + * @param inProcess If {@code true}, an in-process ZooKeeper server server will be used, + * and all clients will connect to it. + * @return A modified clone of this configuration. + */ + public ClientConfig inProcess(boolean inProcess) { + return new ClientConfig(servers, chrootPath, inProcess, sessionTimeout, credentials); + } + + /** + * Creates a new configuration identical to this configuration, but with the provided + * chroot path setting. + * + * @param chrootPath a valid ZooKeeper path used as a chroot for ZooKeeper connections. + * @return A modified clone of this configuration. + */ + public ClientConfig withChrootPath(String chrootPath) { + return new ClientConfig(servers, Optional.of(chrootPath), inProcess, sessionTimeout, credentials); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/guice/client/flagged/FlaggedClientConfig.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/guice/client/flagged/FlaggedClientConfig.java b/commons/src/main/java/com/twitter/common/zookeeper/guice/client/flagged/FlaggedClientConfig.java new file mode 100644 index 0000000..5e3be45 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/guice/client/flagged/FlaggedClientConfig.java @@ -0,0 +1,85 @@ +// ================================================================================================= +// Copyright 2013 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper.guice.client.flagged; + +import java.net.InetSocketAddress; +import java.util.List; + +import com.google.common.base.Optional; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; + +import com.twitter.common.args.Arg; +import com.twitter.common.args.CmdLine; +import com.twitter.common.args.constraints.NotEmpty; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.zookeeper.ZooKeeperClient; +import com.twitter.common.zookeeper.ZooKeeperClient.Credentials; +import com.twitter.common.zookeeper.ZooKeeperUtils; +import com.twitter.common.zookeeper.guice.client.ZooKeeperClientModule.ClientConfig; + +/** + * A factory that creates a {@link ClientConfig} instance based on command line argument values. + */ +public class FlaggedClientConfig { + @CmdLine(name = "zk_in_proc", + help = "Launches an embedded zookeeper server for local testing causing -zk_endpoints " + + "to be ignored if specified.") + private static final Arg<Boolean> IN_PROCESS = Arg.create(false); + + @NotEmpty + @CmdLine(name = "zk_endpoints", help ="Endpoint specification for the ZooKeeper servers.") + private static final Arg<List<InetSocketAddress>> ZK_ENDPOINTS = Arg.create(); + + @CmdLine(name = "zk_chroot_path", help = "chroot path to use for the ZooKeeper connections") + private static final Arg<String> CHROOT_PATH = Arg.create(null); + + @CmdLine(name = "zk_session_timeout", help ="The ZooKeeper session timeout.") + private static final Arg<Amount<Integer, Time>> SESSION_TIMEOUT = + Arg.create(ZooKeeperUtils.DEFAULT_ZK_SESSION_TIMEOUT); + + @CmdLine(name = "zk_digest_credentials", + help ="user:password to use when authenticating with ZooKeeper.") + private static final Arg<String> DIGEST_CREDENTIALS = Arg.create(); + + /** + * Creates a configuration from command line arguments. + * + * @return Configuration instance. + */ + public static ClientConfig create() { + return new ClientConfig( + ZK_ENDPOINTS.get(), + Optional.fromNullable(CHROOT_PATH.get()), + IN_PROCESS.get(), + SESSION_TIMEOUT.get(), + DIGEST_CREDENTIALS.hasAppliedValue() + ? getCredentials(DIGEST_CREDENTIALS.get()) + : Credentials.NONE + ); + } + + private static Credentials getCredentials(String userAndPass) { + List<String> parts = ImmutableList.copyOf(Splitter.on(":").split(userAndPass)); + if (parts.size() != 2) { + throw new IllegalArgumentException( + "zk_digest_credentials must be formatted as user:pass"); + } + return ZooKeeperClient.digestCredentials(parts.get(0), parts.get(1)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/testing/BaseZooKeeperTest.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/testing/BaseZooKeeperTest.java b/commons/src/main/java/com/twitter/common/zookeeper/testing/BaseZooKeeperTest.java new file mode 100644 index 0000000..8d3724a --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/testing/BaseZooKeeperTest.java @@ -0,0 +1,157 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper.testing; + +import java.io.IOException; + +import com.google.common.base.Preconditions; +import com.google.common.testing.TearDown; +import com.google.common.testing.junit4.TearDownTestCase; + +import org.junit.Before; + +import com.twitter.common.application.ShutdownRegistry.ShutdownRegistryImpl; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.zookeeper.ZooKeeperClient; +import com.twitter.common.zookeeper.ZooKeeperClient.Credentials; +import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; + +/** + * A baseclass for in-process zookeeper tests. + * Uses ZooKeeperTestHelper to start the server and create clients: new tests should directly use + * that helper class instead of extending this class. + */ +public abstract class BaseZooKeeperTest extends TearDownTestCase { + + private final Amount<Integer, Time> defaultSessionTimeout; + private ZooKeeperTestServer zkTestServer; + + /** + * Creates a test case where the test server uses its + * {@link ZooKeeperTestServer#DEFAULT_SESSION_TIMEOUT} for clients created without an explicit + * session timeout. + */ + public BaseZooKeeperTest() { + this(ZooKeeperTestServer.DEFAULT_SESSION_TIMEOUT); + } + + /** + * Creates a test case where the test server uses the given {@code defaultSessionTimeout} for + * clients created without an explicit session timeout. + */ + public BaseZooKeeperTest(Amount<Integer, Time> defaultSessionTimeout) { + this.defaultSessionTimeout = Preconditions.checkNotNull(defaultSessionTimeout); + } + + @Before + public final void setUp() throws Exception { + final ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl(); + addTearDown(new TearDown() { + @Override public void tearDown() { + shutdownRegistry.execute(); + } + }); + zkTestServer = new ZooKeeperTestServer(0, shutdownRegistry, defaultSessionTimeout); + zkTestServer.startNetwork(); + } + + /** + * Starts zookeeper back up on the last used port. + */ + protected final void restartNetwork() throws IOException, InterruptedException { + zkTestServer.restartNetwork(); + } + + /** + * Shuts down the in-process zookeeper network server. + */ + protected final void shutdownNetwork() { + zkTestServer.shutdownNetwork(); + } + + /** + * Expires the active session for the given client. The client should be one returned from + * {@link #createZkClient}. + * + * @param zkClient the client to expire + * @throws ZooKeeperClient.ZooKeeperConnectionException if a problem is encountered connecting to + * the local zk server while trying to expire the session + * @throws InterruptedException if interrupted while requesting expiration + */ + protected final void expireSession(ZooKeeperClient zkClient) + throws ZooKeeperConnectionException, InterruptedException { + zkTestServer.expireClientSession(zkClient); + } + + /** + * Returns the current port to connect to the in-process zookeeper instance. + */ + protected final int getPort() { + return zkTestServer.getPort(); + } + + /** + * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server + * with the default session timeout. + */ + protected final ZooKeeperClient createZkClient() { + return zkTestServer.createClient(); + } + + /** + * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with + * the default session timeout. + */ + protected final ZooKeeperClient createZkClient(Credentials credentials) { + return zkTestServer.createClient(credentials); + } + + /** + * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with + * the default session timeout. The client is authenticated in the digest authentication scheme + * with the given {@code username} and {@code password}. + */ + protected final ZooKeeperClient createZkClient(String username, String password) { + return createZkClient(ZooKeeperClient.digestCredentials(username, password)); + } + + /** + * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server + * with a custom {@code sessionTimeout}. + */ + protected final ZooKeeperClient createZkClient(Amount<Integer, Time> sessionTimeout) { + return zkTestServer.createClient(sessionTimeout); + } + + /** + * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with + * a custom {@code sessionTimeout}. + */ + protected final ZooKeeperClient createZkClient(Amount<Integer, Time> sessionTimeout, + Credentials credentials) { + return zkTestServer.createClient(sessionTimeout, credentials); + } + + /** + * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with + * the default session timeout and the custom chroot path. + */ + protected final ZooKeeperClient createZkClient(String chrootPath) { + return zkTestServer.createClient(chrootPath); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/testing/ZooKeeperTestServer.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/testing/ZooKeeperTestServer.java b/commons/src/main/java/com/twitter/common/zookeeper/testing/ZooKeeperTestServer.java new file mode 100644 index 0000000..ae73206 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/testing/ZooKeeperTestServer.java @@ -0,0 +1,225 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper.testing; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +import org.apache.zookeeper.server.NIOServerCnxn; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.ZooKeeperServer.BasicDataTreeBuilder; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; + +import com.twitter.common.application.ShutdownRegistry; +import com.twitter.common.base.Command; +import com.twitter.common.base.ExceptionalCommand; +import com.twitter.common.io.FileUtils; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.zookeeper.ZooKeeperClient; +import com.twitter.common.zookeeper.ZooKeeperClient.Credentials; + +/** + * A helper class for starting in-process ZooKeeper server and clients. + * + * <p>This is ONLY meant to be used for testing. + */ +public class ZooKeeperTestServer { + + /** + * The default session timeout for clients created by servers constructed with + * {@link #ZooKeeperTestServer(int, ShutdownRegistry)}. + */ + public static final Amount<Integer, Time> DEFAULT_SESSION_TIMEOUT = + Amount.of(100, Time.MILLISECONDS); + + protected final ZooKeeperServer zooKeeperServer; + private final ShutdownRegistry shutdownRegistry; + private NIOServerCnxn.Factory connectionFactory; + private int port; + private final Amount<Integer, Time> defaultSessionTimeout; + + /** + * @param port the port to start the zoo keeper server on - {@code 0} picks an ephemeral port + * @param shutdownRegistry a registry that will be used to register client and server shutdown + * commands. It is up to the caller to execute the registered actions at an appropriate time. + * @throws IOException if there was aproblem creating the server's database + */ + public ZooKeeperTestServer(int port, ShutdownRegistry shutdownRegistry) throws IOException { + this(port, shutdownRegistry, DEFAULT_SESSION_TIMEOUT); + } + + /** + * @param port the port to start the zoo keeper server on - {@code 0} picks an ephemeral port + * @param shutdownRegistry a registry that will be used to register client and server shutdown + * commands. It is up to the caller to execute the registered actions at an appropriate time. + * @param defaultSessionTimeout the default session timeout for clients created with + * {@link #createClient()}. + * @throws IOException if there was aproblem creating the server's database + */ + public ZooKeeperTestServer(int port, ShutdownRegistry shutdownRegistry, + Amount<Integer, Time> defaultSessionTimeout) throws IOException { + Preconditions.checkArgument(0 <= port && port <= 0xFFFF); + this.port = port; + this.shutdownRegistry = Preconditions.checkNotNull(shutdownRegistry); + this.defaultSessionTimeout = Preconditions.checkNotNull(defaultSessionTimeout); + + zooKeeperServer = + new ZooKeeperServer( + new FileTxnSnapLog(createTempDir(), createTempDir()), + new BasicDataTreeBuilder()) { + + // TODO(John Sirois): Introduce a builder to configure the in-process server if and when + // some folks need JMX for in-process tests. + @Override protected void registerJMX() { + // noop + } + }; + } + + /** + * Starts zookeeper up on the configured port. If the configured port is the ephemeral port + * (@{code 0}), then the actual chosen port is returned. + */ + public final int startNetwork() throws IOException, InterruptedException { + connectionFactory = new NIOServerCnxn.Factory(new InetSocketAddress(port)); + connectionFactory.startup(zooKeeperServer); + shutdownRegistry.addAction(new Command() { + @Override public void execute() { + shutdownNetwork(); + } + }); + port = zooKeeperServer.getClientPort(); + return port; + } + + /** + * Starts zookeeper back up on the last used port. + */ + public final void restartNetwork() throws IOException, InterruptedException { + checkEphemeralPortAssigned(); + Preconditions.checkState(!connectionFactory.isAlive()); + startNetwork(); + } + + /** + * Shuts down the in-process zookeeper network server. + */ + public final void shutdownNetwork() { + if (connectionFactory != null && connectionFactory.isAlive()) { + connectionFactory.shutdown(); + } + } + + /** + * Expires the active session for the given client. The client should be one returned from + * {@link #createClient}. + * + * @param zkClient the client to expire + * @throws ZooKeeperClient.ZooKeeperConnectionException if a problem is encountered connecting to + * the local zk server while trying to expire the session + * @throws InterruptedException if interrupted while requesting expiration + */ + public final void expireClientSession(ZooKeeperClient zkClient) + throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException { + zooKeeperServer.closeSession(zkClient.get().getSessionId()); + } + + /** + * Returns the current port to connect to the in-process zookeeper instance. + */ + public final int getPort() { + checkEphemeralPortAssigned(); + return port; + } + + /** + * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server + * with the default session timeout. + */ + public final ZooKeeperClient createClient() { + return createClient(defaultSessionTimeout); + } + + /** + * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server + * with the default session timeout and a custom {@code chrootPath}. + */ + public final ZooKeeperClient createClient(String chrootPath) { + return createClient(defaultSessionTimeout, Credentials.NONE, Optional.of(chrootPath)); + } + + /** + * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with + * the default session timeout. + */ + public final ZooKeeperClient createClient(Credentials credentials) { + return createClient(defaultSessionTimeout, credentials, Optional.<String>absent()); + } + + /** + * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server + * with a custom {@code sessionTimeout}. + */ + public final ZooKeeperClient createClient(Amount<Integer, Time> sessionTimeout) { + return createClient(sessionTimeout, Credentials.NONE, Optional.<String>absent()); + } + + /** + * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with + * a custom {@code sessionTimeout}. + */ + public final ZooKeeperClient createClient(Amount<Integer, Time> sessionTimeout, + Credentials credentials) { + return createClient(sessionTimeout, credentials, Optional.<String>absent()); + } + + /** + * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with + * a custom {@code sessionTimeout} and a custom {@code chrootPath}. + */ + public final ZooKeeperClient createClient(Amount<Integer, Time> sessionTimeout, + Credentials credentials, Optional<String> chrootPath) { + final ZooKeeperClient client = new ZooKeeperClient(sessionTimeout, credentials, + chrootPath, Arrays.asList(InetSocketAddress.createUnresolved("127.0.0.1", port))); + shutdownRegistry.addAction(new ExceptionalCommand<InterruptedException>() { + @Override public void execute() { + client.close(); + } + }); + return client; + } + + private void checkEphemeralPortAssigned() { + Preconditions.checkState(port > 0, "startNetwork must be called first"); + } + + private File createTempDir() { + final File tempDir = FileUtils.createTempDir(); + shutdownRegistry.addAction(new ExceptionalCommand<IOException>() { + @Override public void execute() throws IOException { + org.apache.commons.io.FileUtils.deleteDirectory(tempDir); + } + }); + return tempDir; + } +}
