Repository: aurora Updated Branches: refs/heads/master ce40eaa71 -> 14f975a4a
Factor out a discovery package. This will be the home for the Curator implementations. In the process, lift the `ServerSetMonitor` to a top-level class and add a test. Also tighten up the `ServiceDiscoveryModule` and make requirements and exports clear. Bugs closed: AURORA-1468 Reviewed at https://reviews.apache.org/r/45850/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/14f975a4 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/14f975a4 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/14f975a4 Branch: refs/heads/master Commit: 14f975a4a31758cf9d664497bc4dbf72824d6830 Parents: ce40eaa Author: John Sirois <[email protected]> Authored: Wed Apr 6 20:46:04 2016 -0600 Committer: John Sirois <[email protected]> Committed: Wed Apr 6 20:46:04 2016 -0600 ---------------------------------------------------------------------- .../aurora/scheduler/app/SchedulerMain.java | 1 + .../scheduler/app/ServiceDiscoveryModule.java | 124 ----------------- .../discovery/CommonsServerGroupMonitor.java | 59 ++++++++ .../discovery/ServiceDiscoveryModule.java | 99 ++++++++++++++ .../aurora/scheduler/app/SchedulerIT.java | 1 + .../CommonsServerGroupMonitorTest.java | 137 +++++++++++++++++++ 6 files changed, 297 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/14f975a4/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java index 11f6ad1..ecdaa7e 100644 --- a/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java +++ b/src/main/java/org/apache/aurora/scheduler/app/SchedulerMain.java @@ -50,6 +50,7 @@ import org.apache.aurora.scheduler.SchedulerLifecycle; import org.apache.aurora.scheduler.TierModule; import org.apache.aurora.scheduler.configuration.executor.ExecutorModule; import org.apache.aurora.scheduler.cron.quartz.CronModule; +import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule; import org.apache.aurora.scheduler.http.HttpService; import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule; import org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule; http://git-wip-us.apache.org/repos/asf/aurora/blob/14f975a4/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java deleted file mode 100644 index 73695cd..0000000 --- a/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.app; - -import java.util.List; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; - -import javax.inject.Inject; -import javax.inject.Singleton; - -import com.google.common.collect.ImmutableSet; -import com.google.inject.AbstractModule; -import com.google.inject.Provides; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.zookeeper.ServerSetImpl; -import org.apache.aurora.common.zookeeper.SingletonService; -import org.apache.aurora.common.zookeeper.SingletonServiceImpl; -import org.apache.aurora.common.zookeeper.ZooKeeperClient; -import org.apache.aurora.common.zookeeper.ZooKeeperClient.Credentials; -import org.apache.aurora.common.zookeeper.ZooKeeperUtils; -import org.apache.zookeeper.data.ACL; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.util.Objects.requireNonNull; - -/** - * Binding module for utilities to advertise the network presence of the scheduler. - */ -class ServiceDiscoveryModule extends AbstractModule { - - private static class ServerSetMonitor implements ServiceGroupMonitor { - private Optional<Command> closeCommand = Optional.empty(); - private final DynamicHostSet<ServiceInstance> serverSet; - private final AtomicReference<ImmutableSet<ServiceInstance>> services = - new AtomicReference<>(ImmutableSet.of()); - - // NB: We only take a ServerSetImpl instead of a DynamicHostSet<ServiceInstance> here to - // simplify binding. - @Inject - ServerSetMonitor(ServerSetImpl serverSet) { - this.serverSet = requireNonNull(serverSet); - } - - @Override - public void start() throws MonitorException { - try { - closeCommand = Optional.of(serverSet.watch(services::set)); - } catch (DynamicHostSet.MonitorException e) { - throw new MonitorException("Unable to watch scheduler host set.", e); - } - } - - @Override - public void close() { - closeCommand.ifPresent(Command::execute); - } - - @Override - public ImmutableSet<ServiceInstance> get() { - return services.get(); - } - } - - private static final Logger LOG = LoggerFactory.getLogger(ServiceDiscoveryModule.class); - - private final String serverSetPath; - private final Credentials zkCredentials; - - ServiceDiscoveryModule(String serverSetPath, Credentials zkCredentials) { - this.serverSetPath = requireNonNull(serverSetPath); - this.zkCredentials = requireNonNull(zkCredentials); - } - - @Override - protected void configure() { - bind(ServiceGroupMonitor.class).to(ServerSetMonitor.class).in(Singleton.class); - } - - @Provides - @Singleton - List<ACL> provideAcls() { - if (zkCredentials == Credentials.NONE) { - LOG.warn("Running without ZooKeeper digest credentials. ZooKeeper ACLs are disabled."); - return ZooKeeperUtils.OPEN_ACL_UNSAFE; - } else { - return ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL; - } - } - - @Provides - @Singleton - ServerSetImpl provideServerSet(ZooKeeperClient client, List<ACL> zooKeeperAcls) { - return new ServerSetImpl(client, zooKeeperAcls, serverSetPath); - } - - // NB: We only take a ServerSetImpl instead of a ServerSet here to simplify binding. - @Provides - @Singleton - SingletonService provideSingletonService( - ZooKeeperClient client, - ServerSetImpl serverSet, - List<ACL> zookeeperAcls) { - - return new SingletonServiceImpl( - serverSet, - SingletonServiceImpl.createSingletonCandidate(client, serverSetPath, zookeeperAcls)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/14f975a4/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServerGroupMonitor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServerGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServerGroupMonitor.java new file mode 100644 index 0000000..3336c87 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServerGroupMonitor.java @@ -0,0 +1,59 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import javax.inject.Inject; + +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.net.pool.DynamicHostSet; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; + +import static java.util.Objects.requireNonNull; + +class CommonsServerGroupMonitor implements ServiceGroupMonitor { + private Optional<Command> closeCommand = Optional.empty(); + private final DynamicHostSet<ServiceInstance> serverSet; + private final AtomicReference<ImmutableSet<ServiceInstance>> services = + new AtomicReference<>(ImmutableSet.of()); + + @Inject + CommonsServerGroupMonitor(DynamicHostSet<ServiceInstance> serverSet) { + this.serverSet = requireNonNull(serverSet); + } + + @Override + public void start() throws MonitorException { + try { + closeCommand = Optional.of(serverSet.watch(services::set)); + } catch (DynamicHostSet.MonitorException e) { + throw new MonitorException("Unable to watch scheduler host set.", e); + } + } + + @Override + public void close() { + closeCommand.ifPresent(Command::execute); + } + + @Override + public ImmutableSet<ServiceInstance> get() { + return services.get(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/14f975a4/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java new file mode 100644 index 0000000..c14162f --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java @@ -0,0 +1,99 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import java.util.List; + +import javax.inject.Singleton; + +import com.google.inject.Exposed; +import com.google.inject.PrivateModule; +import com.google.inject.Provides; + +import org.apache.aurora.common.net.pool.DynamicHostSet; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.zookeeper.ServerSetImpl; +import org.apache.aurora.common.zookeeper.SingletonService; +import org.apache.aurora.common.zookeeper.SingletonServiceImpl; +import org.apache.aurora.common.zookeeper.ZooKeeperClient; +import org.apache.aurora.common.zookeeper.ZooKeeperClient.Credentials; +import org.apache.aurora.common.zookeeper.ZooKeeperUtils; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +/** + * Binding module for utilities to advertise the network presence of the scheduler. + */ +public class ServiceDiscoveryModule extends PrivateModule { + + private static final Logger LOG = LoggerFactory.getLogger(ServiceDiscoveryModule.class); + + private final String serverSetPath; + private final Credentials zkCredentials; + + public ServiceDiscoveryModule(String serverSetPath, Credentials zkCredentials) { + this.serverSetPath = requireNonNull(serverSetPath); + this.zkCredentials = requireNonNull(zkCredentials); + } + + @Override + protected void configure() { + requireBinding(ZooKeeperClient.class); + + bind(ServiceGroupMonitor.class).to(CommonsServerGroupMonitor.class).in(Singleton.class); + expose(ServiceGroupMonitor.class); + } + + @Provides + @Singleton + List<ACL> provideAcls() { + if (zkCredentials == Credentials.NONE) { + LOG.warn("Running without ZooKeeper digest credentials. ZooKeeper ACLs are disabled."); + return ZooKeeperUtils.OPEN_ACL_UNSAFE; + } else { + return ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL; + } + } + + @Provides + @Singleton + ServerSetImpl provideServerSet(ZooKeeperClient client, List<ACL> zooKeeperAcls) { + return new ServerSetImpl(client, zooKeeperAcls, serverSetPath); + } + + @Provides + @Singleton + DynamicHostSet<ServiceInstance> provideServerSet(ServerSetImpl serverSet) { + // Used for a type re-binding of the server set. + return serverSet; + } + + // NB: We only take a ServerSetImpl instead of a ServerSet here to simplify binding. + @Provides + @Singleton + @Exposed + SingletonService provideSingletonService( + ZooKeeperClient client, + ServerSetImpl serverSet, + List<ACL> zookeeperAcls) { + + return new SingletonServiceImpl( + serverSet, + SingletonServiceImpl.createSingletonCandidate(client, serverSetPath, zookeeperAcls)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/14f975a4/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java index 5b77750..b449827 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java +++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java @@ -63,6 +63,7 @@ import org.apache.aurora.scheduler.ResourceSlot; import org.apache.aurora.scheduler.TierModule; import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; +import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule; import org.apache.aurora.scheduler.log.Log; import org.apache.aurora.scheduler.log.Log.Entry; import org.apache.aurora.scheduler.log.Log.Position; http://git-wip-us.apache.org/repos/asf/aurora/blob/14f975a4/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServerGroupMonitorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServerGroupMonitorTest.java b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServerGroupMonitorTest.java new file mode 100644 index 0000000..b584780 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/discovery/CommonsServerGroupMonitorTest.java @@ -0,0 +1,137 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.discovery; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.net.pool.DynamicHostSet; +import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor; +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.common.thrift.Endpoint; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; +import org.apache.aurora.scheduler.app.ServiceGroupMonitor; +import org.easymock.Capture; +import org.junit.Before; +import org.junit.Test; + +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class CommonsServerGroupMonitorTest extends EasyMockTest { + + private DynamicHostSet<ServiceInstance> serverSet; + private Capture<HostChangeMonitor<ServiceInstance>> hostChangeMonitorCapture; + private Command stopCommand; + + @Before + public void setUp() throws Exception { + serverSet = createMock(new Clazz<DynamicHostSet<ServiceInstance>>() { }); + hostChangeMonitorCapture = createCapture(); + stopCommand = createMock(Command.class); + } + + private void expectSuccessfulWatch() throws Exception { + expect(serverSet.watch(capture(hostChangeMonitorCapture))).andReturn(stopCommand); + } + + private void expectFailedWatch() throws Exception { + DynamicHostSet.MonitorException watchError = + new DynamicHostSet.MonitorException( + "Problem watching service group", + new RuntimeException()); + expect(serverSet.watch(capture(hostChangeMonitorCapture))).andThrow(watchError); + } + + @Test + public void testNominalLifecycle() throws Exception { + expectSuccessfulWatch(); + + stopCommand.execute(); + expectLastCall(); + + control.replay(); + + CommonsServerGroupMonitor groupMonitor = new CommonsServerGroupMonitor(serverSet); + groupMonitor.start(); + groupMonitor.close(); + } + + @Test + public void testExceptionalLifecycle() throws Exception { + expectFailedWatch(); + control.replay(); + + CommonsServerGroupMonitor groupMonitor = new CommonsServerGroupMonitor(serverSet); + try { + groupMonitor.start(); + fail(); + } catch (ServiceGroupMonitor.MonitorException e) { + // expected + } + + // Close on a non-started monitor should be allowed. + groupMonitor.close(); + } + + @Test + public void testNoHosts() throws Exception { + expectSuccessfulWatch(); + control.replay(); + + CommonsServerGroupMonitor groupMonitor = new CommonsServerGroupMonitor(serverSet); + assertEquals(ImmutableSet.of(), groupMonitor.get()); + + groupMonitor.start(); + assertEquals(ImmutableSet.of(), groupMonitor.get()); + + hostChangeMonitorCapture.getValue().onChange(ImmutableSet.of()); + assertEquals(ImmutableSet.of(), groupMonitor.get()); + } + + @Test + public void testHostUpdates() throws Exception { + expectSuccessfulWatch(); + control.replay(); + + CommonsServerGroupMonitor groupMonitor = new CommonsServerGroupMonitor(serverSet); + groupMonitor.start(); + + ImmutableSet<ServiceInstance> twoHosts = + ImmutableSet.of(serviceInstance("one"), serviceInstance("two")); + hostChangeMonitorCapture.getValue().onChange(twoHosts); + assertEquals(twoHosts, groupMonitor.get()); + + ImmutableSet<ServiceInstance> oneHost = ImmutableSet.of(serviceInstance("one")); + hostChangeMonitorCapture.getValue().onChange(oneHost); + assertEquals(oneHost, groupMonitor.get()); + + ImmutableSet<ServiceInstance> anotherHost = ImmutableSet.of(serviceInstance("three")); + hostChangeMonitorCapture.getValue().onChange(anotherHost); + assertEquals(anotherHost, groupMonitor.get()); + + ImmutableSet<ServiceInstance> noHosts = ImmutableSet.of(); + hostChangeMonitorCapture.getValue().onChange(noHosts); + assertEquals(noHosts, groupMonitor.get()); + } + + private ServiceInstance serviceInstance(String hostName) { + return new ServiceInstance(new Endpoint(hostName, 42), ImmutableMap.of(), Status.ALIVE); + } +}
