Move common/zookeeper to the main aurora project. Remove unused code and restrict visibility where possible. Also fix up various warnings.
Bugs closed: AURORA-1669 Reviewed at https://reviews.apache.org/r/52594/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/b417be38 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/b417be38 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/b417be38 Branch: refs/heads/master Commit: b417be38fe1fcae6b85f7e91cea961ab272adf3f Parents: fdb536a Author: John Sirois <[email protected]> Authored: Thu Oct 6 11:44:07 2016 -0600 Committer: John Sirois <[email protected]> Committed: Thu Oct 6 11:44:07 2016 -0600 ---------------------------------------------------------------------- build.gradle | 17 +- .../aurora/common/zookeeper/Credentials.java | 90 ----- .../aurora/common/zookeeper/JsonCodec.java | 147 -------- .../common/zookeeper/SingletonService.java | 114 ------ .../common/zookeeper/ZooKeeperClient.java | 372 ------------------- .../aurora/common/zookeeper/ZooKeeperUtils.java | 167 --------- .../testing/BaseZooKeeperClientTest.java | 140 ------- .../zookeeper/testing/BaseZooKeeperTest.java | 46 --- .../zookeeper/testing/ZooKeeperTestServer.java | 121 ------ .../aurora/common/zookeeper/JsonCodecTest.java | 159 -------- .../common/zookeeper/ZooKeeperClientTest.java | 210 ----------- .../common/zookeeper/ZooKeeperUtilsTest.java | 139 ------- config/findbugs/excludeFilter.xml | 8 + docs/features/service-discovery.md | 2 +- .../aurora/scheduler/SchedulerLifecycle.java | 6 +- .../aurora/scheduler/app/SchedulerMain.java | 4 +- .../scheduler/app/ServiceGroupMonitor.java | 46 --- .../aurora/scheduler/discovery/Credentials.java | 98 +++++ .../CuratorServiceDiscoveryModule.java | 6 +- .../discovery/CuratorServiceGroupMonitor.java | 1 - .../discovery/CuratorSingletonService.java | 1 - .../discovery/FlaggedZooKeeperConfig.java | 8 +- .../aurora/scheduler/discovery/JsonCodec.java | 147 ++++++++ .../discovery/ServiceDiscoveryModule.java | 7 +- .../discovery/ServiceGroupMonitor.java | 46 +++ .../scheduler/discovery/SingletonService.java | 114 ++++++ .../scheduler/discovery/ZooKeeperConfig.java | 9 +- .../scheduler/discovery/ZooKeeperUtils.java | 51 +++ .../discovery/testing/BaseZooKeeperTest.java | 53 +++ .../discovery/testing/ZooKeeperTestServer.java | 101 +++++ .../scheduler/http/JettyServerModule.java | 2 +- .../aurora/scheduler/http/LeaderRedirect.java | 4 +- .../log/mesos/MesosLogStreamModule.java | 4 +- .../scheduler/SchedulerLifecycleTest.java | 4 +- .../aurora/scheduler/app/SchedulerIT.java | 7 +- .../discovery/BaseCuratorDiscoveryTest.java | 4 +- .../discovery/CuratorDiscoveryModuleTest.java | 6 +- .../discovery/CuratorSingletonServiceTest.java | 2 - .../scheduler/discovery/JsonCodecTest.java | 159 ++++++++ .../discovery/ZooKeeperConfigTest.java | 12 +- .../scheduler/http/AbstractJettyTest.java | 15 +- .../scheduler/http/LeaderRedirectTest.java | 4 +- .../aurora/scheduler/thrift/ThriftIT.java | 2 +- 43 files changed, 832 insertions(+), 1823 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 07689f9..3cd083c 100644 --- a/build.gradle +++ b/build.gradle @@ -90,6 +90,7 @@ For more details, please see https://issues.apache.org/jira/browse/AURORA-1169 ext.jerseyRev = '1.19' ext.jsrRev = '3.0.1' ext.junitRev = '4.12' + ext.logbackRev = '1.1.3' ext.mybatisRev = '3.3.1' ext.protobufRev = '2.6.1' ext.servletRev = '3.1.0' @@ -163,6 +164,7 @@ project(':commons') { dependencies { compile project(':commons-args') + compile "ch.qos.logback:logback-classic:${logbackRev}" compile "com.google.code.findbugs:jsr305:${jsrRev}" compile "com.google.code.gson:gson:${gsonRev}" compile "com.google.guava:guava:${guavaRev}" @@ -173,17 +175,13 @@ project(':commons') { compile "javax.servlet:javax.servlet-api:${servletRev}" compile "joda-time:joda-time:2.9.1" compile "org.antlr:stringtemplate:${stringTemplateRev}" - compile "org.apache.zookeeper:zookeeper:${zookeeperRev}" compile "org.easymock:easymock:3.4" // There are a few testing support libs in the src/main/java trees that use junit - currently: - // src/main/java/org/apache/aurora/common/zookeeper/testing // src/main/java/org/apache/aurora/common/testing compile "junit:junit:${junitRev}" testCompile "junit:junit:${junitRev}" - testCompile "org.powermock:powermock-module-junit4:1.6.4" - testCompile "org.powermock:powermock-api-easymock:1.6.4" } } @@ -349,9 +347,11 @@ dependencies { compile project(':commons') compile project(':commons-args') + compile 'aopalliance:aopalliance:1.0' - compile 'ch.qos.logback:logback-classic:1.1.3' + compile "ch.qos.logback:logback-classic:${logbackRev}" compile "com.google.code.findbugs:jsr305:${jsrRev}" + compile "com.google.code.gson:gson:${gsonRev}" compile "com.google.inject:guice:${guiceRev}" compile "com.google.inject.extensions:guice-assistedinject:${guiceRev}" compile "com.google.protobuf:protobuf-java:${protobufRev}" @@ -385,8 +385,15 @@ dependencies { compile 'org.quartz-scheduler:quartz:2.2.2' compile "uno.perk:forward:1.0.0" + // There are a few testing support libs in the src/main/java trees that use junit - currently: + // src/main/java/org/apache/aurora/common/zookeeper/testing + compile "junit:junit:${junitRev}" + testCompile "com.sun.jersey:jersey-client:${jerseyRev}" testCompile "junit:junit:${junitRev}" + testCompile "org.powermock:powermock-module-junit4:1.6.4" + testCompile "org.powermock:powermock-api-easymock:1.6.4" + } // For normal developer builds, avoid running the often-time-consuming code quality checks. http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java deleted file mode 100644 index 18319a3..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/Credentials.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; - -import org.apache.aurora.common.base.MorePreconditions; -import org.apache.commons.lang.builder.EqualsBuilder; - -import static java.util.Objects.requireNonNull; - -/** - * Encapsulates a user's ZooKeeper credentials. - */ -public final class Credentials { - - /** - * Creates a set of credentials for the ZooKeeper digest authentication mechanism. - * - * @param username the username to authenticate with - * @param password the password to authenticate with - * @return a set of credentials that can be used to authenticate the zoo keeper client - */ - public static Credentials digestCredentials(String username, String password) { - MorePreconditions.checkNotBlank(username); - Preconditions.checkNotNull(password); - - // TODO(John Sirois): DigestAuthenticationProvider is broken - uses platform default charset - // (on server) and so we just have to hope here that clients are deployed in compatible jvms. - // Consider writing and installing a version of DigestAuthenticationProvider that controls its - // Charset explicitly. - return new Credentials("digest", (username + ":" + password).getBytes()); - } - - private final String scheme; - private final byte[] authToken; - - public Credentials(String scheme, byte[] authToken) { - this.scheme = MorePreconditions.checkNotBlank(scheme); - this.authToken = requireNonNull(authToken); - } - - /** - * Returns the authentication scheme these credentials are for. - * - * @return the scheme these credentials are for. - */ - public String scheme() { - return scheme; - } - - /** - * Returns the authentication token. - * - * @return the authentication token. - */ - public byte[] authToken() { - return authToken; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof Credentials)) { - return false; - } - - Credentials other = (Credentials) o; - return new EqualsBuilder() - .append(scheme, other.scheme()) - .append(authToken, other.authToken()) - .isEquals(); - } - - @Override - public int hashCode() { - return Objects.hashCode(scheme, authToken); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java deleted file mode 100644 index 45e789b..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.nio.charset.Charset; -import java.util.Map; - -import javax.annotation.Nullable; - -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import com.google.gson.Gson; -import com.google.gson.JsonIOException; -import com.google.gson.JsonParseException; - -import org.apache.aurora.common.io.Codec; -import org.apache.aurora.common.thrift.Endpoint; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.thrift.Status; - -import static java.util.Objects.requireNonNull; - -/** - * Encodes a {@link ServiceInstance} as a JSON object. - */ -public class JsonCodec implements Codec<ServiceInstance> { - - private static void assertRequiredField(String fieldName, Object fieldValue) { - if (fieldValue == null) { - throw new JsonParseException(String.format("Field %s is required", fieldName)); - } - } - - private static class EndpointSchema { - private final String host; - private final Integer port; - - EndpointSchema(Endpoint endpoint) { - host = endpoint.getHost(); - port = endpoint.getPort(); - } - - Endpoint asEndpoint() { - assertRequiredField("host", host); - assertRequiredField("port", port); - - return new Endpoint(host, port); - } - } - - private static class ServiceInstanceSchema { - private final EndpointSchema serviceEndpoint; - private final Map<String, EndpointSchema> additionalEndpoints; - private final Status status; - private final @Nullable Integer shard; - - ServiceInstanceSchema(ServiceInstance instance) { - serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint()); - if (instance.isSetAdditionalEndpoints()) { - additionalEndpoints = - Maps.transformValues(instance.getAdditionalEndpoints(), EndpointSchema::new); - } else { - additionalEndpoints = ImmutableMap.of(); - } - status = instance.getStatus(); - shard = instance.isSetShard() ? instance.getShard() : null; - } - - ServiceInstance asServiceInstance() { - assertRequiredField("serviceEndpoint", serviceEndpoint); - assertRequiredField("status", status); - - Map<String, EndpointSchema> extraEndpoints = - additionalEndpoints == null ? ImmutableMap.of() : additionalEndpoints; - - ServiceInstance instance = - new ServiceInstance( - serviceEndpoint.asEndpoint(), - Maps.transformValues(extraEndpoints, EndpointSchema::asEndpoint), - status); - if (shard != null) { - instance.setShard(shard); - } - return instance; - } - } - - /** - * The encoding for service instance data in ZooKeeper expected by Aurora clients. - */ - public static final Codec<ServiceInstance> INSTANCE = new JsonCodec(); - - private static final Charset ENCODING = Charsets.UTF_8; - - private final Gson gson; - - private JsonCodec() { - this(new Gson()); - } - - JsonCodec(Gson gson) { - this.gson = requireNonNull(gson); - } - - @Override - public void serialize(ServiceInstance instance, OutputStream sink) throws IOException { - Writer writer = new OutputStreamWriter(sink, ENCODING); - try { - gson.toJson(new ServiceInstanceSchema(instance), writer); - } catch (JsonIOException e) { - throw new IOException(String.format("Problem serializing %s to JSON", instance), e); - } - writer.flush(); - } - - @Override - public ServiceInstance deserialize(InputStream source) throws IOException { - InputStreamReader reader = new InputStreamReader(source, ENCODING); - try { - @Nullable ServiceInstanceSchema schema = gson.fromJson(reader, ServiceInstanceSchema.class); - if (schema == null) { - throw new IOException("JSON did not include a ServiceInstance object"); - } - return schema.asServiceInstance(); - } catch (JsonParseException e) { - throw new IOException("Problem parsing JSON ServiceInstance.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java deleted file mode 100644 index 7f962eb..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.net.InetSocketAddress; -import java.util.Map; - -/** - * A service that uses master election to only allow a single service instance to be active amongst - * a set of potential servers at a time. - */ -public interface SingletonService { - - /** - * Indicates an error attempting to lead a group of servers. - */ - class LeadException extends Exception { - public LeadException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Indicates an error attempting to advertise leadership of a group of servers. - */ - class AdvertiseException extends Exception { - public AdvertiseException(String message) { - super(message); - } - - public AdvertiseException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Indicates an error attempting to leave a group of servers, abdicating leadership of the group. - */ - class LeaveException extends Exception { - public LeaveException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Attempts to lead the singleton service. - * - * @param endpoint The primary endpoint to register as a leader candidate in the service. - * @param additionalEndpoints Additional endpoints that are available on the host. - * @param listener Handler to call when the candidate is elected or defeated. - * @throws LeadException If there was a problem joining or watching the ZooKeeper group. - * @throws InterruptedException If the thread watching/joining the group was interrupted. - */ - void lead( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints, - LeadershipListener listener) - throws LeadException, InterruptedException; - - /** - * A listener to be notified of changes in the leadership status. - * Implementers should be careful to avoid blocking operations in these callbacks. - */ - interface LeadershipListener { - - /** - * Notifies the listener that is is current leader. - * - * @param control A controller handle to advertise and/or leave advertised presence. - */ - void onLeading(LeaderControl control); - - /** - * Notifies the listener that it is no longer leader. - */ - void onDefeated(); - } - - /** - * A controller for the state of the leader. This will be provided to the leader upon election, - * which allows the leader to decide when to advertise as leader of the server set and terminate - * leadership at will. - */ - interface LeaderControl { - - /** - * Advertises the leader's server presence to clients. - * - * @throws AdvertiseException If there was an error advertising the singleton leader to clients - * of the server set. - * @throws InterruptedException If interrupted while advertising. - */ - void advertise() throws AdvertiseException, InterruptedException; - - /** - * Leaves candidacy for leadership, removing advertised server presence if applicable. - * - * @throws LeaveException If the leader's status could not be updated or there was an error - * abdicating server set leadership. - */ - void leave() throws LeaveException; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java deleted file mode 100644 index ce243fb..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperClient.java +++ /dev/null @@ -1,372 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.net.InetSocketAddressHelper; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.SessionExpiredException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.common.PathUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Manages a connection to a ZooKeeper cluster. - */ -public class ZooKeeperClient { - - /** - * Indicates an error connecting to a zookeeper cluster. - */ - public class ZooKeeperConnectionException extends Exception { - ZooKeeperConnectionException(String message, Throwable cause) { - super(message, cause); - } - } - - private final class SessionState { - private final long sessionId; - private final byte[] sessionPasswd; - - private SessionState(long sessionId, byte[] sessionPasswd) { - this.sessionId = sessionId; - this.sessionPasswd = sessionPasswd; - } - } - - private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClient.class); - - private static final Amount<Long,Time> WAIT_FOREVER = Amount.of(0L, Time.MILLISECONDS); - - private final int sessionTimeoutMs; - private final Optional<Credentials> credentials; - private final String zooKeeperServers; - // GuardedBy "this", but still volatile for tests, where we want to be able to see writes - // made from within long synchronized blocks. - private volatile ZooKeeper zooKeeper; - private SessionState sessionState; - - private final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>(); - private final BlockingQueue<WatchedEvent> eventQueue = new LinkedBlockingQueue<WatchedEvent>(); - - private static Iterable<InetSocketAddress> combine(InetSocketAddress address, - InetSocketAddress... addresses) { - return ImmutableSet.<InetSocketAddress>builder().add(address).add(addresses).build(); - } - - /** - * Creates an unconnected client that will lazily attempt to connect on the first call to - * {@link #get()}. - * - * @param sessionTimeout the ZK session timeout - * @param zooKeeperServer the first, required ZK server - * @param zooKeeperServers any additional servers forming the ZK cluster - */ - public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, InetSocketAddress zooKeeperServer, - InetSocketAddress... zooKeeperServers) { - this(sessionTimeout, combine(zooKeeperServer, zooKeeperServers)); - } - - /** - * Creates an unconnected client that will lazily attempt to connect on the first call to - * {@link #get}. - * - * @param sessionTimeout the ZK session timeout - * @param zooKeeperServers the set of servers forming the ZK cluster - */ - public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, - Iterable<InetSocketAddress> zooKeeperServers) { - this(sessionTimeout, Optional.absent(), Optional.absent(), zooKeeperServers); - } - - /** - * Creates an unconnected client that will lazily attempt to connect on the first call to - * {@link #get()}. All successful connections will be authenticated with the given - * {@code credentials}. - * - * @param sessionTimeout the ZK session timeout - * @param credentials the credentials to authenticate with - * @param zooKeeperServer the first, required ZK server - * @param zooKeeperServers any additional servers forming the ZK cluster - */ - public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials, - InetSocketAddress zooKeeperServer, InetSocketAddress... zooKeeperServers) { - this(sessionTimeout, - Optional.of(credentials), - Optional.absent(), - combine(zooKeeperServer, zooKeeperServers)); - } - - /** - * Creates an unconnected client that will lazily attempt to connect on the first call to - * {@link #get}. All successful connections will be authenticated with the given - * {@code credentials}. - * - * @param sessionTimeout the ZK session timeout - * @param credentials the credentials to authenticate with - * @param zooKeeperServers the set of servers forming the ZK cluster - */ - public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Credentials credentials, - Iterable<InetSocketAddress> zooKeeperServers) { - this(sessionTimeout, - Optional.of(credentials), - Optional.absent(), - zooKeeperServers); - } - - /** - * Creates an unconnected client that will lazily attempt to connect on the first call to - * {@link #get}. All successful connections will be authenticated with the given - * {@code credentials}. - * - * @param sessionTimeout the ZK session timeout - * @param credentials the credentials to authenticate with - * @param chrootPath an optional chroot path - * @param zooKeeperServers the set of servers forming the ZK cluster - */ - public ZooKeeperClient(Amount<Integer, Time> sessionTimeout, Optional<Credentials> credentials, - Optional<String> chrootPath, Iterable<InetSocketAddress> zooKeeperServers) { - this.sessionTimeoutMs = Preconditions.checkNotNull(sessionTimeout).as(Time.MILLISECONDS); - this.credentials = Preconditions.checkNotNull(credentials); - - if (chrootPath.isPresent()) { - PathUtils.validatePath(chrootPath.get()); - } - - Preconditions.checkNotNull(zooKeeperServers); - Preconditions.checkArgument(!Iterables.isEmpty(zooKeeperServers), - "Must present at least 1 ZK server"); - - Thread watcherProcessor = new Thread("ZookeeperClient-watcherProcessor") { - @Override - public void run() { - while (true) { - try { - WatchedEvent event = eventQueue.take(); - for (Watcher watcher : watchers) { - watcher.process(event); - } - } catch (InterruptedException e) { /* ignore */ } - } - } - }; - watcherProcessor.setDaemon(true); - watcherProcessor.start(); - - Iterable<String> servers = - Iterables.transform(ImmutableSet.copyOf(zooKeeperServers), - InetSocketAddressHelper::toString); - this.zooKeeperServers = Joiner.on(',').join(servers).concat(chrootPath.or("")); - } - - /** - * Returns the current active ZK connection or establishes a new one if none has yet been - * established or a previous connection was disconnected or had its session time out. This method - * will attempt to re-use sessions when possible. Equivalent to: - * <pre>get(Amount.of(0L, ...)</pre>. - * - * @return a connected ZooKeeper client - * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster - * @throws InterruptedException if interrupted while waiting for a connection to be established - */ - public synchronized ZooKeeper get() throws ZooKeeperConnectionException, InterruptedException { - try { - return get(WAIT_FOREVER); - } catch (TimeoutException e) { - InterruptedException interruptedException = - new InterruptedException("Got an unexpected TimeoutException for 0 wait"); - interruptedException.initCause(e); - throw interruptedException; - } - } - - /** - * Returns the current active ZK connection or establishes a new one if none has yet been - * established or a previous connection was disconnected or had its session time out. This - * method will attempt to re-use sessions when possible. - * - * @param connectionTimeout the maximum amount of time to wait for the connection to the ZK - * cluster to be established; 0 to wait forever - * @return a connected ZooKeeper client - * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster - * @throws InterruptedException if interrupted while waiting for a connection to be established - * @throws TimeoutException if a connection could not be established within the configured - * session timeout - */ - public synchronized ZooKeeper get(Amount<Long, Time> connectionTimeout) - throws ZooKeeperConnectionException, InterruptedException, TimeoutException { - - if (zooKeeper == null) { - final CountDownLatch connected = new CountDownLatch(1); - Watcher watcher = event -> { - switch (event.getType()) { - // Guard the None type since this watch may be used as the default watch on calls by - // the client outside our control. - case None: - switch (event.getState()) { - case Expired: - LOG.info("Zookeeper session expired. Event: " + event); - close(); - break; - case SyncConnected: - connected.countDown(); - break; - } - } - - eventQueue.offer(event); - }; - - try { - zooKeeper = (sessionState != null) - ? new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher, sessionState.sessionId, - sessionState.sessionPasswd) - : new ZooKeeper(zooKeeperServers, sessionTimeoutMs, watcher); - } catch (IOException e) { - throw new ZooKeeperConnectionException( - "Problem connecting to servers: " + zooKeeperServers, e); - } - - if (connectionTimeout.getValue() > 0) { - if (!connected.await(connectionTimeout.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) { - close(); - throw new TimeoutException("Timed out waiting for a ZK connection after " - + connectionTimeout); - } - } else { - try { - connected.await(); - } catch (InterruptedException ex) { - LOG.info("Interrupted while waiting to connect to zooKeeper"); - close(); - throw ex; - } - } - if (credentials.isPresent()) { - Credentials zkCredentials = credentials.get(); - zooKeeper.addAuthInfo(zkCredentials.scheme(), zkCredentials.authToken()); - } - - sessionState = new SessionState(zooKeeper.getSessionId(), zooKeeper.getSessionPasswd()); - } - return zooKeeper; - } - - /** - * Clients that need to re-establish state after session expiration can register an - * {@code onExpired} command to execute. - * - * @param onExpired the {@code Command} to register - * @return the new {@link Watcher} which can later be passed to {@link #unregister} for - * removal. - */ - public Watcher registerExpirationHandler(final Command onExpired) { - Watcher watcher = event -> { - if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) { - onExpired.execute(); - } - }; - register(watcher); - return watcher; - } - - /** - * Clients that need to register a top-level {@code Watcher} should do so using this method. The - * registered {@code watcher} will remain registered across re-connects and session expiration - * events. - * - * @param watcher the {@code Watcher to register} - */ - public void register(Watcher watcher) { - watchers.add(watcher); - } - - /** - * Clients can attempt to unregister a top-level {@code Watcher} that has previously been - * registered. - * - * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch - * @return whether the given {@code Watcher} was found and removed from the active set - */ - public boolean unregister(Watcher watcher) { - return watchers.remove(watcher); - } - - /** - * Checks to see if the client might reasonably re-try an operation given the exception thrown - * while attempting it. If the ZooKeeper session should be expired to enable the re-try to - * succeed this method will expire it as a side-effect. - * - * @param e the exception to test - * @return true if a retry can be attempted - */ - public boolean shouldRetry(KeeperException e) { - if (e instanceof SessionExpiredException) { - close(); - } - return ZooKeeperUtils.isRetryable(e); - } - - /** - * Closes the current connection if any expiring the current ZooKeeper session. Any subsequent - * calls to this method will no-op until the next successful {@link #get}. - */ - public synchronized void close() { - if (zooKeeper != null) { - try { - zooKeeper.close(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warn("Interrupted trying to close zooKeeper"); - } finally { - zooKeeper = null; - sessionState = null; - } - } - } - - @VisibleForTesting - synchronized boolean isClosed() { - return zooKeeper == null; - } - - @VisibleForTesting - ZooKeeper getZooKeeperClientForTests() { - return zooKeeper; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java deleted file mode 100644 index 2ada264..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.util.List; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.common.PathUtils; -import org.apache.zookeeper.data.ACL; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Utilities for dealing with zoo keeper. - */ -public final class ZooKeeperUtils { - - private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class); - - /** - * An appropriate default session timeout for Twitter ZooKeeper clusters. - */ - public static final Amount<Integer,Time> DEFAULT_ZK_SESSION_TIMEOUT = Amount.of(4, Time.SECONDS); - - /** - * The magic version number that allows any mutation to always succeed regardless of actual - * version number. - */ - public static final int ANY_VERSION = -1; - - /** - * An ACL that gives all permissions any user authenticated or not. - */ - public static final ImmutableList<ACL> OPEN_ACL_UNSAFE = - ImmutableList.copyOf(Ids.OPEN_ACL_UNSAFE); - - /** - * An ACL that gives all permissions to node creators and read permissions only to everyone else. - */ - public static final ImmutableList<ACL> EVERYONE_READ_CREATOR_ALL = - ImmutableList.<ACL>builder() - .addAll(Ids.CREATOR_ALL_ACL) - .addAll(Ids.READ_ACL_UNSAFE) - .build(); - - /** - * Returns true if the given exception indicates an error that can be resolved by retrying the - * operation without modification. - * - * @param e the exception to check - * @return true if the causing operation is strictly retryable - */ - public static boolean isRetryable(KeeperException e) { - Preconditions.checkNotNull(e); - - switch (e.code()) { - case CONNECTIONLOSS: - case SESSIONEXPIRED: - case SESSIONMOVED: - case OPERATIONTIMEOUT: - return true; - - case RUNTIMEINCONSISTENCY: - case DATAINCONSISTENCY: - case MARSHALLINGERROR: - case BADARGUMENTS: - case NONODE: - case NOAUTH: - case BADVERSION: - case NOCHILDRENFOREPHEMERALS: - case NODEEXISTS: - case NOTEMPTY: - case INVALIDCALLBACK: - case INVALIDACL: - case AUTHFAILED: - case UNIMPLEMENTED: - - // These two should not be encountered - they are used internally by ZK to specify ranges - case SYSTEMERROR: - case APIERROR: - - case OK: // This is actually an invalid ZK exception code - - default: - return false; - } - } - - /** - * Ensures the given {@code path} exists in the ZK cluster accessed by {@code zkClient}. If the - * path already exists, nothing is done; however if any portion of the path is missing, it will be - * created with the given {@code acl} as a persistent zookeeper node. The given {@code path} must - * be a valid zookeeper absolute path. - * - * @param zkClient the client to use to access the ZK cluster - * @param acl the acl to use if creating path nodes - * @param path the path to ensure exists - * @throws ZooKeeperConnectionException if there was a problem accessing the ZK cluster - * @throws InterruptedException if we were interrupted attempting to connect to the ZK cluster - * @throws KeeperException if there was a problem in ZK - */ - public static void ensurePath(ZooKeeperClient zkClient, List<ACL> acl, String path) - throws ZooKeeperConnectionException, InterruptedException, KeeperException { - Preconditions.checkNotNull(zkClient); - Preconditions.checkNotNull(path); - Preconditions.checkArgument(path.startsWith("/")); - - ensurePathInternal(zkClient, acl, path); - } - - private static void ensurePathInternal(ZooKeeperClient zkClient, List<ACL> acl, String path) - throws ZooKeeperConnectionException, InterruptedException, KeeperException { - if (zkClient.get().exists(path, false) == null) { - // The current path does not exist; so back up a level and ensure the parent path exists - // unless we're already a root-level path. - int lastPathIndex = path.lastIndexOf('/'); - if (lastPathIndex > 0) { - ensurePathInternal(zkClient, acl, path.substring(0, lastPathIndex)); - } - - // We've ensured our parent path (if any) exists so we can proceed to create our path. - try { - zkClient.get().create(path, null, acl, CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException e) { - // This ensures we don't die if a race condition was met between checking existence and - // trying to create the node. - LOG.info("Node existed when trying to ensure path " + path + ", somebody beat us to it?"); - } - } - } - - /** - * Validate and return a normalized zookeeper path which doesn't contain consecutive slashes and - * never ends with a slash (except for root path). - * - * @param path the path to be normalized - * @return normalized path string - */ - public static String normalizePath(String path) { - String normalizedPath = path.replaceAll("//+", "/").replaceFirst("(.+)/$", "$1"); - PathUtils.validatePath(normalizedPath); - return normalizedPath; - } - - private ZooKeeperUtils() { - // utility - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java deleted file mode 100644 index ba09279..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper.testing; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.zookeeper.Credentials; -import org.apache.aurora.common.zookeeper.ZooKeeperClient; - -/** - * A base-class for tests that interact with ZooKeeper via the commons ZooKeeperClient. - */ -public abstract class BaseZooKeeperClientTest extends BaseZooKeeperTest { - - private final Amount<Integer, Time> defaultSessionTimeout; - - /** - * Creates a test case where the test server uses its - * {@link ZooKeeperTestServer#DEFAULT_SESSION_TIMEOUT} for clients created without an explicit - * session timeout. - */ - public BaseZooKeeperClientTest() { - this(ZooKeeperTestServer.DEFAULT_SESSION_TIMEOUT); - } - - /** - * Creates a test case where the test server uses the given {@code defaultSessionTimeout} for - * clients created without an explicit session timeout. - */ - public BaseZooKeeperClientTest(Amount<Integer, Time> defaultSessionTimeout) { - this.defaultSessionTimeout = Preconditions.checkNotNull(defaultSessionTimeout); - } - - - /** - * Starts zookeeper back up on the last used port. - */ - protected final void restartNetwork() throws IOException, InterruptedException { - getServer().restartNetwork(); - } - - /** - * Shuts down the in-process zookeeper network server. - */ - protected final void shutdownNetwork() { - getServer().shutdownNetwork(); - } - - /** - * Expires the active session for the given client. The client should be one returned from - * {@link #createZkClient}. - * - * @param zkClient the client to expire - * @throws ZooKeeperClient.ZooKeeperConnectionException if a problem is encountered connecting to - * the local zk server while trying to expire the session - * @throws InterruptedException if interrupted while requesting expiration - */ - protected final void expireSession(ZooKeeperClient zkClient) - throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException { - getServer().expireClientSession(zkClient.get().getSessionId()); - } - - /** - * Returns the current port to connect to the in-process zookeeper instance. - */ - protected final int getPort() { - return getServer().getPort(); - } - - /** - * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server - * with the default session timeout. - */ - protected final ZooKeeperClient createZkClient() { - return createZkClient(defaultSessionTimeout, Optional.absent(), Optional.absent()); - } - - /** - * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with - * the default session timeout. - */ - protected final ZooKeeperClient createZkClient(Credentials credentials) { - return createZkClient(defaultSessionTimeout, Optional.of(credentials), Optional.absent()); - } - - /** - * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with - * the default session timeout. The client is authenticated in the digest authentication scheme - * with the given {@code username} and {@code password}. - */ - protected final ZooKeeperClient createZkClient(String username, String password) { - return createZkClient(Credentials.digestCredentials(username, password)); - } - - /** - * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server - * with a custom {@code sessionTimeout}. - */ - protected final ZooKeeperClient createZkClient(Amount<Integer, Time> sessionTimeout) { - return createZkClient(sessionTimeout, Optional.absent(), Optional.absent()); - } - - /** - * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with - * the default session timeout and the custom chroot path. - */ - protected final ZooKeeperClient createZkClient(String chrootPath) { - return createZkClient(defaultSessionTimeout, Optional.absent(), - Optional.of(chrootPath)); - } - - private ZooKeeperClient createZkClient( - Amount<Integer, Time> sessionTimeout, - Optional<Credentials> credentials, - Optional<String> chrootPath) { - - ZooKeeperClient client = new ZooKeeperClient(sessionTimeout, credentials, chrootPath, - ImmutableList.of(InetSocketAddress.createUnresolved("127.0.0.1", getPort()))); - addTearDown(client::close); - return client; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java deleted file mode 100644 index 0e68987..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper.testing; - -import org.apache.aurora.common.testing.TearDownTestCase; -import org.junit.Before; -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; - -/** - * A base-class for in-process zookeeper tests. - */ -public abstract class BaseZooKeeperTest extends TearDownTestCase { - - private ZooKeeperTestServer zkTestServer; - - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Before - public final void setUp() throws Exception { - zkTestServer = new ZooKeeperTestServer(tmpFolder.newFolder(), tmpFolder.newFolder()); - addTearDown(zkTestServer::stop); - zkTestServer.startNetwork(); - } - - /** - * Returns the running in-process ZooKeeper server. - * - * @return The in-process ZooKeeper server. - */ - protected final ZooKeeperTestServer getServer() { - return zkTestServer; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java deleted file mode 100644 index 50acaeb..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper.testing; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; - -import com.google.common.base.Preconditions; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.zookeeper.server.NIOServerCnxnFactory; -import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.ZooKeeperServer.BasicDataTreeBuilder; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; - -/** - * A helper class for starting in-process ZooKeeper server and clients. - * - * <p>This is ONLY meant to be used for testing. - */ -public class ZooKeeperTestServer { - - static final Amount<Integer, Time> DEFAULT_SESSION_TIMEOUT = Amount.of(100, Time.MILLISECONDS); - - private final File dataDir; - private final File snapDir; - - private ZooKeeperServer zooKeeperServer; - private ServerCnxnFactory connectionFactory; - private int port; - - public ZooKeeperTestServer(File dataDir, File snapDir) { - this.dataDir = Preconditions.checkNotNull(dataDir); - this.snapDir = Preconditions.checkNotNull(snapDir); - } - - /** - * Starts zookeeper up on an ephemeral port. - */ - public void startNetwork() throws IOException, InterruptedException { - zooKeeperServer = - new ZooKeeperServer( - new FileTxnSnapLog(dataDir, snapDir), - new BasicDataTreeBuilder()) { - - // TODO(John Sirois): Introduce a builder to configure the in-process server if and when - // some folks need JMX for in-process tests. - @Override protected void registerJMX() { - // noop - } - }; - - connectionFactory = new NIOServerCnxnFactory(); - connectionFactory.configure( - new InetSocketAddress(port), - 60 /* Semi-arbitrary, max 60 connections is the default used by NIOServerCnxnFactory */); - connectionFactory.startup(zooKeeperServer); - port = zooKeeperServer.getClientPort(); - } - - /** - * Stops the zookeeper server. - */ - public void stop() { - shutdownNetwork(); - } - - /** - * Starts zookeeper back up on the last used port. - */ - final void restartNetwork() throws IOException, InterruptedException { - checkEphemeralPortAssigned(); - Preconditions.checkState(connectionFactory == null); - startNetwork(); - } - - /** - * Shuts down the in-process zookeeper network server. - */ - final void shutdownNetwork() { - if (connectionFactory != null) { - connectionFactory.shutdown(); // Also shuts down zooKeeperServer. - connectionFactory = null; - } - } - - /** - * Expires the client session with the given {@code sessionId}. - * - * @param sessionId The id of the client session to expire. - */ - public final void expireClientSession(long sessionId) { - zooKeeperServer.closeSession(sessionId); - } - - /** - * Returns the current port to connect to the in-process zookeeper instance. - */ - public final int getPort() { - checkEphemeralPortAssigned(); - return port; - } - - private void checkEphemeralPortAssigned() { - Preconditions.checkState(port > 0, "startNetwork must be called first"); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java deleted file mode 100644 index a37808c..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableMap; -import com.google.gson.Gson; -import com.google.gson.JsonIOException; - -import org.apache.aurora.common.thrift.Endpoint; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.thrift.Status; -import org.easymock.EasyMock; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(Gson.class) -public class JsonCodecTest { - - private static byte[] serializeServiceInstance(ServiceInstance serviceInstance) - throws IOException { - - ByteArrayOutputStream output = new ByteArrayOutputStream(); - JsonCodec.INSTANCE.serialize(serviceInstance, output); - return output.toByteArray(); - } - - private static ServiceInstance deserializeServiceInstance(byte[] data) throws IOException { - return JsonCodec.INSTANCE.deserialize(new ByteArrayInputStream(data)); - } - - @Test - public void testJsonCodecRoundtrip() throws Exception { - ServiceInstance instance1 = new ServiceInstance( - new Endpoint("foo", 1000), - ImmutableMap.of("http", new Endpoint("foo", 8080)), - Status.ALIVE) - .setShard(0); - byte[] data = serializeServiceInstance(instance1); - assertTrue(deserializeServiceInstance(data).getServiceEndpoint().isSetPort()); - assertTrue(deserializeServiceInstance(data).isSetShard()); - - ServiceInstance instance2 = new ServiceInstance( - new Endpoint("foo", 1000), - ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)), - Status.ALIVE); - data = serializeServiceInstance(instance2); - assertTrue(deserializeServiceInstance(data).getServiceEndpoint().isSetPort()); - assertFalse(deserializeServiceInstance(data).isSetShard()); - - ServiceInstance instance3 = new ServiceInstance( - new Endpoint("foo", 1000), - ImmutableMap.<String, Endpoint>of(), - Status.ALIVE); - data = serializeServiceInstance(instance3); - assertTrue(deserializeServiceInstance(data).getServiceEndpoint().isSetPort()); - assertFalse(deserializeServiceInstance(data).isSetShard()); - } - - @Test - public void testJsonCompatibility() throws IOException { - ServiceInstance instance = new ServiceInstance( - new Endpoint("foo", 1000), - ImmutableMap.of("http", new Endpoint("foo", 8080)), - Status.ALIVE).setShard(42); - - ByteArrayOutputStream results = new ByteArrayOutputStream(); - JsonCodec.INSTANCE.serialize(instance, results); - assertEquals( - "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000}," - + "\"additionalEndpoints\":{\"http\":{\"host\":\"foo\",\"port\":8080}}," - + "\"status\":\"ALIVE\"," - + "\"shard\":42}", - results.toString()); - } - - @Test - public void testInvalidSerialize() { - // Gson is final so we need to call on PowerMock here. - Gson gson = PowerMock.createMock(Gson.class); - gson.toJson(EasyMock.isA(Object.class), EasyMock.isA(Appendable.class)); - EasyMock.expectLastCall().andThrow(new JsonIOException("error")); - PowerMock.replay(gson); - - ServiceInstance instance = - new ServiceInstance(new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE); - - try { - new JsonCodec(gson).serialize(instance, new ByteArrayOutputStream()); - fail(); - } catch (IOException e) { - // Expected. - } - - PowerMock.verify(gson); - } - - @Test - public void testDeserializeMinimal() throws IOException { - String minimal = "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},\"status\":\"ALIVE\"}"; - ByteArrayInputStream source = new ByteArrayInputStream(minimal.getBytes(Charsets.UTF_8)); - ServiceInstance actual = JsonCodec.INSTANCE.deserialize(source); - ServiceInstance expected = - new ServiceInstance(new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE); - assertEquals(expected, actual); - } - - @Test - public void testInvalidDeserialize() { - // Not JSON. - assertInvalidDeserialize(new byte[] {0xC, 0xA, 0xF, 0xE}); - - // No JSON object. - assertInvalidDeserialize(""); - assertInvalidDeserialize("[]"); - - // Missing required fields. - assertInvalidDeserialize("{}"); - assertInvalidDeserialize("{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000}}"); - assertInvalidDeserialize("{\"status\":\"ALIVE\"}"); - } - - private void assertInvalidDeserialize(String data) { - assertInvalidDeserialize(data.getBytes(Charsets.UTF_8)); - } - - private void assertInvalidDeserialize(byte[] data) { - try { - JsonCodec.INSTANCE.deserialize(new ByteArrayInputStream(data)); - fail(); - } catch (IOException e) { - // Expected. - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java deleted file mode 100644 index 5eee235..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java +++ /dev/null @@ -1,210 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException.ConnectionLossException; -import org.apache.zookeeper.KeeperException.NoAuthException; -import org.apache.zookeeper.ZooKeeper; -import org.junit.Test; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * @author John Sirois - */ -public class ZooKeeperClientTest extends BaseZooKeeperClientTest { - - public ZooKeeperClientTest() { - super(Amount.of(1, Time.DAYS)); - } - - @Test - public void testGet() throws Exception { - final ZooKeeperClient zkClient = createZkClient(); - shutdownNetwork(); - try { - zkClient.get(Amount.of(50L, Time.MILLISECONDS)); - fail("Expected client connection to timeout while network down"); - } catch (TimeoutException e) { - assertTrue(zkClient.isClosed()); - } - assertNull(zkClient.getZooKeeperClientForTests()); - - final CountDownLatch blockingGetComplete = new CountDownLatch(1); - final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>(); - new Thread(() -> { - try { - client.set(zkClient.get()); - } catch (ZooKeeperConnectionException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } finally { - blockingGetComplete.countDown(); - } - }).start(); - - restartNetwork(); - - // Hung blocking connects should succeed when server connection comes up - blockingGetComplete.await(); - assertNotNull(client.get()); - - // New connections should succeed now that network is back up - long sessionId = zkClient.get().getSessionId(); - - // While connected the same client should be reused (no new connections while healthy) - assertSame(client.get(), zkClient.get()); - - shutdownNetwork(); - // Our client doesn't know the network is down yet so we should be able to get() - ZooKeeper zooKeeper = zkClient.get(); - try { - zooKeeper.exists("/", false); - fail("Expected client operation to fail while network down"); - } catch (ConnectionLossException e) { - // expected - } - - restartNetwork(); - assertEquals("Expected connection to be re-established with existing session", - sessionId, zkClient.get().getSessionId()); - } - - /** - * Test that if a blocking get() call gets interrupted, after a connection has been created - * but before it's connected, the zk connection gets closed. - */ - @Test - public void testGetInterrupted() throws Exception { - final ZooKeeperClient zkClient = createZkClient(); - shutdownNetwork(); - - final CountDownLatch blockingGetComplete = new CountDownLatch(1); - final AtomicBoolean interrupted = new AtomicBoolean(); - final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>(); - Thread getThread = new Thread(() -> { - try { - client.set(zkClient.get()); - } catch (ZooKeeperConnectionException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { - interrupted.set(true); - throw new RuntimeException(e); - } finally { - blockingGetComplete.countDown(); - } - }); - getThread.start(); - - while (zkClient.getZooKeeperClientForTests() == null) { - Thread.sleep(100); - } - - getThread.interrupt(); - blockingGetComplete.await(); - - assertNull("The zk connection should have been closed", zkClient.getZooKeeperClientForTests()); - assertTrue("The waiter thread should have been interrupted", interrupted.get()); - assertTrue(zkClient.isClosed()); - } - - @Test - public void testClose() throws Exception { - ZooKeeperClient zkClient = createZkClient(); - zkClient.close(); - - // Close should be idempotent - zkClient.close(); - - long firstSessionId = zkClient.get().getSessionId(); - - // Close on an open client should force session re-establishment - zkClient.close(); - - assertNotEquals(firstSessionId, zkClient.get().getSessionId()); - } - - @Test - public void testCredentials() throws Exception { - String path = "/test"; - ZooKeeperClient authenticatedClient = createZkClient("creator", "creator"); - assertEquals(path, - authenticatedClient.get().create(path, "42".getBytes(), - ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT)); - - ZooKeeperClient unauthenticatedClient = createZkClient(); - assertEquals("42", getData(unauthenticatedClient, path)); - try { - setData(unauthenticatedClient, path, "37"); - fail("Expected unauthenticated write attempt to fail"); - } catch (NoAuthException e) { - assertEquals("42", getData(unauthenticatedClient, path)); - } - - ZooKeeperClient nonOwnerClient = createZkClient("nonowner", "nonowner"); - assertEquals("42", getData(nonOwnerClient, path)); - try { - setData(nonOwnerClient, path, "37"); - fail("Expected non owner write attempt to fail"); - } catch (NoAuthException e) { - assertEquals("42", getData(nonOwnerClient, path)); - } - - ZooKeeperClient authenticatedClient2 = createZkClient("creator", "creator"); - setData(authenticatedClient2, path, "37"); - assertEquals("37", getData(authenticatedClient2, path)); - } - - @Test - public void testChrootPath() throws Exception { - ZooKeeperClient rootClient = createZkClient(); - String rootPath = "/test"; - String subPath = "/test/subtest"; - assertEquals(rootPath, - rootClient.get().create(rootPath, "42".getBytes(), - ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - assertEquals(subPath, - rootClient.get().create(subPath, "37".getBytes(), - ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - - ZooKeeperClient chrootedClient = createZkClient(rootPath); - assertArrayEquals("37".getBytes(), chrootedClient.get().getData("/subtest", false, null)); - } - - private void setData(ZooKeeperClient zkClient, String path, String data) throws Exception { - zkClient.get().setData(path, data.getBytes(), ZooKeeperUtils.ANY_VERSION); - } - - private String getData(ZooKeeperClient zkClient, String path) throws Exception { - return new String(zkClient.get().getData(path, false, null)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java deleted file mode 100644 index 9e482a6..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import com.google.common.base.Charsets; - -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException.BadVersionException; -import org.apache.zookeeper.KeeperException.NoAuthException; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.Stat; -import org.junit.Test; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * @author John Sirois - */ -public class ZooKeeperUtilsTest extends BaseZooKeeperClientTest { - @Test - public void testEnsurePath() throws Exception { - ZooKeeperClient zkClient = createZkClient(); - zkClient.get().addAuthInfo("digest", "client1:boo".getBytes(Charsets.UTF_8)); - - assertNull(zkClient.get().exists("/foo", false)); - ZooKeeperUtils.ensurePath(zkClient, ZooDefs.Ids.CREATOR_ALL_ACL, "/foo/bar/baz"); - - zkClient = createZkClient(); - zkClient.get().addAuthInfo("digest", "client2:bap".getBytes(Charsets.UTF_8)); - - // Anyone can check for existence in ZK - assertNotNull(zkClient.get().exists("/foo", false)); - assertNotNull(zkClient.get().exists("/foo/bar", false)); - assertNotNull(zkClient.get().exists("/foo/bar/baz", false)); - - try { - zkClient.get().delete("/foo/bar/baz", -1 /* delete no matter what */); - fail("Expected CREATOR_ALL_ACL to be applied to created path and client2 mutations to be " - + "rejected"); - } catch (NoAuthException e) { - // expected - } - } - - @Test - public void testMagicVersionNumberAllowsUnconditionalUpdate() throws Exception { - String nodePath = "/foo"; - ZooKeeperClient zkClient = createZkClient(); - - zkClient.get().create(nodePath, "init".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - - Stat initStat = new Stat(); - byte[] initialData = zkClient.get().getData(nodePath, false, initStat); - assertArrayEquals("init".getBytes(), initialData); - - // bump the version - Stat rev1Stat = zkClient.get().setData(nodePath, "rev1".getBytes(), initStat.getVersion()); - - try { - zkClient.get().setData(nodePath, "rev2".getBytes(), initStat.getVersion()); - fail("expected correct version to be required"); - } catch (BadVersionException e) { - // expected - } - - // expect using the correct version to work - Stat rev2Stat = zkClient.get().setData(nodePath, "rev2".getBytes(), rev1Stat.getVersion()); - assertNotEquals(ZooKeeperUtils.ANY_VERSION, rev2Stat.getVersion()); - - zkClient.get().setData(nodePath, "force-write".getBytes(), ZooKeeperUtils.ANY_VERSION); - Stat forceWriteStat = new Stat(); - byte[] forceWriteData = zkClient.get().getData(nodePath, false, forceWriteStat); - assertArrayEquals("force-write".getBytes(), forceWriteData); - - assertTrue(forceWriteStat.getVersion() > rev2Stat.getVersion()); - assertNotEquals(ZooKeeperUtils.ANY_VERSION, forceWriteStat.getVersion()); - } - - @Test - public void testNormalizingPath() throws Exception { - assertEquals("/", ZooKeeperUtils.normalizePath("/")); - assertEquals("/foo", ZooKeeperUtils.normalizePath("/foo/")); - assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo//bar")); - assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("//foo/bar")); - assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar/")); - assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar//")); - assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar")); - } - - @Test - public void testLenientPaths() { - assertEquals("/", ZooKeeperUtils.normalizePath("///")); - assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a/group")); - assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a/group/")); - assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a//group")); - assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a//group//")); - - try { - ZooKeeperUtils.normalizePath("a/group"); - fail("Relative paths should not be allowed."); - } catch (IllegalArgumentException e) { - // expected - } - - try { - ZooKeeperUtils.normalizePath("/a/./group"); - fail("Relative paths should not be allowed."); - } catch (IllegalArgumentException e) { - // expected - } - - try { - ZooKeeperUtils.normalizePath("/a/../group"); - fail("Relative paths should not be allowed."); - } catch (IllegalArgumentException e) { - // expected - } - } - -} http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/config/findbugs/excludeFilter.xml ---------------------------------------------------------------------- diff --git a/config/findbugs/excludeFilter.xml b/config/findbugs/excludeFilter.xml index fe3f4ca..1c311d3 100644 --- a/config/findbugs/excludeFilter.xml +++ b/config/findbugs/excludeFilter.xml @@ -73,6 +73,14 @@ limitations under the License. <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS" /> </Match> + <!-- We're forced to use the platform default encoding to generate a byte array from digest + credentials because the underlying ZooKeeper API dictates this - also noted in the + offending code. --> + <Match> + <Class name="org.apache.aurora.scheduler.discovery.Credentials" /> + <Bug pattern="DM_DEFAULT_ENCODING" /> + </Match> + <!-- Technical debt. --> <Match> <Class name="org.apache.aurora.scheduler.log.mesos.MesosLog$LogStream" /> http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/docs/features/service-discovery.md ---------------------------------------------------------------------- diff --git a/docs/features/service-discovery.md b/docs/features/service-discovery.md index 36823c8..511c96d 100644 --- a/docs/features/service-discovery.md +++ b/docs/features/service-discovery.md @@ -6,7 +6,7 @@ the purpose of service discovery. ServerSets use the Zookeeper [group membershi of which there are several reference implementations: - [C++](https://github.com/apache/mesos/blob/master/src/zookeeper/group.cpp) - - [Java](https://github.com/twitter/commons/blob/master/src/java/com/twitter/common/zookeeper/ServerSetImpl.java#L221) + - [Java](http://curator.apache.org/curator-recipes/group-member.html) - [Python](https://github.com/twitter/commons/blob/master/src/python/twitter/common/zookeeper/serverset/serverset.py#L51) These can also be used natively in Finagle using the [ZookeeperServerSetCluster](https://github.com/twitter/finagle/blob/master/finagle-serversets/src/main/scala/com/twitter/finagle/zookeeper/ZookeeperServerSetCluster.scala). http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java index 195ab91..5ac5f25 100644 --- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java +++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java @@ -47,8 +47,8 @@ import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.common.util.StateMachine; import org.apache.aurora.common.util.StateMachine.Transition; -import org.apache.aurora.common.zookeeper.SingletonService; -import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl; +import org.apache.aurora.scheduler.discovery.SingletonService; +import org.apache.aurora.scheduler.discovery.SingletonService.LeaderControl; import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.mesos.Driver; @@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory; import static java.util.Objects.requireNonNull; -import static org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener; +import static org.apache.aurora.scheduler.discovery.SingletonService.LeadershipListener; /** * The central driver of the scheduler runtime lifecycle. Handles the transitions from startup and http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java index 94c1a29..0f92a3c 100644 --- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java +++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java @@ -42,8 +42,6 @@ import org.apache.aurora.common.args.constraints.NotEmpty; import org.apache.aurora.common.args.constraints.NotNull; import org.apache.aurora.common.inject.Bindings; import org.apache.aurora.common.stats.Stats; -import org.apache.aurora.common.zookeeper.SingletonService; -import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener; import org.apache.aurora.gen.ServerInfo; import org.apache.aurora.scheduler.AppStartup; import org.apache.aurora.scheduler.SchedulerLifecycle; @@ -52,6 +50,8 @@ import org.apache.aurora.scheduler.configuration.executor.ExecutorModule; import org.apache.aurora.scheduler.cron.quartz.CronModule; import org.apache.aurora.scheduler.discovery.FlaggedZooKeeperConfig; import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule; +import org.apache.aurora.scheduler.discovery.SingletonService; +import org.apache.aurora.scheduler.discovery.SingletonService.LeadershipListener; import org.apache.aurora.scheduler.events.WebhookModule; import org.apache.aurora.scheduler.http.HttpService; import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule; http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java deleted file mode 100644 index a1329fd..0000000 --- a/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.app; - -import java.io.Closeable; -import java.util.function.Supplier; - -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.common.thrift.ServiceInstance; - -/** - * Monitors a service group's membership and supplies a live view of the most recent set. - */ -public interface ServiceGroupMonitor extends Supplier<ImmutableSet<ServiceInstance>>, Closeable { - - /** - * Indicates a problem initiating monitoring of a service group. - */ - class MonitorException extends Exception { - public MonitorException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Starts monitoring the service group. - * - * When the service group membership no longer needs to be maintained, this monitor should be - * {@link #close() closed}. - * - * @throws MonitorException if there is a problem initiating monitoring of the service group. - */ - void start() throws MonitorException; -} http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java b/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java new file mode 100644 index 0000000..75d58e7 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java @@ -0,0 +1,98 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import java.util.Arrays; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +import org.apache.aurora.common.base.MorePreconditions; +import org.apache.commons.lang.builder.EqualsBuilder; + +import static java.util.Objects.requireNonNull; + +/** + * Encapsulates a user's ZooKeeper credentials. + */ +public final class Credentials { + + /** + * Creates a set of credentials for the ZooKeeper digest authentication mechanism. + * + * @param username the username to authenticate with + * @param password the password to authenticate with + * @return a set of credentials that can be used to authenticate the zoo keeper client + */ + public static Credentials digestCredentials(String username, String password) { + MorePreconditions.checkNotBlank(username); + Preconditions.checkNotNull(password); + + // TODO(John Sirois): DigestAuthenticationProvider is broken - uses platform default charset + // (on server) and so we just have to hope here that clients are deployed in compatible jvms. + // Consider writing and installing a version of DigestAuthenticationProvider that controls its + // Charset explicitly. + return new Credentials("digest", (username + ":" + password).getBytes()); + } + + private final String authScheme; + private final byte[] authToken; + + /** + * Creates a new set of credentials for the given ZooKeeper authentication scheme. + * + * @param scheme The name of the authentication scheme the {@code token} is valid in. + * @param token The authentication token for the given {@code scheme}. + */ + public Credentials(String scheme, byte[] token) { + authScheme = MorePreconditions.checkNotBlank(scheme); + authToken = requireNonNull(token); + } + + /** + * Returns the authentication scheme these credentials are for. + * + * @return the scheme these credentials are for. + */ + public String scheme() { + return authScheme; + } + + /** + * Returns the authentication token. + * + * @return the authentication token. + */ + public byte[] token() { + return Arrays.copyOf(authToken, authToken.length); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Credentials)) { + return false; + } + + Credentials other = (Credentials) o; + return new EqualsBuilder() + .append(authScheme, other.scheme()) + .append(authToken, other.token()) + .isEquals(); + } + + @Override + public int hashCode() { + return Objects.hashCode(authScheme, authToken); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java index 6ccfef4..e690d14 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java @@ -33,10 +33,6 @@ import org.apache.aurora.common.net.InetSocketAddressHelper; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.zookeeper.Credentials; -import org.apache.aurora.common.zookeeper.JsonCodec; -import org.apache.aurora.common.zookeeper.SingletonService; -import org.apache.aurora.scheduler.app.ServiceGroupMonitor; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -106,7 +102,7 @@ class CuratorServiceDiscoveryModule extends PrivateModule { if (zooKeeperConfig.getCredentials().isPresent()) { Credentials credentials = zooKeeperConfig.getCredentials().get(); - builder.authorization(credentials.scheme(), credentials.authToken()); + builder.authorization(credentials.scheme(), credentials.token()); } CuratorFramework curatorFramework = builder.build(); http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java index 0b86fb6..db886df 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java @@ -24,7 +24,6 @@ import org.apache.aurora.GuavaUtils; import org.apache.aurora.common.io.Codec; import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.scheduler.app.SchedulerMain; -import org.apache.aurora.scheduler.app.ServiceGroupMonitor; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.utils.ZKPaths; http://git-wip-us.apache.org/repos/asf/aurora/blob/b417be38/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java index c9bd1eb..4040067 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java @@ -27,7 +27,6 @@ import org.apache.aurora.common.io.Codec; import org.apache.aurora.common.thrift.Endpoint; import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.common.thrift.Status; -import org.apache.aurora.common.zookeeper.SingletonService; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
