Revert removal of twitter/commons/zk based leadership code See discussion here: https://issues.apache.org/jira/browse/AURORA-1840
Reviewed at https://reviews.apache.org/r/54250/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/16e4651d Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/16e4651d Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/16e4651d Branch: refs/heads/master Commit: 16e4651d5ff038dad0e9977edea7c57aeb37fe12 Parents: 8bcad84 Author: David McLaughlin <da...@dmclaughlin.com> Authored: Thu Dec 1 09:01:33 2016 -0800 Committer: David McLaughlin <dmclaugh...@twitter.com> Committed: Thu Dec 1 09:01:33 2016 -0800 ---------------------------------------------------------------------- RELEASE-NOTES.md | 1 + build.gradle | 16 +- .../aurora/common/zookeeper/Candidate.java | 78 +++ .../aurora/common/zookeeper/CandidateImpl.java | 127 ++++ .../aurora/common/zookeeper/Credentials.java | 90 +++ .../apache/aurora/common/zookeeper/Group.java | 674 +++++++++++++++++++ .../aurora/common/zookeeper/JsonCodec.java | 139 ++++ .../aurora/common/zookeeper/ServerSet.java | 74 ++ .../aurora/common/zookeeper/ServerSetImpl.java | 349 ++++++++++ .../aurora/common/zookeeper/ServerSets.java | 118 ++++ .../common/zookeeper/SingletonService.java | 114 ++++ .../common/zookeeper/SingletonServiceImpl.java | 122 ++++ .../common/zookeeper/ZooKeeperClient.java | 372 ++++++++++ .../aurora/common/zookeeper/ZooKeeperUtils.java | 167 +++++ .../testing/BaseZooKeeperClientTest.java | 140 ++++ .../zookeeper/testing/BaseZooKeeperTest.java | 46 ++ .../zookeeper/testing/ZooKeeperTestServer.java | 121 ++++ .../common/zookeeper/CandidateImplTest.java | 165 +++++ .../aurora/common/zookeeper/GroupTest.java | 321 +++++++++ .../aurora/common/zookeeper/JsonCodecTest.java | 151 +++++ .../common/zookeeper/ServerSetImplTest.java | 258 +++++++ .../aurora/common/zookeeper/ServerSetsTest.java | 44 ++ .../zookeeper/SingletonServiceImplTest.java | 243 +++++++ .../common/zookeeper/ZooKeeperClientTest.java | 210 ++++++ .../common/zookeeper/ZooKeeperUtilsTest.java | 139 ++++ config/findbugs/excludeFilter.xml | 8 - docs/features/service-discovery.md | 2 +- docs/reference/scheduler-configuration.md | 2 + .../aurora/scheduler/SchedulerLifecycle.java | 6 +- .../aurora/scheduler/app/SchedulerMain.java | 4 +- .../scheduler/app/ServiceGroupMonitor.java | 46 ++ .../CommonsServiceDiscoveryModule.java | 102 +++ .../discovery/CommonsServiceGroupMonitor.java | 59 ++ .../aurora/scheduler/discovery/Credentials.java | 98 --- .../CuratorServiceDiscoveryModule.java | 8 +- .../discovery/CuratorServiceGroupMonitor.java | 1 + .../discovery/CuratorSingletonService.java | 1 + .../discovery/FlaggedZooKeeperConfig.java | 21 +- .../aurora/scheduler/discovery/JsonCodec.java | 147 ---- .../discovery/ServiceDiscoveryModule.java | 20 +- .../discovery/ServiceGroupMonitor.java | 46 -- .../scheduler/discovery/SingletonService.java | 114 ---- .../scheduler/discovery/ZooKeeperConfig.java | 21 +- .../scheduler/discovery/ZooKeeperUtils.java | 51 -- .../discovery/testing/BaseZooKeeperTest.java | 53 -- .../discovery/testing/ZooKeeperTestServer.java | 101 --- .../scheduler/http/JettyServerModule.java | 2 +- .../aurora/scheduler/http/LeaderRedirect.java | 4 +- .../log/mesos/MesosLogStreamModule.java | 4 +- .../scheduler/SchedulerLifecycleTest.java | 4 +- .../aurora/scheduler/app/SchedulerIT.java | 52 +- .../discovery/AbstractDiscoveryModuleTest.java | 77 +++ .../discovery/BaseCuratorDiscoveryTest.java | 10 +- .../discovery/CommonsDiscoveryModuleTest.java | 29 + .../CommonsServiceGroupMonitorTest.java | 137 ++++ .../discovery/CuratorDiscoveryModuleTest.java | 64 +- .../discovery/CuratorSingletonServiceTest.java | 3 +- .../scheduler/discovery/JsonCodecTest.java | 159 ----- .../discovery/ZooKeeperConfigTest.java | 17 +- .../scheduler/http/AbstractJettyTest.java | 15 +- .../scheduler/http/LeaderRedirectTest.java | 4 +- .../aurora/scheduler/thrift/ThriftIT.java | 2 +- 62 files changed, 4871 insertions(+), 902 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/RELEASE-NOTES.md ---------------------------------------------------------------------- diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 96926f4..7a3d331 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -26,6 +26,7 @@ - The scheduler flag `-zk_use_curator` has been removed. If you have never set the flag and are upgrading you should take care as described in the [note](#zk_use_curator_upgrade) below. +======= 0.16.0 ====== http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index f257440..2f23b85 100644 --- a/build.gradle +++ b/build.gradle @@ -164,7 +164,6 @@ project(':commons') { dependencies { compile project(':commons-args') - compile "ch.qos.logback:logback-classic:${logbackRev}" compile "com.google.code.findbugs:jsr305:${jsrRev}" compile "com.google.code.gson:gson:${gsonRev}" compile "com.google.guava:guava:${guavaRev}" @@ -175,13 +174,17 @@ project(':commons') { compile "javax.servlet:javax.servlet-api:${servletRev}" compile "joda-time:joda-time:2.9.1" compile "org.antlr:stringtemplate:${stringTemplateRev}" + compile "org.apache.zookeeper:zookeeper:${zookeeperRev}" compile "org.easymock:easymock:3.4" // There are a few testing support libs in the src/main/java trees that use junit - currently: + // src/main/java/org/apache/aurora/common/zookeeper/testing // src/main/java/org/apache/aurora/common/testing compile "junit:junit:${junitRev}" testCompile "junit:junit:${junitRev}" + testCompile "org.powermock:powermock-module-junit4:1.6.4" + testCompile "org.powermock:powermock-api-easymock:1.6.4" } } @@ -347,11 +350,9 @@ dependencies { compile project(':commons') compile project(':commons-args') - compile 'aopalliance:aopalliance:1.0' - compile "ch.qos.logback:logback-classic:${logbackRev}" + compile 'ch.qos.logback:logback-classic:1.1.3' compile "com.google.code.findbugs:jsr305:${jsrRev}" - compile "com.google.code.gson:gson:${gsonRev}" compile "com.google.inject:guice:${guiceRev}" compile "com.google.inject.extensions:guice-assistedinject:${guiceRev}" compile "com.google.protobuf:protobuf-java:${protobufRev}" @@ -385,15 +386,8 @@ dependencies { compile 'org.quartz-scheduler:quartz:2.2.2' compile "uno.perk:forward:1.0.0" - // There are a few testing support libs in the src/main/java trees that use junit - currently: - // src/main/java/org/apache/aurora/common/zookeeper/testing - compile "junit:junit:${junitRev}" - testCompile "com.sun.jersey:jersey-client:${jerseyRev}" testCompile "junit:junit:${junitRev}" - testCompile "org.powermock:powermock-module-junit4:1.6.4" - testCompile "org.powermock:powermock-api-easymock:1.6.4" - } // For normal developer builds, avoid running the often-time-consuming code quality checks. http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java new file mode 100644 index 0000000..75c1b14 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java @@ -0,0 +1,78 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import com.google.common.base.Optional; +import com.google.common.base.Supplier; + +import org.apache.aurora.common.base.ExceptionalCommand; +import org.apache.aurora.common.zookeeper.Group.JoinException; +import org.apache.aurora.common.zookeeper.Group.WatchException; +import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; +import org.apache.zookeeper.KeeperException; + +/** + * Interface definition for becoming or querying for a ZooKeeper-based group leader. + */ +public interface Candidate { + + /** + * Returns the current group leader by querying ZooKeeper synchronously. + * + * @return the current group leader's identifying data or {@link Optional#absent()} if there is + * no leader + * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper + * @throws KeeperException if there was a problem reading the leader information + * @throws InterruptedException if this thread is interrupted getting the leader + */ + public Optional<byte[]> getLeaderData() + throws ZooKeeperConnectionException, KeeperException, InterruptedException; + + /** + * Encapsulates a leader that can be elected and subsequently defeated. + */ + interface Leader { + + /** + * Called when this leader has been elected. + * + * @param abdicate a command that can be used to abdicate leadership and force a new election + */ + void onElected(ExceptionalCommand<JoinException> abdicate); + + /** + * Called when the leader has been ousted. Can occur either if the leader abdicates or if an + * external event causes the leader to lose its leadership role (session expiration). + */ + void onDefeated(); + } + + /** + * Offers this candidate in leadership elections for as long as the current jvm process is alive. + * Upon election, the {@code onElected} callback will be executed and a command that can be used + * to abdicate leadership will be passed in. If the elected leader jvm process dies or the + * elected leader successfully abdicates then a new leader will be elected. Leaders that + * successfully abdicate are removed from the group and will not be eligible for leadership + * election unless {@link #offerLeadership(Leader)} is called again. + * + * @param leader the leader to notify of election and defeat events + * @throws JoinException if there was a problem joining the group + * @throws WatchException if there is a problem generating the 1st group membership list + * @throws InterruptedException if interrupted waiting to join the group and determine initial + * election results + * @return a supplier that can be queried to find out if this leader is currently elected + */ + public Supplier<Boolean> offerLeadership(Leader leader) + throws JoinException, WatchException, InterruptedException; +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java new file mode 100644 index 0000000..98b5ee4 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java @@ -0,0 +1,127 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.annotation.Nullable; + +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; + +import org.apache.aurora.common.zookeeper.Group.JoinException; +import org.apache.aurora.common.zookeeper.Group.Membership; +import org.apache.aurora.common.zookeeper.Group.WatchException; +import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements leader election for small groups of candidates. This implementation is subject to the + * <a href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection"> + * herd effect</a> for a given group and should only be used for small (~10 member) candidate pools. + */ +public class CandidateImpl implements Candidate { + private static final Logger LOG = LoggerFactory.getLogger(CandidateImpl.class); + + private static final byte[] UNKNOWN_CANDIDATE_DATA = "<unknown>".getBytes(Charsets.UTF_8); + + private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = () -> { + try { + return InetAddress.getLocalHost().getHostAddress().getBytes(); + } catch (UnknownHostException e) { + LOG.warn("Failed to determine local address!", e); + return UNKNOWN_CANDIDATE_DATA; + } + }; + + private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE = + candidates -> Ordering.natural().min(candidates); + + private final Group group; + + /** + * Creates a candidate that can be used to offer leadership for the given {@code group}. + */ + public CandidateImpl(Group group) { + this.group = Preconditions.checkNotNull(group); + } + + @Override + public Optional<byte[]> getLeaderData() + throws ZooKeeperConnectionException, KeeperException, InterruptedException { + + String leaderId = getLeader(group.getMemberIds()); + return leaderId == null + ? Optional.<byte[]>absent() + : Optional.of(group.getMemberData(leaderId)); + } + + @Override + public Supplier<Boolean> offerLeadership(final Leader leader) + throws JoinException, WatchException, InterruptedException { + + final Membership membership = group.join(IP_ADDRESS_DATA_SUPPLIER, leader::onDefeated); + + final AtomicBoolean elected = new AtomicBoolean(false); + final AtomicBoolean abdicated = new AtomicBoolean(false); + group.watch(memberIds -> { + boolean noCandidates = Iterables.isEmpty(memberIds); + String memberId = membership.getMemberId(); + + if (noCandidates) { + LOG.warn("All candidates have temporarily left the group: " + group); + } else if (!Iterables.contains(memberIds, memberId)) { + LOG.error( + "Current member ID {} is not a candidate for leader, current voting: {}", + memberId, memberIds); + } else { + boolean electedLeader = memberId.equals(getLeader(memberIds)); + boolean previouslyElected = elected.getAndSet(electedLeader); + + if (!previouslyElected && electedLeader) { + LOG.info("Candidate {} is now leader of group: {}", + membership.getMemberPath(), memberIds); + + leader.onElected(() -> { + membership.cancel(); + abdicated.set(true); + }); + } else if (!electedLeader) { + if (previouslyElected) { + leader.onDefeated(); + } + LOG.info( + "Candidate {} waiting for the next leader election, current voting: {}", + membership.getMemberPath(), memberIds); + } + } + }); + + return () -> !abdicated.get() && elected.get(); + } + + @Nullable + private String getLeader(Iterable<String> memberIds) { + return Iterables.isEmpty(memberIds) ? null : MOST_RECENT_JUDGE.apply(memberIds); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java new file mode 100644 index 0000000..18319a3 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java @@ -0,0 +1,90 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +import org.apache.aurora.common.base.MorePreconditions; +import org.apache.commons.lang.builder.EqualsBuilder; + +import static java.util.Objects.requireNonNull; + +/** + * Encapsulates a user's ZooKeeper credentials. + */ +public final class Credentials { + + /** + * Creates a set of credentials for the ZooKeeper digest authentication mechanism. + * + * @param username the username to authenticate with + * @param password the password to authenticate with + * @return a set of credentials that can be used to authenticate the zoo keeper client + */ + public static Credentials digestCredentials(String username, String password) { + MorePreconditions.checkNotBlank(username); + Preconditions.checkNotNull(password); + + // TODO(John Sirois): DigestAuthenticationProvider is broken - uses platform default charset + // (on server) and so we just have to hope here that clients are deployed in compatible jvms. + // Consider writing and installing a version of DigestAuthenticationProvider that controls its + // Charset explicitly. + return new Credentials("digest", (username + ":" + password).getBytes()); + } + + private final String scheme; + private final byte[] authToken; + + public Credentials(String scheme, byte[] authToken) { + this.scheme = MorePreconditions.checkNotBlank(scheme); + this.authToken = requireNonNull(authToken); + } + + /** + * Returns the authentication scheme these credentials are for. + * + * @return the scheme these credentials are for. + */ + public String scheme() { + return scheme; + } + + /** + * Returns the authentication token. + * + * @return the authentication token. + */ + public byte[] authToken() { + return authToken; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Credentials)) { + return false; + } + + Credentials other = (Credentials) o; + return new EqualsBuilder() + .append(scheme, other.scheme()) + .append(authToken, other.authToken()) + .isEquals(); + } + + @Override + public int hashCode() { + return Objects.hashCode(scheme, authToken); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java new file mode 100644 index 0000000..2720dd1 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java @@ -0,0 +1,674 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; + +import javax.annotation.Nullable; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.base.Commands; +import org.apache.aurora.common.base.ExceptionalSupplier; +import org.apache.aurora.common.base.MorePreconditions; +import org.apache.aurora.common.util.BackoffHelper; +import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class exposes methods for joining and monitoring distributed groups. The groups this class + * monitors are realized as persistent paths in ZooKeeper with ephemeral child nodes for + * each member of a group. + */ +public class Group { + private static final Logger LOG = LoggerFactory.getLogger(Group.class); + + private static final Supplier<byte[]> NO_MEMBER_DATA = Suppliers.ofInstance(null); + private static final String DEFAULT_NODE_NAME_PREFIX = "member_"; + + private final ZooKeeperClient zkClient; + private final ImmutableList<ACL> acl; + private final String path; + + private final NodeScheme nodeScheme; + private final Predicate<String> nodeNameFilter; + + private final BackoffHelper backoffHelper; + + /** + * Creates a group rooted at the given {@code path}. Paths must be absolute and trailing or + * duplicate slashes will be normalized. For example, all the following paths would create a + * group at the normalized path /my/distributed/group: + * <ul> + * <li>/my/distributed/group + * <li>/my/distributed/group/ + * <li>/my/distributed//group + * </ul> + * + * @param zkClient the client to use for interactions with ZooKeeper + * @param acl the ACL to use for creating the persistent group path if it does not already exist + * @param path the absolute persistent path that represents this group + * @param nodeScheme the scheme that defines how nodes are created + */ + public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, NodeScheme nodeScheme) { + this.zkClient = Preconditions.checkNotNull(zkClient); + this.acl = ImmutableList.copyOf(acl); + this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path)); + + this.nodeScheme = Preconditions.checkNotNull(nodeScheme); + nodeNameFilter = Group.this.nodeScheme::isMember; + + backoffHelper = new BackoffHelper(); + } + + /** + * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} with a + * {@code namePrefix} of 'member_'. + */ + public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) { + this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX); + } + + /** + * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, NodeScheme)} with a + * {@link DefaultScheme} using {@code namePrefix}. + */ + public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, String namePrefix) { + this(zkClient, acl, path, new DefaultScheme(namePrefix)); + } + + public String getMemberPath(String memberId) { + return path + "/" + MorePreconditions.checkNotBlank(memberId); + } + + public String getPath() { + return path; + } + + public String getMemberId(String nodePath) { + MorePreconditions.checkNotBlank(nodePath); + Preconditions.checkArgument(nodePath.startsWith(path + "/"), + "Not a member of this group[%s]: %s", path, nodePath); + + String memberId = StringUtils.substringAfterLast(nodePath, "/"); + Preconditions.checkArgument(nodeScheme.isMember(memberId), + "Not a group member: %s", memberId); + return memberId; + } + + /** + * Returns the current list of group member ids by querying ZooKeeper synchronously. + * + * @return the ids of all the present members of this group + * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper + * @throws KeeperException if there was a problem reading this group's member ids + * @throws InterruptedException if this thread is interrupted listing the group members + */ + public Iterable<String> getMemberIds() + throws ZooKeeperConnectionException, KeeperException, InterruptedException { + return Iterables.filter(zkClient.get().getChildren(path, false), nodeNameFilter); + } + + /** + * Gets the data for one of this groups members by querying ZooKeeper synchronously. + * + * @param memberId the id of the member whose data to retrieve + * @return the data associated with the {@code memberId} + * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper + * @throws KeeperException if there was a problem reading this member's data + * @throws InterruptedException if this thread is interrupted retrieving the member data + */ + public byte[] getMemberData(String memberId) + throws ZooKeeperConnectionException, KeeperException, InterruptedException { + return zkClient.get().getData(getMemberPath(memberId), false, null); + } + + /** + * Represents membership in a distributed group. + */ + public interface Membership { + + /** + * Returns the persistent ZooKeeper path that represents this group. + */ + String getGroupPath(); + + /** + * Returns the id (ZooKeeper node name) of this group member. May change over time if the + * ZooKeeper session expires. + */ + String getMemberId(); + + /** + * Returns the full ZooKeeper path to this group member. May change over time if the + * ZooKeeper session expires. + */ + String getMemberPath(); + + /** + * Updates the membership data synchronously using the {@code Supplier<byte[]>} passed to + * {@link Group#join()}. + * + * @return the new membership data + * @throws UpdateException if there was a problem updating the membership data + */ + byte[] updateMemberData() throws UpdateException; + + /** + * Cancels group membership by deleting the associated ZooKeeper member node. + * + * @throws JoinException if there is a problem deleting the node + */ + void cancel() throws JoinException; + } + + /** + * Indicates an error joining a group. + */ + public static class JoinException extends Exception { + public JoinException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Indicates an error updating a group member's data. + */ + public static class UpdateException extends Exception { + public UpdateException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Equivalent to calling {@code join(null, null)}. + */ + public final Membership join() throws JoinException, InterruptedException { + return join(NO_MEMBER_DATA, null); + } + + /** + * Equivalent to calling {@code join(memberData, null)}. + */ + public final Membership join(Supplier<byte[]> memberData) + throws JoinException, InterruptedException { + + return join(memberData, null); + } + + /** + * Equivalent to calling {@code join(null, onLoseMembership)}. + */ + public final Membership join(@Nullable final Command onLoseMembership) + throws JoinException, InterruptedException { + + return join(NO_MEMBER_DATA, onLoseMembership); + } + + /** + * Joins this group and returns the resulting Membership when successful. Membership will be + * automatically cancelled when the current jvm process dies; however the returned Membership + * object can be used to cancel membership earlier. Unless + * {@link Group.Membership#cancel()} is called the membership will + * be maintained by re-establishing it silently in the background. + * + * <p>Any {@code memberData} given is persisted in the member node in ZooKeeper. If an + * {@code onLoseMembership} callback is supplied, it will be notified each time this member loses + * membership in the group. + * + * @param memberData a supplier of the data to store in the member node + * @param onLoseMembership a callback to notify when membership is lost + * @return a Membership object with the member details + * @throws JoinException if there was a problem joining the group + * @throws InterruptedException if this thread is interrupted awaiting completion of the join + */ + public final Membership join(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) + throws JoinException, InterruptedException { + + Preconditions.checkNotNull(memberData); + ensurePersistentGroupPath(); + + final ActiveMembership groupJoiner = new ActiveMembership(memberData, onLoseMembership); + return backoffHelper.doUntilResult(() -> { + try { + return groupJoiner.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new JoinException("Interrupted trying to join group at path: " + path, e); + } catch (ZooKeeperConnectionException e) { + LOG.warn("Temporary error trying to join group at path: " + path, e); + return null; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.warn("Temporary error trying to join group at path: " + path, e); + return null; + } else { + throw new JoinException("Problem joining partition group at path: " + path, e); + } + } + }); + } + + private void ensurePersistentGroupPath() throws JoinException, InterruptedException { + backoffHelper.doUntilSuccess(() -> { + try { + ZooKeeperUtils.ensurePath(zkClient, acl, path); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new JoinException("Interrupted trying to ensure group at path: " + path, e); + } catch (ZooKeeperConnectionException e) { + LOG.warn("Problem connecting to ZooKeeper, retrying", e); + return false; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.warn("Temporary error ensuring path: " + path, e); + return false; + } else { + throw new JoinException("Problem ensuring group at path: " + path, e); + } + } + }); + } + + private class ActiveMembership implements Membership { + private final Supplier<byte[]> memberData; + private final Command onLoseMembership; + private String nodePath; + private String memberId; + private volatile boolean cancelled; + private byte[] membershipData; + + public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) { + this.memberData = memberData; + this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : onLoseMembership; + } + + @Override + public String getGroupPath() { + return path; + } + + @Override + public synchronized String getMemberId() { + return memberId; + } + + @Override + public synchronized String getMemberPath() { + return nodePath; + } + + @Override + public synchronized byte[] updateMemberData() throws UpdateException { + byte[] membershipData = memberData.get(); + if (!ArrayUtils.isEquals(this.membershipData, membershipData)) { + try { + zkClient.get().setData(nodePath, membershipData, ZooKeeperUtils.ANY_VERSION); + this.membershipData = membershipData; + } catch (KeeperException e) { + throw new UpdateException("Problem updating membership data.", e); + } catch (InterruptedException e) { + throw new UpdateException("Interrupted attempting to update membership data.", e); + } catch (ZooKeeperConnectionException e) { + throw new UpdateException( + "Could not connect to the ZooKeeper cluster to update membership data.", e); + } + } + return membershipData; + } + + @Override + public synchronized void cancel() throws JoinException { + if (!cancelled) { + try { + backoffHelper.doUntilSuccess(() -> { + try { + zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new JoinException("Interrupted trying to cancel membership: " + nodePath, e); + } catch (ZooKeeperConnectionException e) { + LOG.warn("Problem connecting to ZooKeeper, retrying", e); + return false; + } catch (NoNodeException e) { + LOG.info("Membership already cancelled, node at path: " + nodePath + + " has been deleted"); + return true; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.warn("Temporary error cancelling membership: " + nodePath, e); + return false; + } else { + throw new JoinException("Problem cancelling membership: " + nodePath, e); + } + } + }); + cancelled = true; // Prevent auto-re-join logic from undoing this cancel. + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new JoinException("Problem cancelling membership: " + nodePath, e); + } + } + } + + private class CancelledException extends IllegalStateException { /* marker */ } + + synchronized Membership join() + throws ZooKeeperConnectionException, InterruptedException, KeeperException { + + if (cancelled) { + throw new CancelledException(); + } + + if (nodePath == null) { + // Re-join if our ephemeral node goes away due to session expiry - only needs to be + // registered once. + zkClient.registerExpirationHandler(this::tryJoin); + } + + byte[] membershipData = memberData.get(); + String nodeName = nodeScheme.createName(membershipData); + CreateMode createMode = nodeScheme.isSequential() + ? CreateMode.EPHEMERAL_SEQUENTIAL + : CreateMode.EPHEMERAL; + nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, acl, createMode); + memberId = Group.this.getMemberId(nodePath); + LOG.info("Set group member ID to " + memberId); + this.membershipData = membershipData; + + // Re-join if our ephemeral node goes away due to maliciousness. + zkClient.get().exists(nodePath, event -> { + if (event.getType() == EventType.NodeDeleted) { + tryJoin(); + } + }); + + return this; + } + + private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin = + () -> { + try { + join(); + return true; + } catch (CancelledException e) { + // Lost a cancel race - that's ok. + return true; + } catch (ZooKeeperConnectionException e) { + LOG.warn("Problem connecting to ZooKeeper, retrying", e); + return false; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.warn("Temporary error re-joining group: " + path, e); + return false; + } else { + throw new IllegalStateException("Permanent problem re-joining group: " + path, e); + } + } + }; + + private synchronized void tryJoin() { + onLoseMembership.execute(); + try { + backoffHelper.doUntilSuccess(tryJoin); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + String.format("Interrupted while trying to re-join group: %s, giving up", path), e); + } + } + } + + /** + * An interface to an object that listens for changes to a group's membership. + */ + public interface GroupChangeListener { + + /** + * Called whenever group membership changes with the new list of member ids. + * + * @param memberIds the current member ids + */ + void onGroupChange(Iterable<String> memberIds); + } + + /** + * An interface that dictates the scheme to use for storing and filtering nodes that represent + * members of a distributed group. + */ + public interface NodeScheme { + /** + * Determines if a child node is a member of a group by examining the node's name. + * + * @param nodeName the name of a child node found in a group + * @return {@code true} if {@code nodeName} identifies a group member in this scheme + */ + boolean isMember(String nodeName); + + /** + * Generates a node name for the node representing this process in the distributed group. + * + * @param membershipData the data that will be stored in this node + * @return the name for the node that will represent this process in the group + */ + String createName(byte[] membershipData); + + /** + * Indicates whether this scheme needs ephemeral sequential nodes or just ephemeral nodes. + * + * @return {@code true} if this scheme requires sequential node names; {@code false} otherwise + */ + boolean isSequential(); + } + + /** + * Indicates an error watching a group. + */ + public static class WatchException extends Exception { + public WatchException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Watches this group for the lifetime of this jvm process. This method will block until the + * current group members are available, notify the {@code groupChangeListener} and then return. + * All further changes to the group membership will cause notifications on a background thread. + * + * @param groupChangeListener the listener to notify of group membership change events + * @return A command which, when executed, will stop watching the group. + * @throws WatchException if there is a problem generating the 1st group membership list + * @throws InterruptedException if interrupted waiting to gather the 1st group membership list + */ + public final Command watch(final GroupChangeListener groupChangeListener) + throws WatchException, InterruptedException { + Preconditions.checkNotNull(groupChangeListener); + + try { + ensurePersistentGroupPath(); + } catch (JoinException e) { + throw new WatchException("Failed to create group path: " + path, e); + } + + final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener); + backoffHelper.doUntilSuccess(() -> { + try { + groupMonitor.watchGroup(); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new WatchException("Interrupted trying to watch group at path: " + path, e); + } catch (ZooKeeperConnectionException e) { + LOG.warn("Temporary error trying to watch group at path: " + path, e); + return null; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.warn("Temporary error trying to watch group at path: " + path, e); + return null; + } else { + throw new WatchException("Problem trying to watch group at path: " + path, e); + } + } + }); + return groupMonitor::stopWatching; + } + + /** + * Helps continuously monitor a group for membership changes. + */ + private class GroupMonitor { + private final GroupChangeListener groupChangeListener; + private volatile boolean stopped = false; + private Set<String> members; + + GroupMonitor(GroupChangeListener groupChangeListener) { + this.groupChangeListener = groupChangeListener; + } + + private final Watcher groupWatcher = event -> { + if (event.getType() == EventType.NodeChildrenChanged) { + tryWatchGroup(); + } + }; + + private final ExceptionalSupplier<Boolean, InterruptedException> tryWatchGroup = + () -> { + try { + watchGroup(); + return true; + } catch (ZooKeeperConnectionException e) { + LOG.warn("Problem connecting to ZooKeeper, retrying", e); + return false; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.warn("Temporary error re-watching group: " + path, e); + return false; + } else { + throw new IllegalStateException("Permanent problem re-watching group: " + path, e); + } + } + }; + + private void tryWatchGroup() { + if (stopped) { + return; + } + + try { + backoffHelper.doUntilSuccess(tryWatchGroup); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + String.format("Interrupted while trying to re-watch group: %s, giving up", path), e); + } + } + + private void watchGroup() + throws ZooKeeperConnectionException, InterruptedException, KeeperException { + + if (stopped) { + return; + } + + List<String> children = zkClient.get().getChildren(path, groupWatcher); + setMembers(Iterables.filter(children, nodeNameFilter)); + } + + private void stopWatching() { + // TODO(William Farner): Cancel the watch when + // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved. + LOG.info("Stopping watch on " + this); + stopped = true; + } + + synchronized void setMembers(Iterable<String> members) { + if (stopped) { + LOG.info("Suppressing membership update, no longer watching " + this); + return; + } + + if (this.members == null) { + // Reset our watch on the group if session expires - only needs to be registered once. + zkClient.registerExpirationHandler(this::tryWatchGroup); + } + + Set<String> membership = ImmutableSet.copyOf(members); + if (!membership.equals(this.members)) { + groupChangeListener.onGroupChange(members); + this.members = membership; + } + } + } + + /** + * Default naming scheme implementation. Stores nodes at [given path] + "/" + [given prefix] + + * ZooKeeper-generated member ID. For example, if the path is "/discovery/servicename", and the + * prefix is "member_", the node's full path will look something like + * {@code /discovery/servicename/member_0000000007}. + */ + public static class DefaultScheme implements NodeScheme { + private final String namePrefix; + private final Pattern namePattern; + + /** + * Creates a sequential node scheme based on the given node name prefix. + * + * @param namePrefix the prefix for the names of the member nodes + */ + public DefaultScheme(String namePrefix) { + this.namePrefix = MorePreconditions.checkNotBlank(namePrefix); + namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + "-?[0-9]+$"); + } + + @Override + public boolean isMember(String nodeName) { + return namePattern.matcher(nodeName).matches(); + } + + @Override + public String createName(byte[] membershipData) { + return namePrefix; + } + + @Override + public boolean isSequential() { + return true; + } + } + + @Override + public String toString() { + return "Group " + path; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java new file mode 100644 index 0000000..9d31608 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java @@ -0,0 +1,139 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.Charset; +import java.util.Map; + +import javax.annotation.Nullable; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import com.google.gson.JsonIOException; +import com.google.gson.JsonParseException; + +import org.apache.aurora.common.io.Codec; +import org.apache.aurora.common.thrift.Endpoint; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; + +import static java.util.Objects.requireNonNull; + +class JsonCodec implements Codec<ServiceInstance> { + + private static void assertRequiredField(String fieldName, Object fieldValue) { + if (fieldValue == null) { + throw new JsonParseException(String.format("Field %s is required", fieldName)); + } + } + + private static class EndpointSchema { + private final String host; + private final Integer port; + + EndpointSchema(Endpoint endpoint) { + host = endpoint.getHost(); + port = endpoint.getPort(); + } + + Endpoint asEndpoint() { + assertRequiredField("host", host); + assertRequiredField("port", port); + + return new Endpoint(host, port); + } + } + + private static class ServiceInstanceSchema { + private final EndpointSchema serviceEndpoint; + private final Map<String, EndpointSchema> additionalEndpoints; + private final Status status; + private final @Nullable Integer shard; + + ServiceInstanceSchema(ServiceInstance instance) { + serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint()); + if (instance.isSetAdditionalEndpoints()) { + additionalEndpoints = + Maps.transformValues(instance.getAdditionalEndpoints(), EndpointSchema::new); + } else { + additionalEndpoints = ImmutableMap.of(); + } + status = instance.getStatus(); + shard = instance.isSetShard() ? instance.getShard() : null; + } + + ServiceInstance asServiceInstance() { + assertRequiredField("serviceEndpoint", serviceEndpoint); + assertRequiredField("status", status); + + Map<String, EndpointSchema> extraEndpoints = + additionalEndpoints == null ? ImmutableMap.of() : additionalEndpoints; + + ServiceInstance instance = + new ServiceInstance( + serviceEndpoint.asEndpoint(), + Maps.transformValues(extraEndpoints, EndpointSchema::asEndpoint), + status); + if (shard != null) { + instance.setShard(shard); + } + return instance; + } + } + + private static final Charset ENCODING = Charsets.UTF_8; + + private final Gson gson; + + JsonCodec() { + this(new Gson()); + } + + JsonCodec(Gson gson) { + this.gson = requireNonNull(gson); + } + + @Override + public void serialize(ServiceInstance instance, OutputStream sink) throws IOException { + Writer writer = new OutputStreamWriter(sink, ENCODING); + try { + gson.toJson(new ServiceInstanceSchema(instance), writer); + } catch (JsonIOException e) { + throw new IOException(String.format("Problem serializing %s to JSON", instance), e); + } + writer.flush(); + } + + @Override + public ServiceInstance deserialize(InputStream source) throws IOException { + InputStreamReader reader = new InputStreamReader(source, ENCODING); + try { + @Nullable ServiceInstanceSchema schema = gson.fromJson(reader, ServiceInstanceSchema.class); + if (schema == null) { + throw new IOException("JSON did not include a ServiceInstance object"); + } + return schema.asServiceInstance(); + } catch (JsonParseException e) { + throw new IOException("Problem parsing JSON ServiceInstance.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java new file mode 100644 index 0000000..aeea02d --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java @@ -0,0 +1,74 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.net.InetSocketAddress; +import java.util.Map; + +import org.apache.aurora.common.io.Codec; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.zookeeper.Group.JoinException; + +/** + * A logical set of servers registered in ZooKeeper. Intended to be used by servers in a + * common service to advertise their presence to server-set protocol-aware clients. + * + * Standard implementations should use the {@link #JSON_CODEC} to serialize the service instance + * rendezvous data to zookeeper so that standard clients can interoperate. + */ +public interface ServerSet { + + /** + * Encodes a {@link ServiceInstance} as a JSON object. + * + * This is the default encoding for service instance data in ZooKeeper. + */ + Codec<ServiceInstance> JSON_CODEC = new JsonCodec(); + + /** + * Attempts to join a server set for this logical service group. + * + * @param endpoint the primary service endpoint + * @param additionalEndpoints and additional endpoints keyed by their logical name + * @return an EndpointStatus object that allows the endpoint to adjust its status + * @throws JoinException if there was a problem joining the server set + * @throws InterruptedException if interrupted while waiting to join the server set + */ + EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints) + throws JoinException, InterruptedException; + + /** + * A handle to a service endpoint's status data that allows updating it to track current events. + */ + interface EndpointStatus { + + /** + * Removes the endpoint from the server set. + * + * @throws UpdateException if there was a problem leaving the ServerSet. + */ + void leave() throws UpdateException; + } + + /** + * Indicates an error updating a service's status information. + */ + class UpdateException extends Exception { + public UpdateException(String message, Throwable cause) { + super(message, cause); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java new file mode 100644 index 0000000..ace4980 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java @@ -0,0 +1,349 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Predicates; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.collect.Sets.SetView; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.io.Codec; +import org.apache.aurora.common.net.pool.DynamicHostSet; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; +import org.apache.aurora.common.util.BackoffHelper; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * ZooKeeper-backed implementation of {@link ServerSet} and {@link DynamicHostSet}. + */ +public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance> { + private static final Logger LOG = LoggerFactory.getLogger(ServerSetImpl.class); + + private final ZooKeeperClient zkClient; + private final Group group; + private final Codec<ServiceInstance> codec; + private final BackoffHelper backoffHelper; + + /** + * Creates a new ServerSet using open ZooKeeper node ACLs. + * + * @param zkClient the client to use for interactions with ZooKeeper + * @param path the name-service path of the service to connect to + */ + public ServerSetImpl(ZooKeeperClient zkClient, String path) { + this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path); + } + + /** + * Creates a new ServerSet for the given service {@code path}. + * + * @param zkClient the client to use for interactions with ZooKeeper + * @param acl the ACL to use for creating the persistent group path if it does not already exist + * @param path the name-service path of the service to connect to + */ + public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) { + this(zkClient, new Group(zkClient, acl, path), JSON_CODEC); + } + + /** + * Creates a new ServerSet using the given service {@code group}. + * + * @param zkClient the client to use for interactions with ZooKeeper + * @param group the server group + */ + public ServerSetImpl(ZooKeeperClient zkClient, Group group) { + this(zkClient, group, JSON_CODEC); + } + + /** + * Creates a new ServerSet using the given service {@code group} and a custom {@code codec}. + * + * @param zkClient the client to use for interactions with ZooKeeper + * @param group the server group + * @param codec a codec to use for serializing and de-serializing the ServiceInstance data to and + * from a byte array + */ + public ServerSetImpl(ZooKeeperClient zkClient, Group group, Codec<ServiceInstance> codec) { + this.zkClient = checkNotNull(zkClient); + this.group = checkNotNull(group); + this.codec = checkNotNull(codec); + + // TODO(John Sirois): Inject the helper so that backoff strategy can be configurable. + backoffHelper = new BackoffHelper(); + } + + @VisibleForTesting + ZooKeeperClient getZkClient() { + return zkClient; + } + + @Override + public EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints) + throws Group.JoinException, InterruptedException { + + checkNotNull(endpoint); + checkNotNull(additionalEndpoints); + + MemberStatus memberStatus = new MemberStatus(endpoint, additionalEndpoints); + Supplier<byte[]> serviceInstanceSupplier = memberStatus::serializeServiceInstance; + Group.Membership membership = group.join(serviceInstanceSupplier); + + return () -> memberStatus.leave(membership); + } + + @Override + public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { + ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, monitor); + try { + return serverSetWatcher.watch(); + } catch (Group.WatchException e) { + throw new MonitorException("ZooKeeper watch failed.", e); + } catch (InterruptedException e) { + throw new MonitorException("Interrupted while watching ZooKeeper.", e); + } + } + + private class MemberStatus { + private final InetSocketAddress endpoint; + private final Map<String, InetSocketAddress> additionalEndpoints; + + private MemberStatus( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints) { + + this.endpoint = endpoint; + this.additionalEndpoints = additionalEndpoints; + } + + synchronized void leave(Group.Membership membership) throws UpdateException { + try { + membership.cancel(); + } catch (Group.JoinException e) { + throw new UpdateException( + "Failed to auto-cancel group membership on transition to DEAD status", e); + } + } + + byte[] serializeServiceInstance() { + ServiceInstance serviceInstance = new ServiceInstance( + ServerSets.toEndpoint(endpoint), + Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT), + Status.ALIVE); + + LOG.debug("updating endpoint data to:\n\t" + serviceInstance); + try { + return ServerSets.serializeServiceInstance(serviceInstance, codec); + } catch (IOException e) { + throw new IllegalStateException("Unexpected problem serializing thrift struct " + + serviceInstance + "to a byte[]", e); + } + } + } + + private static class ServiceInstanceFetchException extends RuntimeException { + ServiceInstanceFetchException(String message, Throwable cause) { + super(message, cause); + } + } + + private static class ServiceInstanceDeletedException extends RuntimeException { + ServiceInstanceDeletedException(String path) { + super(path); + } + } + + private class ServerSetWatcher { + private final ZooKeeperClient zkClient; + private final HostChangeMonitor<ServiceInstance> monitor; + @Nullable private ImmutableSet<ServiceInstance> serverSet; + + ServerSetWatcher(ZooKeeperClient zkClient, HostChangeMonitor<ServiceInstance> monitor) { + this.zkClient = zkClient; + this.monitor = monitor; + } + + public Command watch() throws Group.WatchException, InterruptedException { + Watcher onExpirationWatcher = zkClient.registerExpirationHandler(this::rebuildServerSet); + + try { + return group.watch(this::notifyGroupChange); + } catch (Group.WatchException e) { + zkClient.unregister(onExpirationWatcher); + throw e; + } catch (InterruptedException e) { + zkClient.unregister(onExpirationWatcher); + throw e; + } + } + + private ServiceInstance getServiceInstance(final String nodePath) { + try { + return backoffHelper.doUntilResult(() -> { + try { + byte[] data = zkClient.get().getData(nodePath, false, null); + return ServerSets.deserializeServiceInstance(data, codec); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ServiceInstanceFetchException( + "Interrupted updating service data for: " + nodePath, e); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + LOG.warn("Temporary error trying to updating service data for: " + nodePath, e); + return null; + } catch (NoNodeException e) { + invalidateNodePath(nodePath); + throw new ServiceInstanceDeletedException(nodePath); + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.warn("Temporary error trying to update service data for: " + nodePath, e); + return null; + } else { + throw new ServiceInstanceFetchException( + "Failed to update service data for: " + nodePath, e); + } + } catch (IOException e) { + throw new ServiceInstanceFetchException( + "Failed to deserialize the ServiceInstance data for: " + nodePath, e); + } + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ServiceInstanceFetchException( + "Interrupted trying to update service data for: " + nodePath, e); + } + } + + private final LoadingCache<String, ServiceInstance> servicesByMemberId = + CacheBuilder.newBuilder().build(new CacheLoader<String, ServiceInstance>() { + @Override public ServiceInstance load(String memberId) { + return getServiceInstance(group.getMemberPath(memberId)); + } + }); + + private void rebuildServerSet() { + Set<String> memberIds = ImmutableSet.copyOf(servicesByMemberId.asMap().keySet()); + servicesByMemberId.invalidateAll(); + notifyGroupChange(memberIds); + } + + private String invalidateNodePath(String deletedPath) { + String memberId = group.getMemberId(deletedPath); + servicesByMemberId.invalidate(memberId); + return memberId; + } + + private final Function<String, ServiceInstance> MAYBE_FETCH_NODE = + memberId -> { + // This get will trigger a fetch + try { + return servicesByMemberId.getUnchecked(memberId); + } catch (UncheckedExecutionException e) { + Throwable cause = e.getCause(); + if (!(cause instanceof ServiceInstanceDeletedException)) { + Throwables.propagateIfInstanceOf(cause, ServiceInstanceFetchException.class); + throw new IllegalStateException( + "Unexpected error fetching member data for: " + memberId, e); + } + return null; + } + }; + + private synchronized void notifyGroupChange(Iterable<String> memberIds) { + ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds); + Set<String> existingMemberIds = servicesByMemberId.asMap().keySet(); + + // Ignore no-op state changes except for the 1st when we've seen no group yet. + if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) { + SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, newMemberIds); + // Implicit removal from servicesByMemberId. + existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds)); + + Iterable<ServiceInstance> serviceInstances = Iterables.filter( + Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), Predicates.notNull()); + + notifyServerSetChange(ImmutableSet.copyOf(serviceInstances)); + } + } + + private void notifyServerSetChange(ImmutableSet<ServiceInstance> currentServerSet) { + // ZK nodes may have changed if there was a session expiry for a server in the server set, but + // if the server's status has not changed, we can skip any onChange updates. + if (!currentServerSet.equals(serverSet)) { + if (currentServerSet.isEmpty()) { + LOG.warn("server set empty for path " + group.getPath()); + } else { + if (serverSet == null) { + LOG.info("received initial membership {}", currentServerSet); + } else { + logChange(currentServerSet); + } + } + serverSet = currentServerSet; + monitor.onChange(serverSet); + } + } + + private void logChange(ImmutableSet<ServiceInstance> newServerSet) { + StringBuilder message = new StringBuilder("server set " + group.getPath() + " change: "); + if (serverSet.size() != newServerSet.size()) { + message.append("from ").append(serverSet.size()) + .append(" members to ").append(newServerSet.size()); + } + + Joiner joiner = Joiner.on("\n\t\t"); + + SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet); + if (!left.isEmpty()) { + message.append("\n\tleft:\n\t\t").append(joiner.join(left)); + } + + SetView<ServiceInstance> joined = Sets.difference(newServerSet, serverSet); + if (!joined.isEmpty()) { + message.append("\n\tjoined:\n\t\t").append(joiner.join(joined)); + } + + LOG.info(message.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java new file mode 100644 index 0000000..01a54a5 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java @@ -0,0 +1,118 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; + +import org.apache.aurora.common.base.MorePreconditions; +import org.apache.aurora.common.io.Codec; +import org.apache.aurora.common.thrift.Endpoint; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; +import org.apache.zookeeper.data.ACL; + +/** + * Common ServerSet related functions + */ +public class ServerSets { + + private ServerSets() { + // Utility class. + } + + /** + * A function that invokes {@link #toEndpoint(InetSocketAddress)}. + */ + public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT = + ServerSets::toEndpoint; + + /** + * Creates a server set that registers at a single path applying the given ACL to all nodes + * created in the path. + * + * @param zkClient ZooKeeper client to register with. + * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates. + * @param zkPath Path to register at. @see #create(ZooKeeperClient, java.util.Set) + * @return A server set that registers at {@code zkPath}. + */ + public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, String zkPath) { + Preconditions.checkNotNull(zkClient); + MorePreconditions.checkNotBlank(acl); + MorePreconditions.checkNotBlank(zkPath); + + return new ServerSetImpl(zkClient, acl, zkPath); + } + + /** + * Returns a serialized Thrift service instance object, with given endpoints and codec. + * + * @param serviceInstance the Thrift service instance object to be serialized + * @param codec the codec to use to serialize a Thrift service instance object + * @return byte array that contains a serialized Thrift service instance + */ + public static byte[] serializeServiceInstance( + ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws IOException { + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + codec.serialize(serviceInstance, output); + return output.toByteArray(); + } + + /** + * Serializes a service instance based on endpoints. + * @see #serializeServiceInstance(ServiceInstance, Codec) + * + * @param address the target address of the service instance + * @param additionalEndpoints additional endpoints of the service instance + * @param status service status + */ + public static byte[] serializeServiceInstance( + InetSocketAddress address, + Map<String, Endpoint> additionalEndpoints, + Status status, + Codec<ServiceInstance> codec) throws IOException { + + ServiceInstance serviceInstance = + new ServiceInstance(toEndpoint(address), additionalEndpoints, status); + return serializeServiceInstance(serviceInstance, codec); + } + + /** + * Creates a service instance object deserialized from byte array. + * + * @param data the byte array contains a serialized Thrift service instance + * @param codec the codec to use to deserialize the byte array + */ + public static ServiceInstance deserializeServiceInstance( + byte[] data, Codec<ServiceInstance> codec) throws IOException { + + return codec.deserialize(new ByteArrayInputStream(data)); + } + + /** + * Creates an endpoint for the given InetSocketAddress. + * + * @param address the target address to create the endpoint for + */ + public static Endpoint toEndpoint(InetSocketAddress address) { + return new Endpoint(address.getHostName(), address.getPort()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java new file mode 100644 index 0000000..7f962eb --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java @@ -0,0 +1,114 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.net.InetSocketAddress; +import java.util.Map; + +/** + * A service that uses master election to only allow a single service instance to be active amongst + * a set of potential servers at a time. + */ +public interface SingletonService { + + /** + * Indicates an error attempting to lead a group of servers. + */ + class LeadException extends Exception { + public LeadException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Indicates an error attempting to advertise leadership of a group of servers. + */ + class AdvertiseException extends Exception { + public AdvertiseException(String message) { + super(message); + } + + public AdvertiseException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Indicates an error attempting to leave a group of servers, abdicating leadership of the group. + */ + class LeaveException extends Exception { + public LeaveException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Attempts to lead the singleton service. + * + * @param endpoint The primary endpoint to register as a leader candidate in the service. + * @param additionalEndpoints Additional endpoints that are available on the host. + * @param listener Handler to call when the candidate is elected or defeated. + * @throws LeadException If there was a problem joining or watching the ZooKeeper group. + * @throws InterruptedException If the thread watching/joining the group was interrupted. + */ + void lead( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + LeadershipListener listener) + throws LeadException, InterruptedException; + + /** + * A listener to be notified of changes in the leadership status. + * Implementers should be careful to avoid blocking operations in these callbacks. + */ + interface LeadershipListener { + + /** + * Notifies the listener that is is current leader. + * + * @param control A controller handle to advertise and/or leave advertised presence. + */ + void onLeading(LeaderControl control); + + /** + * Notifies the listener that it is no longer leader. + */ + void onDefeated(); + } + + /** + * A controller for the state of the leader. This will be provided to the leader upon election, + * which allows the leader to decide when to advertise as leader of the server set and terminate + * leadership at will. + */ + interface LeaderControl { + + /** + * Advertises the leader's server presence to clients. + * + * @throws AdvertiseException If there was an error advertising the singleton leader to clients + * of the server set. + * @throws InterruptedException If interrupted while advertising. + */ + void advertise() throws AdvertiseException, InterruptedException; + + /** + * Leaves candidacy for leadership, removing advertised server presence if applicable. + * + * @throws LeaveException If the leader's status could not be updated or there was an error + * abdicating server set leadership. + */ + void leave() throws LeaveException; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java new file mode 100644 index 0000000..d9978a9 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java @@ -0,0 +1,122 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import org.apache.aurora.common.base.ExceptionalCommand; +import org.apache.aurora.common.zookeeper.Candidate.Leader; +import org.apache.aurora.common.zookeeper.Group.JoinException; +import org.apache.zookeeper.data.ACL; + +public class SingletonServiceImpl implements SingletonService { + @VisibleForTesting + static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_"; + + /** + * Creates a candidate that can be combined with an existing server set to form a singleton + * service using {@link #SingletonServiceImpl(ServerSet, Candidate)}. + * + * @param zkClient The ZooKeeper client to use. + * @param servicePath The path where service nodes live. + * @param acl The acl to apply to newly created candidate nodes and serverset nodes. + * @return A candidate that can be housed with a standard server set under a single zk path. + */ + public static Candidate createSingletonCandidate( + ZooKeeperClient zkClient, + String servicePath, + Iterable<ACL> acl) { + + return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX)); + } + + private final ServerSet serverSet; + private final Candidate candidate; + + /** + * Creates a new singleton service that uses the supplied candidate to vie for leadership and then + * advertises itself in the given server set once elected. + * + * @param serverSet The server set to advertise in on election. + * @param candidate The candidacy to use to vie for election. + */ + public SingletonServiceImpl(ServerSet serverSet, Candidate candidate) { + this.serverSet = Preconditions.checkNotNull(serverSet); + this.candidate = Preconditions.checkNotNull(candidate); + } + + @Override + public void lead(final InetSocketAddress endpoint, + final Map<String, InetSocketAddress> additionalEndpoints, + final LeadershipListener listener) + throws LeadException, InterruptedException { + + Preconditions.checkNotNull(listener); + + try { + candidate.offerLeadership(new Leader() { + @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) { + listener.onLeading(new LeaderControl() { + ServerSet.EndpointStatus endpointStatus = null; + final AtomicBoolean left = new AtomicBoolean(false); + + // Methods are synchronized to prevent simultaneous invocations. + @Override public synchronized void advertise() + throws AdvertiseException, InterruptedException { + + Preconditions.checkState(!left.get(), "Cannot advertise after leaving."); + Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once."); + try { + endpointStatus = serverSet.join(endpoint, additionalEndpoints); + } catch (JoinException e) { + throw new AdvertiseException("Problem advertising endpoint " + endpoint, e); + } + } + + @Override public synchronized void leave() throws LeaveException { + Preconditions.checkState(left.compareAndSet(false, true), + "Cannot leave more than once."); + if (endpointStatus != null) { + try { + endpointStatus.leave(); + } catch (ServerSet.UpdateException e) { + throw new LeaveException("Problem updating endpoint status for abdicating leader " + + "at endpoint " + endpoint, e); + } + } + try { + abdicate.execute(); + } catch (JoinException e) { + throw new LeaveException("Problem abdicating leadership for endpoint " + endpoint, e); + } + } + }); + } + + @Override public void onDefeated() { + listener.onDefeated(); + } + }); + } catch (JoinException e) { + throw new LeadException("Problem joining leadership group for endpoint " + endpoint, e); + } catch (Group.WatchException e) { + throw new LeadException("Problem getting initial membership list for leadership group.", e); + } + } +}