Repository: aurora Updated Branches: refs/heads/master ba174ba38 -> 103dae687
Slim the `ServerSet` interface. This makes the path to a Curator implementation to satisfy the contract with `LeaderRedirect` simpler by introducing `ServiceGroupMonitor` which hides server set change events, just exposing the current active set via a query method. This is all the `LeaderRedirect` needs since it is driven by user-generated events (HTTP requests). Bugs closed: AURORA-1468 Reviewed at https://reviews.apache.org/r/45770/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/103dae68 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/103dae68 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/103dae68 Branch: refs/heads/master Commit: 103dae6871eaa76914ab7fe17adaa174e93f537a Parents: ba174ba Author: John Sirois <[email protected]> Authored: Wed Apr 6 13:11:25 2016 -0600 Committer: John Sirois <[email protected]> Committed: Wed Apr 6 13:11:25 2016 -0600 ---------------------------------------------------------------------- .../aurora/common/net/pool/DynamicHostSet.java | 6 +- .../aurora/common/zookeeper/ServerSet.java | 29 +------ .../aurora/common/zookeeper/ServerSetImpl.java | 38 ++------- .../common/zookeeper/ServerSetImplTest.java | 25 ++---- .../scheduler/app/ServiceDiscoveryModule.java | 57 ++++++++++--- .../scheduler/app/ServiceGroupMonitor.java | 46 +++++++++++ .../scheduler/http/JettyServerModule.java | 7 +- .../aurora/scheduler/http/LeaderRedirect.java | 87 ++++++++------------ .../aurora/scheduler/app/SchedulerIT.java | 3 +- .../scheduler/http/AbstractJettyTest.java | 30 +++---- .../scheduler/http/LeaderRedirectTest.java | 58 +++++++------ .../aurora/scheduler/thrift/ThriftIT.java | 10 +-- 12 files changed, 203 insertions(+), 193 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java index 837d15c..df469ef 100644 --- a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java +++ b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java @@ -34,7 +34,7 @@ public interface DynamicHostSet<T> { * @return A command which, when executed, will stop monitoring the host set. * @throws MonitorException if there is a problem monitoring the host set */ - Command watch(final HostChangeMonitor<T> monitor) throws MonitorException; + Command watch(HostChangeMonitor<T> monitor) throws MonitorException; /** * An interface to an object that is interested in receiving notification whenever the host set @@ -52,10 +52,6 @@ public interface DynamicHostSet<T> { } class MonitorException extends Exception { - public MonitorException(String msg) { - super(msg); - } - public MonitorException(String msg, Throwable cause) { super(msg, cause); } http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java index fe6229e..6e32083 100644 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java @@ -16,17 +16,13 @@ package org.apache.aurora.common.zookeeper; import java.net.InetSocketAddress; import java.util.Map; -import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.common.zookeeper.Group.JoinException; /** - * A logical set of servers registered in ZooKeeper. Intended to be used by both servers in a - * common service and their clients. - * - * TODO(William Farner): Explore decoupling this from thrift. + * A logical set of servers registered in ZooKeeper. Intended to be used by servers in a + * common service to advertise their presence to server-set protocol-aware clients. */ -public interface ServerSet extends DynamicHostSet<ServiceInstance> { +public interface ServerSet { /** * Attempts to join a server set for this logical service group. * @@ -42,21 +38,6 @@ public interface ServerSet extends DynamicHostSet<ServiceInstance> { throws JoinException, InterruptedException; /** - * Attempts to join a server set for this logical service group. - * - * @param endpoint the primary service endpoint - * @param additionalEndpoints and additional endpoints keyed by their logical name - * @param shardId Unique shard identifier for this member of the service. - * @return an EndpointStatus object that allows the endpoint to adjust its status - * @throws JoinException if there was a problem joining the server set - * @throws InterruptedException if interrupted while waiting to join the server set - */ - EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints, - int shardId) throws JoinException, InterruptedException; - - /** * A handle to a service endpoint's status data that allows updating it to track current events. */ interface EndpointStatus { @@ -75,9 +56,5 @@ public interface ServerSet extends DynamicHostSet<ServiceInstance> { public UpdateException(String message, Throwable cause) { super(message, cause); } - - public UpdateException(String message) { - super(message); - } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java index eca1351..8b385b8 100644 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java @@ -30,7 +30,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.base.Joiner; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.base.Supplier; @@ -50,6 +49,7 @@ import com.google.gson.Gson; import org.apache.aurora.common.base.Command; import org.apache.aurora.common.io.Codec; +import org.apache.aurora.common.net.pool.DynamicHostSet; import org.apache.aurora.common.thrift.Endpoint; import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.common.thrift.Status; @@ -65,9 +65,9 @@ import org.slf4j.LoggerFactory; import static com.google.common.base.Preconditions.checkNotNull; /** - * ZooKeeper-backed implementation of {@link ServerSet}. + * ZooKeeper-backed implementation of {@link ServerSet} and {@link DynamicHostSet}. */ -public class ServerSetImpl implements ServerSet { +public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance> { private static final Logger LOG = LoggerFactory.getLogger(ServerSetImpl.class); private final ZooKeeperClient zkClient; @@ -134,31 +134,12 @@ public class ServerSetImpl implements ServerSet { Map<String, InetSocketAddress> additionalEndpoints) throws Group.JoinException, InterruptedException { - LOG.warn("Joining a ServerSet without a shard ID is deprecated and will soon break."); - return join(endpoint, additionalEndpoints, Optional.<Integer>absent()); - } - - @Override - public EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints, - int shardId) throws Group.JoinException, InterruptedException { - - return join(endpoint, additionalEndpoints, Optional.of(shardId)); - } - - private EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints, - Optional<Integer> shardId) throws Group.JoinException, InterruptedException { - checkNotNull(endpoint); checkNotNull(additionalEndpoints); - final MemberStatus memberStatus = - new MemberStatus(endpoint, additionalEndpoints, shardId); + MemberStatus memberStatus = new MemberStatus(endpoint, additionalEndpoints); Supplier<byte[]> serviceInstanceSupplier = memberStatus::serializeServiceInstance; - final Group.Membership membership = group.join(serviceInstanceSupplier); + Group.Membership membership = group.join(serviceInstanceSupplier); return () -> memberStatus.leave(membership); } @@ -178,16 +159,13 @@ public class ServerSetImpl implements ServerSet { private class MemberStatus { private final InetSocketAddress endpoint; private final Map<String, InetSocketAddress> additionalEndpoints; - private final Optional<Integer> shardId; private MemberStatus( InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints, - Optional<Integer> shardId) { + Map<String, InetSocketAddress> additionalEndpoints) { this.endpoint = endpoint; this.additionalEndpoints = additionalEndpoints; - this.shardId = shardId; } synchronized void leave(Group.Membership membership) throws UpdateException { @@ -205,10 +183,6 @@ public class ServerSetImpl implements ServerSet { Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT), Status.ALIVE); - if (shardId.isPresent()) { - serviceInstance.setShard(shardId.get()); - } - LOG.debug("updating endpoint data to:\n\t" + serviceInstance); try { return ServerSets.serializeServiceInstance(serviceInstance, codec); http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java index 56cc32d..37be70b 100644 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java @@ -77,13 +77,12 @@ public class ServerSetImplTest extends BaseZooKeeperTest { ServerSetImpl server = createServerSet(); ServerSet.EndpointStatus status = server.join( - InetSocketAddress.createUnresolved("foo", 1234), makePortMap("http-admin", 8080), 0); + InetSocketAddress.createUnresolved("foo", 1234), makePortMap("http-admin", 8080)); ServiceInstance serviceInstance = new ServiceInstance( new Endpoint("foo", 1234), ImmutableMap.of("http-admin", new Endpoint("foo", 8080)), - Status.ALIVE) - .setShard(0); + Status.ALIVE); assertChangeFired(serviceInstance); @@ -166,34 +165,26 @@ public class ServerSetImplTest extends BaseZooKeeperTest { ServiceInstance instance1 = new ServiceInstance( new Endpoint("foo", 1000), ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)), - Status.ALIVE) - .setShard(0); + Status.ALIVE); ServiceInstance instance2 = new ServiceInstance( new Endpoint("foo", 1001), ImmutableMap.of("http-admin2", new Endpoint("foo", 8081)), - Status.ALIVE) - .setShard(1); + Status.ALIVE); ServiceInstance instance3 = new ServiceInstance( new Endpoint("foo", 1002), ImmutableMap.of("http-admin3", new Endpoint("foo", 8082)), - Status.ALIVE) - .setShard(2); + Status.ALIVE); - server1.join( - InetSocketAddress.createUnresolved("foo", 1000), - server1Ports, - 0); + server1.join(InetSocketAddress.createUnresolved("foo", 1000), server1Ports); assertEquals(ImmutableList.of(instance1), ImmutableList.copyOf(serverSetBuffer.take())); ServerSet.EndpointStatus status2 = server2.join( InetSocketAddress.createUnresolved("foo", 1001), - server2Ports, - 1); + server2Ports); assertEquals(ImmutableList.of(instance1, instance2), ImmutableList.copyOf(serverSetBuffer.take())); - server3.join( - InetSocketAddress.createUnresolved("foo", 1002), server3Ports, 2); + server3.join(InetSocketAddress.createUnresolved("foo", 1002), server3Ports); assertEquals(ImmutableList.of(instance1, instance2, instance3), ImmutableList.copyOf(serverSetBuffer.take())); http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java index 240164f..73695cd 100644 --- a/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java +++ b/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java @@ -14,15 +14,19 @@ package org.apache.aurora.scheduler.app; import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import javax.inject.Inject; import javax.inject.Singleton; +import com.google.common.collect.ImmutableSet; import com.google.inject.AbstractModule; import com.google.inject.Provides; +import org.apache.aurora.common.base.Command; import org.apache.aurora.common.net.pool.DynamicHostSet; import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.zookeeper.ServerSet; import org.apache.aurora.common.zookeeper.ServerSetImpl; import org.apache.aurora.common.zookeeper.SingletonService; import org.apache.aurora.common.zookeeper.SingletonServiceImpl; @@ -38,21 +42,54 @@ import static java.util.Objects.requireNonNull; /** * Binding module for utilities to advertise the network presence of the scheduler. */ -public class ServiceDiscoveryModule extends AbstractModule { +class ServiceDiscoveryModule extends AbstractModule { + + private static class ServerSetMonitor implements ServiceGroupMonitor { + private Optional<Command> closeCommand = Optional.empty(); + private final DynamicHostSet<ServiceInstance> serverSet; + private final AtomicReference<ImmutableSet<ServiceInstance>> services = + new AtomicReference<>(ImmutableSet.of()); + + // NB: We only take a ServerSetImpl instead of a DynamicHostSet<ServiceInstance> here to + // simplify binding. + @Inject + ServerSetMonitor(ServerSetImpl serverSet) { + this.serverSet = requireNonNull(serverSet); + } + + @Override + public void start() throws MonitorException { + try { + closeCommand = Optional.of(serverSet.watch(services::set)); + } catch (DynamicHostSet.MonitorException e) { + throw new MonitorException("Unable to watch scheduler host set.", e); + } + } + + @Override + public void close() { + closeCommand.ifPresent(Command::execute); + } + + @Override + public ImmutableSet<ServiceInstance> get() { + return services.get(); + } + } private static final Logger LOG = LoggerFactory.getLogger(ServiceDiscoveryModule.class); private final String serverSetPath; private final Credentials zkCredentials; - public ServiceDiscoveryModule(String serverSetPath, Credentials zkCredentials) { + ServiceDiscoveryModule(String serverSetPath, Credentials zkCredentials) { this.serverSetPath = requireNonNull(serverSetPath); this.zkCredentials = requireNonNull(zkCredentials); } @Override protected void configure() { - // provider-only module. + bind(ServiceGroupMonitor.class).to(ServerSetMonitor.class).in(Singleton.class); } @Provides @@ -68,22 +105,16 @@ public class ServiceDiscoveryModule extends AbstractModule { @Provides @Singleton - ServerSet provideServerSet(ZooKeeperClient client, List<ACL> zooKeeperAcls) { + ServerSetImpl provideServerSet(ZooKeeperClient client, List<ACL> zooKeeperAcls) { return new ServerSetImpl(client, zooKeeperAcls, serverSetPath); } - @Provides - @Singleton - DynamicHostSet<ServiceInstance> provideSchedulerHostSet(ServerSet serverSet) { - // Used for a type re-binding of the serverset. - return serverSet; - } - + // NB: We only take a ServerSetImpl instead of a ServerSet here to simplify binding. @Provides @Singleton SingletonService provideSingletonService( ZooKeeperClient client, - ServerSet serverSet, + ServerSetImpl serverSet, List<ACL> zookeeperAcls) { return new SingletonServiceImpl( http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java new file mode 100644 index 0000000..a1329fd --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java @@ -0,0 +1,46 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.app; + +import java.io.Closeable; +import java.util.function.Supplier; + +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.thrift.ServiceInstance; + +/** + * Monitors a service group's membership and supplies a live view of the most recent set. + */ +public interface ServiceGroupMonitor extends Supplier<ImmutableSet<ServiceInstance>>, Closeable { + + /** + * Indicates a problem initiating monitoring of a service group. + */ + class MonitorException extends Exception { + public MonitorException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Starts monitoring the service group. + * + * When the service group membership no longer needs to be maintained, this monitor should be + * {@link #close() closed}. + * + * @throws MonitorException if there is a problem initiating monitoring of the service group. + */ + void start() throws MonitorException; +} http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java b/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java index 5b5cde5..60667fc 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java +++ b/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java @@ -13,6 +13,7 @@ */ package org.apache.aurora.scheduler.http; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.EnumSet; @@ -64,8 +65,8 @@ import org.apache.aurora.common.net.http.handlers.ThreadStackPrinter; import org.apache.aurora.common.net.http.handlers.TimeSeriesDataSource; import org.apache.aurora.common.net.http.handlers.VarsHandler; import org.apache.aurora.common.net.http.handlers.VarsJsonHandler; -import org.apache.aurora.common.net.pool.DynamicHostSet.MonitorException; import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException; import org.apache.aurora.scheduler.http.api.ApiModule; import org.apache.aurora.scheduler.http.api.security.HttpSecurityModule; import org.apache.aurora.scheduler.thrift.ThriftModule; @@ -295,8 +296,8 @@ public class JettyServerModule extends AbstractModule { } @Override - protected void shutDown() { - // Nothing to do here - we await VM shutdown. + protected void shutDown() throws IOException { + redirector.close(); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java index ef71290..3847fb8 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java +++ b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java @@ -13,7 +13,8 @@ */ package org.apache.aurora.scheduler.http; -import java.util.concurrent.atomic.AtomicReference; +import java.io.Closeable; +import java.io.IOException; import javax.inject.Inject; import javax.servlet.http.HttpServletRequest; @@ -23,13 +24,11 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.net.HostAndPort; -import com.google.common.util.concurrent.Atomics; -import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor; -import org.apache.aurora.common.net.pool.DynamicHostSet.MonitorException; import org.apache.aurora.common.thrift.Endpoint; import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,8 +38,9 @@ import static java.util.Objects.requireNonNull; * Redirect logic for finding the leading scheduler in the event that this process is not the * leader. */ -public class LeaderRedirect { - public enum LeaderStatus { +class LeaderRedirect implements Closeable { + + enum LeaderStatus { /** * This instance is currently the leading scheduler. */ @@ -57,22 +57,15 @@ public class LeaderRedirect { NOT_LEADING, } - // TODO(wfarner): Should we tie this directly to the producer of the node (HttpModule? It seems - // like the right thing to do, but would introduce an otherwise unnecessary dependency. - @VisibleForTesting - static final String HTTP_PORT_NAME = "http"; - private static final Logger LOG = LoggerFactory.getLogger(LeaderRedirect.class); private final HttpService httpService; - private final DynamicHostSet<ServiceInstance> schedulers; - - private final AtomicReference<ServiceInstance> leader = Atomics.newReference(); + private final ServiceGroupMonitor serviceGroupMonitor; @Inject - LeaderRedirect(HttpService httpService, DynamicHostSet<ServiceInstance> schedulers) { + LeaderRedirect(HttpService httpService, ServiceGroupMonitor serviceGroupMonitor) { this.httpService = requireNonNull(httpService); - this.schedulers = requireNonNull(schedulers); + this.serviceGroupMonitor = requireNonNull(serviceGroupMonitor); } /** @@ -81,17 +74,19 @@ public class LeaderRedirect { * @throws MonitorException If monitoring failed to initialize. */ public void monitor() throws MonitorException { - schedulers.watch(new SchedulerMonitor()); + serviceGroupMonitor.start(); + } + + @Override + public void close() throws IOException { + serviceGroupMonitor.close(); } private Optional<HostAndPort> getLeaderHttp() { - ServiceInstance leadingScheduler = leader.get(); - if (leadingScheduler == null) { - return Optional.absent(); - } + Optional<ServiceInstance> leadingScheduler = getLeader(); - if (leadingScheduler.isSetServiceEndpoint()) { - Endpoint leaderHttp = leadingScheduler.getServiceEndpoint(); + if (leadingScheduler.isPresent() && leadingScheduler.get().isSetServiceEndpoint()) { + Endpoint leaderHttp = leadingScheduler.get().getServiceEndpoint(); if (leaderHttp != null && leaderHttp.isSetHost() && leaderHttp.isSetPort()) { return Optional.of(HostAndPort.fromParts(leaderHttp.getHost(), leaderHttp.getPort())); } @@ -136,13 +131,13 @@ public class LeaderRedirect { * @return a {@code LeaderStatus} indicating whether there is an elected leader (and if so, if * this instance is the leader). */ - public LeaderStatus getLeaderStatus() { - ServiceInstance leadingScheduler = leader.get(); - if (leadingScheduler == null) { + LeaderStatus getLeaderStatus() { + Optional<ServiceInstance> leadingScheduler = getLeader(); + if (!leadingScheduler.isPresent()) { return LeaderStatus.NO_LEADER; } - if (!leadingScheduler.isSetServiceEndpoint()) { + if (!leadingScheduler.get().isSetServiceEndpoint()) { LOG.warn("Leader service instance seems to be incomplete: " + leadingScheduler); return LeaderStatus.NO_LEADER; } @@ -164,7 +159,7 @@ public class LeaderRedirect { * @param req HTTP request. * @return An optional redirect destination to route the request to the leading scheduler. */ - public Optional<String> getRedirectTarget(HttpServletRequest req) { + Optional<String> getRedirectTarget(HttpServletRequest req) { Optional<HostAndPort> redirectTarget = getRedirect(); if (redirectTarget.isPresent()) { HostAndPort target = redirectTarget.get(); @@ -192,28 +187,18 @@ public class LeaderRedirect { } } - /** - * Monitor to track scheduler leader changes. - */ - private class SchedulerMonitor implements HostChangeMonitor<ServiceInstance> { - @Override - public void onChange(ImmutableSet<ServiceInstance> hostSet) { - switch (hostSet.size()) { - case 0: - LOG.warn("No schedulers in host set, will not redirect despite not being leader."); - leader.set(null); - break; - - case 1: - LOG.info("Found leader scheduler at " + hostSet); - leader.set(Iterables.getOnlyElement(hostSet)); - break; - - default: - LOG.error("Multiple schedulers detected, will not redirect: " + hostSet); - leader.set(null); - break; - } + private Optional<ServiceInstance> getLeader() { + ImmutableSet<ServiceInstance> hostSet = serviceGroupMonitor.get(); + switch (hostSet.size()) { + case 0: + LOG.warn("No serviceGroupMonitor in host set, will not redirect despite not being leader."); + return Optional.absent(); + case 1: + LOG.info("Found leader scheduler at " + hostSet); + return Optional.of(Iterables.getOnlyElement(hostSet)); + default: + LOG.error("Multiple serviceGroupMonitor detected, will not redirect: " + hostSet); + return Optional.absent(); } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java index 918a3da..5b77750 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java +++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java @@ -44,7 +44,6 @@ import org.apache.aurora.common.application.Lifecycle; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Data; import org.apache.aurora.common.stats.Stats; -import org.apache.aurora.common.zookeeper.ServerSet; import org.apache.aurora.common.zookeeper.ServerSetImpl; import org.apache.aurora.common.zookeeper.ZooKeeperClient; import org.apache.aurora.common.zookeeper.ZooKeeperClient.Credentials; @@ -229,7 +228,7 @@ public class SchedulerIT extends BaseZooKeeperTest { private void awaitSchedulerReady() throws Exception { executor.submit(() -> { - ServerSet schedulerService = new ServerSetImpl(zkClient, SERVERSET_PATH); + ServerSetImpl schedulerService = new ServerSetImpl(zkClient, SERVERSET_PATH); final CountDownLatch schedulerReady = new CountDownLatch(1); schedulerService.watch(hostSet -> { if (!hostSet.isEmpty()) { http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java index 19c8a1f..561b134 100644 --- a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java +++ b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java @@ -14,6 +14,7 @@ package org.apache.aurora.scheduler.http; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import javax.servlet.ServletContextListener; import javax.ws.rs.core.MediaType; @@ -27,7 +28,6 @@ import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; -import com.google.inject.TypeLiteral; import com.google.inject.util.Modules; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.WebResource; @@ -36,9 +36,6 @@ import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.api.json.JSONConfiguration; import org.apache.aurora.GuavaUtils.ServiceManagerIface; -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.StatsProvider; @@ -50,6 +47,7 @@ import org.apache.aurora.gen.ServerInfo; import org.apache.aurora.scheduler.AppStartup; import org.apache.aurora.scheduler.SchedulerServicesModule; import org.apache.aurora.scheduler.app.LifecycleModule; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; import org.apache.aurora.scheduler.async.AsyncModule; import org.apache.aurora.scheduler.cron.CronJobManager; import org.apache.aurora.scheduler.http.api.GsonMessageBodyHandler; @@ -63,12 +61,11 @@ import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IServerInfo; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.aurora.scheduler.testing.FakeStatsProvider; -import org.easymock.Capture; import org.junit.Before; import static org.apache.aurora.scheduler.http.JettyServerModule.makeServletContextListener; -import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertNotNull; /** @@ -81,7 +78,7 @@ public abstract class AbstractJettyTest extends EasyMockTest { private Injector injector; protected StorageTestUtil storage; protected HostAndPort httpServer; - private Capture<HostChangeMonitor<ServiceInstance>> schedulerWatcher; + private AtomicReference<ImmutableSet<ServiceInstance>> schedulers; /** * Subclasses should override with a module that configures the servlets they are testing. @@ -95,8 +92,8 @@ public abstract class AbstractJettyTest extends EasyMockTest { @Before public void setUpBase() throws Exception { storage = new StorageTestUtil(this); - DynamicHostSet<ServiceInstance> schedulers = - createMock(new Clazz<DynamicHostSet<ServiceInstance>>() { }); + + ServiceGroupMonitor serviceGroupMonitor = createMock(ServiceGroupMonitor.class); injector = Guice.createInjector( new StatsModule(), @@ -122,7 +119,7 @@ public abstract class AbstractJettyTest extends EasyMockTest { Amount.of(1L, Time.MILLISECONDS), bindMock(BackoffStrategy.class), RateLimiter.create(1000))); - bind(new TypeLiteral<DynamicHostSet<ServiceInstance>>() { }).toInstance(schedulers); + bind(ServiceGroupMonitor.class).toInstance(serviceGroupMonitor); bindMock(CronJobManager.class); bindMock(LockManager.class); bindMock(OfferManager.class); @@ -136,17 +133,22 @@ public abstract class AbstractJettyTest extends EasyMockTest { } }, new JettyServerModule(false)); - schedulerWatcher = createCapture(); - expect(schedulers.watch(capture(schedulerWatcher))).andReturn(createMock(Command.class)); + + schedulers = new AtomicReference<>(ImmutableSet.of()); + + serviceGroupMonitor.start(); + expectLastCall(); + + expect(serviceGroupMonitor.get()).andAnswer(schedulers::get).anyTimes(); } protected void setLeadingScheduler(String host, int port) { - schedulerWatcher.getValue().onChange( + schedulers.set( ImmutableSet.of(new ServiceInstance().setServiceEndpoint(new Endpoint(host, port)))); } protected void unsetLeadingSchduler() { - schedulerWatcher.getValue().onChange(ImmutableSet.of()); + schedulers.set(ImmutableSet.of()); } protected void replayAndStart() { http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java b/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java index 3678266..a16058f 100644 --- a/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java +++ b/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java @@ -14,6 +14,7 @@ package org.apache.aurora.scheduler.http; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicReference; import javax.servlet.http.HttpServletRequest; @@ -23,19 +24,17 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.net.HostAndPort; -import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor; -import org.apache.aurora.common.net.pool.DynamicHostSet.MonitorException; import org.apache.aurora.common.testing.easymock.EasyMockTest; import org.apache.aurora.common.thrift.Endpoint; import org.apache.aurora.common.thrift.ServiceInstance; -import org.easymock.Capture; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException; import org.junit.Before; import org.junit.Test; import static org.apache.aurora.scheduler.http.LeaderRedirect.LeaderStatus; -import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; public class LeaderRedirectTest extends EasyMockTest { @@ -46,63 +45,74 @@ public class LeaderRedirectTest extends EasyMockTest { endpoint -> new ServiceInstance() .setServiceEndpoint(new Endpoint(endpoint.getHostText(), endpoint.getPort())); - private Capture<HostChangeMonitor<ServiceInstance>> monitorCapture; - + private AtomicReference<ImmutableSet<ServiceInstance>> schedulers; + private ServiceGroupMonitor serviceGroupMonitor; private LeaderRedirect leaderRedirector; @Before public void setUp() throws MonitorException { - DynamicHostSet<ServiceInstance> schedulers = - createMock(new Clazz<DynamicHostSet<ServiceInstance>>() { }); + schedulers = new AtomicReference<>(ImmutableSet.of()); + serviceGroupMonitor = createMock(ServiceGroupMonitor.class); HttpService http = createMock(HttpService.class); expect(http.getAddress()).andStubReturn(HostAndPort.fromParts("localhost", HTTP_PORT)); - leaderRedirector = new LeaderRedirect(http, schedulers); - - monitorCapture = new Capture<>(); - expect(schedulers.watch(capture(monitorCapture))).andReturn(null); + leaderRedirector = new LeaderRedirect(http, serviceGroupMonitor); } - private void replayAndMonitor() throws Exception { + private void replayAndMonitor(int expectedGetCalls) throws Exception { + serviceGroupMonitor.start(); + expectLastCall(); + + expect(serviceGroupMonitor.get()).andAnswer(() -> schedulers.get()).times(expectedGetCalls); + control.replay(); leaderRedirector.monitor(); } @Test public void testLeader() throws Exception { - replayAndMonitor(); + replayAndMonitor(3); publishSchedulers(localPort(HTTP_PORT)); assertEquals(Optional.absent(), leaderRedirector.getRedirect()); + + // NB: LEADING takes 2 tests of the server group membership to calculate; thus we expect 3 + // server group get calls, 1 for the getRedirect() above and 2 here. assertEquals(LeaderStatus.LEADING, leaderRedirector.getLeaderStatus()); } @Test public void testNotLeader() throws Exception { - replayAndMonitor(); + replayAndMonitor(3); HostAndPort remote = HostAndPort.fromParts("foobar", HTTP_PORT); publishSchedulers(remote); assertEquals(Optional.of(remote), leaderRedirector.getRedirect()); + + // NB: NOT_LEADING takes 2 tests of the server group membership to calculate; thus we expect 3 + // server group get calls, 1 for the getRedirect() above and 2 here. assertEquals(LeaderStatus.NOT_LEADING, leaderRedirector.getLeaderStatus()); } @Test public void testLeaderOnSameHost() throws Exception { - replayAndMonitor(); + replayAndMonitor(3); HostAndPort local = localPort(555); publishSchedulers(local); assertEquals(Optional.of(local), leaderRedirector.getRedirect()); + + // NB: NOT_LEADING takes 2 tests of the server group membership to calculate; thus we expect 3 + // server group get calls, 1 for the getRedirect() above and 2 here. assertEquals(LeaderStatus.NOT_LEADING, leaderRedirector.getLeaderStatus()); } @Test public void testNoLeaders() throws Exception { - replayAndMonitor(); + replayAndMonitor(2); assertEquals(Optional.absent(), leaderRedirector.getRedirect()); assertEquals(LeaderStatus.NO_LEADER, leaderRedirector.getLeaderStatus()); @@ -110,7 +120,7 @@ public class LeaderRedirectTest extends EasyMockTest { @Test public void testMultipleLeaders() throws Exception { - replayAndMonitor(); + replayAndMonitor(2); publishSchedulers(HostAndPort.fromParts("foobar", 500), HostAndPort.fromParts("baz", 800)); @@ -120,7 +130,7 @@ public class LeaderRedirectTest extends EasyMockTest { @Test public void testBadServiceInstance() throws Exception { - replayAndMonitor(); + replayAndMonitor(2); publishSchedulers(ImmutableSet.of(new ServiceInstance())); @@ -143,7 +153,7 @@ public class LeaderRedirectTest extends EasyMockTest { public void testRedirectTargetNoAttribute() throws Exception { HttpServletRequest mockRequest = mockRequest(null, null); - replayAndMonitor(); + replayAndMonitor(1); HostAndPort remote = HostAndPort.fromParts("foobar", HTTP_PORT); publishSchedulers(remote); @@ -157,7 +167,7 @@ public class LeaderRedirectTest extends EasyMockTest { public void testRedirectTargetWithAttribute() throws Exception { HttpServletRequest mockRequest = mockRequest("/the/original/path", null); - replayAndMonitor(); + replayAndMonitor(1); HostAndPort remote = HostAndPort.fromParts("foobar", HTTP_PORT); publishSchedulers(remote); @@ -171,7 +181,7 @@ public class LeaderRedirectTest extends EasyMockTest { public void testRedirectTargetQueryString() throws Exception { HttpServletRequest mockRequest = mockRequest(null, "bar=baz"); - replayAndMonitor(); + replayAndMonitor(1); HostAndPort remote = HostAndPort.fromParts("foobar", HTTP_PORT); publishSchedulers(remote); @@ -187,7 +197,7 @@ public class LeaderRedirectTest extends EasyMockTest { } private void publishSchedulers(ImmutableSet<ServiceInstance> instances) { - monitorCapture.getValue().onChange(instances); + schedulers.set(instances); } private static HostAndPort localPort(int port) { http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java index a39226c..d5648c9 100644 --- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java +++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java @@ -24,13 +24,10 @@ import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Provides; -import com.google.inject.TypeLiteral; import org.apache.aurora.common.application.ShutdownStage; import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.net.pool.DynamicHostSet; import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.gen.AuroraAdmin; import org.apache.aurora.gen.Container; import org.apache.aurora.gen.Container._Fields; @@ -46,6 +43,7 @@ import org.apache.aurora.gen.TaskQuery; import org.apache.aurora.scheduler.TierModule; import org.apache.aurora.scheduler.app.AppModule; import org.apache.aurora.scheduler.app.LifecycleModule; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; import org.apache.aurora.scheduler.app.local.FakeNonVolatileStorage; import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings; @@ -104,9 +102,9 @@ public class ThriftIT extends EasyMockTest { bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class); - DynamicHostSet<ServiceInstance> schedulers = - createMock(new Clazz<DynamicHostSet<ServiceInstance>>() { }); - bind(new TypeLiteral<DynamicHostSet<ServiceInstance>>() { }).toInstance(schedulers); + ServiceGroupMonitor schedulers = createMock(ServiceGroupMonitor.class); + bind(ServiceGroupMonitor.class).toInstance(schedulers); + bindMock(DriverFactory.class); bind(DriverSettings.class).toInstance(new DriverSettings( "fakemaster",
