Repository: aurora Updated Branches: refs/heads/master 2652fe02a -> 705dbc7cd
http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java index c599fe3..b132cde 100644 --- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java @@ -14,33 +14,14 @@ package org.apache.aurora.scheduler.mesos; import java.nio.charset.StandardCharsets; -import java.util.concurrent.atomic.AtomicBoolean; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.net.InetAddresses; -import org.apache.aurora.common.application.Lifecycle; -import org.apache.aurora.common.base.Command; import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.TaskStatusHandler; -import org.apache.aurora.scheduler.base.Conversions; -import org.apache.aurora.scheduler.base.SchedulerException; -import org.apache.aurora.scheduler.events.EventSink; -import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; -import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStatusReceived; -import org.apache.aurora.scheduler.offers.OfferManager; -import org.apache.aurora.scheduler.stats.CachedCounters; -import org.apache.aurora.scheduler.storage.Storage.StorageException; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; -import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.apache.mesos.Protos; import org.apache.mesos.SchedulerDriver; +import org.apache.mesos.v1.Protos.ExecutorID; import org.apache.mesos.v1.Protos.FrameworkID; import org.apache.mesos.v1.Protos.OfferID; import org.apache.mesos.v1.Protos.TaskID; @@ -48,64 +29,36 @@ import org.apache.mesos.v1.Protos.TaskState; import org.apache.mesos.v1.Protos.TaskStatus; import org.apache.mesos.v1.Protos.TaskStatus.Reason; import org.apache.mesos.v1.Protos.TaskStatus.Source; -import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import static org.apache.aurora.gen.MaintenanceMode.DRAINING; -import static org.apache.aurora.gen.MaintenanceMode.NONE; -import static org.easymock.EasyMock.anyString; -import static org.easymock.EasyMock.expect; +import static org.apache.aurora.scheduler.mesos.ProtosConversion.convert; import static org.easymock.EasyMock.expectLastCall; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; public class MesosSchedulerImplTest extends EasyMockTest { private static final String FRAMEWORK_ID = "framework-id"; private static final FrameworkID FRAMEWORK = FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(); - + private static final String MASTER_ID = "master-id"; + private static final Protos.MasterInfo MASTER = Protos.MasterInfo.newBuilder() + .setId(MASTER_ID) + .setIp(InetAddresses.coerceToInteger(InetAddresses.forString("1.2.3.4"))) //NOPMD + .setPort(5050).build(); private static final String SLAVE_HOST = "slave-hostname"; private static final Protos.SlaveID SLAVE_ID = Protos.SlaveID.newBuilder().setValue("slave-id").build(); - private static final String SLAVE_HOST_2 = "slave-hostname-2"; - private static final Protos.SlaveID SLAVE_ID_2 = - Protos.SlaveID.newBuilder().setValue("slave-id-2").build(); - private static final Protos.ExecutorID EXECUTOR_ID = - Protos.ExecutorID.newBuilder().setValue("executor-id").build(); private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("offer-id").build(); private static final Protos.Offer OFFER = Protos.Offer.newBuilder() - .setFrameworkId(ProtosConversion.convert(FRAMEWORK)) + .setFrameworkId(convert(FRAMEWORK)) .setSlaveId(SLAVE_ID) .setHostname(SLAVE_HOST) - .setId(ProtosConversion.convert(OFFER_ID)) - .build(); - private static final HostOffer HOST_OFFER = new HostOffer( - ProtosConversion.convert(OFFER), - IHostAttributes.build( - new HostAttributes() - .setHost(SLAVE_HOST) - .setSlaveId(SLAVE_ID.getValue()) - .setMode(NONE) - .setAttributes(ImmutableSet.of()))); - private static final OfferID OFFER_ID_2 = OfferID.newBuilder().setValue("offer-id-2").build(); - private static final Protos.Offer OFFER_2 = Protos.Offer.newBuilder(OFFER) - .setSlaveId(SLAVE_ID_2) - .setHostname(SLAVE_HOST_2) - .setId(ProtosConversion.convert(OFFER_ID_2)) + .setId(convert(OFFER_ID)) .build(); - private static final HostOffer HOST_OFFER_2 = new HostOffer( - ProtosConversion.convert(OFFER_2), - IHostAttributes.build( - new HostAttributes() - .setHost(SLAVE_HOST_2) - .setSlaveId(SLAVE_ID_2.getValue()) - .setMode(NONE) - .setAttributes(ImmutableSet.of()))); + + private static final ExecutorID EXECUTOR_ID = + ExecutorID.newBuilder().setValue("executor-id").build(); private static final TaskStatus STATUS_NO_REASON = TaskStatus.newBuilder() .setState(TaskState.TASK_RUNNING) @@ -121,358 +74,129 @@ public class MesosSchedulerImplTest extends EasyMockTest { .setReason(Reason.REASON_COMMAND_EXECUTOR_FAILED) .build(); - private static final TaskStatus STATUS_RECONCILIATION = STATUS_NO_REASON - .toBuilder() - .setReason(Reason.REASON_RECONCILIATION) - .build(); - - private static final TaskStatusReceived PUBSUB_RECONCILIATION_EVENT = new TaskStatusReceived( - STATUS_RECONCILIATION.getState(), - Optional.of(STATUS_RECONCILIATION.getSource()), - Optional.of(STATUS_RECONCILIATION.getReason()), - Optional.of(1000000L) - ); - - private StorageTestUtil storageUtil; - private Command shutdownCommand; - private TaskStatusHandler statusHandler; - private OfferManager offerManager; + private MesosCallbackHandler handler; private SchedulerDriver driver; - private EventSink eventSink; - private FakeStatsProvider statsProvider; private MesosSchedulerImpl scheduler; @Before public void setUp() { - Logger log = LoggerFactory.getLogger(""); - initializeScheduler(log); - } - - private void initializeScheduler(Logger logger) { - storageUtil = new StorageTestUtil(this); - shutdownCommand = createMock(Command.class); - statusHandler = createMock(TaskStatusHandler.class); - offerManager = createMock(OfferManager.class); - eventSink = createMock(EventSink.class); - statsProvider = new FakeStatsProvider(); - - scheduler = new MesosSchedulerImpl( - storageUtil.storage, - new Lifecycle(shutdownCommand), - statusHandler, - offerManager, - eventSink, - MoreExecutors.sameThreadExecutor(), - new CachedCounters(statsProvider), - logger, - statsProvider); + handler = createMock(MesosCallbackHandler.class); driver = createMock(SchedulerDriver.class); + scheduler = new MesosSchedulerImpl(handler); } - @Test(expected = IllegalStateException.class) - public void testBadOrdering() { - control.replay(); + @Test + public void testSlaveLost() { + handler.handleLostAgent(convert(SLAVE_ID)); + expectLastCall().once(); - // Should fail since the scheduler is not yet registered. - scheduler.resourceOffers(driver, ImmutableList.of()); - } + control.replay(); - @Test - public void testNoOffers() { - new AbstractRegisteredTest() { - @Override - void test() { - scheduler.resourceOffers(driver, ImmutableList.of()); - } - }.run(); + scheduler.slaveLost(driver, SLAVE_ID); } @Test - public void testAcceptOffer() { - new AbstractOfferTest() { - @Override - void respondToOffer() { - expectOfferAttributesSaved(HOST_OFFER); - offerManager.addOffer(HOST_OFFER); - } - }.run(); - } + public void testRegistered() { + handler.handleRegistration(FRAMEWORK, convert(MASTER)); + expectLastCall().once(); - @Test - public void testAcceptOfferDebugLogging() { - Logger mockLogger = createMock(Logger.class); - mockLogger.info(anyString()); - mockLogger.debug(anyString(), EasyMock.<Object>anyObject()); - initializeScheduler(mockLogger); - - new AbstractOfferTest() { - @Override - void respondToOffer() { - expectOfferAttributesSaved(HOST_OFFER); - offerManager.addOffer(HOST_OFFER); - } - }.run(); - } + control.replay(); - @Test - public void testAttributesModePreserved() { - new AbstractOfferTest() { - @Override - void respondToOffer() { - IHostAttributes draining = - IHostAttributes.build(HOST_OFFER.getAttributes().newBuilder().setMode(DRAINING)); - expect(storageUtil.attributeStore.getHostAttributes(HOST_OFFER.getOffer().getHostname())) - .andReturn(Optional.of(draining)); - IHostAttributes saved = IHostAttributes.build( - Conversions.getAttributes(HOST_OFFER.getOffer()).newBuilder().setMode(DRAINING)); - expect(storageUtil.attributeStore.saveHostAttributes(saved)).andReturn(true); - - HostOffer offer = new HostOffer(HOST_OFFER.getOffer(), draining); - offerManager.addOffer(offer); - } - }.run(); + scheduler.registered(driver, convert(FRAMEWORK), MASTER); } @Test - public void testStatusUpdate() { - // Test multiple variations of fields in TaskStatus to cover all branches. - new StatusUpdater(STATUS).run(); - control.verify(); - control.reset(); - new StatusUpdater(STATUS.toBuilder().clearSource().build()).run(); - control.verify(); - control.reset(); - new StatusUpdater(STATUS.toBuilder().clearReason().build()).run(); - control.verify(); - control.reset(); - new StatusUpdater(STATUS.toBuilder().clearMessage().build()).run(); - } + public void testDisconnected() { + handler.handleDisconnection(); + expectLastCall().once(); - @Test(expected = SchedulerException.class) - public void testStatusUpdateFails() { - new AbstractStatusTest() { - @Override - void expectations() { - eventSink.post(new TaskStatusReceived( - STATUS.getState(), - Optional.of(STATUS.getSource()), - Optional.of(STATUS.getReason()), - Optional.of(1000000L) - )); - statusHandler.statusUpdate(status); - expectLastCall().andThrow(new StorageException("Injected.")); - } - }.run(); - } + control.replay(); - @Test - public void testMultipleOffers() { - new AbstractRegisteredTest() { - @Override - void expectations() { - expectOfferAttributesSaved(HOST_OFFER); - expectOfferAttributesSaved(HOST_OFFER_2); - offerManager.addOffer(HOST_OFFER); - offerManager.addOffer(HOST_OFFER_2); - } - - @Override - void test() { - scheduler.resourceOffers(driver, - ImmutableList.of( - ProtosConversion.convert(HOST_OFFER.getOffer()), - ProtosConversion.convert(HOST_OFFER_2.getOffer()))); - } - }.run(); + scheduler.disconnected(driver); } @Test - public void testDisconnected() { - new AbstractRegisteredTest() { - @Override - void expectations() { - eventSink.post(new DriverDisconnected()); - } - - @Override - void test() { - scheduler.disconnected(driver); - assertEquals(1L, statsProvider.getLongValue("scheduler_framework_disconnects")); - } - }.run(); - } + public void testReRegistered() { + handler.handleReregistration(convert(MASTER)); + expectLastCall().once(); - @Test - public void testFrameworkMessageIgnored() { control.replay(); - scheduler.frameworkMessage( - driver, - EXECUTOR_ID, - SLAVE_ID, - "hello".getBytes(StandardCharsets.UTF_8)); + scheduler.reregistered(driver, MASTER); } @Test - public void testSlaveLost() { + public void testResourceOffers() { + handler.handleRegistration(FRAMEWORK, convert(MASTER)); + expectLastCall().once(); + handler.handleOffers(ImmutableList.of(convert(OFFER))); + expectLastCall().once(); + control.replay(); - scheduler.slaveLost(driver, SLAVE_ID); - assertEquals(1L, statsProvider.getLongValue("slaves_lost")); + scheduler.registered(driver, convert(FRAMEWORK), MASTER); + scheduler.resourceOffers(driver, ImmutableList.of(OFFER)); } - @Test - public void testReregistered() { + @Test(expected = IllegalStateException.class) + public void testBadOrdering() { control.replay(); - scheduler.reregistered(driver, Protos.MasterInfo.getDefaultInstance()); + // Should fail since the scheduler is not yet registered. + scheduler.resourceOffers(driver, ImmutableList.of()); } @Test public void testOfferRescinded() { - offerManager.cancelOffer(OFFER_ID); + handler.handleRescind(OFFER_ID); + expectLastCall().once(); control.replay(); - scheduler.offerRescinded(driver, ProtosConversion.convert(OFFER_ID)); - assertEquals(1L, statsProvider.getLongValue("offers_rescinded")); + scheduler.offerRescinded(driver, convert(OFFER_ID)); } @Test - public void testError() { - shutdownCommand.execute(); + public void testStatusUpdate() { + handler.handleUpdate(STATUS); + expectLastCall().once(); control.replay(); - scheduler.error(driver, "error"); + scheduler.statusUpdate(driver, convert(STATUS)); } @Test - public void testExecutorLost() { + public void testError() { + handler.handleError("Oh No!"); + expectLastCall().once(); + control.replay(); - scheduler.executorLost(driver, EXECUTOR_ID, SLAVE_ID, 1); + scheduler.error(driver, "Oh No!"); } @Test - public void testStatusReconciliationAcceptsDebugLogging() { - Logger mockLogger = createMock(Logger.class); - mockLogger.info(anyString()); - mockLogger.debug(anyString()); - initializeScheduler(mockLogger); - - new AbstractStatusReconciliationTest() { - @Override - void expectations() { - eventSink.post(PUBSUB_RECONCILIATION_EVENT); - statusHandler.statusUpdate(status); - } - }.run(); - } - - private class StatusUpdater extends AbstractStatusTest { - StatusUpdater(TaskStatus status) { - super(status); - } - - @Override - void expectations() { - eventSink.post(new TaskStatusReceived( - status.getState(), - Optional.fromNullable(status.getSource()), - status.hasReason() ? Optional.of(status.getReason()) : Optional.absent(), - Optional.of(1000000L) - )); - statusHandler.statusUpdate(status); - } - } - - private void expectOfferAttributesSaved(HostOffer offer) { - expect(storageUtil.attributeStore.getHostAttributes(offer.getOffer().getHostname())) - .andReturn(Optional.absent()); - IHostAttributes defaultMode = IHostAttributes.build( - Conversions.getAttributes(offer.getOffer()).newBuilder().setMode(NONE)); - expect(storageUtil.attributeStore.saveHostAttributes(defaultMode)).andReturn(true); - } - - private abstract class AbstractRegisteredTest { - private final AtomicBoolean runCalled = new AtomicBoolean(false); - - AbstractRegisteredTest() { - // Prevent otherwise silent noop tests that forget to call run(). - addTearDown(new TearDown() { - @Override - public void tearDown() { - assertTrue(runCalled.get()); - } - }); - } - - void run() { - runCalled.set(true); - eventSink.post(new DriverRegistered()); - storageUtil.expectOperations(); - storageUtil.schedulerStore.saveFrameworkId(FRAMEWORK_ID); - expectations(); - - control.replay(); - - scheduler.registered( - driver, - ProtosConversion.convert(FRAMEWORK), - Protos.MasterInfo.getDefaultInstance()); - test(); - } - - void expectations() { - // Default no-op, subclasses may override. - } - - abstract void test(); - } + public void testFrameworkMessage() { + handler.handleMessage(EXECUTOR_ID, convert(SLAVE_ID)); + expectLastCall().once(); - private abstract class AbstractOfferTest extends AbstractRegisteredTest { - AbstractOfferTest() { - super(); - } - - abstract void respondToOffer(); - - @Override - void expectations() { - respondToOffer(); - } + control.replay(); - @Override - void test() { - scheduler.resourceOffers( - driver, - ImmutableList.of(ProtosConversion.convert(HOST_OFFER.getOffer()))); - } + scheduler.frameworkMessage( + driver, + convert(EXECUTOR_ID), + SLAVE_ID, + "message".getBytes(StandardCharsets.UTF_8)); } - private abstract class AbstractStatusTest extends AbstractRegisteredTest { - protected final TaskStatus status; - - AbstractStatusTest() { - this(STATUS); - } - - AbstractStatusTest(TaskStatus status) { - super(); - this.status = status; - } + @Test + public void testExecutorLost() { + handler.handleLostExecutor(EXECUTOR_ID, convert(SLAVE_ID), 1); - @Override - void test() { - scheduler.statusUpdate(driver, ProtosConversion.convert(status)); - } - } + control.replay(); - private abstract class AbstractStatusReconciliationTest extends AbstractStatusTest { - AbstractStatusReconciliationTest() { - super(STATUS_RECONCILIATION); - } + scheduler.executorLost(driver, convert(EXECUTOR_ID), SLAVE_ID, 1); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java new file mode 100644 index 0000000..988ec50 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java @@ -0,0 +1,275 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.mesos; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.net.InetAddresses; +import com.google.protobuf.ByteString; + +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.scheduler.stats.CachedCounters; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.apache.mesos.v1.Protos.AgentID; +import org.apache.mesos.v1.Protos.ExecutorID; +import org.apache.mesos.v1.Protos.FrameworkID; +import org.apache.mesos.v1.Protos.FrameworkInfo; +import org.apache.mesos.v1.Protos.MasterInfo; +import org.apache.mesos.v1.Protos.Offer; +import org.apache.mesos.v1.Protos.OfferID; +import org.apache.mesos.v1.Protos.TaskID; +import org.apache.mesos.v1.Protos.TaskState; +import org.apache.mesos.v1.Protos.TaskStatus; +import org.apache.mesos.v1.Protos.TaskStatus.Source; +import org.apache.mesos.v1.scheduler.Mesos; +import org.apache.mesos.v1.scheduler.Protos.Call; +import org.apache.mesos.v1.scheduler.Protos.Event; +import org.easymock.Capture; +import org.junit.Before; +import org.junit.Test; + +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class VersionedMesosSchedulerImplTest extends EasyMockTest { + + private MesosCallbackHandler handler; + private StorageTestUtil storageUtil; + private Mesos driver; + private FakeStatsProvider statsProvider; + private DriverSettings driverSettings; + + private VersionedMesosSchedulerImpl scheduler; + + private static final String AGENT_HOST = "slave-hostname"; + private static final AgentID AGENT_ID = + AgentID.newBuilder().setValue("slave-id").build(); + + private static final String FRAMEWORK_ID = "framework-id"; + private static final FrameworkID FRAMEWORK = + FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(); + private static final FrameworkInfo FRAMEWORK_INFO = FrameworkInfo.newBuilder() + .setName("name") + .setUser("user") + .setCheckpoint(true) + .setFailoverTimeout(1000) + .build(); + + private static final String MASTER_ID = "master-id"; + private static final MasterInfo MASTER = MasterInfo.newBuilder() + .setId(MASTER_ID) + .setIp(InetAddresses.coerceToInteger(InetAddresses.forString("1.2.3.4"))) //NOPMD + .setPort(5050).build(); + + private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("offer-id").build(); + private static final Offer OFFER = Offer.newBuilder() + .setFrameworkId(FRAMEWORK) + .setAgentId(AGENT_ID) + .setHostname(AGENT_HOST) + .setId(OFFER_ID) + .build(); + + private static final TaskStatus STATUS = TaskStatus.newBuilder() + .setState(TaskState.TASK_RUNNING) + .setSource(Source.SOURCE_AGENT) + .setMessage("message") + .setTimestamp(1D) + .setTaskId(TaskID.newBuilder().setValue("task-id").build()) + .build(); + + private static final ExecutorID EXECUTOR_ID = + ExecutorID.newBuilder().setValue("executor-id").build(); + + private static final Event OFFER_EVENT = Event.newBuilder() + .setType(Event.Type.OFFERS) + .setOffers(Event.Offers.newBuilder() + .addOffers(OFFER)) + .build(); + + private static final Event SUBSCRIBED_EVENT = Event.newBuilder() + .setType(Event.Type.SUBSCRIBED) + .setSubscribed(Event.Subscribed.newBuilder() + .setFrameworkId(FRAMEWORK) + .setHeartbeatIntervalSeconds(15) + .setMasterInfo(MASTER)) + .build(); + + private static final Event UPDATE_EVENT = Event.newBuilder() + .setType(Event.Type.UPDATE) + .setUpdate(Event.Update.newBuilder() + .setStatus(STATUS)) + .build(); + + private static final Event RESCIND_EVENT = Event.newBuilder() + .setType(Event.Type.RESCIND) + .setRescind(Event.Rescind.newBuilder().setOfferId(OFFER_ID)) + .build(); + + private static final Event MESSAGE_EVENT = Event.newBuilder() + .setType(Event.Type.MESSAGE) + .setMessage(Event.Message.newBuilder() + .setAgentId(AGENT_ID) + .setExecutorId(EXECUTOR_ID) + .setData(ByteString.copyFromUtf8("message"))) + .build(); + + private static final Event ERROR_EVENT = Event.newBuilder() + .setType(Event.Type.ERROR) + .setError(Event.Error.newBuilder().setMessage("Oh no!")) + .build(); + + private static final Event FAILED_AGENT_EVENT = Event.newBuilder() + .setType(Event.Type.FAILURE) + .setFailure(Event.Failure.newBuilder().setAgentId(AGENT_ID)) + .build(); + + @Before + public void setUp() { + handler = createMock(MesosCallbackHandler.class); + storageUtil = new StorageTestUtil(this); + driver = createMock(Mesos.class); + statsProvider = new FakeStatsProvider(); + driverSettings = createMock(DriverSettings.class); + + scheduler = new VersionedMesosSchedulerImpl( + handler, + new CachedCounters(statsProvider), + storageUtil.storage, + driverSettings); + } + + @Test + public void testConnected() { + // Once the V1 driver has connected, we need to establish a subscription to get events + + storageUtil.expectOperations(); + expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID)); + expect(driverSettings.getFrameworkInfo()).andReturn(FRAMEWORK_INFO); + + Capture<Call> subscribeCapture = createCapture(); + + driver.send(capture(subscribeCapture)); + expectLastCall().once(); + + control.replay(); + + scheduler.connected(driver); + + assertTrue(subscribeCapture.hasCaptured()); + + Call subscribe = subscribeCapture.getValue(); + + assertEquals(subscribe.getType(), Call.Type.SUBSCRIBE); + assertEquals(subscribe.getFrameworkId(), FRAMEWORK); + assertEquals( + subscribe.getSubscribe().getFrameworkInfo(), + FRAMEWORK_INFO.toBuilder().setId(FRAMEWORK).build()); + } + + @Test + public void testDisconnected() { + handler.handleDisconnection(); + + control.replay(); + + scheduler.disconnected(driver); + } + + @Test + public void testSubscription() { + handler.handleRegistration(FRAMEWORK, MASTER); + + control.replay(); + + scheduler.received(driver, SUBSCRIBED_EVENT); + + assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_SUBSCRIBED")); + } + + @Test + public void testOffers() { + handler.handleRegistration(FRAMEWORK, MASTER); + handler.handleOffers(ImmutableList.of(OFFER)); + + control.replay(); + + scheduler.received(driver, SUBSCRIBED_EVENT); + scheduler.received(driver, OFFER_EVENT); + assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_OFFERS")); + } + + @Test + public void testRescind() { + handler.handleRescind(OFFER_ID); + + control.replay(); + + scheduler.received(driver, RESCIND_EVENT); + assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_RESCIND")); + } + + @Test + public void testUpdate() { + handler.handleUpdate(STATUS); + + control.replay(); + + scheduler.received(driver, UPDATE_EVENT); + + assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_UPDATE")); + } + + @Test(expected = IllegalStateException.class) + public void testBadOrdering() { + // get an offer before the driver has registered + + control.replay(); + + scheduler.received(driver, OFFER_EVENT); + } + + @Test + public void testMessage() { + handler.handleMessage(EXECUTOR_ID, AGENT_ID); + + control.replay(); + + scheduler.received(driver, MESSAGE_EVENT); + assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_MESSAGE")); + } + + @Test + public void testError() { + handler.handleError("Oh no!"); + + control.replay(); + + scheduler.received(driver, ERROR_EVENT); + assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_ERROR")); + } + + @Test + public void testFailedAgent() { + handler.handleLostAgent(AGENT_ID); + + control.replay(); + + scheduler.received(driver, FAILED_AGENT_EVENT); + assertEquals(1L, statsProvider.getLongValue("mesos_scheduler_event_FAILURE")); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java new file mode 100644 index 0000000..72aede8 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java @@ -0,0 +1,194 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.mesos; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; + +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.mesos.v1.Protos; +import org.apache.mesos.v1.scheduler.Mesos; +import org.apache.mesos.v1.scheduler.Protos.Call; +import org.apache.mesos.v1.scheduler.Scheduler; +import org.easymock.Capture; +import org.junit.Before; +import org.junit.Test; + +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class VersionedSchedulerDriverServiceTest extends EasyMockTest { + + private static final String FRAMEWORK_ID = "framework_id"; + private static final Protos.OfferID OFFER_ID = + Protos.OfferID.newBuilder().setValue("offer-id").build(); + private static final Protos.Filters FILTER = + Protos.Filters.newBuilder().setRefuseSeconds(10).build(); + private static final DriverSettings SETTINGS = new DriverSettings( + "fakemaster", + Optional.absent(), + Protos.FrameworkInfo.newBuilder() + .setUser("framework user") + .setName("test framework") + .build()); + + private StorageTestUtil storage; + private Scheduler scheduler; + private VersionedSchedulerDriverService driverService; + private VersionedDriverFactory driverFactory; + private Mesos mesos; + + @Before + public void setUp() { + scheduler = createMock(Scheduler.class); + storage = new StorageTestUtil(this); + driverFactory = createMock(VersionedDriverFactory.class); + mesos = createMock(Mesos.class); + driverService = new VersionedSchedulerDriverService( + storage.storage, + SETTINGS, + scheduler, + driverFactory); + } + + @Test + public void testNoopStop() { + control.replay(); + + driverService.stopAsync().awaitTerminated(); + } + + @Test + public void testStop() { + expectStart(); + control.replay(); + + driverService.startAsync().awaitRunning(); + driverService.stopAsync().awaitTerminated(); + } + + @Test(expected = IllegalStateException.class) + public void testExceptionBeforeStart() { + control.replay(); + driverService.killTask("task-id"); + } + + @Test + public void testBlockingBeforeRegistered() throws InterruptedException { + expectStart(); + control.replay(); + driverService.startAsync().awaitRunning(); + + Thread killRunner = new Thread(() -> { + driverService.killTask("task-id"); + }); + + killRunner.start(); + + // A hack to ensure the thread actually executes the method + Thread.sleep(1000L); + assertEquals(Thread.State.WAITING, killRunner.getState()); + + killRunner.interrupt(); + } + + @Test + public void testKill() { + expectStart(); + expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID)); + + Capture<Call> killCapture = createCapture(); + mesos.send(capture(killCapture)); + expectLastCall().once(); + + control.replay(); + driverService.startAsync().awaitRunning(); + driverService.registered(new PubsubEvent.DriverRegistered()); + + driverService.killTask("task-id"); + + assertTrue(killCapture.hasCaptured()); + Call kill = killCapture.getValue(); + assertEquals(kill.getFrameworkId().getValue(), FRAMEWORK_ID); + assertEquals(kill.getType(), Call.Type.KILL); + assertEquals(kill.getKill().getTaskId().getValue(), "task-id"); + } + + @Test + public void testDecline() { + expectStart(); + expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID)); + + Capture<Call> declineCapture = createCapture(); + mesos.send(capture(declineCapture)); + expectLastCall().once(); + + control.replay(); + driverService.startAsync().awaitRunning(); + driverService.registered(new PubsubEvent.DriverRegistered()); + + driverService.declineOffer(OFFER_ID, FILTER); + + assertTrue(declineCapture.hasCaptured()); + Call decline = declineCapture.getValue(); + assertEquals(decline.getFrameworkId().getValue(), FRAMEWORK_ID); + assertEquals(decline.getType(), Call.Type.DECLINE); + assertEquals(decline.getDecline().getOfferIdsList(), ImmutableList.of(OFFER_ID)); + assertEquals(decline.getDecline().getFilters(), FILTER); + } + + @Test + public void testAccept() { + expectStart(); + expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID)); + + Capture<Call> acceptCapture = createCapture(); + mesos.send(capture(acceptCapture)); + expectLastCall().once(); + + control.replay(); + driverService.startAsync().awaitRunning(); + driverService.registered(new PubsubEvent.DriverRegistered()); + + driverService.acceptOffers(OFFER_ID, ImmutableList.of(), FILTER); + + assertTrue(acceptCapture.hasCaptured()); + Call accept = acceptCapture.getValue(); + assertEquals(accept.getFrameworkId().getValue(), FRAMEWORK_ID); + assertEquals(accept.getType(), Call.Type.ACCEPT); + assertEquals(accept.getAccept().getFilters(), FILTER); + assertEquals(accept.getAccept().getOfferIdsList(), ImmutableList.of(OFFER_ID)); + assertEquals(accept.getAccept().getOperationsList(), ImmutableList.of()); + } + + private void expectStart() { + storage.expectOperations(); + expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID)); + + Protos.FrameworkInfo.Builder builder = SETTINGS.getFrameworkInfo().toBuilder(); + builder.setId(Protos.FrameworkID.newBuilder().setValue(FRAMEWORK_ID)); + + expect(driverFactory.create( + scheduler, + builder.build(), + SETTINGS.getMasterUri(), + SETTINGS.getCredentials() + )).andReturn(mesos); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java index f2275c7..1421821 100644 --- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java +++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java @@ -67,6 +67,7 @@ import org.apache.shiro.subject.Subject; import org.junit.Test; import static org.apache.aurora.gen.ResponseCode.OK; +import static org.apache.aurora.scheduler.app.SchedulerMain.DriverKind.SCHEDULER_DRIVER; import static org.junit.Assert.assertEquals; public class ThriftIT extends EasyMockTest { @@ -97,7 +98,7 @@ public class ThriftIT extends EasyMockTest { install(new TierModule(TaskTestUtil.TIER_CONFIG)); bind(ExecutorSettings.class).toInstance(TestExecutorSettings.THERMOS_EXECUTOR); - install(new AppModule(configurationManagerSettings)); + install(new AppModule(configurationManagerSettings, SCHEDULER_DRIVER)); bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class);
