http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java b/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java new file mode 100644 index 0000000..646fab9 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java @@ -0,0 +1,175 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.collect.Ordering; +import com.twitter.common.zookeeper.Group.GroupChangeListener; +import com.twitter.common.zookeeper.Group.JoinException; +import com.twitter.common.zookeeper.Group.Membership; +import com.twitter.common.zookeeper.Group.UpdateException; +import com.twitter.common.zookeeper.Group.WatchException; +import org.apache.zookeeper.data.ACL; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.logging.Logger; + +/** + * A distributed mechanism for eventually arriving at an evenly partitioned space of long values. + * A typical usage would have a client on each of several hosts joining a logical partition (a + * "partition group") that represents some shared work. Clients could then process a subset of a + * full body of work by testing any given item of work with their partition filter. + * + * <p>Note that clients must be able to tolerate periods of duplicate processing by more than 1 + * partition as explained in {@link #join()}. + * + * @author John Sirois + */ +public class Partitioner { + + private static final Logger LOG = Logger.getLogger(Partitioner.class.getName()); + + private volatile int groupSize; + private volatile int groupIndex; + private final Group group; + + /** + * Constructs a representation of a partition group but does not join it. Note that the partition + * group path will be created as a persistent zookeeper path if it does not already exist. + * + * @param zkClient a client to use for joining the partition group and watching its membership + * @param acl the acl for this partition group + * @param path a zookeeper path that represents the partition group + */ + public Partitioner(ZooKeeperClient zkClient, List<ACL> acl, String path) { + group = new Group(zkClient, acl, path); + } + + @VisibleForTesting + int getGroupSize() { + return groupSize; + } + + /** + * Represents a slice of a partition group. The partition is dynamic and will adjust its size as + * members join and leave its partition group. + */ + public abstract static class Partition implements Predicate<Long>, Membership { + + /** + * Returns {@code true} if the given {@code value} is a member of this partition at this time. + */ + public abstract boolean isMember(long value); + + /** + * Gets number of members in the group at this time. + * + * @return number of members in the ZK group at this time. + */ + public abstract int getNumPartitions(); + + /** + * Evaluates partition membership based on the given {@code value}'s hash code. If the value + * is null it is never a member of a partition. + */ + boolean isMember(Object value) { + return (value != null) && isMember(value.hashCode()); + } + + /** + * Equivalent to {@link #isMember(long)} for all non-null values; however incurs unboxing + * overhead. + */ + @Override + public boolean apply(@Nullable Long input) { + return (input != null) && isMember(input); + } + } + + /** + * Attempts to join the partition group and claim a slice. When successful, a predicate is + * returned that can be used to test whether or not an item belongs to this partition. The + * predicate is dynamic such that as the group is further partitioned or partitions merge the + * predicate will claim a narrower or wider swath of the partition space respectively. Partition + * creation and merging is not instantaneous and clients should expect independent partitions to + * claim ownership of some items when partition membership is in flux. It is only in the steady + * state that a client should expect independent partitions to divide the partition space evenly + * and without overlap. + * + * <p>TODO(John Sirois): consider adding a version with a global timeout for the join operation. + * + * @return the partition representing the slice of the partition group this member can claim + * @throws JoinException if there was a problem joining the partition group + * @throws InterruptedException if interrupted while waiting to join the partition group + */ + public final Partition join() throws JoinException, InterruptedException { + final Membership membership = group.join(); + try { + group.watch(createGroupChangeListener(membership)); + } catch (WatchException e) { + membership.cancel(); + throw new JoinException("Problem establishing watch on group after joining it", e); + } + return new Partition() { + @Override public boolean isMember(long value) { + return (value % groupSize) == groupIndex; + } + + @Override public int getNumPartitions() { + return groupSize; + } + + @Override public String getGroupPath() { + return membership.getGroupPath(); + } + + @Override public String getMemberId() { + return membership.getMemberId(); + } + + @Override public String getMemberPath() { + return membership.getMemberPath(); + } + + @Override public byte[] updateMemberData() throws UpdateException { + return membership.updateMemberData(); + } + + @Override public void cancel() throws JoinException { + membership.cancel(); + } + }; + } + + @VisibleForTesting GroupChangeListener createGroupChangeListener(final Membership membership) { + return new GroupChangeListener() { + @Override public void onGroupChange(Iterable<String> memberIds) { + List<String> members = Ordering.natural().sortedCopy(memberIds); + int newSize = members.size(); + int newIndex = members.indexOf(membership.getMemberId()); + + LOG.info(String.format("Rebuilding group %s:%s [%d:%d]->[%d:%d]", + membership.getGroupPath(), members, groupSize, groupIndex, newSize, newIndex)); + + groupSize = newSize; + groupIndex = newIndex; + } + }; + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java b/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java new file mode 100644 index 0000000..b6a0686 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java @@ -0,0 +1,117 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import com.twitter.common.net.pool.DynamicHostSet; +import com.twitter.common.zookeeper.Group.JoinException; +import com.twitter.thrift.ServiceInstance; +import com.twitter.thrift.Status; + +import java.net.InetSocketAddress; +import java.util.Map; + +/** + * A logical set of servers registered in ZooKeeper. Intended to be used by both servers in a + * common service and their clients. + * + * TODO(William Farner): Explore decoupling this from thrift. + */ +public interface ServerSet extends DynamicHostSet<ServiceInstance> { + + /** + * Attempts to join a server set for this logical service group. + * + * @param endpoint the primary service endpoint + * @param additionalEndpoints and additional endpoints keyed by their logical name + * @param status the current service status + * @return an EndpointStatus object that allows the endpoint to adjust its status + * @throws JoinException if there was a problem joining the server set + * @throws InterruptedException if interrupted while waiting to join the server set + * @deprecated The status field is deprecated. Please use {@link #join(InetSocketAddress, Map)} + */ + @Deprecated + public EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + Status status) throws JoinException, InterruptedException; + + /** + * Attempts to join a server set for this logical service group. + * + * @param endpoint the primary service endpoint + * @param additionalEndpoints and additional endpoints keyed by their logical name + * @return an EndpointStatus object that allows the endpoint to adjust its status + * @throws JoinException if there was a problem joining the server set + * @throws InterruptedException if interrupted while waiting to join the server set + */ + public EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints) + throws JoinException, InterruptedException; + + /** + * Attempts to join a server set for this logical service group. + * + * @param endpoint the primary service endpoint + * @param additionalEndpoints and additional endpoints keyed by their logical name + * @param shardId Unique shard identifier for this member of the service. + * @return an EndpointStatus object that allows the endpoint to adjust its status + * @throws JoinException if there was a problem joining the server set + * @throws InterruptedException if interrupted while waiting to join the server set + */ + EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + int shardId) throws JoinException, InterruptedException; + + /** + * A handle to a service endpoint's status data that allows updating it to track current events. + */ + public interface EndpointStatus { + + /** + * Attempts to update the status of the service endpoint associated with this endpoint. If the + * {@code status} is {@link Status#DEAD} then the endpoint will be removed from the server set. + * + * @param status the current status of the endpoint + * @throws UpdateException if there was a problem writing the update + * @deprecated Support for mutable status is deprecated. Please use {@link #leave()} + */ + @Deprecated + void update(Status status) throws UpdateException; + + /** + * Removes the endpoint from the server set. + * + * @throws UpdateException if there was a problem leaving the ServerSet. + */ + void leave() throws UpdateException; + } + + /** + * Indicates an error updating a service's status information. + */ + public static class UpdateException extends Exception { + public UpdateException(String message, Throwable cause) { + super(message, cause); + } + + public UpdateException(String message) { + super(message); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java new file mode 100644 index 0000000..ec6b3f7 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java @@ -0,0 +1,609 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.base.Throwables; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.collect.Sets.SetView; +import com.google.common.util.concurrent.UncheckedExecutionException; +import com.google.gson.Gson; + +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; + +import com.twitter.common.args.Arg; +import com.twitter.common.args.CmdLine; +import com.twitter.common.base.Command; +import com.twitter.common.base.Function; +import com.twitter.common.base.Supplier; +import com.twitter.common.io.Codec; +import com.twitter.common.io.CompatibilityCodec; +import com.twitter.common.io.ThriftCodec; +import com.twitter.common.util.BackoffHelper; +import com.twitter.common.zookeeper.Group.GroupChangeListener; +import com.twitter.common.zookeeper.Group.JoinException; +import com.twitter.common.zookeeper.Group.Membership; +import com.twitter.common.zookeeper.Group.WatchException; +import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; +import com.twitter.thrift.Endpoint; +import com.twitter.thrift.ServiceInstance; +import com.twitter.thrift.Status; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * ZooKeeper-backed implementation of {@link ServerSet}. + */ +public class ServerSetImpl implements ServerSet { + private static final Logger LOG = Logger.getLogger(ServerSetImpl.class.getName()); + + @CmdLine(name = "serverset_encode_json", + help = "If true, use JSON for encoding server set information." + + " Defaults to true (use JSON).") + private static final Arg<Boolean> ENCODE_JSON = Arg.create(true); + + private final ZooKeeperClient zkClient; + private final Group group; + private final Codec<ServiceInstance> codec; + private final BackoffHelper backoffHelper; + + /** + * Creates a new ServerSet using open ZooKeeper node ACLs. + * + * @param zkClient the client to use for interactions with ZooKeeper + * @param path the name-service path of the service to connect to + */ + public ServerSetImpl(ZooKeeperClient zkClient, String path) { + this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path); + } + + /** + * Creates a new ServerSet for the given service {@code path}. + * + * @param zkClient the client to use for interactions with ZooKeeper + * @param acl the ACL to use for creating the persistent group path if it does not already exist + * @param path the name-service path of the service to connect to + */ + public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) { + this(zkClient, new Group(zkClient, acl, path), createDefaultCodec()); + } + + /** + * Creates a new ServerSet using the given service {@code group}. + * + * @param zkClient the client to use for interactions with ZooKeeper + * @param group the server group + */ + public ServerSetImpl(ZooKeeperClient zkClient, Group group) { + this(zkClient, group, createDefaultCodec()); + } + + /** + * Creates a new ServerSet using the given service {@code group} and a custom {@code codec}. + * + * @param zkClient the client to use for interactions with ZooKeeper + * @param group the server group + * @param codec a codec to use for serializing and de-serializing the ServiceInstance data to and + * from a byte array + */ + public ServerSetImpl(ZooKeeperClient zkClient, Group group, Codec<ServiceInstance> codec) { + this.zkClient = checkNotNull(zkClient); + this.group = checkNotNull(group); + this.codec = checkNotNull(codec); + + // TODO(John Sirois): Inject the helper so that backoff strategy can be configurable. + backoffHelper = new BackoffHelper(); + } + + @VisibleForTesting + ZooKeeperClient getZkClient() { + return zkClient; + } + + @Override + public EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints) + throws JoinException, InterruptedException { + + LOG.log(Level.WARNING, + "Joining a ServerSet without a shard ID is deprecated and will soon break."); + return join(endpoint, additionalEndpoints, Optional.<Integer>absent()); + } + + @Override + public EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + int shardId) throws JoinException, InterruptedException { + + return join(endpoint, additionalEndpoints, Optional.of(shardId)); + } + + private EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + Optional<Integer> shardId) throws JoinException, InterruptedException { + + checkNotNull(endpoint); + checkNotNull(additionalEndpoints); + + final MemberStatus memberStatus = + new MemberStatus(endpoint, additionalEndpoints, shardId); + Supplier<byte[]> serviceInstanceSupplier = new Supplier<byte[]>() { + @Override public byte[] get() { + return memberStatus.serializeServiceInstance(); + } + }; + final Membership membership = group.join(serviceInstanceSupplier); + + return new EndpointStatus() { + @Override public void update(Status status) throws UpdateException { + checkNotNull(status); + LOG.warning("This method is deprecated. Please use leave() instead."); + if (status == Status.DEAD) { + leave(); + } else { + LOG.warning("Status update has been ignored"); + } + } + + @Override public void leave() throws UpdateException { + memberStatus.leave(membership); + } + }; + } + + @Override + public EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + Status status) throws JoinException, InterruptedException { + + LOG.warning("This method is deprecated. Please do not specify a status field."); + if (status != Status.ALIVE) { + LOG.severe("**************************************************************************\n" + + "WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.\n" + + "JOINING WITH STATUS ALIVE EVEN THOUGH YOU SPECIFIED " + status + + "\n**************************************************************************"); + } + return join(endpoint, additionalEndpoints); + } + + @Override + public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { + ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, monitor); + try { + return serverSetWatcher.watch(); + } catch (WatchException e) { + throw new MonitorException("ZooKeeper watch failed.", e); + } catch (InterruptedException e) { + throw new MonitorException("Interrupted while watching ZooKeeper.", e); + } + } + + @Override + public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { + LOG.warning("This method is deprecated. Please use watch instead."); + watch(monitor); + } + + private class MemberStatus { + private final InetSocketAddress endpoint; + private final Map<String, InetSocketAddress> additionalEndpoints; + private final Optional<Integer> shardId; + + private MemberStatus( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + Optional<Integer> shardId) { + + this.endpoint = endpoint; + this.additionalEndpoints = additionalEndpoints; + this.shardId = shardId; + } + + synchronized void leave(Membership membership) throws UpdateException { + try { + membership.cancel(); + } catch (JoinException e) { + throw new UpdateException( + "Failed to auto-cancel group membership on transition to DEAD status", e); + } + } + + byte[] serializeServiceInstance() { + ServiceInstance serviceInstance = new ServiceInstance( + ServerSets.toEndpoint(endpoint), + Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT), + Status.ALIVE); + + if (shardId.isPresent()) { + serviceInstance.setShard(shardId.get()); + } + + LOG.fine("updating endpoint data to:\n\t" + serviceInstance); + try { + return ServerSets.serializeServiceInstance(serviceInstance, codec); + } catch (IOException e) { + throw new IllegalStateException("Unexpected problem serializing thrift struct " + + serviceInstance + "to a byte[]", e); + } + } + } + + private static class ServiceInstanceFetchException extends RuntimeException { + ServiceInstanceFetchException(String message, Throwable cause) { + super(message, cause); + } + } + + private static class ServiceInstanceDeletedException extends RuntimeException { + ServiceInstanceDeletedException(String path) { + super(path); + } + } + + private class ServerSetWatcher { + private final ZooKeeperClient zkClient; + private final HostChangeMonitor<ServiceInstance> monitor; + @Nullable private ImmutableSet<ServiceInstance> serverSet; + + ServerSetWatcher(ZooKeeperClient zkClient, HostChangeMonitor<ServiceInstance> monitor) { + this.zkClient = zkClient; + this.monitor = monitor; + } + + public Command watch() throws WatchException, InterruptedException { + Watcher onExpirationWatcher = zkClient.registerExpirationHandler(new Command() { + @Override public void execute() { + // Servers may have changed Status while we were disconnected from ZooKeeper, check and + // re-register our node watches. + rebuildServerSet(); + } + }); + + try { + return group.watch(new GroupChangeListener() { + @Override public void onGroupChange(Iterable<String> memberIds) { + notifyGroupChange(memberIds); + } + }); + } catch (WatchException e) { + zkClient.unregister(onExpirationWatcher); + throw e; + } catch (InterruptedException e) { + zkClient.unregister(onExpirationWatcher); + throw e; + } + } + + private ServiceInstance getServiceInstance(final String nodePath) { + try { + return backoffHelper.doUntilResult(new Supplier<ServiceInstance>() { + @Override public ServiceInstance get() { + try { + byte[] data = zkClient.get().getData(nodePath, false, null); + return ServerSets.deserializeServiceInstance(data, codec); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ServiceInstanceFetchException( + "Interrupted updating service data for: " + nodePath, e); + } catch (ZooKeeperConnectionException e) { + LOG.log(Level.WARNING, + "Temporary error trying to updating service data for: " + nodePath, e); + return null; + } catch (NoNodeException e) { + invalidateNodePath(nodePath); + throw new ServiceInstanceDeletedException(nodePath); + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.log(Level.WARNING, + "Temporary error trying to update service data for: " + nodePath, e); + return null; + } else { + throw new ServiceInstanceFetchException( + "Failed to update service data for: " + nodePath, e); + } + } catch (IOException e) { + throw new ServiceInstanceFetchException( + "Failed to deserialize the ServiceInstance data for: " + nodePath, e); + } + } + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ServiceInstanceFetchException( + "Interrupted trying to update service data for: " + nodePath, e); + } + } + + private final LoadingCache<String, ServiceInstance> servicesByMemberId = + CacheBuilder.newBuilder().build(new CacheLoader<String, ServiceInstance>() { + @Override public ServiceInstance load(String memberId) { + return getServiceInstance(group.getMemberPath(memberId)); + } + }); + + private void rebuildServerSet() { + Set<String> memberIds = ImmutableSet.copyOf(servicesByMemberId.asMap().keySet()); + servicesByMemberId.invalidateAll(); + notifyGroupChange(memberIds); + } + + private String invalidateNodePath(String deletedPath) { + String memberId = group.getMemberId(deletedPath); + servicesByMemberId.invalidate(memberId); + return memberId; + } + + private final Function<String, ServiceInstance> MAYBE_FETCH_NODE = + new Function<String, ServiceInstance>() { + @Override public ServiceInstance apply(String memberId) { + // This get will trigger a fetch + try { + return servicesByMemberId.getUnchecked(memberId); + } catch (UncheckedExecutionException e) { + Throwable cause = e.getCause(); + if (!(cause instanceof ServiceInstanceDeletedException)) { + Throwables.propagateIfInstanceOf(cause, ServiceInstanceFetchException.class); + throw new IllegalStateException( + "Unexpected error fetching member data for: " + memberId, e); + } + return null; + } + } + }; + + private synchronized void notifyGroupChange(Iterable<String> memberIds) { + ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds); + Set<String> existingMemberIds = servicesByMemberId.asMap().keySet(); + + // Ignore no-op state changes except for the 1st when we've seen no group yet. + if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) { + SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, newMemberIds); + // Implicit removal from servicesByMemberId. + existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds)); + + Iterable<ServiceInstance> serviceInstances = Iterables.filter( + Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), Predicates.notNull()); + + notifyServerSetChange(ImmutableSet.copyOf(serviceInstances)); + } + } + + private void notifyServerSetChange(ImmutableSet<ServiceInstance> currentServerSet) { + // ZK nodes may have changed if there was a session expiry for a server in the server set, but + // if the server's status has not changed, we can skip any onChange updates. + if (!currentServerSet.equals(serverSet)) { + if (currentServerSet.isEmpty()) { + LOG.warning("server set empty for path " + group.getPath()); + } else { + if (LOG.isLoggable(Level.INFO)) { + if (serverSet == null) { + LOG.info("received initial membership " + currentServerSet); + } else { + logChange(Level.INFO, currentServerSet); + } + } + } + serverSet = currentServerSet; + monitor.onChange(serverSet); + } + } + + private void logChange(Level level, ImmutableSet<ServiceInstance> newServerSet) { + StringBuilder message = new StringBuilder("server set " + group.getPath() + " change: "); + if (serverSet.size() != newServerSet.size()) { + message.append("from ").append(serverSet.size()) + .append(" members to ").append(newServerSet.size()); + } + + Joiner joiner = Joiner.on("\n\t\t"); + + SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet); + if (!left.isEmpty()) { + message.append("\n\tleft:\n\t\t").append(joiner.join(left)); + } + + SetView<ServiceInstance> joined = Sets.difference(newServerSet, serverSet); + if (!joined.isEmpty()) { + message.append("\n\tjoined:\n\t\t").append(joiner.join(joined)); + } + + LOG.log(level, message.toString()); + } + } + + private static class EndpointSchema { + final String host; + final Integer port; + + EndpointSchema(Endpoint endpoint) { + Preconditions.checkNotNull(endpoint); + this.host = endpoint.getHost(); + this.port = endpoint.getPort(); + } + + String getHost() { + return host; + } + + Integer getPort() { + return port; + } + } + + private static class ServiceInstanceSchema { + final EndpointSchema serviceEndpoint; + final Map<String, EndpointSchema> additionalEndpoints; + final Status status; + final Integer shard; + + ServiceInstanceSchema(ServiceInstance instance) { + this.serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint()); + if (instance.getAdditionalEndpoints() != null) { + this.additionalEndpoints = Maps.transformValues( + instance.getAdditionalEndpoints(), + new Function<Endpoint, EndpointSchema>() { + @Override public EndpointSchema apply(Endpoint endpoint) { + return new EndpointSchema(endpoint); + } + } + ); + } else { + this.additionalEndpoints = Maps.newHashMap(); + } + this.status = instance.getStatus(); + this.shard = instance.isSetShard() ? instance.getShard() : null; + } + + EndpointSchema getServiceEndpoint() { + return serviceEndpoint; + } + + Map<String, EndpointSchema> getAdditionalEndpoints() { + return additionalEndpoints; + } + + Status getStatus() { + return status; + } + + Integer getShard() { + return shard; + } + } + + /** + * An adapted JSON codec that makes use of {@link ServiceInstanceSchema} to circumvent the + * __isset_bit_vector internal thrift struct field that tracks primitive types. + */ + private static class AdaptedJsonCodec implements Codec<ServiceInstance> { + private static final Charset ENCODING = Charsets.UTF_8; + private static final Class<ServiceInstanceSchema> CLASS = ServiceInstanceSchema.class; + private final Gson gson = new Gson(); + + @Override + public void serialize(ServiceInstance instance, OutputStream sink) throws IOException { + Writer w = new OutputStreamWriter(sink, ENCODING); + gson.toJson(new ServiceInstanceSchema(instance), CLASS, w); + w.flush(); + } + + @Override + public ServiceInstance deserialize(InputStream source) throws IOException { + ServiceInstanceSchema output = gson.fromJson(new InputStreamReader(source, ENCODING), CLASS); + Endpoint primary = new Endpoint( + output.getServiceEndpoint().getHost(), output.getServiceEndpoint().getPort()); + Map<String, Endpoint> additional = Maps.transformValues( + output.getAdditionalEndpoints(), + new Function<EndpointSchema, Endpoint>() { + @Override public Endpoint apply(EndpointSchema endpoint) { + return new Endpoint(endpoint.getHost(), endpoint.getPort()); + } + } + ); + ServiceInstance instance = + new ServiceInstance(primary, ImmutableMap.copyOf(additional), output.getStatus()); + if (output.getShard() != null) { + instance.setShard(output.getShard()); + } + return instance; + } + } + + private static Codec<ServiceInstance> createCodec(final boolean useJsonEncoding) { + final Codec<ServiceInstance> json = new AdaptedJsonCodec(); + final Codec<ServiceInstance> thrift = + ThriftCodec.create(ServiceInstance.class, ThriftCodec.BINARY_PROTOCOL); + final Predicate<byte[]> recognizer = new Predicate<byte[]>() { + public boolean apply(byte[] input) { + return (input.length > 1 && input[0] == '{' && input[1] == '\"') == useJsonEncoding; + } + }; + + if (useJsonEncoding) { + return CompatibilityCodec.create(json, thrift, 2, recognizer); + } + return CompatibilityCodec.create(thrift, json, 2, recognizer); + } + + /** + * Creates a codec for {@link ServiceInstance} objects that uses Thrift binary encoding, and can + * decode both Thrift and JSON encodings. + * + * @return a new codec instance. + */ + public static Codec<ServiceInstance> createThriftCodec() { + return createCodec(false); + } + + /** + * Creates a codec for {@link ServiceInstance} objects that uses JSON encoding, and can decode + * both Thrift and JSON encodings. + * + * @return a new codec instance. + */ + public static Codec<ServiceInstance> createJsonCodec() { + return createCodec(true); + } + + /** + * Returns a codec for {@link ServiceInstance} objects that uses either the Thrift or the JSON + * encoding, depending on whether the command line argument <tt>serverset_json_encofing</tt> is + * set to <tt>true</tt>, and can decode both Thrift and JSON encodings. + * + * @return a new codec instance. + */ + public static Codec<ServiceInstance> createDefaultCodec() { + return createCodec(ENCODE_JSON.get()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java b/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java new file mode 100644 index 0000000..370ab6b --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java @@ -0,0 +1,135 @@ +package com.twitter.common.zookeeper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.Set; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import org.apache.zookeeper.data.ACL; + +import com.twitter.common.base.Function; +import com.twitter.common.base.MorePreconditions; +import com.twitter.common.io.Codec; +import com.twitter.thrift.Endpoint; +import com.twitter.thrift.ServiceInstance; +import com.twitter.thrift.Status; + +/** + * Common ServerSet related functions + */ +public class ServerSets { + + private ServerSets() { + // Utility class. + } + + /** + * A function that invokes {@link #toEndpoint(InetSocketAddress)}. + */ + public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT = + new Function<InetSocketAddress, Endpoint>() { + @Override public Endpoint apply(InetSocketAddress address) { + return ServerSets.toEndpoint(address); + } + }; + + /** + * Creates a server set that registers at a single path applying the given ACL to all nodes + * created in the path. + * + * @param zkClient ZooKeeper client to register with. + * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates. + * @param zkPath Path to register at. @see #create(ZooKeeperClient, java.util.Set) + * @return A server set that registers at {@code zkPath}. + */ + public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, String zkPath) { + return create(zkClient, acl, ImmutableSet.of(zkPath)); + } + + /** + * Creates a server set that registers at one or multiple paths applying the given ACL to all + * nodes created in the paths. + * + * @param zkClient ZooKeeper client to register with. + * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates. + * @param zkPaths Paths to register at, must be non-empty. + * @return A server set that registers at the given {@code zkPath}s. + */ + public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, Set<String> zkPaths) { + Preconditions.checkNotNull(zkClient); + MorePreconditions.checkNotBlank(acl); + MorePreconditions.checkNotBlank(zkPaths); + + if (zkPaths.size() == 1) { + return new ServerSetImpl(zkClient, acl, Iterables.getOnlyElement(zkPaths)); + } else { + ImmutableList.Builder<ServerSet> builder = ImmutableList.builder(); + for (String path : zkPaths) { + builder.add(new ServerSetImpl(zkClient, acl, path)); + } + return new CompoundServerSet(builder.build()); + } + } + + /** + * Returns a serialized Thrift service instance object, with given endpoints and codec. + * + * @param serviceInstance the Thrift service instance object to be serialized + * @param codec the codec to use to serialize a Thrift service instance object + * @return byte array that contains a serialized Thrift service instance + */ + public static byte[] serializeServiceInstance( + ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws IOException { + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + codec.serialize(serviceInstance, output); + return output.toByteArray(); + } + + /** + * Serializes a service instance based on endpoints. + * @see #serializeServiceInstance(ServiceInstance, Codec) + * + * @param address the target address of the service instance + * @param additionalEndpoints additional endpoints of the service instance + * @param status service status + */ + public static byte[] serializeServiceInstance( + InetSocketAddress address, + Map<String, Endpoint> additionalEndpoints, + Status status, + Codec<ServiceInstance> codec) throws IOException { + + ServiceInstance serviceInstance = + new ServiceInstance(toEndpoint(address), additionalEndpoints, status); + return serializeServiceInstance(serviceInstance, codec); + } + + /** + * Creates a service instance object deserialized from byte array. + * + * @param data the byte array contains a serialized Thrift service instance + * @param codec the codec to use to deserialize the byte array + */ + public static ServiceInstance deserializeServiceInstance( + byte[] data, Codec<ServiceInstance> codec) throws IOException { + + return codec.deserialize(new ByteArrayInputStream(data)); + } + + /** + * Creates an endpoint for the given InetSocketAddress. + * + * @param address the target address to create the endpoint for + */ + public static Endpoint toEndpoint(InetSocketAddress address) { + return new Endpoint(address.getHostName(), address.getPort()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java b/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java new file mode 100644 index 0000000..00b8b53 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java @@ -0,0 +1,318 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; + +import com.twitter.common.base.ExceptionalCommand; +import com.twitter.common.zookeeper.Candidate.Leader; +import com.twitter.common.zookeeper.Group.JoinException; +import com.twitter.common.zookeeper.ServerSet.EndpointStatus; +import com.twitter.common.zookeeper.ServerSet.UpdateException; +import com.twitter.thrift.Status; + +/** + * A service that uses master election to only allow a single instance of the server to join + * the {@link ServerSet} at a time. + */ +public class SingletonService { + private static final Logger LOG = Logger.getLogger(SingletonService.class.getName()); + + @VisibleForTesting + static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_"; + + /** + * Creates a candidate that can be combined with an existing server set to form a singleton + * service using {@link #SingletonService(ServerSet, Candidate)}. + * + * @param zkClient The ZooKeeper client to use. + * @param servicePath The path where service nodes live. + * @param acl The acl to apply to newly created candidate nodes and serverset nodes. + * @return A candidate that can be housed with a standard server set under a single zk path. + */ + public static Candidate createSingletonCandidate( + ZooKeeperClient zkClient, + String servicePath, + Iterable<ACL> acl) { + + return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX)); + } + + private final ServerSet serverSet; + private final Candidate candidate; + + /** + * Equivalent to {@link #SingletonService(ZooKeeperClient, String, Iterable)} with a default + * wide open {@code acl} ({@link ZooDefs.Ids#OPEN_ACL_UNSAFE}). + */ + public SingletonService(ZooKeeperClient zkClient, String servicePath) { + this(zkClient, servicePath, ZooDefs.Ids.OPEN_ACL_UNSAFE); + } + + /** + * Creates a new singleton service, identified by {@code servicePath}. All nodes related to the + * service (for both leader election and service registration) will live under the path and each + * node will be created with the supplied {@code acl}. Internally, two ZooKeeper {@code Group}s + * are used to manage a singleton service - one for leader election, and another for the + * {@code ServerSet} where the leader's endpoints are registered. Leadership election should + * guarantee that at most one instance will ever exist in the ServerSet at once. + * + * @param zkClient The ZooKeeper client to use. + * @param servicePath The path where service nodes live. + * @param acl The acl to apply to newly created candidate nodes and serverset nodes. + */ + public SingletonService(ZooKeeperClient zkClient, String servicePath, Iterable<ACL> acl) { + this( + new ServerSetImpl(zkClient, new Group(zkClient, acl, servicePath)), + createSingletonCandidate(zkClient, servicePath, acl)); + } + + /** + * Creates a new singleton service that uses the supplied candidate to vie for leadership and then + * advertises itself in the given server set once elected. + * + * @param serverSet The server set to advertise in on election. + * @param candidate The candidacy to use to vie for election. + */ + public SingletonService(ServerSet serverSet, Candidate candidate) { + this.serverSet = Preconditions.checkNotNull(serverSet); + this.candidate = Preconditions.checkNotNull(candidate); + } + + /** + * Attempts to lead the singleton service. + * + * @param endpoint The primary endpoint to register as a leader candidate in the service. + * @param additionalEndpoints Additional endpoints that are available on the host. + * @param status deprecated, will be ignored entirely + * @param listener Handler to call when the candidate is elected or defeated. + * @throws Group.WatchException If there was a problem watching the ZooKeeper group. + * @throws Group.JoinException If there was a problem joining the ZooKeeper group. + * @throws InterruptedException If the thread watching/joining the group was interrupted. + * @deprecated The status field is deprecated. Please use + * {@link #lead(InetSocketAddress, Map, LeadershipListener)} + */ + @Deprecated + public void lead(final InetSocketAddress endpoint, + final Map<String, InetSocketAddress> additionalEndpoints, + final Status status, + final LeadershipListener listener) + throws Group.WatchException, Group.JoinException, InterruptedException { + + if (status != Status.ALIVE) { + LOG.severe("******************************************************************************"); + LOG.severe("WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED."); + LOG.severe("JOINING WITH STATUS ALIVE EVEN THOUGH YOU SPECIFIED " + status); + LOG.severe("******************************************************************************"); + } else { + LOG.warning("******************************************************************************"); + LOG.warning("WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED."); + LOG.warning("Please use SingletonService.lead(InetSocketAddress, Map, LeadershipListener)"); + LOG.warning("******************************************************************************"); + } + + lead(endpoint, additionalEndpoints, listener); + } + + /** + * Attempts to lead the singleton service. + * + * @param endpoint The primary endpoint to register as a leader candidate in the service. + * @param additionalEndpoints Additional endpoints that are available on the host. + * @param listener Handler to call when the candidate is elected or defeated. + * @throws Group.WatchException If there was a problem watching the ZooKeeper group. + * @throws Group.JoinException If there was a problem joining the ZooKeeper group. + * @throws InterruptedException If the thread watching/joining the group was interrupted. + */ + public void lead(final InetSocketAddress endpoint, + final Map<String, InetSocketAddress> additionalEndpoints, + final LeadershipListener listener) + throws Group.WatchException, Group.JoinException, InterruptedException { + + Preconditions.checkNotNull(listener); + + candidate.offerLeadership(new Leader() { + private EndpointStatus endpointStatus = null; + @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) { + listener.onLeading(new LeaderControl() { + EndpointStatus endpointStatus = null; + final AtomicBoolean left = new AtomicBoolean(false); + + // Methods are synchronized to prevent simultaneous invocations. + @Override public synchronized void advertise() + throws JoinException, InterruptedException { + + Preconditions.checkState(!left.get(), "Cannot advertise after leaving."); + Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once."); + endpointStatus = serverSet.join(endpoint, additionalEndpoints); + } + + @Override public synchronized void leave() throws UpdateException, JoinException { + Preconditions.checkState(left.compareAndSet(false, true), + "Cannot leave more than once."); + if (endpointStatus != null) { + endpointStatus.leave(); + } + abdicate.execute(); + } + }); + } + + @Override public void onDefeated() { + listener.onDefeated(endpointStatus); + } + }); + } + + /** + * A listener to be notified of changes in the leadership status. + * Implementers should be careful to avoid blocking operations in these callbacks. + */ + public interface LeadershipListener { + + /** + * Notifies the listener that is is current leader. + * + * @param control A controller handle to advertise and/or leave advertised presence. + */ + public void onLeading(LeaderControl control); + + /** + * Notifies the listener that it is no longer leader. The leader should take this opportunity + * to remove its advertisement gracefully. + * + * @param status A handle on the endpoint status for the advertised leader. + */ + public void onDefeated(@Nullable EndpointStatus status); + } + + /** + * A leadership listener that decorates another listener by automatically defeating a + * leader that has dropped its connection to ZooKeeper. + * Note that the decision to use this over session-based mutual exclusion should not be taken + * lightly. Any momentary connection loss due to a flaky network or a ZooKeeper server process + * exit will cause a leader to abort. + */ + public static class DefeatOnDisconnectLeader implements LeadershipListener { + + private final LeadershipListener wrapped; + private Optional<LeaderControl> maybeControl = Optional.absent(); + + /** + * Creates a new leadership listener that will delegate calls to the wrapped listener, and + * invoke {@link #onDefeated(EndpointStatus)} if a ZooKeeper disconnect is observed while + * leading. + * + * @param zkClient The ZooKeeper client to watch for disconnect events. + * @param wrapped The leadership listener to wrap. + */ + public DefeatOnDisconnectLeader(ZooKeeperClient zkClient, LeadershipListener wrapped) { + this.wrapped = Preconditions.checkNotNull(wrapped); + + zkClient.register(new Watcher() { + @Override public void process(WatchedEvent event) { + if ((event.getType() == EventType.None) + && (event.getState() == KeeperState.Disconnected)) { + disconnected(); + } + } + }); + } + + private synchronized void disconnected() { + if (maybeControl.isPresent()) { + LOG.warning("Disconnected from ZooKeeper while leading, committing suicide."); + try { + wrapped.onDefeated(null); + maybeControl.get().leave(); + } catch (UpdateException e) { + LOG.log(Level.WARNING, "Failed to leave singleton service: " + e, e); + } catch (JoinException e) { + LOG.log(Level.WARNING, "Failed to leave singleton service: " + e, e); + } finally { + setControl(null); + } + } else { + LOG.info("Disconnected from ZooKeeper, but that's fine because I'm not the leader."); + } + } + + private synchronized void setControl(@Nullable LeaderControl control) { + this.maybeControl = Optional.fromNullable(control); + } + + @Override public void onLeading(final LeaderControl control) { + setControl(control); + wrapped.onLeading(new LeaderControl() { + @Override public void advertise() throws JoinException, InterruptedException { + control.advertise(); + } + + @Override public void leave() throws UpdateException, JoinException { + setControl(null); + control.leave(); + } + }); + } + + @Override public void onDefeated(@Nullable EndpointStatus status) { + setControl(null); + wrapped.onDefeated(status); + } + } + + /** + * A controller for the state of the leader. This will be provided to the leader upon election, + * which allows the leader to decide when to advertise in the underlying {@link ServerSet} and + * terminate leadership at will. + */ + public interface LeaderControl { + + /** + * Advertises the leader's server presence to clients. + * + * @throws JoinException If there was an error advertising. + * @throws InterruptedException If interrupted while advertising. + */ + void advertise() throws JoinException, InterruptedException; + + /** + * Leaves candidacy for leadership, removing advertised server presence if applicable. + * + * @throws UpdateException If the leader's status could not be updated. + * @throws JoinException If there was an error abdicating from leader election. + */ + void leave() throws UpdateException, JoinException; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java b/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java new file mode 100644 index 0000000..c10bd86 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java @@ -0,0 +1,132 @@ +package com.twitter.common.zookeeper; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; + +import com.twitter.common.base.Command; +import com.twitter.common.base.Commands; +import com.twitter.common.zookeeper.Group.JoinException; +import com.twitter.thrift.Endpoint; +import com.twitter.thrift.ServiceInstance; +import com.twitter.thrift.Status; + +/** + * A server set that represents a fixed set of hosts. + * This may be composed under {@link CompoundServerSet} to ensure a minimum set of hosts is + * present. + * A static server set does not support joining, but will allow normal join calls and status update + * calls to be made. + */ +public class StaticServerSet implements ServerSet { + + private static final Logger LOG = Logger.getLogger(StaticServerSet.class.getName()); + + private static final Function<Endpoint, ServiceInstance> ENDPOINT_TO_INSTANCE = + new Function<Endpoint, ServiceInstance>() { + @Override public ServiceInstance apply(Endpoint endpoint) { + return new ServiceInstance(endpoint, ImmutableMap.<String, Endpoint>of(), Status.ALIVE); + } + }; + + private final ImmutableSet<ServiceInstance> hosts; + + /** + * Creates a static server set that will reply to monitor calls immediately and exactly once with + * the provided service instances. + * + * @param hosts Hosts in the static set. + */ + public StaticServerSet(Set<ServiceInstance> hosts) { + this.hosts = ImmutableSet.copyOf(hosts); + } + + /** + * Creates a static server set containing the provided endpoints (and no auxiliary ports) which + * will all be in the {@link Status#ALIVE} state. + * + * @param endpoints Endpoints in the static set. + * @return A static server set that will advertise the provided endpoints. + */ + public static StaticServerSet fromEndpoints(Set<Endpoint> endpoints) { + return new StaticServerSet( + ImmutableSet.copyOf(Iterables.transform(endpoints, ENDPOINT_TO_INSTANCE))); + } + + private EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> auxEndpoints, + Optional<Integer> shardId) { + + LOG.warning("Attempt to join fixed server set ignored."); + ServiceInstance joining = new ServiceInstance( + ServerSets.toEndpoint(endpoint), + Maps.transformValues(auxEndpoints, ServerSets.TO_ENDPOINT), + Status.ALIVE); + if (shardId.isPresent()) { + joining.setShard(shardId.get()); + } + if (!hosts.contains(joining)) { + LOG.log(Level.SEVERE, + "Joining instance " + joining + " does not match any member of the static set."); + } + + return new EndpointStatus() { + @Override public void leave() throws UpdateException { + LOG.warning("Attempt to adjust state of fixed server set ignored."); + } + + @Override public void update(Status status) throws UpdateException { + LOG.warning("Attempt to adjust state of fixed server set ignored."); + } + }; + } + + @Override + public EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> auxEndpoints, + Status status) { + + LOG.warning("This method is deprecated. Please do not specify a status field."); + return join(endpoint, auxEndpoints, Optional.<Integer>absent()); + } + + @Override + public EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> auxEndpoints) { + + LOG.warning("Joining a ServerSet without a shard ID is deprecated and will soon break."); + return join(endpoint, auxEndpoints, Optional.<Integer>absent()); + } + + @Override + public EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> auxEndpoints, + int shardId) throws JoinException, InterruptedException { + + return join(endpoint, auxEndpoints, Optional.of(shardId)); + } + + @Override + public Command watch(HostChangeMonitor<ServiceInstance> monitor) { + monitor.onChange(hosts); + return Commands.NOOP; + } + + @Override + public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { + watch(monitor); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperClient.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperClient.java b/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperClient.java new file mode 100644 index 0000000..a051611 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/zookeeper/ZooKeeperClient.java @@ -0,0 +1,496 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Objects; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.common.PathUtils; + +import com.twitter.common.base.Command; +import com.twitter.common.base.MorePreconditions; +import com.twitter.common.net.InetSocketAddressHelper; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +/** + * Manages a connection to a ZooKeeper cluster. + */ +public class ZooKeeperClient { + + /** + * Indicates an error connecting to a zookeeper cluster. + */ + public class ZooKeeperConnectionException extends Exception { + public ZooKeeperConnectionException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Encapsulates a user's credentials and has the ability to authenticate them through a + * {@link ZooKeeper} client. + */ + public interface Credentials { + + /** + * A set of {@code Credentials} that performs no authentication. + */ + Credentials NONE = new Credentials() { + @Override public void authenticate(ZooKeeper zooKeeper) { + // noop + } + + @Override public String scheme() { + return null; + } + + @Override public byte[] authToken() { + return null; + } + }; + + /** + * Authenticates these credentials against the given {@code ZooKeeper} client. + * + * @param zooKeeper the client to authenticate + */ + void authenticate(ZooKeeper zooKeeper); + + /** + * Returns the authentication scheme these credentials are for. + * + * @return the scheme these credentials are for or {@code null} if no authentication is + * intended. + */ + @Nullable + String scheme(); + + /** + * Returns the authentication token. + * + * @return the authentication token or {@code null} if no authentication is intended. + */ + @Nullable + byte[] authToken(); + } + + /** + * Creates a set of credentials for the zoo keeper digest authentication mechanism. + * + * @param username the username to authenticate with + * @param password the password to authenticate with + * @return a set of credentials that can be used to authenticate the zoo keeper client + */ + public static Credentials digestCredentials(String username, String password) { + MorePreconditions.checkNotBlank(username); + Preconditions.checkNotNull(password); + + // TODO(John Sirois): DigestAuthenticationProvider is broken - uses platform default charset + // (on server) and so we just have to hope here that clients are deployed in compatible jvms. + // Consider writing and installing a version of DigestAuthenticationProvider that controls its + // Charset explicitly. + return credentials("digest", (username + ":" + password).getBytes()); + } + + /** + * Creates a set of credentials for the given authentication {@code scheme}. + * + * @param scheme the scheme to authenticate with + * @param authToken the authentication token + * @return a set of credentials that can be used to authenticate the zoo keeper client + */ + public static Credentials credentials(final String scheme, final byte[] authToken) { + MorePreconditions.checkNotBlank(scheme); + Preconditions.checkNotNull(authToken); + + return new Credentials() { + @Override public void authenticate(ZooKeeper zooKeeper) { + zooKeeper.addAuthInfo(scheme, authToken); + } + + @Override public String scheme() { + return scheme; + } + + @Override public byte[] authToken() { + return authToken; + } + + @Override public boolean equals(Object o) { + if (!(o instanceof Credentials)) { + return false; + } + + Credentials other = (Credentials) o; + return new EqualsBuilder() + .append(scheme, other.scheme()) + .append(authToken, other.authToken()) + .isEquals(); + } + + @Override public int hashCode() { + return Objects.hashCode(scheme, authToken); + } + }; + } + + private final class SessionState { + private final long sessionId; + private final byte[] sessionPasswd; + + private SessionState(long sessionId, byte[] sessionPasswd) { + this.sessionId = sessionId; + this.sessionPasswd = sessionPasswd; + } + } + + private static final Logger LOG = Logger.getLogger(ZooKeeperClient.class.getName()); + + private static final Amount<Long,Time> WAIT_FOREVER = Amount.of(0L, Time.MILLISECONDS); + + private final int sessionTimeoutMs; + private final Credentials credentials; + private final String zooKeeperServers; + // GuardedBy "this", but still volatile for tests, where we want to be able to see writes + // made from within long synchronized blocks. + private volatile ZooKeeper zooKeeper; + private SessionState sessionState; + + private final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>(); + private final BlockingQueue<WatchedEvent> eventQueue = new LinkedBlockingQueue<WatchedEvent>(); + + private static Iterable<InetSocketAddress> combine(InetSocketAddress address, + InetSocketAddress... addresses) { + return ImmutableSet.<InetSocketAddress>builder().add(address).add(addresses).build(); + } + + /** + * Creates an unconnected client that will lazily attempt to connect on the first call to + * {@link #get()}. + * + * @param sessionTimeout the ZK session timeout + * @param zooKeeperServer the first, required ZK server + * @param zooKeeperServers any additional servers forming the ZK cluster + */ + public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, InetSocketAddress zooKeeperServer, + InetSocketAddress... zooKeeperServers) { + this(sessionTimeout, combine(zooKeeperServer, zooKeeperServers)); + } + + /** + * Creates an unconnected client that will lazily attempt to connect on the first call to + * {@link #get}. + * + * @param sessionTimeout the ZK session timeout + * @param zooKeeperServers the set of servers forming the ZK cluster + */ + public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, + Iterable<InetSocketAddress> zooKeeperServers) { + this(sessionTimeout, Credentials.NONE, Optional.<String> absent(), zooKeeperServers); + } + + /** + * Creates an unconnected client that will lazily attempt to connect on the first call to + * {@link #get()}. All successful connections will be authenticated with the given + * {@code credentials}. + * + * @param sessionTimeout the ZK session timeout + * @param credentials the credentials to authenticate with + * @param zooKeeperServer the first, required ZK server + * @param zooKeeperServers any additional servers forming the ZK cluster + */ + public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials, + InetSocketAddress zooKeeperServer, InetSocketAddress... zooKeeperServers) { + this(sessionTimeout, credentials, Optional.<String> absent(), combine(zooKeeperServer, zooKeeperServers)); + } + + /** + * Creates an unconnected client that will lazily attempt to connect on the first call to + * {@link #get}. All successful connections will be authenticated with the given + * {@code credentials}. + * + * @param sessionTimeout the ZK session timeout + * @param credentials the credentials to authenticate with + * @param zooKeeperServers the set of servers forming the ZK cluster + */ + public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials, + Iterable<InetSocketAddress> zooKeeperServers) { + this(sessionTimeout, credentials, Optional.<String> absent(), zooKeeperServers); + } + + /** + * Creates an unconnected client that will lazily attempt to connect on the first call to + * {@link #get}. All successful connections will be authenticated with the given + * {@code credentials}. + * + * @param sessionTimeout the ZK session timeout + * @param credentials the credentials to authenticate with + * @param chrootPath an optional chroot path + * @param zooKeeperServers the set of servers forming the ZK cluster + */ + public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials, + Optional<String> chrootPath, Iterable<InetSocketAddress> zooKeeperServers) { + this.sessionTimeoutMs = Preconditions.checkNotNull(sessionTimeout).as(Time.MILLISECONDS); + this.credentials = Preconditions.checkNotNull(credentials); + + if (chrootPath.isPresent()) { + PathUtils.validatePath(chrootPath.get()); + } + + Preconditions.checkNotNull(zooKeeperServers); + Preconditions.checkArgument(!Iterables.isEmpty(zooKeeperServers), + "Must present at least 1 ZK server"); + + Thread watcherProcessor = new Thread("ZookeeperClient-watcherProcessor") { + @Override + public void run() { + while (true) { + try { + WatchedEvent event = eventQueue.take(); + for (Watcher watcher : watchers) { + watcher.process(event); + } + } catch (InterruptedException e) { /* ignore */ } + } + } + }; + watcherProcessor.setDaemon(true); + watcherProcessor.start(); + + Iterable<String> servers = + Iterables.transform(ImmutableSet.copyOf(zooKeeperServers), + InetSocketAddressHelper.INET_TO_STR); + this.zooKeeperServers = Joiner.on(',').join(servers).concat(chrootPath.or("")); + } + + /** + * Returns true if this client has non-empty credentials set. For example, returns {@code false} + * if this client was constructed with {@link Credentials#NONE}. + * + * @return {@code true} if this client is configured with non-empty credentials. + */ + public boolean hasCredentials() { + return !Strings.isNullOrEmpty(credentials.scheme()) && (credentials.authToken() != null); + } + + /** + * Returns the current active ZK connection or establishes a new one if none has yet been + * established or a previous connection was disconnected or had its session time out. This method + * will attempt to re-use sessions when possible. Equivalent to: + * <pre>get(Amount.of(0L, ...)</pre>. + * + * @return a connected ZooKeeper client + * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster + * @throws InterruptedException if interrupted while waiting for a connection to be established + */ + public synchronized ZooKeeper get() throws ZooKeeperConnectionException, InterruptedException { + try { + return get(WAIT_FOREVER); + } catch (TimeoutException e) { + InterruptedException interruptedException = + new InterruptedException("Got an unexpected TimeoutException for 0 wait"); + interruptedException.initCause(e); + throw interruptedException; + } + } + + /** + * Returns the current active ZK connection or establishes a new one if none has yet been + * established or a previous connection was disconnected or had its session time out. This + * method will attempt to re-use sessions when possible. + * + * @param connectionTimeout the maximum amount of time to wait for the connection to the ZK + * cluster to be established; 0 to wait forever + * @return a connected ZooKeeper client + * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster + * @throws InterruptedException if interrupted while waiting for a connection to be established + * @throws TimeoutException if a connection could not be established within the configured + * session timeout + */ + public synchronized ZooKeeper get(Amount<Long, Time> connectionTimeout) + throws ZooKeeperConnectionException, InterruptedException, TimeoutException { + + if (zooKeeper == null) { + final CountDownLatch connected = new CountDownLatch(1); + Watcher watcher = new Watcher() { + @Override public void process(WatchedEvent event) { + switch (event.getType()) { + // Guard the None type since this watch may be used as the default watch on calls by + // the client outside our control. + case None: + switch (event.getState()) { + case Expired: + LOG.info("Zookeeper session expired. Event: " + event); + close(); + break; + case SyncConnected: + connected.countDown(); + break; + } + } + + eventQueue.offer(event); + } + }; + + try { + zooKeeper = (sessionState != null) + ? new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher, sessionState.sessionId, + sessionState.sessionPasswd) + : new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher); + } catch (IOException e) { + throw new ZooKeeperConnectionException( + "Problem connecting to servers: " + zooKeeperServers, e); + } + + if (connectionTimeout.getValue() > 0) { + if (!connected.await(connectionTimeout.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) { + close(); + throw new TimeoutException("Timed out waiting for a ZK connection after " + + connectionTimeout); + } + } else { + try { + connected.await(); + } catch (InterruptedException ex) { + LOG.info("Interrupted while waiting to connect to zooKeeper"); + close(); + throw ex; + } + } + credentials.authenticate(zooKeeper); + + sessionState = new SessionState(zooKeeper.getSessionId(), zooKeeper.getSessionPasswd()); + } + return zooKeeper; + } + + /** + * Clients that need to re-establish state after session expiration can register an + * {@code onExpired} command to execute. + * + * @param onExpired the {@code Command} to register + * @return the new {@link Watcher} which can later be passed to {@link #unregister} for + * removal. + */ + public Watcher registerExpirationHandler(final Command onExpired) { + Watcher watcher = new Watcher() { + @Override public void process(WatchedEvent event) { + if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) { + onExpired.execute(); + } + } + }; + register(watcher); + return watcher; + } + + /** + * Clients that need to register a top-level {@code Watcher} should do so using this method. The + * registered {@code watcher} will remain registered across re-connects and session expiration + * events. + * + * @param watcher the {@code Watcher to register} + */ + public void register(Watcher watcher) { + watchers.add(watcher); + } + + /** + * Clients can attempt to unregister a top-level {@code Watcher} that has previously been + * registered. + * + * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch + * @return whether the given {@code Watcher} was found and removed from the active set + */ + public boolean unregister(Watcher watcher) { + return watchers.remove(watcher); + } + + /** + * Checks to see if the client might reasonably re-try an operation given the exception thrown + * while attempting it. If the ZooKeeper session should be expired to enable the re-try to + * succeed this method will expire it as a side-effect. + * + * @param e the exception to test + * @return true if a retry can be attempted + */ + public boolean shouldRetry(KeeperException e) { + if (e instanceof SessionExpiredException) { + close(); + } + return ZooKeeperUtils.isRetryable(e); + } + + /** + * Closes the current connection if any expiring the current ZooKeeper session. Any subsequent + * calls to this method will no-op until the next successful {@link #get}. + */ + public synchronized void close() { + if (zooKeeper != null) { + try { + zooKeeper.close(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warning("Interrupted trying to close zooKeeper"); + } finally { + zooKeeper = null; + sessionState = null; + } + } + } + + @VisibleForTesting + synchronized boolean isClosed() { + return zooKeeper == null; + } + + @VisibleForTesting + ZooKeeper getZooKeeperClientForTests() { + return zooKeeper; + } +}
