Remove `-zk_use_curator` and unused code. Some portions of the commons zookeeper package remain to be moved in a follow-up change.
Bugs closed: AURORA-1669 Reviewed at https://reviews.apache.org/r/52312/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/69cba786 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/69cba786 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/69cba786 Branch: refs/heads/master Commit: 69cba786efc2628eab566201dfea46836a1d9af5 Parents: f559e93 Author: John Sirois <[email protected]> Authored: Tue Sep 27 15:38:20 2016 -0600 Committer: John Sirois <[email protected]> Committed: Tue Sep 27 15:38:20 2016 -0600 ---------------------------------------------------------------------- RELEASE-NOTES.md | 8 + .../aurora/common/zookeeper/Candidate.java | 78 --- .../aurora/common/zookeeper/CandidateImpl.java | 127 ---- .../apache/aurora/common/zookeeper/Group.java | 674 ------------------- .../aurora/common/zookeeper/JsonCodec.java | 12 +- .../aurora/common/zookeeper/ServerSet.java | 74 -- .../aurora/common/zookeeper/ServerSetImpl.java | 349 ---------- .../aurora/common/zookeeper/ServerSets.java | 118 ---- .../common/zookeeper/SingletonServiceImpl.java | 122 ---- .../common/zookeeper/CandidateImplTest.java | 165 ----- .../aurora/common/zookeeper/GroupTest.java | 321 --------- .../aurora/common/zookeeper/JsonCodecTest.java | 38 +- .../common/zookeeper/ServerSetImplTest.java | 258 ------- .../aurora/common/zookeeper/ServerSetsTest.java | 44 -- .../zookeeper/SingletonServiceImplTest.java | 243 ------- docs/reference/scheduler-configuration.md | 2 - .../CommonsServiceDiscoveryModule.java | 102 --- .../discovery/CommonsServiceGroupMonitor.java | 59 -- .../CuratorServiceDiscoveryModule.java | 4 +- .../discovery/FlaggedZooKeeperConfig.java | 13 - .../discovery/ServiceDiscoveryModule.java | 13 +- .../scheduler/discovery/ZooKeeperConfig.java | 12 +- .../aurora/scheduler/app/SchedulerIT.java | 45 +- .../discovery/AbstractDiscoveryModuleTest.java | 77 --- .../discovery/BaseCuratorDiscoveryTest.java | 8 +- .../discovery/CommonsDiscoveryModuleTest.java | 29 - .../CommonsServiceGroupMonitorTest.java | 137 ---- .../discovery/CuratorDiscoveryModuleTest.java | 66 +- .../discovery/CuratorSingletonServiceTest.java | 3 +- .../discovery/ZooKeeperConfigTest.java | 5 +- 30 files changed, 119 insertions(+), 3087 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/RELEASE-NOTES.md ---------------------------------------------------------------------- diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 82c9a1c..49c03e8 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -1,3 +1,11 @@ +0.17.0 (Not yet released) +========================= + +### Deprecations and removals: + +- The scheduler flag `-zk_use_curator` has been removed. If you have never set the flag and are + upgrading you should take care as described in the [note](#zk_use_curator_upgrade) below. + 0.16.0 ====== http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java deleted file mode 100644 index 75c1b14..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import com.google.common.base.Optional; -import com.google.common.base.Supplier; - -import org.apache.aurora.common.base.ExceptionalCommand; -import org.apache.aurora.common.zookeeper.Group.JoinException; -import org.apache.aurora.common.zookeeper.Group.WatchException; -import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; -import org.apache.zookeeper.KeeperException; - -/** - * Interface definition for becoming or querying for a ZooKeeper-based group leader. - */ -public interface Candidate { - - /** - * Returns the current group leader by querying ZooKeeper synchronously. - * - * @return the current group leader's identifying data or {@link Optional#absent()} if there is - * no leader - * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper - * @throws KeeperException if there was a problem reading the leader information - * @throws InterruptedException if this thread is interrupted getting the leader - */ - public Optional<byte[]> getLeaderData() - throws ZooKeeperConnectionException, KeeperException, InterruptedException; - - /** - * Encapsulates a leader that can be elected and subsequently defeated. - */ - interface Leader { - - /** - * Called when this leader has been elected. - * - * @param abdicate a command that can be used to abdicate leadership and force a new election - */ - void onElected(ExceptionalCommand<JoinException> abdicate); - - /** - * Called when the leader has been ousted. Can occur either if the leader abdicates or if an - * external event causes the leader to lose its leadership role (session expiration). - */ - void onDefeated(); - } - - /** - * Offers this candidate in leadership elections for as long as the current jvm process is alive. - * Upon election, the {@code onElected} callback will be executed and a command that can be used - * to abdicate leadership will be passed in. If the elected leader jvm process dies or the - * elected leader successfully abdicates then a new leader will be elected. Leaders that - * successfully abdicate are removed from the group and will not be eligible for leadership - * election unless {@link #offerLeadership(Leader)} is called again. - * - * @param leader the leader to notify of election and defeat events - * @throws JoinException if there was a problem joining the group - * @throws WatchException if there is a problem generating the 1st group membership list - * @throws InterruptedException if interrupted waiting to join the group and determine initial - * election results - * @return a supplier that can be queried to find out if this leader is currently elected - */ - public Supplier<Boolean> offerLeadership(Leader leader) - throws JoinException, WatchException, InterruptedException; -} http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java deleted file mode 100644 index 98b5ee4..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.annotation.Nullable; - -import com.google.common.base.Charsets; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.collect.Iterables; -import com.google.common.collect.Ordering; - -import org.apache.aurora.common.zookeeper.Group.JoinException; -import org.apache.aurora.common.zookeeper.Group.Membership; -import org.apache.aurora.common.zookeeper.Group.WatchException; -import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements leader election for small groups of candidates. This implementation is subject to the - * <a href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection"> - * herd effect</a> for a given group and should only be used for small (~10 member) candidate pools. - */ -public class CandidateImpl implements Candidate { - private static final Logger LOG = LoggerFactory.getLogger(CandidateImpl.class); - - private static final byte[] UNKNOWN_CANDIDATE_DATA = "<unknown>".getBytes(Charsets.UTF_8); - - private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = () -> { - try { - return InetAddress.getLocalHost().getHostAddress().getBytes(); - } catch (UnknownHostException e) { - LOG.warn("Failed to determine local address!", e); - return UNKNOWN_CANDIDATE_DATA; - } - }; - - private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE = - candidates -> Ordering.natural().min(candidates); - - private final Group group; - - /** - * Creates a candidate that can be used to offer leadership for the given {@code group}. - */ - public CandidateImpl(Group group) { - this.group = Preconditions.checkNotNull(group); - } - - @Override - public Optional<byte[]> getLeaderData() - throws ZooKeeperConnectionException, KeeperException, InterruptedException { - - String leaderId = getLeader(group.getMemberIds()); - return leaderId == null - ? Optional.<byte[]>absent() - : Optional.of(group.getMemberData(leaderId)); - } - - @Override - public Supplier<Boolean> offerLeadership(final Leader leader) - throws JoinException, WatchException, InterruptedException { - - final Membership membership = group.join(IP_ADDRESS_DATA_SUPPLIER, leader::onDefeated); - - final AtomicBoolean elected = new AtomicBoolean(false); - final AtomicBoolean abdicated = new AtomicBoolean(false); - group.watch(memberIds -> { - boolean noCandidates = Iterables.isEmpty(memberIds); - String memberId = membership.getMemberId(); - - if (noCandidates) { - LOG.warn("All candidates have temporarily left the group: " + group); - } else if (!Iterables.contains(memberIds, memberId)) { - LOG.error( - "Current member ID {} is not a candidate for leader, current voting: {}", - memberId, memberIds); - } else { - boolean electedLeader = memberId.equals(getLeader(memberIds)); - boolean previouslyElected = elected.getAndSet(electedLeader); - - if (!previouslyElected && electedLeader) { - LOG.info("Candidate {} is now leader of group: {}", - membership.getMemberPath(), memberIds); - - leader.onElected(() -> { - membership.cancel(); - abdicated.set(true); - }); - } else if (!electedLeader) { - if (previouslyElected) { - leader.onDefeated(); - } - LOG.info( - "Candidate {} waiting for the next leader election, current voting: {}", - membership.getMemberPath(), memberIds); - } - } - }); - - return () -> !abdicated.get() && elected.get(); - } - - @Nullable - private String getLeader(Iterable<String> memberIds) { - return Iterables.isEmpty(memberIds) ? null : MOST_RECENT_JUDGE.apply(memberIds); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java deleted file mode 100644 index 2720dd1..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java +++ /dev/null @@ -1,674 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.util.List; -import java.util.Set; -import java.util.regex.Pattern; - -import javax.annotation.Nullable; - -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.base.Commands; -import org.apache.aurora.common.base.ExceptionalSupplier; -import org.apache.aurora.common.base.MorePreconditions; -import org.apache.aurora.common.util.BackoffHelper; -import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; -import org.apache.commons.lang.ArrayUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.apache.zookeeper.data.ACL; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class exposes methods for joining and monitoring distributed groups. The groups this class - * monitors are realized as persistent paths in ZooKeeper with ephemeral child nodes for - * each member of a group. - */ -public class Group { - private static final Logger LOG = LoggerFactory.getLogger(Group.class); - - private static final Supplier<byte[]> NO_MEMBER_DATA = Suppliers.ofInstance(null); - private static final String DEFAULT_NODE_NAME_PREFIX = "member_"; - - private final ZooKeeperClient zkClient; - private final ImmutableList<ACL> acl; - private final String path; - - private final NodeScheme nodeScheme; - private final Predicate<String> nodeNameFilter; - - private final BackoffHelper backoffHelper; - - /** - * Creates a group rooted at the given {@code path}. Paths must be absolute and trailing or - * duplicate slashes will be normalized. For example, all the following paths would create a - * group at the normalized path /my/distributed/group: - * <ul> - * <li>/my/distributed/group - * <li>/my/distributed/group/ - * <li>/my/distributed//group - * </ul> - * - * @param zkClient the client to use for interactions with ZooKeeper - * @param acl the ACL to use for creating the persistent group path if it does not already exist - * @param path the absolute persistent path that represents this group - * @param nodeScheme the scheme that defines how nodes are created - */ - public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, NodeScheme nodeScheme) { - this.zkClient = Preconditions.checkNotNull(zkClient); - this.acl = ImmutableList.copyOf(acl); - this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path)); - - this.nodeScheme = Preconditions.checkNotNull(nodeScheme); - nodeNameFilter = Group.this.nodeScheme::isMember; - - backoffHelper = new BackoffHelper(); - } - - /** - * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} with a - * {@code namePrefix} of 'member_'. - */ - public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) { - this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX); - } - - /** - * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, NodeScheme)} with a - * {@link DefaultScheme} using {@code namePrefix}. - */ - public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, String namePrefix) { - this(zkClient, acl, path, new DefaultScheme(namePrefix)); - } - - public String getMemberPath(String memberId) { - return path + "/" + MorePreconditions.checkNotBlank(memberId); - } - - public String getPath() { - return path; - } - - public String getMemberId(String nodePath) { - MorePreconditions.checkNotBlank(nodePath); - Preconditions.checkArgument(nodePath.startsWith(path + "/"), - "Not a member of this group[%s]: %s", path, nodePath); - - String memberId = StringUtils.substringAfterLast(nodePath, "/"); - Preconditions.checkArgument(nodeScheme.isMember(memberId), - "Not a group member: %s", memberId); - return memberId; - } - - /** - * Returns the current list of group member ids by querying ZooKeeper synchronously. - * - * @return the ids of all the present members of this group - * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper - * @throws KeeperException if there was a problem reading this group's member ids - * @throws InterruptedException if this thread is interrupted listing the group members - */ - public Iterable<String> getMemberIds() - throws ZooKeeperConnectionException, KeeperException, InterruptedException { - return Iterables.filter(zkClient.get().getChildren(path, false), nodeNameFilter); - } - - /** - * Gets the data for one of this groups members by querying ZooKeeper synchronously. - * - * @param memberId the id of the member whose data to retrieve - * @return the data associated with the {@code memberId} - * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper - * @throws KeeperException if there was a problem reading this member's data - * @throws InterruptedException if this thread is interrupted retrieving the member data - */ - public byte[] getMemberData(String memberId) - throws ZooKeeperConnectionException, KeeperException, InterruptedException { - return zkClient.get().getData(getMemberPath(memberId), false, null); - } - - /** - * Represents membership in a distributed group. - */ - public interface Membership { - - /** - * Returns the persistent ZooKeeper path that represents this group. - */ - String getGroupPath(); - - /** - * Returns the id (ZooKeeper node name) of this group member. May change over time if the - * ZooKeeper session expires. - */ - String getMemberId(); - - /** - * Returns the full ZooKeeper path to this group member. May change over time if the - * ZooKeeper session expires. - */ - String getMemberPath(); - - /** - * Updates the membership data synchronously using the {@code Supplier<byte[]>} passed to - * {@link Group#join()}. - * - * @return the new membership data - * @throws UpdateException if there was a problem updating the membership data - */ - byte[] updateMemberData() throws UpdateException; - - /** - * Cancels group membership by deleting the associated ZooKeeper member node. - * - * @throws JoinException if there is a problem deleting the node - */ - void cancel() throws JoinException; - } - - /** - * Indicates an error joining a group. - */ - public static class JoinException extends Exception { - public JoinException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Indicates an error updating a group member's data. - */ - public static class UpdateException extends Exception { - public UpdateException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Equivalent to calling {@code join(null, null)}. - */ - public final Membership join() throws JoinException, InterruptedException { - return join(NO_MEMBER_DATA, null); - } - - /** - * Equivalent to calling {@code join(memberData, null)}. - */ - public final Membership join(Supplier<byte[]> memberData) - throws JoinException, InterruptedException { - - return join(memberData, null); - } - - /** - * Equivalent to calling {@code join(null, onLoseMembership)}. - */ - public final Membership join(@Nullable final Command onLoseMembership) - throws JoinException, InterruptedException { - - return join(NO_MEMBER_DATA, onLoseMembership); - } - - /** - * Joins this group and returns the resulting Membership when successful. Membership will be - * automatically cancelled when the current jvm process dies; however the returned Membership - * object can be used to cancel membership earlier. Unless - * {@link Group.Membership#cancel()} is called the membership will - * be maintained by re-establishing it silently in the background. - * - * <p>Any {@code memberData} given is persisted in the member node in ZooKeeper. If an - * {@code onLoseMembership} callback is supplied, it will be notified each time this member loses - * membership in the group. - * - * @param memberData a supplier of the data to store in the member node - * @param onLoseMembership a callback to notify when membership is lost - * @return a Membership object with the member details - * @throws JoinException if there was a problem joining the group - * @throws InterruptedException if this thread is interrupted awaiting completion of the join - */ - public final Membership join(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) - throws JoinException, InterruptedException { - - Preconditions.checkNotNull(memberData); - ensurePersistentGroupPath(); - - final ActiveMembership groupJoiner = new ActiveMembership(memberData, onLoseMembership); - return backoffHelper.doUntilResult(() -> { - try { - return groupJoiner.join(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new JoinException("Interrupted trying to join group at path: " + path, e); - } catch (ZooKeeperConnectionException e) { - LOG.warn("Temporary error trying to join group at path: " + path, e); - return null; - } catch (KeeperException e) { - if (zkClient.shouldRetry(e)) { - LOG.warn("Temporary error trying to join group at path: " + path, e); - return null; - } else { - throw new JoinException("Problem joining partition group at path: " + path, e); - } - } - }); - } - - private void ensurePersistentGroupPath() throws JoinException, InterruptedException { - backoffHelper.doUntilSuccess(() -> { - try { - ZooKeeperUtils.ensurePath(zkClient, acl, path); - return true; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new JoinException("Interrupted trying to ensure group at path: " + path, e); - } catch (ZooKeeperConnectionException e) { - LOG.warn("Problem connecting to ZooKeeper, retrying", e); - return false; - } catch (KeeperException e) { - if (zkClient.shouldRetry(e)) { - LOG.warn("Temporary error ensuring path: " + path, e); - return false; - } else { - throw new JoinException("Problem ensuring group at path: " + path, e); - } - } - }); - } - - private class ActiveMembership implements Membership { - private final Supplier<byte[]> memberData; - private final Command onLoseMembership; - private String nodePath; - private String memberId; - private volatile boolean cancelled; - private byte[] membershipData; - - public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) { - this.memberData = memberData; - this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : onLoseMembership; - } - - @Override - public String getGroupPath() { - return path; - } - - @Override - public synchronized String getMemberId() { - return memberId; - } - - @Override - public synchronized String getMemberPath() { - return nodePath; - } - - @Override - public synchronized byte[] updateMemberData() throws UpdateException { - byte[] membershipData = memberData.get(); - if (!ArrayUtils.isEquals(this.membershipData, membershipData)) { - try { - zkClient.get().setData(nodePath, membershipData, ZooKeeperUtils.ANY_VERSION); - this.membershipData = membershipData; - } catch (KeeperException e) { - throw new UpdateException("Problem updating membership data.", e); - } catch (InterruptedException e) { - throw new UpdateException("Interrupted attempting to update membership data.", e); - } catch (ZooKeeperConnectionException e) { - throw new UpdateException( - "Could not connect to the ZooKeeper cluster to update membership data.", e); - } - } - return membershipData; - } - - @Override - public synchronized void cancel() throws JoinException { - if (!cancelled) { - try { - backoffHelper.doUntilSuccess(() -> { - try { - zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION); - return true; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new JoinException("Interrupted trying to cancel membership: " + nodePath, e); - } catch (ZooKeeperConnectionException e) { - LOG.warn("Problem connecting to ZooKeeper, retrying", e); - return false; - } catch (NoNodeException e) { - LOG.info("Membership already cancelled, node at path: " + nodePath + - " has been deleted"); - return true; - } catch (KeeperException e) { - if (zkClient.shouldRetry(e)) { - LOG.warn("Temporary error cancelling membership: " + nodePath, e); - return false; - } else { - throw new JoinException("Problem cancelling membership: " + nodePath, e); - } - } - }); - cancelled = true; // Prevent auto-re-join logic from undoing this cancel. - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new JoinException("Problem cancelling membership: " + nodePath, e); - } - } - } - - private class CancelledException extends IllegalStateException { /* marker */ } - - synchronized Membership join() - throws ZooKeeperConnectionException, InterruptedException, KeeperException { - - if (cancelled) { - throw new CancelledException(); - } - - if (nodePath == null) { - // Re-join if our ephemeral node goes away due to session expiry - only needs to be - // registered once. - zkClient.registerExpirationHandler(this::tryJoin); - } - - byte[] membershipData = memberData.get(); - String nodeName = nodeScheme.createName(membershipData); - CreateMode createMode = nodeScheme.isSequential() - ? CreateMode.EPHEMERAL_SEQUENTIAL - : CreateMode.EPHEMERAL; - nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, acl, createMode); - memberId = Group.this.getMemberId(nodePath); - LOG.info("Set group member ID to " + memberId); - this.membershipData = membershipData; - - // Re-join if our ephemeral node goes away due to maliciousness. - zkClient.get().exists(nodePath, event -> { - if (event.getType() == EventType.NodeDeleted) { - tryJoin(); - } - }); - - return this; - } - - private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin = - () -> { - try { - join(); - return true; - } catch (CancelledException e) { - // Lost a cancel race - that's ok. - return true; - } catch (ZooKeeperConnectionException e) { - LOG.warn("Problem connecting to ZooKeeper, retrying", e); - return false; - } catch (KeeperException e) { - if (zkClient.shouldRetry(e)) { - LOG.warn("Temporary error re-joining group: " + path, e); - return false; - } else { - throw new IllegalStateException("Permanent problem re-joining group: " + path, e); - } - } - }; - - private synchronized void tryJoin() { - onLoseMembership.execute(); - try { - backoffHelper.doUntilSuccess(tryJoin); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException( - String.format("Interrupted while trying to re-join group: %s, giving up", path), e); - } - } - } - - /** - * An interface to an object that listens for changes to a group's membership. - */ - public interface GroupChangeListener { - - /** - * Called whenever group membership changes with the new list of member ids. - * - * @param memberIds the current member ids - */ - void onGroupChange(Iterable<String> memberIds); - } - - /** - * An interface that dictates the scheme to use for storing and filtering nodes that represent - * members of a distributed group. - */ - public interface NodeScheme { - /** - * Determines if a child node is a member of a group by examining the node's name. - * - * @param nodeName the name of a child node found in a group - * @return {@code true} if {@code nodeName} identifies a group member in this scheme - */ - boolean isMember(String nodeName); - - /** - * Generates a node name for the node representing this process in the distributed group. - * - * @param membershipData the data that will be stored in this node - * @return the name for the node that will represent this process in the group - */ - String createName(byte[] membershipData); - - /** - * Indicates whether this scheme needs ephemeral sequential nodes or just ephemeral nodes. - * - * @return {@code true} if this scheme requires sequential node names; {@code false} otherwise - */ - boolean isSequential(); - } - - /** - * Indicates an error watching a group. - */ - public static class WatchException extends Exception { - public WatchException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Watches this group for the lifetime of this jvm process. This method will block until the - * current group members are available, notify the {@code groupChangeListener} and then return. - * All further changes to the group membership will cause notifications on a background thread. - * - * @param groupChangeListener the listener to notify of group membership change events - * @return A command which, when executed, will stop watching the group. - * @throws WatchException if there is a problem generating the 1st group membership list - * @throws InterruptedException if interrupted waiting to gather the 1st group membership list - */ - public final Command watch(final GroupChangeListener groupChangeListener) - throws WatchException, InterruptedException { - Preconditions.checkNotNull(groupChangeListener); - - try { - ensurePersistentGroupPath(); - } catch (JoinException e) { - throw new WatchException("Failed to create group path: " + path, e); - } - - final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener); - backoffHelper.doUntilSuccess(() -> { - try { - groupMonitor.watchGroup(); - return true; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new WatchException("Interrupted trying to watch group at path: " + path, e); - } catch (ZooKeeperConnectionException e) { - LOG.warn("Temporary error trying to watch group at path: " + path, e); - return null; - } catch (KeeperException e) { - if (zkClient.shouldRetry(e)) { - LOG.warn("Temporary error trying to watch group at path: " + path, e); - return null; - } else { - throw new WatchException("Problem trying to watch group at path: " + path, e); - } - } - }); - return groupMonitor::stopWatching; - } - - /** - * Helps continuously monitor a group for membership changes. - */ - private class GroupMonitor { - private final GroupChangeListener groupChangeListener; - private volatile boolean stopped = false; - private Set<String> members; - - GroupMonitor(GroupChangeListener groupChangeListener) { - this.groupChangeListener = groupChangeListener; - } - - private final Watcher groupWatcher = event -> { - if (event.getType() == EventType.NodeChildrenChanged) { - tryWatchGroup(); - } - }; - - private final ExceptionalSupplier<Boolean, InterruptedException> tryWatchGroup = - () -> { - try { - watchGroup(); - return true; - } catch (ZooKeeperConnectionException e) { - LOG.warn("Problem connecting to ZooKeeper, retrying", e); - return false; - } catch (KeeperException e) { - if (zkClient.shouldRetry(e)) { - LOG.warn("Temporary error re-watching group: " + path, e); - return false; - } else { - throw new IllegalStateException("Permanent problem re-watching group: " + path, e); - } - } - }; - - private void tryWatchGroup() { - if (stopped) { - return; - } - - try { - backoffHelper.doUntilSuccess(tryWatchGroup); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException( - String.format("Interrupted while trying to re-watch group: %s, giving up", path), e); - } - } - - private void watchGroup() - throws ZooKeeperConnectionException, InterruptedException, KeeperException { - - if (stopped) { - return; - } - - List<String> children = zkClient.get().getChildren(path, groupWatcher); - setMembers(Iterables.filter(children, nodeNameFilter)); - } - - private void stopWatching() { - // TODO(William Farner): Cancel the watch when - // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved. - LOG.info("Stopping watch on " + this); - stopped = true; - } - - synchronized void setMembers(Iterable<String> members) { - if (stopped) { - LOG.info("Suppressing membership update, no longer watching " + this); - return; - } - - if (this.members == null) { - // Reset our watch on the group if session expires - only needs to be registered once. - zkClient.registerExpirationHandler(this::tryWatchGroup); - } - - Set<String> membership = ImmutableSet.copyOf(members); - if (!membership.equals(this.members)) { - groupChangeListener.onGroupChange(members); - this.members = membership; - } - } - } - - /** - * Default naming scheme implementation. Stores nodes at [given path] + "/" + [given prefix] + - * ZooKeeper-generated member ID. For example, if the path is "/discovery/servicename", and the - * prefix is "member_", the node's full path will look something like - * {@code /discovery/servicename/member_0000000007}. - */ - public static class DefaultScheme implements NodeScheme { - private final String namePrefix; - private final Pattern namePattern; - - /** - * Creates a sequential node scheme based on the given node name prefix. - * - * @param namePrefix the prefix for the names of the member nodes - */ - public DefaultScheme(String namePrefix) { - this.namePrefix = MorePreconditions.checkNotBlank(namePrefix); - namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + "-?[0-9]+$"); - } - - @Override - public boolean isMember(String nodeName) { - return namePattern.matcher(nodeName).matches(); - } - - @Override - public String createName(byte[] membershipData) { - return namePrefix; - } - - @Override - public boolean isSequential() { - return true; - } - } - - @Override - public String toString() { - return "Group " + path; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java index 9d31608..45e789b 100644 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java @@ -38,7 +38,10 @@ import org.apache.aurora.common.thrift.Status; import static java.util.Objects.requireNonNull; -class JsonCodec implements Codec<ServiceInstance> { +/** + * Encodes a {@link ServiceInstance} as a JSON object. + */ +public class JsonCodec implements Codec<ServiceInstance> { private static void assertRequiredField(String fieldName, Object fieldValue) { if (fieldValue == null) { @@ -100,11 +103,16 @@ class JsonCodec implements Codec<ServiceInstance> { } } + /** + * The encoding for service instance data in ZooKeeper expected by Aurora clients. + */ + public static final Codec<ServiceInstance> INSTANCE = new JsonCodec(); + private static final Charset ENCODING = Charsets.UTF_8; private final Gson gson; - JsonCodec() { + private JsonCodec() { this(new Gson()); } http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java deleted file mode 100644 index aeea02d..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.net.InetSocketAddress; -import java.util.Map; - -import org.apache.aurora.common.io.Codec; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.zookeeper.Group.JoinException; - -/** - * A logical set of servers registered in ZooKeeper. Intended to be used by servers in a - * common service to advertise their presence to server-set protocol-aware clients. - * - * Standard implementations should use the {@link #JSON_CODEC} to serialize the service instance - * rendezvous data to zookeeper so that standard clients can interoperate. - */ -public interface ServerSet { - - /** - * Encodes a {@link ServiceInstance} as a JSON object. - * - * This is the default encoding for service instance data in ZooKeeper. - */ - Codec<ServiceInstance> JSON_CODEC = new JsonCodec(); - - /** - * Attempts to join a server set for this logical service group. - * - * @param endpoint the primary service endpoint - * @param additionalEndpoints and additional endpoints keyed by their logical name - * @return an EndpointStatus object that allows the endpoint to adjust its status - * @throws JoinException if there was a problem joining the server set - * @throws InterruptedException if interrupted while waiting to join the server set - */ - EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints) - throws JoinException, InterruptedException; - - /** - * A handle to a service endpoint's status data that allows updating it to track current events. - */ - interface EndpointStatus { - - /** - * Removes the endpoint from the server set. - * - * @throws UpdateException if there was a problem leaving the ServerSet. - */ - void leave() throws UpdateException; - } - - /** - * Indicates an error updating a service's status information. - */ - class UpdateException extends Exception { - public UpdateException(String message, Throwable cause) { - super(message, cause); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java deleted file mode 100644 index ace4980..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java +++ /dev/null @@ -1,349 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.Set; - -import javax.annotation.Nullable; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Predicates; -import com.google.common.base.Supplier; -import com.google.common.base.Throwables; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ImmutableSortedSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.collect.Sets.SetView; -import com.google.common.util.concurrent.UncheckedExecutionException; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.io.Codec; -import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.thrift.Status; -import org.apache.aurora.common.util.BackoffHelper; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * ZooKeeper-backed implementation of {@link ServerSet} and {@link DynamicHostSet}. - */ -public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance> { - private static final Logger LOG = LoggerFactory.getLogger(ServerSetImpl.class); - - private final ZooKeeperClient zkClient; - private final Group group; - private final Codec<ServiceInstance> codec; - private final BackoffHelper backoffHelper; - - /** - * Creates a new ServerSet using open ZooKeeper node ACLs. - * - * @param zkClient the client to use for interactions with ZooKeeper - * @param path the name-service path of the service to connect to - */ - public ServerSetImpl(ZooKeeperClient zkClient, String path) { - this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path); - } - - /** - * Creates a new ServerSet for the given service {@code path}. - * - * @param zkClient the client to use for interactions with ZooKeeper - * @param acl the ACL to use for creating the persistent group path if it does not already exist - * @param path the name-service path of the service to connect to - */ - public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) { - this(zkClient, new Group(zkClient, acl, path), JSON_CODEC); - } - - /** - * Creates a new ServerSet using the given service {@code group}. - * - * @param zkClient the client to use for interactions with ZooKeeper - * @param group the server group - */ - public ServerSetImpl(ZooKeeperClient zkClient, Group group) { - this(zkClient, group, JSON_CODEC); - } - - /** - * Creates a new ServerSet using the given service {@code group} and a custom {@code codec}. - * - * @param zkClient the client to use for interactions with ZooKeeper - * @param group the server group - * @param codec a codec to use for serializing and de-serializing the ServiceInstance data to and - * from a byte array - */ - public ServerSetImpl(ZooKeeperClient zkClient, Group group, Codec<ServiceInstance> codec) { - this.zkClient = checkNotNull(zkClient); - this.group = checkNotNull(group); - this.codec = checkNotNull(codec); - - // TODO(John Sirois): Inject the helper so that backoff strategy can be configurable. - backoffHelper = new BackoffHelper(); - } - - @VisibleForTesting - ZooKeeperClient getZkClient() { - return zkClient; - } - - @Override - public EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints) - throws Group.JoinException, InterruptedException { - - checkNotNull(endpoint); - checkNotNull(additionalEndpoints); - - MemberStatus memberStatus = new MemberStatus(endpoint, additionalEndpoints); - Supplier<byte[]> serviceInstanceSupplier = memberStatus::serializeServiceInstance; - Group.Membership membership = group.join(serviceInstanceSupplier); - - return () -> memberStatus.leave(membership); - } - - @Override - public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { - ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, monitor); - try { - return serverSetWatcher.watch(); - } catch (Group.WatchException e) { - throw new MonitorException("ZooKeeper watch failed.", e); - } catch (InterruptedException e) { - throw new MonitorException("Interrupted while watching ZooKeeper.", e); - } - } - - private class MemberStatus { - private final InetSocketAddress endpoint; - private final Map<String, InetSocketAddress> additionalEndpoints; - - private MemberStatus( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints) { - - this.endpoint = endpoint; - this.additionalEndpoints = additionalEndpoints; - } - - synchronized void leave(Group.Membership membership) throws UpdateException { - try { - membership.cancel(); - } catch (Group.JoinException e) { - throw new UpdateException( - "Failed to auto-cancel group membership on transition to DEAD status", e); - } - } - - byte[] serializeServiceInstance() { - ServiceInstance serviceInstance = new ServiceInstance( - ServerSets.toEndpoint(endpoint), - Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT), - Status.ALIVE); - - LOG.debug("updating endpoint data to:\n\t" + serviceInstance); - try { - return ServerSets.serializeServiceInstance(serviceInstance, codec); - } catch (IOException e) { - throw new IllegalStateException("Unexpected problem serializing thrift struct " + - serviceInstance + "to a byte[]", e); - } - } - } - - private static class ServiceInstanceFetchException extends RuntimeException { - ServiceInstanceFetchException(String message, Throwable cause) { - super(message, cause); - } - } - - private static class ServiceInstanceDeletedException extends RuntimeException { - ServiceInstanceDeletedException(String path) { - super(path); - } - } - - private class ServerSetWatcher { - private final ZooKeeperClient zkClient; - private final HostChangeMonitor<ServiceInstance> monitor; - @Nullable private ImmutableSet<ServiceInstance> serverSet; - - ServerSetWatcher(ZooKeeperClient zkClient, HostChangeMonitor<ServiceInstance> monitor) { - this.zkClient = zkClient; - this.monitor = monitor; - } - - public Command watch() throws Group.WatchException, InterruptedException { - Watcher onExpirationWatcher = zkClient.registerExpirationHandler(this::rebuildServerSet); - - try { - return group.watch(this::notifyGroupChange); - } catch (Group.WatchException e) { - zkClient.unregister(onExpirationWatcher); - throw e; - } catch (InterruptedException e) { - zkClient.unregister(onExpirationWatcher); - throw e; - } - } - - private ServiceInstance getServiceInstance(final String nodePath) { - try { - return backoffHelper.doUntilResult(() -> { - try { - byte[] data = zkClient.get().getData(nodePath, false, null); - return ServerSets.deserializeServiceInstance(data, codec); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ServiceInstanceFetchException( - "Interrupted updating service data for: " + nodePath, e); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - LOG.warn("Temporary error trying to updating service data for: " + nodePath, e); - return null; - } catch (NoNodeException e) { - invalidateNodePath(nodePath); - throw new ServiceInstanceDeletedException(nodePath); - } catch (KeeperException e) { - if (zkClient.shouldRetry(e)) { - LOG.warn("Temporary error trying to update service data for: " + nodePath, e); - return null; - } else { - throw new ServiceInstanceFetchException( - "Failed to update service data for: " + nodePath, e); - } - } catch (IOException e) { - throw new ServiceInstanceFetchException( - "Failed to deserialize the ServiceInstance data for: " + nodePath, e); - } - }); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ServiceInstanceFetchException( - "Interrupted trying to update service data for: " + nodePath, e); - } - } - - private final LoadingCache<String, ServiceInstance> servicesByMemberId = - CacheBuilder.newBuilder().build(new CacheLoader<String, ServiceInstance>() { - @Override public ServiceInstance load(String memberId) { - return getServiceInstance(group.getMemberPath(memberId)); - } - }); - - private void rebuildServerSet() { - Set<String> memberIds = ImmutableSet.copyOf(servicesByMemberId.asMap().keySet()); - servicesByMemberId.invalidateAll(); - notifyGroupChange(memberIds); - } - - private String invalidateNodePath(String deletedPath) { - String memberId = group.getMemberId(deletedPath); - servicesByMemberId.invalidate(memberId); - return memberId; - } - - private final Function<String, ServiceInstance> MAYBE_FETCH_NODE = - memberId -> { - // This get will trigger a fetch - try { - return servicesByMemberId.getUnchecked(memberId); - } catch (UncheckedExecutionException e) { - Throwable cause = e.getCause(); - if (!(cause instanceof ServiceInstanceDeletedException)) { - Throwables.propagateIfInstanceOf(cause, ServiceInstanceFetchException.class); - throw new IllegalStateException( - "Unexpected error fetching member data for: " + memberId, e); - } - return null; - } - }; - - private synchronized void notifyGroupChange(Iterable<String> memberIds) { - ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds); - Set<String> existingMemberIds = servicesByMemberId.asMap().keySet(); - - // Ignore no-op state changes except for the 1st when we've seen no group yet. - if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) { - SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, newMemberIds); - // Implicit removal from servicesByMemberId. - existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds)); - - Iterable<ServiceInstance> serviceInstances = Iterables.filter( - Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), Predicates.notNull()); - - notifyServerSetChange(ImmutableSet.copyOf(serviceInstances)); - } - } - - private void notifyServerSetChange(ImmutableSet<ServiceInstance> currentServerSet) { - // ZK nodes may have changed if there was a session expiry for a server in the server set, but - // if the server's status has not changed, we can skip any onChange updates. - if (!currentServerSet.equals(serverSet)) { - if (currentServerSet.isEmpty()) { - LOG.warn("server set empty for path " + group.getPath()); - } else { - if (serverSet == null) { - LOG.info("received initial membership {}", currentServerSet); - } else { - logChange(currentServerSet); - } - } - serverSet = currentServerSet; - monitor.onChange(serverSet); - } - } - - private void logChange(ImmutableSet<ServiceInstance> newServerSet) { - StringBuilder message = new StringBuilder("server set " + group.getPath() + " change: "); - if (serverSet.size() != newServerSet.size()) { - message.append("from ").append(serverSet.size()) - .append(" members to ").append(newServerSet.size()); - } - - Joiner joiner = Joiner.on("\n\t\t"); - - SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet); - if (!left.isEmpty()) { - message.append("\n\tleft:\n\t\t").append(joiner.join(left)); - } - - SetView<ServiceInstance> joined = Sets.difference(newServerSet, serverSet); - if (!joined.isEmpty()) { - message.append("\n\tjoined:\n\t\t").append(joiner.join(joined)); - } - - LOG.info(message.toString()); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java deleted file mode 100644 index 01a54a5..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Map; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; - -import org.apache.aurora.common.base.MorePreconditions; -import org.apache.aurora.common.io.Codec; -import org.apache.aurora.common.thrift.Endpoint; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.thrift.Status; -import org.apache.zookeeper.data.ACL; - -/** - * Common ServerSet related functions - */ -public class ServerSets { - - private ServerSets() { - // Utility class. - } - - /** - * A function that invokes {@link #toEndpoint(InetSocketAddress)}. - */ - public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT = - ServerSets::toEndpoint; - - /** - * Creates a server set that registers at a single path applying the given ACL to all nodes - * created in the path. - * - * @param zkClient ZooKeeper client to register with. - * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates. - * @param zkPath Path to register at. @see #create(ZooKeeperClient, java.util.Set) - * @return A server set that registers at {@code zkPath}. - */ - public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, String zkPath) { - Preconditions.checkNotNull(zkClient); - MorePreconditions.checkNotBlank(acl); - MorePreconditions.checkNotBlank(zkPath); - - return new ServerSetImpl(zkClient, acl, zkPath); - } - - /** - * Returns a serialized Thrift service instance object, with given endpoints and codec. - * - * @param serviceInstance the Thrift service instance object to be serialized - * @param codec the codec to use to serialize a Thrift service instance object - * @return byte array that contains a serialized Thrift service instance - */ - public static byte[] serializeServiceInstance( - ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws IOException { - - ByteArrayOutputStream output = new ByteArrayOutputStream(); - codec.serialize(serviceInstance, output); - return output.toByteArray(); - } - - /** - * Serializes a service instance based on endpoints. - * @see #serializeServiceInstance(ServiceInstance, Codec) - * - * @param address the target address of the service instance - * @param additionalEndpoints additional endpoints of the service instance - * @param status service status - */ - public static byte[] serializeServiceInstance( - InetSocketAddress address, - Map<String, Endpoint> additionalEndpoints, - Status status, - Codec<ServiceInstance> codec) throws IOException { - - ServiceInstance serviceInstance = - new ServiceInstance(toEndpoint(address), additionalEndpoints, status); - return serializeServiceInstance(serviceInstance, codec); - } - - /** - * Creates a service instance object deserialized from byte array. - * - * @param data the byte array contains a serialized Thrift service instance - * @param codec the codec to use to deserialize the byte array - */ - public static ServiceInstance deserializeServiceInstance( - byte[] data, Codec<ServiceInstance> codec) throws IOException { - - return codec.deserialize(new ByteArrayInputStream(data)); - } - - /** - * Creates an endpoint for the given InetSocketAddress. - * - * @param address the target address to create the endpoint for - */ - public static Endpoint toEndpoint(InetSocketAddress address) { - return new Endpoint(address.getHostName(), address.getPort()); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java deleted file mode 100644 index d9978a9..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import org.apache.aurora.common.base.ExceptionalCommand; -import org.apache.aurora.common.zookeeper.Candidate.Leader; -import org.apache.aurora.common.zookeeper.Group.JoinException; -import org.apache.zookeeper.data.ACL; - -public class SingletonServiceImpl implements SingletonService { - @VisibleForTesting - static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_"; - - /** - * Creates a candidate that can be combined with an existing server set to form a singleton - * service using {@link #SingletonServiceImpl(ServerSet, Candidate)}. - * - * @param zkClient The ZooKeeper client to use. - * @param servicePath The path where service nodes live. - * @param acl The acl to apply to newly created candidate nodes and serverset nodes. - * @return A candidate that can be housed with a standard server set under a single zk path. - */ - public static Candidate createSingletonCandidate( - ZooKeeperClient zkClient, - String servicePath, - Iterable<ACL> acl) { - - return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX)); - } - - private final ServerSet serverSet; - private final Candidate candidate; - - /** - * Creates a new singleton service that uses the supplied candidate to vie for leadership and then - * advertises itself in the given server set once elected. - * - * @param serverSet The server set to advertise in on election. - * @param candidate The candidacy to use to vie for election. - */ - public SingletonServiceImpl(ServerSet serverSet, Candidate candidate) { - this.serverSet = Preconditions.checkNotNull(serverSet); - this.candidate = Preconditions.checkNotNull(candidate); - } - - @Override - public void lead(final InetSocketAddress endpoint, - final Map<String, InetSocketAddress> additionalEndpoints, - final LeadershipListener listener) - throws LeadException, InterruptedException { - - Preconditions.checkNotNull(listener); - - try { - candidate.offerLeadership(new Leader() { - @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) { - listener.onLeading(new LeaderControl() { - ServerSet.EndpointStatus endpointStatus = null; - final AtomicBoolean left = new AtomicBoolean(false); - - // Methods are synchronized to prevent simultaneous invocations. - @Override public synchronized void advertise() - throws AdvertiseException, InterruptedException { - - Preconditions.checkState(!left.get(), "Cannot advertise after leaving."); - Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once."); - try { - endpointStatus = serverSet.join(endpoint, additionalEndpoints); - } catch (JoinException e) { - throw new AdvertiseException("Problem advertising endpoint " + endpoint, e); - } - } - - @Override public synchronized void leave() throws LeaveException { - Preconditions.checkState(left.compareAndSet(false, true), - "Cannot leave more than once."); - if (endpointStatus != null) { - try { - endpointStatus.leave(); - } catch (ServerSet.UpdateException e) { - throw new LeaveException("Problem updating endpoint status for abdicating leader " + - "at endpoint " + endpoint, e); - } - } - try { - abdicate.execute(); - } catch (JoinException e) { - throw new LeaveException("Problem abdicating leadership for endpoint " + endpoint, e); - } - } - }); - } - - @Override public void onDefeated() { - listener.onDefeated(); - } - }); - } catch (JoinException e) { - throw new LeadException("Problem joining leadership group for endpoint " + endpoint, e); - } catch (Group.WatchException e) { - throw new LeadException("Problem getting initial membership list for leadership group.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java deleted file mode 100644 index 9c0cebe..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingDeque; - -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; - -import org.apache.aurora.common.base.ExceptionalCommand; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; - -public class CandidateImplTest extends BaseZooKeeperClientTest { - private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE; - private static final String SERVICE = "/twitter/services/puffin_linkhose/leader"; - private static final Amount<Integer, Time> TIMEOUT = Amount.of(1, Time.MINUTES); - - private LinkedBlockingDeque<CandidateImpl> candidateBuffer; - - @Before - public void mySetUp() throws IOException { - candidateBuffer = new LinkedBlockingDeque<>(); - } - - private Group createGroup(ZooKeeperClient zkClient) throws IOException { - return new Group(zkClient, ACL, SERVICE); - } - - private class Reign implements Candidate.Leader { - private ExceptionalCommand<Group.JoinException> abdicate; - private final CandidateImpl candidate; - private final String id; - private CountDownLatch defeated = new CountDownLatch(1); - - Reign(String id, CandidateImpl candidate) { - this.id = id; - this.candidate = candidate; - } - - @Override - public void onElected(ExceptionalCommand<Group.JoinException> abdicate) { - candidateBuffer.offerFirst(candidate); - this.abdicate = abdicate; - } - - @Override - public void onDefeated() { - defeated.countDown(); - } - - public void abdicate() throws Group.JoinException { - Preconditions.checkState(abdicate != null); - abdicate.execute(); - } - - public void expectDefeated() throws InterruptedException { - defeated.await(); - } - - @Override - public String toString() { - return id; - } - } - - @Test - public void testOfferLeadership() throws Exception { - ZooKeeperClient zkClient1 = createZkClient(TIMEOUT); - final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1)) { - @Override public String toString() { - return "Leader1"; - } - }; - ZooKeeperClient zkClient2 = createZkClient(TIMEOUT); - final CandidateImpl candidate2 = new CandidateImpl(createGroup(zkClient2)) { - @Override public String toString() { - return "Leader2"; - } - }; - ZooKeeperClient zkClient3 = createZkClient(TIMEOUT); - final CandidateImpl candidate3 = new CandidateImpl(createGroup(zkClient3)) { - @Override public String toString() { - return "Leader3"; - } - }; - - Reign candidate1Reign = new Reign("1", candidate1); - Reign candidate2Reign = new Reign("2", candidate2); - Reign candidate3Reign = new Reign("3", candidate3); - - Supplier<Boolean> candidate1Leader = candidate1.offerLeadership(candidate1Reign); - Supplier<Boolean> candidate2Leader = candidate2.offerLeadership(candidate2Reign); - Supplier<Boolean> candidate3Leader = candidate3.offerLeadership(candidate3Reign); - - assertTrue("Since initial group join is synchronous, candidate 1 should be the first leader", - candidate1Leader.get()); - - shutdownNetwork(); - restartNetwork(); - - assertTrue("A re-connect without a session expiration should leave the leader elected", - candidate1Leader.get()); - - candidate1Reign.abdicate(); - assertSame(candidate1, candidateBuffer.takeLast()); - assertFalse(candidate1Leader.get()); - // Active abdication should trigger defeat. - candidate1Reign.expectDefeated(); - - CandidateImpl secondCandidate = candidateBuffer.takeLast(); - assertTrue("exactly 1 remaining candidate should now be leader: " + secondCandidate + " " - + candidateBuffer, - candidate2Leader.get() ^ candidate3Leader.get()); - - if (secondCandidate == candidate2) { - expireSession(zkClient2); - assertSame(candidate3, candidateBuffer.takeLast()); - assertTrue(candidate3Leader.get()); - // Passive expiration should trigger defeat. - candidate2Reign.expectDefeated(); - } else { - expireSession(zkClient3); - assertSame(candidate2, candidateBuffer.takeLast()); - assertTrue(candidate2Leader.get()); - // Passive expiration should trigger defeat. - candidate3Reign.expectDefeated(); - } - } - - @Test - public void testEmptyMembership() throws Exception { - ZooKeeperClient zkClient1 = createZkClient(TIMEOUT); - final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1)); - Reign candidate1Reign = new Reign("1", candidate1); - - candidate1.offerLeadership(candidate1Reign); - assertSame(candidate1, candidateBuffer.takeLast()); - candidate1Reign.abdicate(); - assertFalse(candidate1.getLeaderData().isPresent()); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java deleted file mode 100644 index 97a42d1..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java +++ /dev/null @@ -1,321 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; - -import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.apache.aurora.common.zookeeper.Group.GroupChangeListener; -import org.apache.aurora.common.zookeeper.Group.JoinException; -import org.apache.aurora.common.zookeeper.Group.Membership; -import org.apache.aurora.common.zookeeper.Group.NodeScheme; -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; -import org.apache.zookeeper.ZooDefs.Ids; -import org.junit.Before; -import org.junit.Test; - -import static org.easymock.EasyMock.createMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.reset; -import static org.easymock.EasyMock.verify; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.fail; - -public class GroupTest extends BaseZooKeeperClientTest { - - private ZooKeeperClient zkClient; - private Group joinGroup; - private Group watchGroup; - private Command stopWatching; - private Command onLoseMembership; - - private RecordingListener listener; - - public GroupTest() { - super(Amount.of(1, Time.DAYS)); - } - - @Before - public void mySetUp() throws Exception { - onLoseMembership = createMock(Command.class); - - zkClient = createZkClient("group", "test"); - joinGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group"); - watchGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group"); - - listener = new RecordingListener(); - stopWatching = watchGroup.watch(listener); - } - - private static class RecordingListener implements GroupChangeListener { - private final LinkedBlockingQueue<Iterable<String>> membershipChanges = - new LinkedBlockingQueue<Iterable<String>>(); - - @Override - public void onGroupChange(Iterable<String> memberIds) { - membershipChanges.add(memberIds); - } - - public Iterable<String> take() throws InterruptedException { - return membershipChanges.take(); - } - - public void assertEmpty() { - assertEquals(ImmutableList.<Iterable<String>>of(), ImmutableList.copyOf(membershipChanges)); - } - - @Override - public String toString() { - return membershipChanges.toString(); - } - } - - private static class CustomScheme implements NodeScheme { - static final String NODE_NAME = "custom_name"; - - @Override - public boolean isMember(String nodeName) { - return NODE_NAME.equals(nodeName); - } - - @Override - public String createName(byte[] membershipData) { - return NODE_NAME; - } - - @Override - public boolean isSequential() { - return false; - } - } - - @Test - public void testSessionExpirationTriggersOnLoseMembership() throws Exception { - final CountDownLatch lostMembership = new CountDownLatch(1); - Command onLoseMembership = lostMembership::countDown; - assertEmptyMembershipObserved(); - - Membership membership = joinGroup.join(onLoseMembership); - assertMembershipObserved(membership.getMemberId()); - expireSession(zkClient); - - lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated. - } - - @Test - public void testNodeDeleteTriggersOnLoseMembership() throws Exception { - final CountDownLatch lostMembership = new CountDownLatch(1); - Command onLoseMembership = lostMembership::countDown; - assertEmptyMembershipObserved(); - - Membership membership = joinGroup.join(onLoseMembership); - assertMembershipObserved(membership.getMemberId()); - membership.cancel(); - - lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated. - } - - @Test - public void testJoinsAndWatchesSurviveDisconnect() throws Exception { - replay(onLoseMembership); - - assertEmptyMembershipObserved(); - - Membership membership = joinGroup.join(); - String originalMemberId = membership.getMemberId(); - assertMembershipObserved(originalMemberId); - - shutdownNetwork(); - restartNetwork(); - - // The member should still be present under existing ephemeral node since session did not - // expire. - watchGroup.watch(listener); - assertMembershipObserved(originalMemberId); - - membership.cancel(); - - assertEmptyMembershipObserved(); - assertEmptyMembershipObserved(); // and again for 2nd listener - - listener.assertEmpty(); - - verify(onLoseMembership); - reset(onLoseMembership); // Turn off expectations during ZK server shutdown. - } - - @Test - public void testJoinsAndWatchesSurviveExpiredSession() throws Exception { - onLoseMembership.execute(); - replay(onLoseMembership); - - assertEmptyMembershipObserved(); - - Membership membership = joinGroup.join(onLoseMembership); - String originalMemberId = membership.getMemberId(); - assertMembershipObserved(originalMemberId); - - expireSession(zkClient); - - // We should have lost our group membership and then re-gained it with a new ephemeral node. - // We may or may-not see the intermediate state change but we must see the final state - Iterable<String> members = listener.take(); - if (Iterables.isEmpty(members)) { - members = listener.take(); - } - assertEquals(1, Iterables.size(members)); - assertNotEquals(originalMemberId, Iterables.getOnlyElement(members)); - assertNotEquals(originalMemberId, membership.getMemberId()); - - listener.assertEmpty(); - - verify(onLoseMembership); - reset(onLoseMembership); // Turn off expectations during ZK server shutdown. - } - - @Test - public void testJoinCustomNamingScheme() throws Exception { - Group group = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group", - new CustomScheme()); - - listener = new RecordingListener(); - group.watch(listener); - assertEmptyMembershipObserved(); - - Membership membership = group.join(); - String memberId = membership.getMemberId(); - - assertEquals("Wrong member ID.", CustomScheme.NODE_NAME, memberId); - assertMembershipObserved(memberId); - - expireSession(zkClient); - } - - @Test - public void testUpdateMembershipData() throws Exception { - Supplier<byte[]> dataSupplier = new EasyMockTest.Clazz<Supplier<byte[]>>() {}.createMock(); - - byte[] initial = "start".getBytes(); - expect(dataSupplier.get()).andReturn(initial); - - byte[] second = "update".getBytes(); - expect(dataSupplier.get()).andReturn(second); - - replay(dataSupplier); - - Membership membership = joinGroup.join(dataSupplier, onLoseMembership); - assertArrayEquals("Initial setting is incorrect.", initial, zkClient.get() - .getData(membership.getMemberPath(), false, null)); - - assertArrayEquals("Updating supplier should not change membership data", - initial, zkClient.get().getData(membership.getMemberPath(), false, null)); - - membership.updateMemberData(); - assertArrayEquals("Updating membership should change data", - second, zkClient.get().getData(membership.getMemberPath(), false, null)); - - verify(dataSupplier); - } - - @Test - public void testAcls() throws Exception { - Group securedMembership = - new Group(createZkClient("secured", "group"), ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, - "/secured/group/membership"); - - String memberId = securedMembership.join().getMemberId(); - - Group unauthenticatedObserver = - new Group(createZkClient(), - Ids.READ_ACL_UNSAFE, - "/secured/group/membership"); - RecordingListener unauthenticatedListener = new RecordingListener(); - unauthenticatedObserver.watch(unauthenticatedListener); - - assertMembershipObserved(unauthenticatedListener, memberId); - - try { - unauthenticatedObserver.join(); - fail("Expected join exception for unauthenticated observer"); - } catch (JoinException e) { - // expected - } - - Group unauthorizedObserver = - new Group(createZkClient("joe", "schmoe"), - Ids.READ_ACL_UNSAFE, - "/secured/group/membership"); - RecordingListener unauthorizedListener = new RecordingListener(); - unauthorizedObserver.watch(unauthorizedListener); - - assertMembershipObserved(unauthorizedListener, memberId); - - try { - unauthorizedObserver.join(); - fail("Expected join exception for unauthorized observer"); - } catch (JoinException e) { - // expected - } - } - - @Test - public void testStopWatching() throws Exception { - replay(onLoseMembership); - - assertEmptyMembershipObserved(); - - Membership member1 = joinGroup.join(); - String memberId1 = member1.getMemberId(); - assertMembershipObserved(memberId1); - - Membership member2 = joinGroup.join(); - String memberId2 = member2.getMemberId(); - assertMembershipObserved(memberId1, memberId2); - - stopWatching.execute(); - - member1.cancel(); - Membership member3 = joinGroup.join(); - member2.cancel(); - member3.cancel(); - - listener.assertEmpty(); - } - - private void assertEmptyMembershipObserved() throws InterruptedException { - assertMembershipObserved(); - } - - private void assertMembershipObserved(String... expectedMemberIds) throws InterruptedException { - assertMembershipObserved(listener, expectedMemberIds); - } - - private void assertMembershipObserved(RecordingListener listener, String... expectedMemberIds) - throws InterruptedException { - - assertEquals(ImmutableSet.copyOf(expectedMemberIds), ImmutableSet.copyOf(listener.take())); - } -}
