http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java new file mode 100644 index 0000000..9e482a6 --- /dev/null +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java @@ -0,0 +1,139 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import com.google.common.base.Charsets; + +import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException.BadVersionException; +import org.apache.zookeeper.KeeperException.NoAuthException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.Stat; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * @author John Sirois + */ +public class ZooKeeperUtilsTest extends BaseZooKeeperClientTest { + @Test + public void testEnsurePath() throws Exception { + ZooKeeperClient zkClient = createZkClient(); + zkClient.get().addAuthInfo("digest", "client1:boo".getBytes(Charsets.UTF_8)); + + assertNull(zkClient.get().exists("/foo", false)); + ZooKeeperUtils.ensurePath(zkClient, ZooDefs.Ids.CREATOR_ALL_ACL, "/foo/bar/baz"); + + zkClient = createZkClient(); + zkClient.get().addAuthInfo("digest", "client2:bap".getBytes(Charsets.UTF_8)); + + // Anyone can check for existence in ZK + assertNotNull(zkClient.get().exists("/foo", false)); + assertNotNull(zkClient.get().exists("/foo/bar", false)); + assertNotNull(zkClient.get().exists("/foo/bar/baz", false)); + + try { + zkClient.get().delete("/foo/bar/baz", -1 /* delete no matter what */); + fail("Expected CREATOR_ALL_ACL to be applied to created path and client2 mutations to be " + + "rejected"); + } catch (NoAuthException e) { + // expected + } + } + + @Test + public void testMagicVersionNumberAllowsUnconditionalUpdate() throws Exception { + String nodePath = "/foo"; + ZooKeeperClient zkClient = createZkClient(); + + zkClient.get().create(nodePath, "init".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + Stat initStat = new Stat(); + byte[] initialData = zkClient.get().getData(nodePath, false, initStat); + assertArrayEquals("init".getBytes(), initialData); + + // bump the version + Stat rev1Stat = zkClient.get().setData(nodePath, "rev1".getBytes(), initStat.getVersion()); + + try { + zkClient.get().setData(nodePath, "rev2".getBytes(), initStat.getVersion()); + fail("expected correct version to be required"); + } catch (BadVersionException e) { + // expected + } + + // expect using the correct version to work + Stat rev2Stat = zkClient.get().setData(nodePath, "rev2".getBytes(), rev1Stat.getVersion()); + assertNotEquals(ZooKeeperUtils.ANY_VERSION, rev2Stat.getVersion()); + + zkClient.get().setData(nodePath, "force-write".getBytes(), ZooKeeperUtils.ANY_VERSION); + Stat forceWriteStat = new Stat(); + byte[] forceWriteData = zkClient.get().getData(nodePath, false, forceWriteStat); + assertArrayEquals("force-write".getBytes(), forceWriteData); + + assertTrue(forceWriteStat.getVersion() > rev2Stat.getVersion()); + assertNotEquals(ZooKeeperUtils.ANY_VERSION, forceWriteStat.getVersion()); + } + + @Test + public void testNormalizingPath() throws Exception { + assertEquals("/", ZooKeeperUtils.normalizePath("/")); + assertEquals("/foo", ZooKeeperUtils.normalizePath("/foo/")); + assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo//bar")); + assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("//foo/bar")); + assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar/")); + assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar//")); + assertEquals("/foo/bar", ZooKeeperUtils.normalizePath("/foo/bar")); + } + + @Test + public void testLenientPaths() { + assertEquals("/", ZooKeeperUtils.normalizePath("///")); + assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a/group")); + assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a/group/")); + assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a//group")); + assertEquals("/a/group", ZooKeeperUtils.normalizePath("/a//group//")); + + try { + ZooKeeperUtils.normalizePath("a/group"); + fail("Relative paths should not be allowed."); + } catch (IllegalArgumentException e) { + // expected + } + + try { + ZooKeeperUtils.normalizePath("/a/./group"); + fail("Relative paths should not be allowed."); + } catch (IllegalArgumentException e) { + // expected + } + + try { + ZooKeeperUtils.normalizePath("/a/../group"); + fail("Relative paths should not be allowed."); + } catch (IllegalArgumentException e) { + // expected + } + } + +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/config/findbugs/excludeFilter.xml ---------------------------------------------------------------------- diff --git a/config/findbugs/excludeFilter.xml b/config/findbugs/excludeFilter.xml index 5eaa11a..f7d5ae0 100644 --- a/config/findbugs/excludeFilter.xml +++ b/config/findbugs/excludeFilter.xml @@ -74,14 +74,6 @@ limitations under the License. <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS" /> </Match> - <!-- We're forced to use the platform default encoding to generate a byte array from digest - credentials because the underlying ZooKeeper API dictates this - also noted in the - offending code. --> - <Match> - <Class name="org.apache.aurora.scheduler.discovery.Credentials" /> - <Bug pattern="DM_DEFAULT_ENCODING" /> - </Match> - <!-- Technical debt. --> <Match> <Class name="org.apache.aurora.scheduler.log.mesos.MesosLog$LogStream" /> http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/docs/features/service-discovery.md ---------------------------------------------------------------------- diff --git a/docs/features/service-discovery.md b/docs/features/service-discovery.md index 511c96d..36823c8 100644 --- a/docs/features/service-discovery.md +++ b/docs/features/service-discovery.md @@ -6,7 +6,7 @@ the purpose of service discovery. ServerSets use the Zookeeper [group membershi of which there are several reference implementations: - [C++](https://github.com/apache/mesos/blob/master/src/zookeeper/group.cpp) - - [Java](http://curator.apache.org/curator-recipes/group-member.html) + - [Java](https://github.com/twitter/commons/blob/master/src/java/com/twitter/common/zookeeper/ServerSetImpl.java#L221) - [Python](https://github.com/twitter/commons/blob/master/src/python/twitter/common/zookeeper/serverset/serverset.py#L51) These can also be used natively in Finagle using the [ZookeeperServerSetCluster](https://github.com/twitter/finagle/blob/master/finagle-serversets/src/main/scala/com/twitter/finagle/zookeeper/ZookeeperServerSetCluster.scala). http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/docs/reference/scheduler-configuration.md ---------------------------------------------------------------------- diff --git a/docs/reference/scheduler-configuration.md b/docs/reference/scheduler-configuration.md index 8955653..d4e0a9a 100644 --- a/docs/reference/scheduler-configuration.md +++ b/docs/reference/scheduler-configuration.md @@ -252,5 +252,7 @@ Optional flags: Launches an embedded zookeeper server for local testing causing -zk_endpoints to be ignored if specified. -zk_session_timeout (default (4, secs)) The ZooKeeper session timeout. +-zk_use_curator (default true) + DEPRECATED: Uses Apache Curator as the zookeeper client; otherwise a copy of Twitter commons/zookeeper (the legacy library) is used. ------------------------------------------------------------------------- ``` http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java index 76209b1..4e354b6 100644 --- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java +++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java @@ -46,8 +46,8 @@ import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.common.util.StateMachine; import org.apache.aurora.common.util.StateMachine.Transition; -import org.apache.aurora.scheduler.discovery.SingletonService; -import org.apache.aurora.scheduler.discovery.SingletonService.LeaderControl; +import org.apache.aurora.common.zookeeper.SingletonService; +import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl; import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.mesos.Driver; @@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory; import static java.util.Objects.requireNonNull; -import static org.apache.aurora.scheduler.discovery.SingletonService.LeadershipListener; +import static org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener; /** * The central driver of the scheduler runtime lifecycle. Handles the transitions from startup and http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java index e0d32de..43cc5b4 100644 --- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java +++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java @@ -42,6 +42,8 @@ import org.apache.aurora.common.args.constraints.NotEmpty; import org.apache.aurora.common.args.constraints.NotNull; import org.apache.aurora.common.inject.Bindings; import org.apache.aurora.common.stats.Stats; +import org.apache.aurora.common.zookeeper.SingletonService; +import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener; import org.apache.aurora.gen.ServerInfo; import org.apache.aurora.scheduler.AppStartup; import org.apache.aurora.scheduler.SchedulerLifecycle; @@ -50,8 +52,6 @@ import org.apache.aurora.scheduler.configuration.executor.ExecutorModule; import org.apache.aurora.scheduler.cron.quartz.CronModule; import org.apache.aurora.scheduler.discovery.FlaggedZooKeeperConfig; import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule; -import org.apache.aurora.scheduler.discovery.SingletonService; -import org.apache.aurora.scheduler.discovery.SingletonService.LeadershipListener; import org.apache.aurora.scheduler.events.WebhookModule; import org.apache.aurora.scheduler.http.HttpService; import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule; http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java new file mode 100644 index 0000000..a1329fd --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java @@ -0,0 +1,46 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.app; + +import java.io.Closeable; +import java.util.function.Supplier; + +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.thrift.ServiceInstance; + +/** + * Monitors a service group's membership and supplies a live view of the most recent set. + */ +public interface ServiceGroupMonitor extends Supplier<ImmutableSet<ServiceInstance>>, Closeable { + + /** + * Indicates a problem initiating monitoring of a service group. + */ + class MonitorException extends Exception { + public MonitorException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Starts monitoring the service group. + * + * When the service group membership no longer needs to be maintained, this monitor should be + * {@link #close() closed}. + * + * @throws MonitorException if there is a problem initiating monitoring of the service group. + */ + void start() throws MonitorException; +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java new file mode 100644 index 0000000..339f63b --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java @@ -0,0 +1,102 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import java.net.InetSocketAddress; +import java.util.List; + +import javax.inject.Singleton; + +import com.google.inject.Exposed; +import com.google.inject.PrivateModule; +import com.google.inject.Provides; + +import org.apache.aurora.common.net.pool.DynamicHostSet; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.zookeeper.ServerSetImpl; +import org.apache.aurora.common.zookeeper.SingletonService; +import org.apache.aurora.common.zookeeper.SingletonServiceImpl; +import org.apache.aurora.common.zookeeper.ZooKeeperClient; +import org.apache.aurora.common.zookeeper.ZooKeeperUtils; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; +import org.apache.zookeeper.data.ACL; + +import static java.util.Objects.requireNonNull; + +/** + * Binding module for utilities to advertise the network presence of the scheduler. + * + * Uses a fork of Twitter commons/zookeeper. + */ +class CommonsServiceDiscoveryModule extends PrivateModule { + + private final String discoveryPath; + private final ZooKeeperConfig zooKeeperConfig; + + CommonsServiceDiscoveryModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) { + this.discoveryPath = ZooKeeperUtils.normalizePath(discoveryPath); + this.zooKeeperConfig = requireNonNull(zooKeeperConfig); + } + + @Override + protected void configure() { + requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY); + requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY); + + bind(ServiceGroupMonitor.class).to(CommonsServiceGroupMonitor.class).in(Singleton.class); + expose(ServiceGroupMonitor.class); + } + + @Provides + @Singleton + ZooKeeperClient provideZooKeeperClient( + @ServiceDiscoveryBindings.ZooKeeper Iterable<InetSocketAddress> zooKeeperCluster) { + + return new ZooKeeperClient( + zooKeeperConfig.getSessionTimeout(), + zooKeeperConfig.getCredentials(), + zooKeeperConfig.getChrootPath(), + zooKeeperCluster); + } + + @Provides + @Singleton + ServerSetImpl provideServerSet( + ZooKeeperClient client, + @ServiceDiscoveryBindings.ZooKeeper List<ACL> zooKeeperAcls) { + + return new ServerSetImpl(client, zooKeeperAcls, discoveryPath); + } + + @Provides + @Singleton + DynamicHostSet<ServiceInstance> provideServerSet(ServerSetImpl serverSet) { + // Used for a type re-binding of the server set. + return serverSet; + } + + // NB: We only take a ServerSetImpl instead of a ServerSet here to simplify binding. + @Provides + @Singleton + @Exposed + SingletonService provideSingletonService( + ZooKeeperClient client, + ServerSetImpl serverSet, + @ServiceDiscoveryBindings.ZooKeeper List<ACL> zookeeperAcls) { + + return new SingletonServiceImpl( + serverSet, + SingletonServiceImpl.createSingletonCandidate(client, discoveryPath, zookeeperAcls)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java new file mode 100644 index 0000000..9161455 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java @@ -0,0 +1,59 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import javax.inject.Inject; + +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.net.pool.DynamicHostSet; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; + +import static java.util.Objects.requireNonNull; + +class CommonsServiceGroupMonitor implements ServiceGroupMonitor { + private Optional<Command> closeCommand = Optional.empty(); + private final DynamicHostSet<ServiceInstance> serverSet; + private final AtomicReference<ImmutableSet<ServiceInstance>> services = + new AtomicReference<>(ImmutableSet.of()); + + @Inject + CommonsServiceGroupMonitor(DynamicHostSet<ServiceInstance> serverSet) { + this.serverSet = requireNonNull(serverSet); + } + + @Override + public void start() throws MonitorException { + try { + closeCommand = Optional.of(serverSet.watch(services::set)); + } catch (DynamicHostSet.MonitorException e) { + throw new MonitorException("Unable to watch scheduler host set.", e); + } + } + + @Override + public void close() { + closeCommand.ifPresent(Command::execute); + } + + @Override + public ImmutableSet<ServiceInstance> get() { + return services.get(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java b/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java deleted file mode 100644 index 75d58e7..0000000 --- a/src/main/java/org/apache/aurora/scheduler/discovery/Credentials.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.discovery; - -import java.util.Arrays; - -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; - -import org.apache.aurora.common.base.MorePreconditions; -import org.apache.commons.lang.builder.EqualsBuilder; - -import static java.util.Objects.requireNonNull; - -/** - * Encapsulates a user's ZooKeeper credentials. - */ -public final class Credentials { - - /** - * Creates a set of credentials for the ZooKeeper digest authentication mechanism. - * - * @param username the username to authenticate with - * @param password the password to authenticate with - * @return a set of credentials that can be used to authenticate the zoo keeper client - */ - public static Credentials digestCredentials(String username, String password) { - MorePreconditions.checkNotBlank(username); - Preconditions.checkNotNull(password); - - // TODO(John Sirois): DigestAuthenticationProvider is broken - uses platform default charset - // (on server) and so we just have to hope here that clients are deployed in compatible jvms. - // Consider writing and installing a version of DigestAuthenticationProvider that controls its - // Charset explicitly. - return new Credentials("digest", (username + ":" + password).getBytes()); - } - - private final String authScheme; - private final byte[] authToken; - - /** - * Creates a new set of credentials for the given ZooKeeper authentication scheme. - * - * @param scheme The name of the authentication scheme the {@code token} is valid in. - * @param token The authentication token for the given {@code scheme}. - */ - public Credentials(String scheme, byte[] token) { - authScheme = MorePreconditions.checkNotBlank(scheme); - authToken = requireNonNull(token); - } - - /** - * Returns the authentication scheme these credentials are for. - * - * @return the scheme these credentials are for. - */ - public String scheme() { - return authScheme; - } - - /** - * Returns the authentication token. - * - * @return the authentication token. - */ - public byte[] token() { - return Arrays.copyOf(authToken, authToken.length); - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof Credentials)) { - return false; - } - - Credentials other = (Credentials) o; - return new EqualsBuilder() - .append(authScheme, other.scheme()) - .append(authToken, other.token()) - .isEquals(); - } - - @Override - public int hashCode() { - return Objects.hashCode(authScheme, authToken); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java index e690d14..999a542 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java @@ -33,6 +33,10 @@ import org.apache.aurora.common.net.InetSocketAddressHelper; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.zookeeper.Credentials; +import org.apache.aurora.common.zookeeper.ServerSet; +import org.apache.aurora.common.zookeeper.SingletonService; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -64,7 +68,7 @@ class CuratorServiceDiscoveryModule extends PrivateModule { requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY); requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY); - bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(JsonCodec.INSTANCE); + bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(ServerSet.JSON_CODEC); } @Provides @@ -102,7 +106,7 @@ class CuratorServiceDiscoveryModule extends PrivateModule { if (zooKeeperConfig.getCredentials().isPresent()) { Credentials credentials = zooKeeperConfig.getCredentials().get(); - builder.authorization(credentials.scheme(), credentials.token()); + builder.authorization(credentials.scheme(), credentials.authToken()); } CuratorFramework curatorFramework = builder.build(); http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java index db886df..0b86fb6 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceGroupMonitor.java @@ -24,6 +24,7 @@ import org.apache.aurora.GuavaUtils; import org.apache.aurora.common.io.Codec; import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.scheduler.app.SchedulerMain; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.utils.ZKPaths; http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java index 321bbb3..c378172 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorSingletonService.java @@ -27,6 +27,7 @@ import org.apache.aurora.common.io.Codec; import org.apache.aurora.common.thrift.Endpoint; import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.common.thrift.Status; +import org.apache.aurora.common.zookeeper.SingletonService; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatchListener; http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java index d5019bf..e8aafe4 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java @@ -15,10 +15,10 @@ package org.apache.aurora.scheduler.discovery; import java.net.InetSocketAddress; import java.util.List; -import java.util.Optional; import javax.annotation.Nullable; +import com.google.common.base.Optional; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; @@ -27,12 +27,23 @@ import org.apache.aurora.common.args.CmdLine; import org.apache.aurora.common.args.constraints.NotEmpty; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.zookeeper.Credentials; +import org.apache.aurora.common.zookeeper.ZooKeeperUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A factory that creates a {@link ZooKeeperConfig} instance based on command line argument * values. */ public final class FlaggedZooKeeperConfig { + private static final Logger LOG = LoggerFactory.getLogger(FlaggedZooKeeperConfig.class); + + @CmdLine(name = "zk_use_curator", + help = "DEPRECATED: Uses Apache Curator as the zookeeper client; otherwise a copy of Twitter " + + "commons/zookeeper (the legacy library) is used.") + private static final Arg<Boolean> USE_CURATOR = Arg.create(true); + @CmdLine(name = "zk_in_proc", help = "Launches an embedded zookeeper server for local testing causing -zk_endpoints " + "to be ignored if specified.") @@ -63,9 +74,13 @@ public final class FlaggedZooKeeperConfig { * @return Configuration instance. */ public static ZooKeeperConfig create() { + if (USE_CURATOR.hasAppliedValue()) { + LOG.warn("The -zk_use_curator flag is deprecated and will be removed in a future release."); + } return new ZooKeeperConfig( + USE_CURATOR.get(), ZK_ENDPOINTS.get(), - Optional.ofNullable(CHROOT_PATH.get()), + Optional.fromNullable(CHROOT_PATH.get()), IN_PROCESS.get(), SESSION_TIMEOUT.get(), getCredentials(DIGEST_CREDENTIALS.get())); @@ -73,7 +88,7 @@ public final class FlaggedZooKeeperConfig { private static Optional<Credentials> getCredentials(@Nullable String userAndPass) { if (userAndPass == null) { - return Optional.empty(); + return Optional.absent(); } List<String> parts = ImmutableList.copyOf(Splitter.on(":").split(userAndPass)); http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java b/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java deleted file mode 100644 index 9d22b76..0000000 --- a/src/main/java/org/apache/aurora/scheduler/discovery/JsonCodec.java +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.discovery; - -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.nio.charset.Charset; -import java.util.Map; - -import javax.annotation.Nullable; - -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import com.google.gson.Gson; -import com.google.gson.JsonIOException; -import com.google.gson.JsonParseException; - -import org.apache.aurora.common.io.Codec; -import org.apache.aurora.common.thrift.Endpoint; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.thrift.Status; - -import static java.util.Objects.requireNonNull; - -/** - * Encodes a {@link ServiceInstance} as a JSON object. - */ -class JsonCodec implements Codec<ServiceInstance> { - - private static void assertRequiredField(String fieldName, Object fieldValue) { - if (fieldValue == null) { - throw new JsonParseException(String.format("Field %s is required", fieldName)); - } - } - - private static class EndpointSchema { - private final String host; - private final Integer port; - - EndpointSchema(Endpoint endpoint) { - host = endpoint.getHost(); - port = endpoint.getPort(); - } - - Endpoint asEndpoint() { - assertRequiredField("host", host); - assertRequiredField("port", port); - - return new Endpoint(host, port); - } - } - - private static class ServiceInstanceSchema { - private final EndpointSchema serviceEndpoint; - private final Map<String, EndpointSchema> additionalEndpoints; - private final Status status; - @Nullable private final Integer shard; - - ServiceInstanceSchema(ServiceInstance instance) { - serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint()); - if (instance.isSetAdditionalEndpoints()) { - additionalEndpoints = - Maps.transformValues(instance.getAdditionalEndpoints(), EndpointSchema::new); - } else { - additionalEndpoints = ImmutableMap.of(); - } - status = instance.getStatus(); - shard = instance.isSetShard() ? instance.getShard() : null; - } - - ServiceInstance asServiceInstance() { - assertRequiredField("serviceEndpoint", serviceEndpoint); - assertRequiredField("status", status); - - Map<String, EndpointSchema> extraEndpoints = - additionalEndpoints == null ? ImmutableMap.of() : additionalEndpoints; - - ServiceInstance instance = - new ServiceInstance( - serviceEndpoint.asEndpoint(), - Maps.transformValues(extraEndpoints, EndpointSchema::asEndpoint), - status); - if (shard != null) { - instance.setShard(shard); - } - return instance; - } - } - - /** - * The encoding for service instance data in ZooKeeper expected by Aurora clients. - */ - static final Codec<ServiceInstance> INSTANCE = new JsonCodec(); - - private static final Charset ENCODING = Charsets.UTF_8; - - private final Gson gson; - - private JsonCodec() { - this(new Gson()); - } - - JsonCodec(Gson gson) { - this.gson = requireNonNull(gson); - } - - @Override - public void serialize(ServiceInstance instance, OutputStream sink) throws IOException { - Writer writer = new OutputStreamWriter(sink, ENCODING); - try { - gson.toJson(new ServiceInstanceSchema(instance), writer); - } catch (JsonIOException e) { - throw new IOException(String.format("Problem serializing %s to JSON", instance), e); - } - writer.flush(); - } - - @Override - public ServiceInstance deserialize(InputStream source) throws IOException { - InputStreamReader reader = new InputStreamReader(source, ENCODING); - try { - @Nullable ServiceInstanceSchema schema = gson.fromJson(reader, ServiceInstanceSchema.class); - if (schema == null) { - throw new IOException("JSON did not include a ServiceInstance object"); - } - return schema.asServiceInstance(); - } catch (JsonParseException e) { - throw new IOException("Problem parsing JSON ServiceInstance.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java index b7ca62c..07a6302 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java @@ -25,14 +25,16 @@ import com.google.common.io.Files; import com.google.common.util.concurrent.AbstractIdleService; import com.google.inject.AbstractModule; import com.google.inject.Inject; +import com.google.inject.Module; import com.google.inject.Provider; import com.google.inject.Provides; import com.google.inject.binder.LinkedBindingBuilder; import org.apache.aurora.common.application.ShutdownRegistry; import org.apache.aurora.common.base.MorePreconditions; +import org.apache.aurora.common.zookeeper.ZooKeeperUtils; +import org.apache.aurora.common.zookeeper.testing.ZooKeeperTestServer; import org.apache.aurora.scheduler.SchedulerServicesModule; -import org.apache.aurora.scheduler.discovery.testing.ZooKeeperTestServer; import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,15 +46,15 @@ import static java.util.Objects.requireNonNull; */ public class ServiceDiscoveryModule extends AbstractModule { - private static final Logger LOG = LoggerFactory.getLogger(ServiceDiscoveryModule.class); + private static final Logger LOG = LoggerFactory.getLogger(CommonsServiceDiscoveryModule.class); private final ZooKeeperConfig zooKeeperConfig; private final String discoveryPath; /** * Creates a Guice module that will bind a - * {@link SingletonService} for scheduler leader election and a - * {@link ServiceGroupMonitor} that can be used to find the + * {@link org.apache.aurora.common.zookeeper.SingletonService} for scheduler leader election and a + * {@link org.apache.aurora.scheduler.app.ServiceGroupMonitor} that can be used to find the * leading scheduler. * * @param zooKeeperConfig The ZooKeeper client configuration to use to interact with ZooKeeper. @@ -80,7 +82,15 @@ public class ServiceDiscoveryModule extends AbstractModule { clusterBinder.toInstance(zooKeeperConfig.getServers()); } - install(new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig)); + install(discoveryModule()); + } + + private Module discoveryModule() { + if (zooKeeperConfig.isUseCurator()) { + return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig); + } else { + return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig); + } } @Provides http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java deleted file mode 100644 index fea896c..0000000 --- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceGroupMonitor.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.discovery; - -import java.io.Closeable; -import java.util.function.Supplier; - -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.common.thrift.ServiceInstance; - -/** - * Monitors a service group's membership and supplies a live view of the most recent set. - */ -public interface ServiceGroupMonitor extends Supplier<ImmutableSet<ServiceInstance>>, Closeable { - - /** - * Indicates a problem initiating monitoring of a service group. - */ - class MonitorException extends Exception { - MonitorException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Starts monitoring the service group. - * - * When the service group membership no longer needs to be maintained, this monitor should be - * {@link #close() closed}. - * - * @throws MonitorException if there is a problem initiating monitoring of the service group. - */ - void start() throws MonitorException; -} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java b/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java deleted file mode 100644 index adbc318..0000000 --- a/src/main/java/org/apache/aurora/scheduler/discovery/SingletonService.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.discovery; - -import java.net.InetSocketAddress; -import java.util.Map; - -/** - * A service that uses master election to only allow a single service instance to be active amongst - * a set of potential servers at a time. - */ -public interface SingletonService { - - /** - * Indicates an error attempting to lead a group of servers. - */ - class LeadException extends Exception { - LeadException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Indicates an error attempting to advertise leadership of a group of servers. - */ - class AdvertiseException extends Exception { - AdvertiseException(String message) { - super(message); - } - - AdvertiseException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Indicates an error attempting to leave a group of servers, abdicating leadership of the group. - */ - class LeaveException extends Exception { - LeaveException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Attempts to lead the singleton service. - * - * @param endpoint The primary endpoint to register as a leader candidate in the service. - * @param additionalEndpoints Additional endpoints that are available on the host. - * @param listener Handler to call when the candidate is elected or defeated. - * @throws LeadException If there was a problem joining or watching the ZooKeeper group. - * @throws InterruptedException If the thread watching/joining the group was interrupted. - */ - void lead( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints, - LeadershipListener listener) - throws LeadException, InterruptedException; - - /** - * A listener to be notified of changes in the leadership status. - * Implementers should be careful to avoid blocking operations in these callbacks. - */ - interface LeadershipListener { - - /** - * Notifies the listener that is is current leader. - * - * @param control A controller handle to advertise and/or leave advertised presence. - */ - void onLeading(LeaderControl control); - - /** - * Notifies the listener that it is no longer leader. - */ - void onDefeated(); - } - - /** - * A controller for the state of the leader. This will be provided to the leader upon election, - * which allows the leader to decide when to advertise as leader of the server set and terminate - * leadership at will. - */ - interface LeaderControl { - - /** - * Advertises the leader's server presence to clients. - * - * @throws AdvertiseException If there was an error advertising the singleton leader to clients - * of the server set. - * @throws InterruptedException If interrupted while advertising. - */ - void advertise() throws AdvertiseException, InterruptedException; - - /** - * Leaves candidacy for leadership, removing advertised server presence if applicable. - * - * @throws LeaveException If the leader's status could not be updated or there was an error - * abdicating server set leadership. - */ - void leave() throws LeaveException; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java index acb7905..3f32a62 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java @@ -14,11 +14,14 @@ package org.apache.aurora.scheduler.discovery; import java.net.InetSocketAddress; -import java.util.Optional; + +import com.google.common.base.Optional; import org.apache.aurora.common.base.MorePreconditions; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.zookeeper.Credentials; +import org.apache.aurora.common.zookeeper.ZooKeeperUtils; import static java.util.Objects.requireNonNull; @@ -32,18 +35,21 @@ public class ZooKeeperConfig { /** * Creates a new client configuration with defaults for the session timeout and credentials. * + * @param useCurator {@code true} to use Apache Curator; otherwise commons/zookeeper is used. * @param servers ZooKeeper server addresses. * @return A new configuration. */ - public static ZooKeeperConfig create(Iterable<InetSocketAddress> servers) { + public static ZooKeeperConfig create(boolean useCurator, Iterable<InetSocketAddress> servers) { return new ZooKeeperConfig( + useCurator, servers, - Optional.empty(), // chrootPath + Optional.absent(), // chrootPath false, ZooKeeperUtils.DEFAULT_ZK_SESSION_TIMEOUT, - Optional.empty()); // credentials + Optional.absent()); // credentials } + private final boolean useCurator; private final Iterable<InetSocketAddress> servers; private final boolean inProcess; private final Amount<Integer, Time> sessionTimeout; @@ -60,12 +66,14 @@ public class ZooKeeperConfig { * @param credentials ZooKeeper authentication credentials. */ ZooKeeperConfig( + boolean useCurator, Iterable<InetSocketAddress> servers, Optional<String> chrootPath, boolean inProcess, Amount<Integer, Time> sessionTimeout, Optional<Credentials> credentials) { + this.useCurator = useCurator; this.servers = MorePreconditions.checkNotBlank(servers); this.chrootPath = requireNonNull(chrootPath); this.inProcess = inProcess; @@ -82,6 +90,7 @@ public class ZooKeeperConfig { */ public ZooKeeperConfig withCredentials(Credentials newCredentials) { return new ZooKeeperConfig( + useCurator, servers, chrootPath, inProcess, @@ -89,6 +98,10 @@ public class ZooKeeperConfig { Optional.of(newCredentials)); } + boolean isUseCurator() { + return useCurator; + } + public Iterable<InetSocketAddress> getServers() { return servers; } http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java deleted file mode 100644 index 211aa50..0000000 --- a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperUtils.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.discovery; - -import com.google.common.collect.ImmutableList; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.ACL; - -/** - * Utilities for dealing with ZooKeeper. - */ -final class ZooKeeperUtils { - - /** - * An appropriate default session timeout for ZooKeeper clusters. - */ - static final Amount<Integer, Time> DEFAULT_ZK_SESSION_TIMEOUT = Amount.of(4, Time.SECONDS); - - /** - * An ACL that gives all permissions any user authenticated or not. - */ - static final ImmutableList<ACL> OPEN_ACL_UNSAFE = - ImmutableList.copyOf(Ids.OPEN_ACL_UNSAFE); - - /** - * An ACL that gives all permissions to node creators and read permissions only to everyone else. - */ - static final ImmutableList<ACL> EVERYONE_READ_CREATOR_ALL = - ImmutableList.<ACL>builder() - .addAll(Ids.CREATOR_ALL_ACL) - .addAll(Ids.READ_ACL_UNSAFE) - .build(); - - private ZooKeeperUtils() { - // utility - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java b/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java deleted file mode 100644 index d84037e..0000000 --- a/src/main/java/org/apache/aurora/scheduler/discovery/testing/BaseZooKeeperTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.discovery.testing; - -import org.apache.aurora.common.testing.TearDownTestCase; -import org.junit.Before; -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; - -/** - * A base-class for in-process zookeeper tests. - */ -public abstract class BaseZooKeeperTest extends TearDownTestCase { - - private ZooKeeperTestServer zkTestServer; - - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Before - public final void setUp() throws Exception { - zkTestServer = new ZooKeeperTestServer(tmpFolder.newFolder(), tmpFolder.newFolder()); - addTearDown(zkTestServer::stop); - zkTestServer.startNetwork(); - } - - /** - * Returns the running in-process ZooKeeper server. - * - * @return The in-process ZooKeeper server. - */ - protected final ZooKeeperTestServer getServer() { - return zkTestServer; - } - - /** - * Returns the current port to connect to the in-process zookeeper instance. - */ - protected final int getPort() { - return getServer().getPort(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java b/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java deleted file mode 100644 index a7bb48b..0000000 --- a/src/main/java/org/apache/aurora/scheduler/discovery/testing/ZooKeeperTestServer.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.discovery.testing; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; - -import com.google.common.base.Preconditions; - -import org.apache.zookeeper.server.NIOServerCnxnFactory; -import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.ZooKeeperServer.BasicDataTreeBuilder; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; - -/** - * A helper class for starting in-process ZooKeeper server and clients. - * - * <p>This is ONLY meant to be used for testing. - */ -public class ZooKeeperTestServer { - - private final File dataDir; - private final File snapDir; - - private ZooKeeperServer zooKeeperServer; - private ServerCnxnFactory connectionFactory; - private int port; - - public ZooKeeperTestServer(File dataDir, File snapDir) { - this.dataDir = Preconditions.checkNotNull(dataDir); - this.snapDir = Preconditions.checkNotNull(snapDir); - } - - /** - * Starts zookeeper up on an ephemeral port. - */ - public void startNetwork() throws IOException, InterruptedException { - zooKeeperServer = - new ZooKeeperServer( - new FileTxnSnapLog(dataDir, snapDir), - new BasicDataTreeBuilder()) { - - // TODO(John Sirois): Introduce a builder to configure the in-process server if and when - // some folks need JMX for in-process tests. - @Override protected void registerJMX() { - // noop - } - }; - - connectionFactory = new NIOServerCnxnFactory(); - connectionFactory.configure( - new InetSocketAddress(port), - 60 /* Semi-arbitrary, max 60 connections is the default used by NIOServerCnxnFactory */); - connectionFactory.startup(zooKeeperServer); - port = zooKeeperServer.getClientPort(); - } - - /** - * Stops the zookeeper server. - */ - public void stop() { - if (connectionFactory != null) { - connectionFactory.shutdown(); // Also shuts down zooKeeperServer. - connectionFactory = null; - } - } - - /** - * Expires the client session with the given {@code sessionId}. - * - * @param sessionId The id of the client session to expire. - */ - public final void expireClientSession(long sessionId) { - zooKeeperServer.closeSession(sessionId); - } - - /** - * Returns the current port to connect to the in-process zookeeper instance. - */ - public final int getPort() { - checkEphemeralPortAssigned(); - return port; - } - - private void checkEphemeralPortAssigned() { - Preconditions.checkState(port > 0, "startNetwork must be called first"); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java b/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java index af8567f..53ebc0b 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java +++ b/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java @@ -64,7 +64,7 @@ import org.apache.aurora.common.net.http.handlers.TimeSeriesDataSource; import org.apache.aurora.common.net.http.handlers.VarsHandler; import org.apache.aurora.common.net.http.handlers.VarsJsonHandler; import org.apache.aurora.scheduler.SchedulerServicesModule; -import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor.MonitorException; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException; import org.apache.aurora.scheduler.http.api.ApiModule; import org.apache.aurora.scheduler.http.api.security.HttpSecurityModule; import org.apache.aurora.scheduler.thrift.ThriftModule; http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java index 662d6d5..bc0e2a8 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java +++ b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java @@ -27,8 +27,8 @@ import com.google.common.net.HostAndPort; import org.apache.aurora.common.thrift.Endpoint; import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor; -import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor.MonitorException; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java index c7c0387..6704a32 100644 --- a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java +++ b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLogStreamModule.java @@ -33,8 +33,8 @@ import org.apache.aurora.common.args.CmdLine; import org.apache.aurora.common.net.InetSocketAddressHelper; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.zookeeper.Credentials; import org.apache.aurora.gen.storage.LogEntry; -import org.apache.aurora.scheduler.discovery.Credentials; import org.apache.aurora.scheduler.discovery.ZooKeeperConfig; import org.apache.aurora.scheduler.log.mesos.LogInterface.ReaderInterface; import org.apache.aurora.scheduler.log.mesos.LogInterface.WriterInterface; @@ -157,7 +157,7 @@ public class MesosLogStreamModule extends PrivateModule { zkClientConfig.getSessionTimeout().getUnit().getTimeUnit(), zkLogGroupPath, zkCredentials.scheme(), - zkCredentials.token()); + zkCredentials.authToken()); } else { return new Log( QUORUM_SIZE.get(), http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java index 4324ea9..051c520 100644 --- a/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/SchedulerLifecycleTest.java @@ -21,9 +21,9 @@ import org.apache.aurora.common.application.ShutdownRegistry; import org.apache.aurora.common.base.Command; import org.apache.aurora.common.base.ExceptionalCommand; import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl; +import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener; import org.apache.aurora.scheduler.SchedulerLifecycle.DelayedActions; -import org.apache.aurora.scheduler.discovery.SingletonService.LeaderControl; -import org.apache.aurora.scheduler.discovery.SingletonService.LeadershipListener; import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered; import org.apache.aurora.scheduler.mesos.Driver; import org.apache.aurora.scheduler.storage.Storage.StorageException; http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java index 84d7753..29a3b4a 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java +++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java @@ -17,7 +17,6 @@ import java.io.File; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -44,6 +43,10 @@ import org.apache.aurora.GuavaUtils; import org.apache.aurora.codec.ThriftBinaryCodec.CodingException; import org.apache.aurora.common.application.Lifecycle; import org.apache.aurora.common.stats.Stats; +import org.apache.aurora.common.zookeeper.Credentials; +import org.apache.aurora.common.zookeeper.ServerSetImpl; +import org.apache.aurora.common.zookeeper.ZooKeeperClient; +import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.ScheduleStatus; @@ -60,11 +63,8 @@ import org.apache.aurora.scheduler.AppStartup; import org.apache.aurora.scheduler.TierModule; import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; -import org.apache.aurora.scheduler.discovery.Credentials; import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule; -import org.apache.aurora.scheduler.discovery.ServiceGroupMonitor; import org.apache.aurora.scheduler.discovery.ZooKeeperConfig; -import org.apache.aurora.scheduler.discovery.testing.BaseZooKeeperTest; import org.apache.aurora.scheduler.log.Log; import org.apache.aurora.scheduler.log.Log.Entry; import org.apache.aurora.scheduler.log.Log.Position; @@ -107,9 +107,10 @@ import static org.easymock.EasyMock.createControl; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class SchedulerIT extends BaseZooKeeperTest { +public class SchedulerIT extends BaseZooKeeperClientTest { private static final Logger LOG = LoggerFactory.getLogger(SchedulerIT.class); @@ -144,6 +145,7 @@ public class SchedulerIT extends BaseZooKeeperTest { private Stream logStream; private StreamMatcher streamMatcher; private EntrySerializer entrySerializer; + private ZooKeeperClient zkClient; private File backupDir; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -171,9 +173,11 @@ public class SchedulerIT extends BaseZooKeeperTest { entrySerializer = new EntrySerializer.EntrySerializerImpl( LogStorageModule.MAX_LOG_ENTRY_SIZE.get(), Hashing.md5()); + + zkClient = createZkClient(); } - private Callable<Void> startScheduler() throws Exception { + private void startScheduler() throws Exception { // TODO(wfarner): Try to accomplish all this by subclassing SchedulerMain and actually using // AppLauncher. Module testModule = new AbstractModule() { @@ -198,8 +202,10 @@ public class SchedulerIT extends BaseZooKeeperTest { }; ZooKeeperConfig zkClientConfig = ZooKeeperConfig.create( + true, // useCurator ImmutableList.of(InetSocketAddress.createUnresolved("localhost", getPort()))) .withCredentials(Credentials.digestCredentials("mesos", "mesos")); + SchedulerMain main = SchedulerMain.class.newInstance(); Injector injector = Guice.createInjector( ImmutableList.<Module>builder() .add(SchedulerMain.getUniversalModule()) @@ -209,8 +215,8 @@ public class SchedulerIT extends BaseZooKeeperTest { .add(testModule) .build() ); - SchedulerMain main = new SchedulerMain(); injector.injectMembers(main); + Lifecycle lifecycle = injector.getInstance(Lifecycle.class); executor.submit(() -> { try { @@ -220,32 +226,28 @@ public class SchedulerIT extends BaseZooKeeperTest { executor.shutdownNow(); } }); - - Lifecycle lifecycle = injector.getInstance(Lifecycle.class); addTearDown(() -> { lifecycle.shutdown(); MoreExecutors.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS); }); - injector.getInstance(Key.get(GuavaUtils.ServiceManagerIface.class, AppStartup.class)) .awaitHealthy(); + } - ServiceGroupMonitor schedulerMonitor = injector.getInstance(ServiceGroupMonitor.class); - CountDownLatch schedulerReady = new CountDownLatch(1); + private void awaitSchedulerReady() throws Exception { executor.submit(() -> { - while (schedulerMonitor.get().isEmpty()) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException(e); + ServerSetImpl schedulerService = new ServerSetImpl(zkClient, SERVERSET_PATH); + final CountDownLatch schedulerReady = new CountDownLatch(1); + schedulerService.watch(hostSet -> { + if (!hostSet.isEmpty()) { + schedulerReady.countDown(); } - } - schedulerReady.countDown(); - }); - return () -> { - schedulerReady.await(); + }); + // A timeout is used because certain types of assertion errors (mocks) will not surface + // until the main test thread exits this body of code. + assertTrue(schedulerReady.await(5L, TimeUnit.MINUTES)); return null; - }; + }).get(); } private final AtomicInteger curPosition = new AtomicInteger(); @@ -330,14 +332,14 @@ public class SchedulerIT extends BaseZooKeeperTest { expect(driver.stop(true)).andReturn(Status.DRIVER_STOPPED).anyTimes(); control.replay(); - Callable<Void> awaitSchedulerReady = startScheduler(); + startScheduler(); driverStarted.await(); scheduler.getValue().registered(driver, FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(), MasterInfo.getDefaultInstance()); - awaitSchedulerReady.call(); + awaitSchedulerReady(); assertEquals(0L, Stats.<Long>getVariable("task_store_PENDING").read().longValue()); assertEquals(1L, Stats.<Long>getVariable("task_store_ASSIGNED").read().longValue()); http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java new file mode 100644 index 0000000..d90192b --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/discovery/AbstractDiscoveryModuleTest.java @@ -0,0 +1,77 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import java.net.InetSocketAddress; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.inject.AbstractModule; +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.testing.TearDownTestCase; +import org.apache.aurora.common.zookeeper.Credentials; +import org.apache.aurora.common.zookeeper.SingletonService; +import org.apache.aurora.common.zookeeper.ZooKeeperUtils; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; +import org.junit.Test; + +import static org.junit.Assert.assertNotNull; + +abstract class AbstractDiscoveryModuleTest extends TearDownTestCase { + + @Test + public void testBindingContract() { + ZooKeeperConfig zooKeeperConfig = + new ZooKeeperConfig( + isCurator(), + ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)), + Optional.of("/chroot"), + false, // inProcess + Amount.of(1, Time.DAYS), + Optional.of(Credentials.digestCredentials("test", "user"))); + + Injector injector = + Guice.createInjector( + new AbstractModule() { + @Override + protected void configure() { + bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY) + .toInstance( + ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42))); + bind(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY) + .toInstance(ZooKeeperUtils.OPEN_ACL_UNSAFE); + + bindExtraRequirements(binder()); + } + }, + createModule("/discovery/path", zooKeeperConfig)); + + assertNotNull(injector.getBinding(SingletonService.class).getProvider().get()); + assertNotNull(injector.getBinding(ServiceGroupMonitor.class).getProvider().get()); + } + + void bindExtraRequirements(Binder binder) { + // Noop. + } + + abstract Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig); + + abstract boolean isCurator(); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java index 9f86add..a2b4125 100644 --- a/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java +++ b/src/test/java/org/apache/aurora/scheduler/discovery/BaseCuratorDiscoveryTest.java @@ -21,10 +21,13 @@ import java.util.function.Predicate; import com.google.common.collect.ImmutableMap; +import org.apache.aurora.common.io.Codec; import org.apache.aurora.common.thrift.Endpoint; import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.common.thrift.Status; -import org.apache.aurora.scheduler.discovery.testing.BaseZooKeeperTest; +import org.apache.aurora.common.zookeeper.ServerSet; +import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; @@ -35,6 +38,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest { static final String GROUP_PATH = "/group/root"; static final String MEMBER_TOKEN = "member_"; + static final Codec<ServiceInstance> CODEC = ServerSet.JSON_CODEC; static final int PRIMARY_PORT = 42; private CuratorFramework client; @@ -51,7 +55,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest { groupCache.getListenable().addListener((c, event) -> groupEvents.put(event)); Predicate<String> memberSelector = name -> name.contains(MEMBER_TOKEN); - groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, JsonCodec.INSTANCE); + groupMonitor = new CuratorServiceGroupMonitor(groupCache, memberSelector, ServerSet.JSON_CODEC); } final CuratorFramework startNewClient() { @@ -97,7 +101,7 @@ class BaseCuratorDiscoveryTest extends BaseZooKeeperTest { final byte[] serialize(ServiceInstance serviceInstance) throws IOException { ByteArrayOutputStream sink = new ByteArrayOutputStream(); - JsonCodec.INSTANCE.serialize(serviceInstance, sink); + CODEC.serialize(serviceInstance, sink); return sink.toByteArray(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java new file mode 100644 index 0000000..7a4c4dd --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsDiscoveryModuleTest.java @@ -0,0 +1,29 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import com.google.inject.Module; + +public class CommonsDiscoveryModuleTest extends AbstractDiscoveryModuleTest { + + @Override + Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) { + return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig); + } + + @Override + boolean isCurator() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java new file mode 100644 index 0000000..42a2224 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitorTest.java @@ -0,0 +1,137 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.net.pool.DynamicHostSet; +import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor; +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.common.thrift.Endpoint; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; +import org.easymock.Capture; +import org.junit.Before; +import org.junit.Test; + +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class CommonsServiceGroupMonitorTest extends EasyMockTest { + + private DynamicHostSet<ServiceInstance> serverSet; + private Capture<HostChangeMonitor<ServiceInstance>> hostChangeMonitorCapture; + private Command stopCommand; + + @Before + public void setUp() throws Exception { + serverSet = createMock(new Clazz<DynamicHostSet<ServiceInstance>>() { }); + hostChangeMonitorCapture = createCapture(); + stopCommand = createMock(Command.class); + } + + private void expectSuccessfulWatch() throws Exception { + expect(serverSet.watch(capture(hostChangeMonitorCapture))).andReturn(stopCommand); + } + + private void expectFailedWatch() throws Exception { + DynamicHostSet.MonitorException watchError = + new DynamicHostSet.MonitorException( + "Problem watching service group", + new RuntimeException()); + expect(serverSet.watch(capture(hostChangeMonitorCapture))).andThrow(watchError); + } + + @Test + public void testNominalLifecycle() throws Exception { + expectSuccessfulWatch(); + + stopCommand.execute(); + expectLastCall(); + + control.replay(); + + CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet); + groupMonitor.start(); + groupMonitor.close(); + } + + @Test + public void testExceptionalLifecycle() throws Exception { + expectFailedWatch(); + control.replay(); + + CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet); + try { + groupMonitor.start(); + fail(); + } catch (ServiceGroupMonitor.MonitorException e) { + // expected + } + + // Close on a non-started monitor should be allowed. + groupMonitor.close(); + } + + @Test + public void testNoHosts() throws Exception { + expectSuccessfulWatch(); + control.replay(); + + CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet); + assertEquals(ImmutableSet.of(), groupMonitor.get()); + + groupMonitor.start(); + assertEquals(ImmutableSet.of(), groupMonitor.get()); + + hostChangeMonitorCapture.getValue().onChange(ImmutableSet.of()); + assertEquals(ImmutableSet.of(), groupMonitor.get()); + } + + @Test + public void testHostUpdates() throws Exception { + expectSuccessfulWatch(); + control.replay(); + + CommonsServiceGroupMonitor groupMonitor = new CommonsServiceGroupMonitor(serverSet); + groupMonitor.start(); + + ImmutableSet<ServiceInstance> twoHosts = + ImmutableSet.of(serviceInstance("one"), serviceInstance("two")); + hostChangeMonitorCapture.getValue().onChange(twoHosts); + assertEquals(twoHosts, groupMonitor.get()); + + ImmutableSet<ServiceInstance> oneHost = ImmutableSet.of(serviceInstance("one")); + hostChangeMonitorCapture.getValue().onChange(oneHost); + assertEquals(oneHost, groupMonitor.get()); + + ImmutableSet<ServiceInstance> anotherHost = ImmutableSet.of(serviceInstance("three")); + hostChangeMonitorCapture.getValue().onChange(anotherHost); + assertEquals(anotherHost, groupMonitor.get()); + + ImmutableSet<ServiceInstance> noHosts = ImmutableSet.of(); + hostChangeMonitorCapture.getValue().onChange(noHosts); + assertEquals(noHosts, groupMonitor.get()); + } + + private ServiceInstance serviceInstance(String hostName) { + return new ServiceInstance(new Endpoint(hostName, 42), ImmutableMap.of(), Status.ALIVE); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java index 4ebda5e..f1a02e4 100644 --- a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorDiscoveryModuleTest.java @@ -13,27 +13,37 @@ */ package org.apache.aurora.scheduler.discovery; -import java.net.InetSocketAddress; -import java.util.Optional; - import com.google.common.collect.ImmutableList; -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Injector; +import com.google.inject.Binder; +import com.google.inject.Module; import org.apache.aurora.common.application.ShutdownRegistry; import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.testing.TearDownTestCase; +import org.apache.aurora.common.zookeeper.ZooKeeperUtils; import org.apache.curator.framework.api.ACLProvider; import org.apache.zookeeper.data.ACL; import org.junit.Test; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -public class CuratorDiscoveryModuleTest extends TearDownTestCase { +public class CuratorDiscoveryModuleTest extends AbstractDiscoveryModuleTest { + + @Override + void bindExtraRequirements(Binder binder) { + ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl(); + binder.bind(ShutdownRegistry.class).toInstance(shutdownRegistry); + addTearDown(shutdownRegistry::execute); + } + + @Override + Module createModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) { + return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig); + } + + @Override + boolean isCurator() { + return false; + } @Test public void testSingleACLProvider() { @@ -54,36 +64,4 @@ public class CuratorDiscoveryModuleTest extends TearDownTestCase { public void testSingleACLProviderEmpty() { new CuratorServiceDiscoveryModule.SingleACLProvider(ImmutableList.of()); } - - @Test - public void testBindingContract() { - ZooKeeperConfig zooKeeperConfig = - new ZooKeeperConfig( - ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42)), - Optional.of("/chroot"), - false, // inProcess - Amount.of(1, Time.DAYS), - Optional.of(Credentials.digestCredentials("test", "user"))); - - Injector injector = - Guice.createInjector( - new AbstractModule() { - @Override - protected void configure() { - bind(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY) - .toInstance( - ImmutableList.of(InetSocketAddress.createUnresolved("localhost", 42))); - bind(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY) - .toInstance(ZooKeeperUtils.OPEN_ACL_UNSAFE); - - ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl(); - binder().bind(ShutdownRegistry.class).toInstance(shutdownRegistry); - addTearDown(shutdownRegistry::execute); - } - }, - new CuratorServiceDiscoveryModule("/discovery/path", zooKeeperConfig)); - - assertNotNull(injector.getBinding(SingletonService.class).getProvider().get()); - assertNotNull(injector.getBinding(ServiceGroupMonitor.class).getProvider().get()); - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/16e4651d/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java index bb3d080..6ea49b0 100644 --- a/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java +++ b/src/test/java/org/apache/aurora/scheduler/discovery/CuratorSingletonServiceTest.java @@ -19,6 +19,7 @@ import java.util.concurrent.CountDownLatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.aurora.common.zookeeper.SingletonService; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.easymock.Capture; @@ -56,7 +57,7 @@ public class CuratorSingletonServiceTest extends BaseCuratorDiscoveryTest { throws Exception { CuratorSingletonService singletonService = - new CuratorSingletonService(client, GROUP_PATH, MEMBER_TOKEN, JsonCodec.INSTANCE); + new CuratorSingletonService(client, GROUP_PATH, MEMBER_TOKEN, CODEC); InetSocketAddress leaderEndpoint = InetSocketAddress.createUnresolved(hostName, PRIMARY_PORT); singletonService.lead(leaderEndpoint, ImmutableMap.of(), listener); }
