http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLock.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLock.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLock.java deleted file mode 100644 index 1e8fc48..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLock.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.util.concurrent.TimeUnit; - -/** - * DistributedLock - * - * @author Florian Leibert - */ -public interface DistributedLock { - void lock() throws LockingException; - - boolean tryLock(long timeout, TimeUnit unit); - - void unlock() throws LockingException; - - public static class LockingException extends RuntimeException { - public LockingException(String msg, Exception e) { - super(msg, e); - } - - public LockingException(String msg) { - super(msg); - } - } -}
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLockImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLockImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLockImpl.java deleted file mode 100644 index 99a5774..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLockImpl.java +++ /dev/null @@ -1,286 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.annotation.concurrent.ThreadSafe; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Ordering; - -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; - -import org.apache.aurora.common.base.MorePreconditions; - -/** - * Distributed locking via ZooKeeper. Assuming there are N clients that all try to acquire a lock, - * the algorithm works as follows. Each host creates an ephemeral|sequential node, and requests a - * list of children for the lock node. Due to the nature of sequential, all the ids are increasing - * in order, therefore the client with the least ID according to natural ordering will hold the - * lock. Every other client watches the id immediately preceding its own id and checks for the lock - * in case of notification. The client holding the lock does the work and finally deletes the node, - * thereby triggering the next client in line to acquire the lock. Deadlocks are possible but - * avoided in most cases because if a client drops dead while holding the lock, the ZK session - * should timeout and since the node is ephemeral, it will be removed in such a case. Deadlocks - * could occur if the the worker thread on a client hangs but the zk-client thread is still alive. - * There could be an external monitor client that ensures that alerts are triggered if the least-id - * ephemeral node is present past a time-out. - * <p/> - * Note: Locking attempts will fail in case session expires! - * - * @author Florian Leibert - */ -@ThreadSafe -public class DistributedLockImpl implements DistributedLock { - - private static final Logger LOG = Logger.getLogger(DistributedLockImpl.class.getName()); - - private final ZooKeeperClient zkClient; - private final String lockPath; - private final ImmutableList<ACL> acl; - - private final AtomicBoolean aborted = new AtomicBoolean(false); - private CountDownLatch syncPoint; - private boolean holdsLock = false; - private String currentId; - private String currentNode; - private String watchedNode; - private LockWatcher watcher; - - /** - * Equivalent to {@link #DistributedLockImpl(ZooKeeperClient, String, Iterable)} with a default - * wide open {@code acl} ({@link ZooDefs.Ids#OPEN_ACL_UNSAFE}). - */ - public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath) { - this(zkClient, lockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE); - } - - /** - * Creates a distributed lock using the given {@code zkClient} to coordinate locking. - * - * @param zkClient The ZooKeeper client to use. - * @param lockPath The path used to manage the lock under. - * @param acl The acl to apply to newly created lock nodes. - */ - public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath, Iterable<ACL> acl) { - this.zkClient = Preconditions.checkNotNull(zkClient); - this.lockPath = MorePreconditions.checkNotBlank(lockPath); - this.acl = ImmutableList.copyOf(acl); - this.syncPoint = new CountDownLatch(1); - } - - private synchronized void prepare() - throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException, KeeperException { - - ZooKeeperUtils.ensurePath(zkClient, acl, lockPath); - LOG.log(Level.FINE, "Working with locking path:" + lockPath); - - // Create an EPHEMERAL_SEQUENTIAL node. - currentNode = - zkClient.get().create(lockPath + "/member_", null, acl, CreateMode.EPHEMERAL_SEQUENTIAL); - - // We only care about our actual id since we want to compare ourselves to siblings. - if (currentNode.contains("/")) { - currentId = currentNode.substring(currentNode.lastIndexOf("/") + 1); - } - LOG.log(Level.FINE, "Received ID from zk:" + currentId); - this.watcher = new LockWatcher(); - } - - @Override - public synchronized void lock() throws LockingException { - if (holdsLock) { - throw new LockingException("Error, already holding a lock. Call unlock first!"); - } - try { - prepare(); - watcher.checkForLock(); - syncPoint.await(); - if (!holdsLock) { - throw new LockingException("Error, couldn't acquire the lock!"); - } - } catch (InterruptedException e) { - cancelAttempt(); - throw new LockingException("InterruptedException while trying to acquire lock!", e); - } catch (KeeperException e) { - // No need to clean up since the node wasn't created yet. - throw new LockingException("KeeperException while trying to acquire lock!", e); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - // No need to clean up since the node wasn't created yet. - throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e); - } - } - - @Override - public synchronized boolean tryLock(long timeout, TimeUnit unit) { - if (holdsLock) { - throw new LockingException("Error, already holding a lock. Call unlock first!"); - } - try { - prepare(); - watcher.checkForLock(); - boolean success = syncPoint.await(timeout, unit); - if (!success) { - return false; - } - if (!holdsLock) { - throw new LockingException("Error, couldn't acquire the lock!"); - } - } catch (InterruptedException e) { - cancelAttempt(); - return false; - } catch (KeeperException e) { - // No need to clean up since the node wasn't created yet. - throw new LockingException("KeeperException while trying to acquire lock!", e); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - // No need to clean up since the node wasn't created yet. - throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e); - } - return true; - } - - @Override - public synchronized void unlock() throws LockingException { - if (currentId == null) { - throw new LockingException("Error, neither attempting to lock nor holding a lock!"); - } - Preconditions.checkNotNull(currentId); - // Try aborting! - if (!holdsLock) { - aborted.set(true); - LOG.log(Level.INFO, "Not holding lock, aborting acquisition attempt!"); - } else { - LOG.log(Level.INFO, "Cleaning up this locks ephemeral node."); - cleanup(); - } - } - - //TODO(Florian Leibert): Make sure this isn't a runtime exception. Put exceptions into the token? - - private synchronized void cancelAttempt() { - LOG.log(Level.INFO, "Cancelling lock attempt!"); - cleanup(); - // Bubble up failure... - holdsLock = false; - syncPoint.countDown(); - } - - private void cleanup() { - LOG.info("Cleaning up!"); - Preconditions.checkNotNull(currentId); - try { - Stat stat = zkClient.get().exists(currentNode, false); - if (stat != null) { - zkClient.get().delete(currentNode, ZooKeeperUtils.ANY_VERSION); - } else { - LOG.log(Level.WARNING, "Called cleanup but nothing to cleanup!"); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - holdsLock = false; - aborted.set(false); - currentId = null; - currentNode = null; - watcher = null; - syncPoint = new CountDownLatch(1); - } - - class LockWatcher implements Watcher { - - public synchronized void checkForLock() { - MorePreconditions.checkNotBlank(currentId); - - try { - List<String> candidates = zkClient.get().getChildren(lockPath, null); - ImmutableList<String> sortedMembers = Ordering.natural().immutableSortedCopy(candidates); - - // Unexpected behavior if there are no children! - if (sortedMembers.isEmpty()) { - throw new LockingException("Error, member list is empty!"); - } - - int memberIndex = sortedMembers.indexOf(currentId); - - // If we hold the lock - if (memberIndex == 0) { - holdsLock = true; - syncPoint.countDown(); - } else { - final String nextLowestNode = sortedMembers.get(memberIndex - 1); - LOG.log(Level.INFO, String.format("Current LockWatcher with ephemeral node [%s], is " + - "waiting for [%s] to release lock.", currentId, nextLowestNode)); - - watchedNode = String.format("%s/%s", lockPath, nextLowestNode); - Stat stat = zkClient.get().exists(watchedNode, this); - if (stat == null) { - checkForLock(); - } - } - } catch (InterruptedException e) { - LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " + - "got interrupted. Trying to cancel lock acquisition.", currentId), e); - cancelAttempt(); - } catch (KeeperException e) { - LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " + - "got a KeeperException. Trying to cancel lock acquisition.", currentId), e); - cancelAttempt(); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " + - "got a ConnectionException. Trying to cancel lock acquisition.", currentId), e); - cancelAttempt(); - } - } - - @Override - public synchronized void process(WatchedEvent event) { - // this handles the case where we have aborted a lock and deleted ourselves but still have a - // watch on the nextLowestNode. This is a workaround since ZK doesn't support unsub. - if (!event.getPath().equals(watchedNode)) { - LOG.log(Level.INFO, "Ignoring call for node:" + watchedNode); - return; - } - //TODO(Florian Leibert): Pull this into the outer class. - if (event.getType() == Watcher.Event.EventType.None) { - switch (event.getState()) { - case SyncConnected: - // TODO(Florian Leibert): maybe we should just try to "fail-fast" in this case and abort. - LOG.info("Reconnected..."); - break; - case Expired: - LOG.log(Level.WARNING, String.format("Current ZK session expired![%s]", currentId)); - cancelAttempt(); - break; - } - } else if (event.getType() == Event.EventType.NodeDeleted) { - checkForLock(); - } else { - LOG.log(Level.WARNING, String.format("Unexpected ZK event: %s", event.getType().name())); - } - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/zookeeper/Partitioner.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Partitioner.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Partitioner.java deleted file mode 100644 index 91ea345..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/Partitioner.java +++ /dev/null @@ -1,172 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; -import com.google.common.collect.Ordering; -import org.apache.aurora.common.zookeeper.Group.GroupChangeListener; -import org.apache.aurora.common.zookeeper.Group.JoinException; -import org.apache.aurora.common.zookeeper.Group.Membership; -import org.apache.aurora.common.zookeeper.Group.UpdateException; -import org.apache.aurora.common.zookeeper.Group.WatchException; -import org.apache.zookeeper.data.ACL; - -import javax.annotation.Nullable; -import java.util.List; -import java.util.logging.Logger; - -/** - * A distributed mechanism for eventually arriving at an evenly partitioned space of long values. - * A typical usage would have a client on each of several hosts joining a logical partition (a - * "partition group") that represents some shared work. Clients could then process a subset of a - * full body of work by testing any given item of work with their partition filter. - * - * <p>Note that clients must be able to tolerate periods of duplicate processing by more than 1 - * partition as explained in {@link #join()}. - * - * @author John Sirois - */ -public class Partitioner { - - private static final Logger LOG = Logger.getLogger(Partitioner.class.getName()); - - private volatile int groupSize; - private volatile int groupIndex; - private final Group group; - - /** - * Constructs a representation of a partition group but does not join it. Note that the partition - * group path will be created as a persistent zookeeper path if it does not already exist. - * - * @param zkClient a client to use for joining the partition group and watching its membership - * @param acl the acl for this partition group - * @param path a zookeeper path that represents the partition group - */ - public Partitioner(ZooKeeperClient zkClient, List<ACL> acl, String path) { - group = new Group(zkClient, acl, path); - } - - @VisibleForTesting - int getGroupSize() { - return groupSize; - } - - /** - * Represents a slice of a partition group. The partition is dynamic and will adjust its size as - * members join and leave its partition group. - */ - public abstract static class Partition implements Predicate<Long>, Membership { - - /** - * Returns {@code true} if the given {@code value} is a member of this partition at this time. - */ - public abstract boolean isMember(long value); - - /** - * Gets number of members in the group at this time. - * - * @return number of members in the ZK group at this time. - */ - public abstract int getNumPartitions(); - - /** - * Evaluates partition membership based on the given {@code value}'s hash code. If the value - * is null it is never a member of a partition. - */ - boolean isMember(Object value) { - return (value != null) && isMember(value.hashCode()); - } - - /** - * Equivalent to {@link #isMember(long)} for all non-null values; however incurs unboxing - * overhead. - */ - @Override - public boolean apply(@Nullable Long input) { - return (input != null) && isMember(input); - } - } - - /** - * Attempts to join the partition group and claim a slice. When successful, a predicate is - * returned that can be used to test whether or not an item belongs to this partition. The - * predicate is dynamic such that as the group is further partitioned or partitions merge the - * predicate will claim a narrower or wider swath of the partition space respectively. Partition - * creation and merging is not instantaneous and clients should expect independent partitions to - * claim ownership of some items when partition membership is in flux. It is only in the steady - * state that a client should expect independent partitions to divide the partition space evenly - * and without overlap. - * - * <p>TODO(John Sirois): consider adding a version with a global timeout for the join operation. - * - * @return the partition representing the slice of the partition group this member can claim - * @throws JoinException if there was a problem joining the partition group - * @throws InterruptedException if interrupted while waiting to join the partition group - */ - public final Partition join() throws JoinException, InterruptedException { - final Membership membership = group.join(); - try { - group.watch(createGroupChangeListener(membership)); - } catch (WatchException e) { - membership.cancel(); - throw new JoinException("Problem establishing watch on group after joining it", e); - } - return new Partition() { - @Override public boolean isMember(long value) { - return (value % groupSize) == groupIndex; - } - - @Override public int getNumPartitions() { - return groupSize; - } - - @Override public String getGroupPath() { - return membership.getGroupPath(); - } - - @Override public String getMemberId() { - return membership.getMemberId(); - } - - @Override public String getMemberPath() { - return membership.getMemberPath(); - } - - @Override public byte[] updateMemberData() throws UpdateException { - return membership.updateMemberData(); - } - - @Override public void cancel() throws JoinException { - membership.cancel(); - } - }; - } - - @VisibleForTesting GroupChangeListener createGroupChangeListener(final Membership membership) { - return new GroupChangeListener() { - @Override public void onGroupChange(Iterable<String> memberIds) { - List<String> members = Ordering.natural().sortedCopy(memberIds); - int newSize = members.size(); - int newIndex = members.indexOf(membership.getMemberId()); - - LOG.info(String.format("Rebuilding group %s:%s [%d:%d]->[%d:%d]", - membership.getGroupPath(), members, groupSize, groupIndex, newSize, newIndex)); - - groupSize = newSize; - groupIndex = newIndex; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java index 2b99268..18aff9f 100644 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java @@ -18,14 +18,8 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map; -import java.util.Set; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; - -import org.apache.zookeeper.data.ACL; import org.apache.aurora.common.base.Function; import org.apache.aurora.common.base.MorePreconditions; @@ -33,6 +27,7 @@ import org.apache.aurora.common.io.Codec; import org.apache.aurora.common.thrift.Endpoint; import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.common.thrift.Status; +import org.apache.zookeeper.data.ACL; /** * Common ServerSet related functions @@ -63,32 +58,11 @@ public class ServerSets { * @return A server set that registers at {@code zkPath}. */ public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, String zkPath) { - return create(zkClient, acl, ImmutableSet.of(zkPath)); - } - - /** - * Creates a server set that registers at one or multiple paths applying the given ACL to all - * nodes created in the paths. - * - * @param zkClient ZooKeeper client to register with. - * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates. - * @param zkPaths Paths to register at, must be non-empty. - * @return A server set that registers at the given {@code zkPath}s. - */ - public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, Set<String> zkPaths) { Preconditions.checkNotNull(zkClient); MorePreconditions.checkNotBlank(acl); - MorePreconditions.checkNotBlank(zkPaths); + MorePreconditions.checkNotBlank(zkPath); - if (zkPaths.size() == 1) { - return new ServerSetImpl(zkClient, acl, Iterables.getOnlyElement(zkPaths)); - } else { - ImmutableList.Builder<ServerSet> builder = ImmutableList.builder(); - for (String path : zkPaths) { - builder.add(new ServerSetImpl(zkClient, acl, path)); - } - return new CompoundServerSet(builder.build()); - } + return new ServerSetImpl(zkClient, acl, zkPath); } /** http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/zookeeper/StaticServerSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/StaticServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/StaticServerSet.java deleted file mode 100644 index 99c290e..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/StaticServerSet.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.base.Commands; -import org.apache.aurora.common.zookeeper.Group.JoinException; -import org.apache.aurora.common.thrift.Endpoint; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.thrift.Status; - -/** - * A server set that represents a fixed set of hosts. - * This may be composed under {@link CompoundServerSet} to ensure a minimum set of hosts is - * present. - * A static server set does not support joining, but will allow normal join calls and status update - * calls to be made. - */ -public class StaticServerSet implements ServerSet { - - private static final Logger LOG = Logger.getLogger(StaticServerSet.class.getName()); - - private static final Function<Endpoint, ServiceInstance> ENDPOINT_TO_INSTANCE = - new Function<Endpoint, ServiceInstance>() { - @Override public ServiceInstance apply(Endpoint endpoint) { - return new ServiceInstance(endpoint, ImmutableMap.<String, Endpoint>of(), Status.ALIVE); - } - }; - - private final ImmutableSet<ServiceInstance> hosts; - - /** - * Creates a static server set that will reply to monitor calls immediately and exactly once with - * the provided service instances. - * - * @param hosts Hosts in the static set. - */ - public StaticServerSet(Set<ServiceInstance> hosts) { - this.hosts = ImmutableSet.copyOf(hosts); - } - - /** - * Creates a static server set containing the provided endpoints (and no auxiliary ports) which - * will all be in the {@link Status#ALIVE} state. - * - * @param endpoints Endpoints in the static set. - * @return A static server set that will advertise the provided endpoints. - */ - public static StaticServerSet fromEndpoints(Set<Endpoint> endpoints) { - return new StaticServerSet( - ImmutableSet.copyOf(Iterables.transform(endpoints, ENDPOINT_TO_INSTANCE))); - } - - private EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> auxEndpoints, - Optional<Integer> shardId) { - - LOG.warning("Attempt to join fixed server set ignored."); - ServiceInstance joining = new ServiceInstance( - ServerSets.toEndpoint(endpoint), - Maps.transformValues(auxEndpoints, ServerSets.TO_ENDPOINT), - Status.ALIVE); - if (shardId.isPresent()) { - joining.setShard(shardId.get()); - } - if (!hosts.contains(joining)) { - LOG.log(Level.SEVERE, - "Joining instance " + joining + " does not match any member of the static set."); - } - - return new EndpointStatus() { - @Override public void leave() throws UpdateException { - LOG.warning("Attempt to adjust state of fixed server set ignored."); - } - - @Override public void update(Status status) throws UpdateException { - LOG.warning("Attempt to adjust state of fixed server set ignored."); - } - }; - } - - @Override - public EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> auxEndpoints, - Status status) { - - LOG.warning("This method is deprecated. Please do not specify a status field."); - return join(endpoint, auxEndpoints, Optional.<Integer>absent()); - } - - @Override - public EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> auxEndpoints) { - - LOG.warning("Joining a ServerSet without a shard ID is deprecated and will soon break."); - return join(endpoint, auxEndpoints, Optional.<Integer>absent()); - } - - @Override - public EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> auxEndpoints, - int shardId) throws JoinException, InterruptedException { - - return join(endpoint, auxEndpoints, Optional.of(shardId)); - } - - @Override - public Command watch(HostChangeMonitor<ServiceInstance> monitor) { - monitor.onChange(hosts); - return Commands.NOOP; - } - - @Override - public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { - watch(monitor); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperMap.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperMap.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperMap.java deleted file mode 100644 index 29db55a..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperMap.java +++ /dev/null @@ -1,411 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Functions; -import com.google.common.base.Preconditions; -import com.google.common.collect.ForwardingMap; -import com.google.common.collect.Sets; - -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.base.ExceptionalSupplier; -import org.apache.aurora.common.base.MorePreconditions; -import org.apache.aurora.common.util.BackoffHelper; -import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; - -/** - * A ZooKeeper backed {@link Map}. Initialized with a node path, this map represents child nodes - * under that path as keys, with the data in those nodes as values. This map is readonly from - * clients of this class, and only can be modified via direct zookeeper operations. - * - * Note that instances of this class maintain a zookeeper watch for each zookeeper node under the - * parent, as well as on the parent itself. Instances of this class should be created via the - * {@link #create} factory method. - * - * As of ZooKeeper Version 3.1, the maximum allowable size of a data node is 1 MB. A single - * client should be able to hold up to maintain several thousand watches, but this depends on rate - * of data change as well. - * - * Talk to your zookeeper cluster administrator if you expect number of map entries times number - * of live clients to exceed a thousand, as a zookeeper cluster is limited by total number of - * server-side watches enabled. - * - * For an example of a set of tools to maintain one of these maps, please see - * src/scripts/HenAccess.py in the hen repository. - * - * @param <V> the type of values this map stores - */ -public class ZooKeeperMap<V> extends ForwardingMap<String, V> { - - /** - * An optional listener which can be supplied and triggered when entries in a ZooKeeperMap - * are added, changed or removed. For a ZooKeeperMap of type <V>, the listener will fire a - * "nodeChanged" event with the name of the ZNode that changed, and its resulting value as - * interpreted by the provided deserializer. Removal of child nodes triggers the "nodeRemoved" - * method indicating the name of the ZNode which is no longer present in the map. - */ - public interface Listener<V> { - - /** - * Fired when a node is added to the ZooKeeperMap or changed. - * - * @param nodeName indicates the name of the ZNode that was added or changed. - * @param value is the new value of the node after passing through your supplied deserializer. - */ - void nodeChanged(String nodeName, V value); - - /** - * Fired when a node is removed from the ZooKeeperMap. - * - * @param nodeName indicates the name of the ZNode that was removed from the ZooKeeperMap. - */ - void nodeRemoved(String nodeName); - } - - /** - * Default deserializer for the constructor if you want to simply store the zookeeper byte[] data - * in this map. - */ - public static final Function<byte[], byte[]> BYTE_ARRAY_VALUES = Functions.identity(); - - /** - * A listener that ignores all events. - */ - public static <T> Listener<T> noopListener() { - return new Listener<T>() { - @Override public void nodeChanged(String nodeName, T value) { } - @Override public void nodeRemoved(String nodeName) { } - }; - } - - private static final Logger LOG = Logger.getLogger(ZooKeeperMap.class.getName()); - - private final ZooKeeperClient zkClient; - private final String nodePath; - private final Function<byte[], V> deserializer; - - private final ConcurrentMap<String, V> localMap; - private final Map<String, V> unmodifiableLocalMap; - private final BackoffHelper backoffHelper; - - private final Listener<V> mapListener; - - // Whether it's safe to re-establish watches if our zookeeper session has expired. - private final Object safeToRewatchLock; - private volatile boolean safeToRewatch; - - /** - * Returns an initialized ZooKeeperMap. The given path must exist at the time of - * creation or a {@link KeeperException} will be thrown. - * - * @param zkClient a zookeeper client - * @param nodePath path to a node whose data will be watched - * @param deserializer a function that converts byte[] data from a zk node to this map's - * value type V - * @param listener is a Listener which fires when values are added, changed, or removed. - * - * @throws InterruptedException if the underlying zookeeper server transaction is interrupted - * @throws KeeperException.NoNodeException if the given nodePath doesn't exist - * @throws KeeperException if the server signals an error - * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper - * cluster - */ - public static <V> ZooKeeperMap<V> create( - ZooKeeperClient zkClient, - String nodePath, - Function<byte[], V> deserializer, - Listener<V> listener) - throws InterruptedException, KeeperException, ZooKeeperConnectionException { - - ZooKeeperMap<V> zkMap = new ZooKeeperMap<V>(zkClient, nodePath, deserializer, listener); - zkMap.init(); - return zkMap; - } - - - /** - * Returns an initialized ZooKeeperMap. The given path must exist at the time of - * creation or a {@link KeeperException} will be thrown. - * - * @param zkClient a zookeeper client - * @param nodePath path to a node whose data will be watched - * @param deserializer a function that converts byte[] data from a zk node to this map's - * value type V - * - * @throws InterruptedException if the underlying zookeeper server transaction is interrupted - * @throws KeeperException.NoNodeException if the given nodePath doesn't exist - * @throws KeeperException if the server signals an error - * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper - * cluster - */ - public static <V> ZooKeeperMap<V> create( - ZooKeeperClient zkClient, - String nodePath, - Function<byte[], V> deserializer) - throws InterruptedException, KeeperException, ZooKeeperConnectionException { - - return ZooKeeperMap.create(zkClient, nodePath, deserializer, ZooKeeperMap.<V>noopListener()); - } - - /** - * Initializes a ZooKeeperMap. The given path must exist at the time of object creation or - * a {@link KeeperException} will be thrown. - * - * Please note that this object will not track any remote zookeeper data until {@link #init()} - * is successfully called. After construction and before that call, this {@link Map} will - * be empty. - * - * @param zkClient a zookeeper client - * @param nodePath top-level node path under which the map data lives - * @param deserializer a function that converts byte[] data from a zk node to this map's - * value type V - * @param mapListener is a Listener which fires when values are added, changed, or removed. - * - * @throws InterruptedException if the underlying zookeeper server transaction is interrupted - * @throws KeeperException.NoNodeException if the given nodePath doesn't exist - * @throws KeeperException if the server signals an error - * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper - * cluster - */ - @VisibleForTesting - ZooKeeperMap( - ZooKeeperClient zkClient, - String nodePath, - Function<byte[], V> deserializer, - Listener<V> mapListener) - throws InterruptedException, KeeperException, ZooKeeperConnectionException { - - super(); - - this.mapListener = Preconditions.checkNotNull(mapListener); - this.zkClient = Preconditions.checkNotNull(zkClient); - this.nodePath = MorePreconditions.checkNotBlank(nodePath); - this.deserializer = Preconditions.checkNotNull(deserializer); - - localMap = new ConcurrentHashMap<String, V>(); - unmodifiableLocalMap = Collections.unmodifiableMap(localMap); - backoffHelper = new BackoffHelper(); - safeToRewatchLock = new Object(); - safeToRewatch = false; - - if (zkClient.get().exists(nodePath, null) == null) { - throw new KeeperException.NoNodeException(); - } - } - - /** - * Initialize zookeeper tracking for this {@link Map}. Once this call returns, this object - * will be tracking data in zookeeper. - * - * @throws InterruptedException if the underlying zookeeper server transaction is interrupted - * @throws KeeperException if the server signals an error - * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper - * cluster - */ - @VisibleForTesting - void init() throws InterruptedException, KeeperException, ZooKeeperConnectionException { - Watcher watcher = zkClient.registerExpirationHandler(new Command() { - @Override public void execute() { - /* - * First rewatch all of our locally cached children. Some of them may not exist anymore, - * which will lead to caught KeeperException.NoNode whereafter we'll remove that child - * from the cached map. - * - * Next, we'll establish our top level child watch and add any new nodes that might exist. - */ - try { - synchronized (safeToRewatchLock) { - if (safeToRewatch) { - rewatchDataNodes(); - tryWatchChildren(); - } - } - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "Interrupted while trying to re-establish watch.", e); - Thread.currentThread().interrupt(); - } - } - }); - - try { - // Synchronize to prevent the race of watchChildren completing and then the session expiring - // before we update safeToRewatch. - synchronized (safeToRewatchLock) { - watchChildren(); - safeToRewatch = true; - } - } catch (InterruptedException e) { - zkClient.unregister(watcher); - throw e; - } catch (KeeperException e) { - zkClient.unregister(watcher); - throw e; - } catch (ZooKeeperConnectionException e) { - zkClient.unregister(watcher); - throw e; - } - } - - @Override - protected Map<String, V> delegate() { - return unmodifiableLocalMap; - } - - private void tryWatchChildren() throws InterruptedException { - backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, InterruptedException>() { - @Override public Boolean get() throws InterruptedException { - try { - watchChildren(); - return true; - } catch (KeeperException e) { - return false; - } catch (ZooKeeperConnectionException e) { - return false; - } - } - }); - } - - private synchronized void watchChildren() - throws InterruptedException, KeeperException, ZooKeeperConnectionException { - - /* - * Add a watch on the parent node itself, and attempt to rewatch if it - * gets deleted - */ - zkClient.get().exists(nodePath, new Watcher() { - @Override public void process(WatchedEvent event) { - if (event.getType() == Watcher.Event.EventType.NodeDeleted) { - // If the parent node no longer exists - localMap.clear(); - try { - tryWatchChildren(); - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "Interrupted while trying to watch children.", e); - Thread.currentThread().interrupt(); - } - } - }}); - - final Watcher childWatcher = new Watcher() { - @Override - public void process(WatchedEvent event) { - if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { - try { - tryWatchChildren(); - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "Interrupted while trying to watch children.", e); - Thread.currentThread().interrupt(); - } - } - } - }; - - List<String> children = zkClient.get().getChildren(nodePath, childWatcher); - updateChildren(Sets.newHashSet(children)); - } - - private void tryAddChild(final String child) throws InterruptedException { - backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, InterruptedException>() { - @Override public Boolean get() throws InterruptedException { - try { - addChild(child); - return true; - } catch (KeeperException e) { - return false; - } catch (ZooKeeperConnectionException e) { - return false; - } - } - }); - } - - // TODO(Adam Samet) - Make this use the ZooKeeperNode class. - private void addChild(final String child) - throws InterruptedException, KeeperException, ZooKeeperConnectionException { - - final Watcher nodeWatcher = new Watcher() { - @Override - public void process(WatchedEvent event) { - if (event.getType() == Watcher.Event.EventType.NodeDataChanged) { - try { - tryAddChild(child); - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "Interrupted while trying to add a child.", e); - Thread.currentThread().interrupt(); - } - } else if (event.getType() == Watcher.Event.EventType.NodeDeleted) { - removeEntry(child); - } - } - }; - - try { - V value = deserializer.apply(zkClient.get().getData(makePath(child), nodeWatcher, null)); - putEntry(child, value); - } catch (KeeperException.NoNodeException e) { - // This node doesn't exist anymore, remove it from the map and we're done. - removeEntry(child); - } - } - - @VisibleForTesting - void removeEntry(String key) { - localMap.remove(key); - mapListener.nodeRemoved(key); - } - - @VisibleForTesting - void putEntry(String key, V value) { - localMap.put(key, value); - mapListener.nodeChanged(key, value); - } - - private void rewatchDataNodes() throws InterruptedException { - for (String child : keySet()) { - tryAddChild(child); - } - } - - private String makePath(final String child) { - return nodePath + "/" + child; - } - - private void updateChildren(Set<String> zkChildren) throws InterruptedException { - Set<String> addedChildren = Sets.difference(zkChildren, keySet()); - Set<String> removedChildren = Sets.difference(keySet(), zkChildren); - for (String child : addedChildren) { - tryAddChild(child); - } - for (String child : removedChildren) { - removeEntry(child); - } - } -} - http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperNode.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperNode.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperNode.java deleted file mode 100644 index 3829ca7..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperNode.java +++ /dev/null @@ -1,349 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.annotation.Nullable; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Functions; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; - -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.data.Stat; - -import org.apache.aurora.common.base.Closure; -import org.apache.aurora.common.base.Closures; -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.base.ExceptionalSupplier; -import org.apache.aurora.common.base.MorePreconditions; -import org.apache.aurora.common.util.BackoffHelper; -import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; - -/** - * An implementation of {@link Supplier} that offers a readonly view of a - * zookeeper data node. This class is thread-safe. - * - * Instances of this class each maintain a zookeeper watch for the remote data node. Instances - * of this class should be created via the {@link #create} factory method. - * - * Please see zookeeper documentation and talk to your cluster administrator for guidance on - * appropriate node size and total number of nodes you should be using. - * - * @param <T> the type of data this node stores - */ -public class ZooKeeperNode<T> implements Supplier<T> { - /** - * Deserializer for the constructor if you want to simply store the zookeeper byte[] data - * as-is. - */ - public static final Function<byte[], byte[]> BYTE_ARRAY_VALUE = Functions.identity(); - - private static final Logger LOG = Logger.getLogger(ZooKeeperNode.class.getName()); - - private final ZooKeeperClient zkClient; - private final String nodePath; - private final NodeDeserializer<T> deserializer; - - private final BackoffHelper backoffHelper; - - // Whether it's safe to re-establish watches if our zookeeper session has expired. - private final Object safeToRewatchLock; - private volatile boolean safeToRewatch; - - private final T NO_DATA = null; - @Nullable private volatile T nodeData; - private final Closure<T> dataUpdateListener; - - /** - * When a call to ZooKeeper.getData is made, the Watcher is added to a Set before the the network - * request is made and if the request fails, the Watcher remains. There's a problem where Watcher - * can accumulate when there are failed requests, so they are set to instance fields and reused. - */ - private final Watcher nodeWatcher; - private final Watcher existenceWatcher; - - /** - * Returns an initialized ZooKeeperNode. The given node must exist at the time of object - * creation or a {@link KeeperException} will be thrown. - * - * @param zkClient a zookeeper client - * @param nodePath path to a node whose data will be watched - * @param deserializer a function that converts byte[] data from a zk node to this supplier's - * type T - * - * @throws InterruptedException if the underlying zookeeper server transaction is interrupted - * @throws KeeperException.NoNodeException if the given nodePath doesn't exist - * @throws KeeperException if the server signals an error - * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper - * cluster - */ - public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath, - Function<byte[], T> deserializer) throws InterruptedException, KeeperException, - ZooKeeperConnectionException { - return create(zkClient, nodePath, deserializer, Closures.<T>noop()); - } - - /** - * Like the above, but optionally takes in a {@link Closure} that will get notified - * whenever the data is updated from the remote node. - * - * @param dataUpdateListener a {@link Closure} to receive data update notifications. - */ - public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath, - Function<byte[], T> deserializer, Closure<T> dataUpdateListener) throws InterruptedException, - KeeperException, ZooKeeperConnectionException { - return create(zkClient, nodePath, new FunctionWrapper<T>(deserializer), dataUpdateListener); - } - - /** - * Returns an initialized ZooKeeperNode. The given node must exist at the time of object - * creation or a {@link KeeperException} will be thrown. - * - * @param zkClient a zookeeper client - * @param nodePath path to a node whose data will be watched - * @param deserializer an implentation of {@link NodeDeserializer} that converts a byte[] from a - * zk node to this supplier's type T. Also supplies a {@link Stat} object which is useful for - * doing versioned updates. - * - * @throws InterruptedException if the underlying zookeeper server transaction is interrupted - * @throws KeeperException.NoNodeException if the given nodePath doesn't exist - * @throws KeeperException if the server signals an error - * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper - * cluster - */ - public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath, - NodeDeserializer<T> deserializer) throws InterruptedException, KeeperException, - ZooKeeperConnectionException { - return create(zkClient, nodePath, deserializer, Closures.<T>noop()); - } - - /** - * Like the above, but optionally takes in a {@link Closure} that will get notified - * whenever the data is updated from the remote node. - * - * @param dataUpdateListener a {@link Closure} to receive data update notifications. - */ - public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath, - NodeDeserializer<T> deserializer, Closure<T> dataUpdateListener) - throws InterruptedException, KeeperException, ZooKeeperConnectionException { - ZooKeeperNode<T> zkNode = - new ZooKeeperNode<T>(zkClient, nodePath, deserializer, dataUpdateListener); - zkNode.init(); - return zkNode; - } - - /** - * Initializes a ZooKeeperNode. The given node must exist at the time of object creation or - * a {@link KeeperException} will be thrown. - * - * Please note that this object will not track any remote zookeeper data until {@link #init()} - * is successfully called. After construction and before that call, this {@link Supplier} will - * return null. - * - * @param zkClient a zookeeper client - * @param nodePath path to a node whose data will be watched - * @param deserializer an implementation of {@link NodeDeserializer} that converts byte[] data - * from a zk node to this supplier's type T - * @param dataUpdateListener a {@link Closure} to receive data update notifications. - */ - @VisibleForTesting - ZooKeeperNode(ZooKeeperClient zkClient, String nodePath, - NodeDeserializer<T> deserializer, Closure<T> dataUpdateListener) { - this.zkClient = Preconditions.checkNotNull(zkClient); - this.nodePath = MorePreconditions.checkNotBlank(nodePath); - this.deserializer = Preconditions.checkNotNull(deserializer); - this.dataUpdateListener = Preconditions.checkNotNull(dataUpdateListener); - - backoffHelper = new BackoffHelper(); - safeToRewatchLock = new Object(); - safeToRewatch = false; - nodeData = NO_DATA; - - nodeWatcher = new Watcher() { - @Override public void process(WatchedEvent event) { - if (event.getState() == KeeperState.SyncConnected) { - try { - tryWatchDataNode(); - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "Interrupted while trying to watch a data node.", e); - Thread.currentThread().interrupt(); - } - } else { - LOG.info("Ignoring watcher event " + event); - } - } - }; - - existenceWatcher = new Watcher() { - @Override public void process(WatchedEvent event) { - if (event.getType() == Watcher.Event.EventType.NodeCreated) { - try { - tryWatchDataNode(); - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "Interrupted while trying to watch a data node.", e); - Thread.currentThread().interrupt(); - } - } - } - }; - } - - /** - * Initialize zookeeper tracking for this {@link Supplier}. Once this call returns, this object - * will be tracking data in zookeeper. - * - * @throws InterruptedException if the underlying zookeeper server transaction is interrupted - * @throws KeeperException if the server signals an error - * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper - * cluster - */ - @VisibleForTesting - void init() throws InterruptedException, KeeperException, - ZooKeeperConnectionException { - Watcher watcher = zkClient.registerExpirationHandler(new Command() { - @Override public void execute() { - try { - synchronized (safeToRewatchLock) { - if (safeToRewatch) { - tryWatchDataNode(); - } - } - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "Interrupted while trying to re-establish watch.", e); - Thread.currentThread().interrupt(); - } - } - }); - - try { - /* - * Synchronize to prevent the race of watchDataNode completing and then the session expiring - * before we update safeToRewatch. - */ - synchronized (safeToRewatchLock) { - watchDataNode(); - safeToRewatch = true; - } - } catch (InterruptedException e) { - zkClient.unregister(watcher); - throw e; - } catch (KeeperException e) { - zkClient.unregister(watcher); - throw e; - } catch (ZooKeeperConnectionException e) { - zkClient.unregister(watcher); - throw e; - } - } - - /** - * Returns the data corresponding to a byte array in a remote zookeeper node. This data is - * cached locally and updated in the background on watch notifications. - * - * @return the data currently cached locally or null if {@link #init()} hasn't been called - * or the backing node has no data or does not exist anymore. - */ - @Override - public @Nullable T get() { - return nodeData; - } - - @VisibleForTesting - void updateData(@Nullable T newData) { - nodeData = newData; - dataUpdateListener.execute(newData); - } - - private void tryWatchDataNode() throws InterruptedException { - backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, InterruptedException>() { - @Override public Boolean get() throws InterruptedException { - try { - watchDataNode(); - return true; - } catch (KeeperException e) { - return false; - } catch (ZooKeeperConnectionException e) { - return false; - } - } - }); - } - - private void watchDataNode() throws InterruptedException, KeeperException, - ZooKeeperConnectionException { - try { - Stat stat = new Stat(); - byte[] rawData = zkClient.get().getData(nodePath, nodeWatcher, stat); - T newData = deserializer.deserialize(rawData, stat); - updateData(newData); - } catch (KeeperException.NoNodeException e) { - /* - * This node doesn't exist right now, reflect that locally and then create a watch to wait - * for its recreation. - */ - updateData(NO_DATA); - watchForExistence(); - } - } - - private void watchForExistence() throws InterruptedException, KeeperException, - ZooKeeperConnectionException { - /* - * If the node was created between the getData call and this call, just try watching it. - * We'll have an extra exists watch on it that goes off on its next deletion, which will - * be a no-op. - * Otherwise, just let the exists watch wait for its creation. - */ - if (zkClient.get().exists(nodePath, existenceWatcher) != null) { - tryWatchDataNode(); - } - } - - /** - * Interface for defining zookeeper node data deserialization. - * - * @param <T> the type of data associated with this node - */ - public interface NodeDeserializer<T> { - /** - * @param data the byte array returned from ZooKeeper when a watch is triggered. - * @param stat a ZooKeeper {@link Stat} object. Populated by - * {@link org.apache.zookeeper.ZooKeeper#getData(String, boolean, Stat)}. - */ - T deserialize(byte[] data, Stat stat); - } - - // wrapper for backwards compatibility with older create() methods with Function parameter - private static final class FunctionWrapper<T> implements NodeDeserializer<T> { - private final Function<byte[], T> func; - private FunctionWrapper(Function<byte[], T> func) { - Preconditions.checkNotNull(func); - this.func = func; - } - - public T deserialize(byte[] rawData, Stat stat) { - return func.apply(rawData); - } - } - -} -
