Repository: aurora Updated Branches: refs/heads/master 1c0086ffc -> fb0325065
http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java deleted file mode 100644 index 9a91e63..0000000 --- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerTest.java +++ /dev/null @@ -1,671 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.scheduling; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import javax.annotation.Nullable; - -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.RateLimiter; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.testing.easymock.EasyMockTest; -import com.twitter.common.util.BackoffStrategy; - -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.MaintenanceMode; -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.base.JobKeys; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.base.TaskTestUtil; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.events.PubsubEvent; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted; -import org.apache.aurora.scheduler.filter.AttributeAggregate; -import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; -import org.apache.aurora.scheduler.mesos.Driver; -import org.apache.aurora.scheduler.offers.OfferManager; -import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl; -import org.apache.aurora.scheduler.offers.OfferManager.OfferReturnDelay; -import org.apache.aurora.scheduler.offers.Offers; -import org.apache.aurora.scheduler.preemptor.BiCache; -import org.apache.aurora.scheduler.preemptor.Preemptor; -import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl; -import org.apache.aurora.scheduler.state.MaintenanceController; -import org.apache.aurora.scheduler.state.StateChangeResult; -import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.state.TaskAssigner; -import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.Storage.MutateWork; -import org.apache.aurora.scheduler.storage.Storage.StorageException; -import org.apache.aurora.scheduler.storage.TaskStore; -import org.apache.aurora.scheduler.storage.db.DbUtil; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import org.apache.mesos.Protos.SlaveID; -import org.apache.mesos.Protos.TaskID; -import org.apache.mesos.Protos.TaskInfo; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.IExpectationSetters; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.gen.MaintenanceMode.DRAINED; -import static org.apache.aurora.gen.MaintenanceMode.DRAINING; -import static org.apache.aurora.gen.MaintenanceMode.NONE; -import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED; -import static org.apache.aurora.gen.ScheduleStatus.FINISHED; -import static org.apache.aurora.gen.ScheduleStatus.INIT; -import static org.apache.aurora.gen.ScheduleStatus.KILLED; -import static org.apache.aurora.gen.ScheduleStatus.LOST; -import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.gen.ScheduleStatus.RUNNING; -import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; -import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; -import static org.apache.mesos.Protos.Offer; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.isA; -import static org.junit.Assert.assertEquals; - -/** - * TODO(wfarner): Break this test up to independently test TaskSchedulerImpl and OfferQueueImpl. - */ -public class TaskSchedulerTest extends EasyMockTest { - - private static final long FIRST_SCHEDULE_DELAY_MS = 1L; - - private static final HostOffer OFFER_A = makeOffer("OFFER_A", "HOST_A", NONE); - private static final HostOffer OFFER_B = makeOffer("OFFER_B", "HOST_B", SCHEDULED); - private static final HostOffer OFFER_C = makeOffer("OFFER_C", "HOST_C", DRAINING); - private static final HostOffer OFFER_D = makeOffer("OFFER_D", "HOST_D", DRAINED); - private static final String SLAVE_A = OFFER_A.getOffer().getSlaveId().getValue(); - private static final String SLAVE_B = OFFER_B.getOffer().getSlaveId().getValue(); - private static final String SLAVE_C = OFFER_C.getOffer().getSlaveId().getValue(); - - private Storage storage; - - private MaintenanceController maintenance; - private StateManager stateManager; - private TaskAssigner assigner; - private BackoffStrategy retryStrategy; - private Driver driver; - private ScheduledExecutorService executor; - private ScheduledFuture<?> future; - private OfferReturnDelay returnDelay; - private OfferManager offerManager; - private TaskGroups taskGroups; - private RescheduleCalculator rescheduleCalculator; - private Preemptor preemptor; - private BiCache<String, TaskGroupKey> reservations; - - @Before - public void setUp() { - storage = DbUtil.createStorage(); - maintenance = createMock(MaintenanceController.class); - stateManager = createMock(StateManager.class); - assigner = createMock(TaskAssigner.class); - retryStrategy = createMock(BackoffStrategy.class); - driver = createMock(Driver.class); - executor = createMock(ScheduledExecutorService.class); - future = createMock(ScheduledFuture.class); - returnDelay = createMock(OfferReturnDelay.class); - rescheduleCalculator = createMock(RescheduleCalculator.class); - preemptor = createMock(Preemptor.class); - reservations = createMock(new Clazz<BiCache<String, TaskGroupKey>>() { }); - } - - private void replayAndCreateScheduler() { - control.replay(); - offerManager = new OfferManagerImpl(driver, returnDelay, executor); - TaskScheduler scheduler = new TaskSchedulerImpl(storage, - stateManager, - assigner, - offerManager, - preemptor, - reservations); - taskGroups = new TaskGroups( - executor, - Amount.of(FIRST_SCHEDULE_DELAY_MS, Time.MILLISECONDS), - retryStrategy, - RateLimiter.create(100), - scheduler, - rescheduleCalculator); - } - - private Capture<Runnable> expectOffer() { - return expectOfferDeclineIn(10); - } - - private Capture<Runnable> expectOfferDeclineIn(long delayMillis) { - expect(returnDelay.get()).andReturn(Amount.of(delayMillis, Time.MILLISECONDS)); - Capture<Runnable> runnable = createCapture(); - executor.schedule(capture(runnable), eq(delayMillis), eq(TimeUnit.MILLISECONDS)); - expectLastCall().andReturn(createMock(ScheduledFuture.class)); - return runnable; - } - - private void changeState( - IScheduledTask task, - ScheduleStatus oldState, - ScheduleStatus newState) { - - final IScheduledTask copy = IScheduledTask.build(task.newBuilder().setStatus(newState)); - // Insert the task if it doesn't already exist. - storage.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore(); - if (Iterables.isEmpty(taskStore.fetchTasks(Query.taskScoped(Tasks.id(copy))))) { - taskStore.saveTasks(ImmutableSet.of(copy)); - } - } - }); - taskGroups.taskChangedState(TaskStateChange.transition(copy, oldState)); - } - - private Capture<Runnable> expectTaskRetryIn(long penaltyMs) { - Capture<Runnable> capture = createCapture(); - executor.schedule( - capture(capture), - eq(penaltyMs), - eq(TimeUnit.MILLISECONDS)); - expectLastCall().andReturn(future); - return capture; - } - - private Capture<Runnable> expectTaskGroupBackoff(long previousPenaltyMs, long nextPenaltyMs) { - expect(retryStrategy.calculateBackoffMs(previousPenaltyMs)).andReturn(nextPenaltyMs); - return expectTaskRetryIn(nextPenaltyMs); - } - - @Test - public void testNoTasks() { - expectAnyMaintenanceCalls(); - expectOfferDeclineIn(10); - expectOfferDeclineIn(10); - - replayAndCreateScheduler(); - - offerManager.addOffer(OFFER_A); - offerManager.addOffer(OFFER_B); - } - - @Test - public void testNoOffers() { - Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); - IScheduledTask task = createTask("a"); - expectPreemptorCall(task.getAssignedTask()); - expectReservationCheck(task); - - replayAndCreateScheduler(); - - changeState(task, INIT, PENDING); - timeoutCapture.getValue().run(); - } - - private IScheduledTask createTask(String taskId) { - return createTask(taskId, null); - } - - private IScheduledTask createTask(String taskId, @Nullable ScheduleStatus status) { - return setStatus(makeTask(taskId, TaskTestUtil.JOB), status); - } - - private IScheduledTask setStatus(IScheduledTask task, @Nullable ScheduleStatus status) { - return IScheduledTask.build(task.newBuilder().setStatus(status)); - } - - @Test - public void testLoadFromStorage() { - final IScheduledTask a = createTask("a", KILLED); - final IScheduledTask b = createTask("b", PENDING); - final IScheduledTask c = createTask("c", RUNNING); - - expect(rescheduleCalculator.getStartupScheduleDelayMs(b)).andReturn(10L); - expectTaskRetryIn(10); - - replayAndCreateScheduler(); - - storage.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider store) { - store.getUnsafeTaskStore().saveTasks(ImmutableSet.of(a, b, c)); - } - }); - for (IScheduledTask task : ImmutableList.of(a, b, c)) { - taskGroups.taskChangedState(TaskStateChange.initialized(task)); - } - changeState(c, RUNNING, FINISHED); - } - - @Test - public void testTaskMissing() { - Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - - replayAndCreateScheduler(); - - taskGroups.taskChangedState(TaskStateChange.transition(createTask("a", PENDING), INIT)); - timeoutCapture.getValue().run(); - } - - private IExpectationSetters<Assignment> expectMaybeAssign( - HostOffer offer, - IScheduledTask task, - AttributeAggregate jobAggregate) { - - return expect(assigner.maybeAssign( - EasyMock.anyObject(), - eq(offer), - eq(new ResourceRequest(task.getAssignedTask().getTask(), jobAggregate)), - eq(Tasks.id(task)))); - } - - private IExpectationSetters<?> expectNoReservation(String slaveId) { - return expect(reservations.get(slaveId)).andReturn(Optional.absent()); - } - - private IExpectationSetters<?> expectReservationCheck(IScheduledTask task) { - return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask()))) - .andReturn(ImmutableSet.of()); - } - - @Test - public void testTaskAssigned() { - expectAnyMaintenanceCalls(); - expectOfferDeclineIn(10); - - IScheduledTask taskA = createTask("a", PENDING); - TaskInfo mesosTask = makeTaskInfo(taskA); - - Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - expectNoReservation(SLAVE_A).times(2); - expectReservationCheck(taskA); - expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.failure()); - expectPreemptorCall(taskA.getAssignedTask()); - - Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); - expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTask)); - driver.launchTask(OFFER_A.getOffer().getId(), mesosTask); - - Capture<Runnable> timeoutCapture3 = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); - IScheduledTask taskB = createTask("b"); - expectReservationCheck(taskB); - expectPreemptorCall(taskB.getAssignedTask()); - - replayAndCreateScheduler(); - - offerManager.addOffer(OFFER_A); - changeState(taskA, INIT, PENDING); - timeoutCapture.getValue().run(); - timeoutCapture2.getValue().run(); - - // Ensure the offer was consumed. - changeState(taskB, INIT, PENDING); - timeoutCapture3.getValue().run(); - } - - @Test - public void testDriverNotReady() { - IScheduledTask task = createTask("a", PENDING); - TaskInfo mesosTask = TaskInfo.newBuilder() - .setName(Tasks.id(task)) - .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task))) - .setSlaveId(SlaveID.newBuilder().setValue("slaveId")) - .build(); - - Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - expectAnyMaintenanceCalls(); - expectOfferDeclineIn(10); - expectNoReservation(SLAVE_A); - expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask)); - driver.launchTask(OFFER_A.getOffer().getId(), mesosTask); - expectLastCall().andThrow(new IllegalStateException("Driver not ready.")); - expect(stateManager.changeState( - EasyMock.anyObject(), - eq("a"), - eq(Optional.of(PENDING)), - eq(LOST), - eq(TaskSchedulerImpl.LAUNCH_FAILED_MSG))) - .andReturn(StateChangeResult.SUCCESS); - - replayAndCreateScheduler(); - - changeState(task, INIT, PENDING); - offerManager.addOffer(OFFER_A); - timeoutCapture.getValue().run(); - } - - @Test - public void testStorageException() { - IScheduledTask task = createTask("a", PENDING); - TaskInfo mesosTask = TaskInfo.newBuilder() - .setName(Tasks.id(task)) - .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task))) - .setSlaveId(SlaveID.newBuilder().setValue("slaveId")) - .build(); - - Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - expectAnyMaintenanceCalls(); - expectOfferDeclineIn(10); - expectNoReservation(SLAVE_A).times(2); - expectMaybeAssign(OFFER_A, task, EMPTY).andThrow(new StorageException("Injected failure.")); - - Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); - expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask)); - driver.launchTask(OFFER_A.getOffer().getId(), mesosTask); - expectLastCall(); - - replayAndCreateScheduler(); - - changeState(task, INIT, PENDING); - offerManager.addOffer(OFFER_A); - timeoutCapture.getValue().run(); - timeoutCapture2.getValue().run(); - } - - @Test - public void testExpiration() { - IScheduledTask task = createTask("a", PENDING); - - Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10); - expectAnyMaintenanceCalls(); - expectNoReservation(SLAVE_A); - expectReservationCheck(task).times(2); - expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure()); - Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); - expectPreemptorCall(task.getAssignedTask()); - driver.declineOffer(OFFER_A.getOffer().getId()); - expectTaskGroupBackoff(10, 20); - expectPreemptorCall(task.getAssignedTask()); - - replayAndCreateScheduler(); - - changeState(task, INIT, PENDING); - offerManager.addOffer(OFFER_A); - timeoutCapture.getValue().run(); - offerExpirationCapture.getValue().run(); - timeoutCapture2.getValue().run(); - } - - @Test - public void testOneOfferPerSlave() { - expectAnyMaintenanceCalls(); - Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10); - - HostOffer offerAB = new HostOffer( - Offers.makeOffer("OFFER_B").toBuilder().setSlaveId(OFFER_A.getOffer().getSlaveId()).build(), - IHostAttributes.build(new HostAttributes())); - - driver.declineOffer(OFFER_A.getOffer().getId()); - driver.declineOffer(offerAB.getOffer().getId()); - - replayAndCreateScheduler(); - - offerManager.addOffer(OFFER_A); - offerManager.addOffer(offerAB); - offerExpirationCapture.getValue().run(); - } - - @Test - public void testDontDeclineAcceptedOffer() throws OfferManager.LaunchException { - expectAnyMaintenanceCalls(); - Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10); - - Function<HostOffer, Assignment> offerAcceptor = - createMock(new Clazz<Function<HostOffer, Assignment>>() { }); - final TaskInfo taskInfo = TaskInfo.getDefaultInstance(); - expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(taskInfo)); - driver.launchTask(OFFER_A.getOffer().getId(), taskInfo); - - replayAndCreateScheduler(); - - offerManager.addOffer(OFFER_A); - offerManager.launchFirst(offerAcceptor, TaskGroupKey.from(ITaskConfig.build(new TaskConfig()))); - offerExpirationCapture.getValue().run(); - } - - @Test - public void testBasicMaintenancePreferences() { - expectOffer(); - expectOffer(); - expectOffer(); - expectOffer(); - - IScheduledTask taskA = createTask("A", PENDING); - TaskInfo mesosTaskA = makeTaskInfo(taskA); - expectNoReservation(SLAVE_A); - expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA)); - driver.launchTask(OFFER_A.getOffer().getId(), mesosTaskA); - Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - - IScheduledTask taskB = createTask("B", PENDING); - TaskInfo mesosTaskB = makeTaskInfo(taskB); - expectNoReservation(SLAVE_B); - expectMaybeAssign(OFFER_B, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB)); - driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskB); - Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - - replayAndCreateScheduler(); - - offerManager.addOffer(OFFER_D); - offerManager.addOffer(OFFER_C); - offerManager.addOffer(OFFER_B); - offerManager.addOffer(OFFER_A); - - changeState(taskA, INIT, PENDING); - captureA.getValue().run(); - - changeState(taskB, INIT, PENDING); - captureB.getValue().run(); - } - - @Test - public void testChangingMaintenancePreferences() { - expectOffer(); - expectOffer(); - expectOffer(); - - IScheduledTask taskA = createTask("A", PENDING); - TaskInfo mesosTaskA = makeTaskInfo(taskA); - expectNoReservation(SLAVE_B); - expectMaybeAssign(OFFER_B, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA)); - driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskA); - Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - - IScheduledTask taskB = createTask("B", PENDING); - TaskInfo mesosTaskB = makeTaskInfo(taskB); - HostOffer updatedOfferC = new HostOffer( - OFFER_C.getOffer(), - IHostAttributes.build(OFFER_C.getAttributes().newBuilder().setMode(NONE))); - expectNoReservation(SLAVE_C); - expectMaybeAssign(updatedOfferC, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB)); - driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB); - Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - - replayAndCreateScheduler(); - - offerManager.addOffer(OFFER_A); - offerManager.addOffer(OFFER_B); - offerManager.addOffer(OFFER_C); - - // Initially, we'd expect the offers to be consumed in order (A, B), with (C) unschedulable - - // Expected order now (B), with (C, A) unschedulable - changeHostMaintenanceState(OFFER_A.getAttributes(), DRAINING); - changeState(taskA, INIT, PENDING); - captureA.getValue().run(); - - // Expected order now (C), with (A) unschedulable and (B) already consumed - changeHostMaintenanceState(OFFER_C.getAttributes(), NONE); - changeState(taskB, INIT, PENDING); - captureB.getValue().run(); - } - - private Capture<String> expectTaskScheduled(IScheduledTask task) { - TaskInfo mesosTask = makeTaskInfo(task); - Capture<String> taskId = createCapture(); - expect(assigner.maybeAssign( - EasyMock.anyObject(), - EasyMock.anyObject(), - EasyMock.anyObject(), - capture(taskId))).andReturn(Assignment.success(mesosTask)); - driver.launchTask(EasyMock.anyObject(), eq(mesosTask)); - return taskId; - } - - @Test - public void testResistsStarvation() { - // TODO(wfarner): This test requires intimate knowledge of the way futures are used inside - // TaskScheduler. It's time to test using a real ScheduledExecutorService. - - expectAnyMaintenanceCalls(); - - IScheduledTask jobA0 = setStatus(makeTask("a0", JobKeys.from("a", "b", "c")), PENDING); - - ScheduledTask jobA1Builder = jobA0.newBuilder(); - jobA1Builder.getAssignedTask().setTaskId("a1"); - jobA1Builder.getAssignedTask().setInstanceId(1); - IScheduledTask jobA1 = IScheduledTask.build(jobA1Builder); - - ScheduledTask jobA2Builder = jobA0.newBuilder(); - jobA2Builder.getAssignedTask().setTaskId("a2"); - jobA2Builder.getAssignedTask().setInstanceId(2); - IScheduledTask jobA2 = IScheduledTask.build(jobA2Builder); - - IScheduledTask jobB0 = setStatus(makeTask("b0", JobKeys.from("d", "e", "f")), PENDING); - - expectNoReservation(SLAVE_A); - expectNoReservation(SLAVE_B); - - expectOfferDeclineIn(10); - expectOfferDeclineIn(10); - expectOfferDeclineIn(10); - expectOfferDeclineIn(10); - - Capture<Runnable> timeoutA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - Capture<Runnable> timeoutB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - - Capture<String> firstScheduled = expectTaskScheduled(jobA0); - Capture<String> secondScheduled = expectTaskScheduled(jobB0); - - // Expect another watch of the task group for job A. - expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - - replayAndCreateScheduler(); - - offerManager.addOffer(OFFER_A); - offerManager.addOffer(OFFER_B); - offerManager.addOffer(OFFER_C); - offerManager.addOffer(OFFER_D); - changeState(jobA0, INIT, PENDING); - changeState(jobA1, INIT, PENDING); - changeState(jobA2, INIT, PENDING); - changeState(jobB0, INIT, PENDING); - timeoutA.getValue().run(); - timeoutB.getValue().run(); - assertEquals( - ImmutableSet.of(Tasks.id(jobA0), Tasks.id(jobB0)), - ImmutableSet.of(firstScheduled.getValue(), secondScheduled.getValue())); - } - - @Test - public void testTaskDeleted() { - expectAnyMaintenanceCalls(); - expectOfferDeclineIn(10); - - final IScheduledTask task = createTask("a", PENDING); - - Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - expectNoReservation(SLAVE_A); - expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure()); - expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 20); - expectReservationCheck(task); - expectPreemptorCall(task.getAssignedTask()); - - replayAndCreateScheduler(); - - offerManager.addOffer(OFFER_A); - changeState(task, INIT, PENDING); - timeoutCapture.getValue().run(); - - // Ensure the offer was consumed. - changeState(task, INIT, PENDING); - storage.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - storeProvider.getUnsafeTaskStore().deleteTasks(Tasks.ids(task)); - } - }); - taskGroups.tasksDeleted(new TasksDeleted(ImmutableSet.of(task))); - timeoutCapture.getValue().run(); - } - - private TaskInfo makeTaskInfo(IScheduledTask task) { - return TaskInfo.newBuilder() - .setName(Tasks.id(task)) - .setTaskId(TaskID.newBuilder().setValue(Tasks.id(task))) - .setSlaveId(SlaveID.newBuilder().setValue("slave-id" + task.toString())) - .build(); - } - - private void expectAnyMaintenanceCalls() { - expect(maintenance.getMode(isA(String.class))).andReturn(NONE).anyTimes(); - } - - private void changeHostMaintenanceState(IHostAttributes attributes, MaintenanceMode mode) { - offerManager.hostAttributesChanged(new PubsubEvent.HostAttributesChanged( - IHostAttributes.build(attributes.newBuilder().setMode(mode)))); - } - - private static HostOffer makeOffer(String offerId, String hostName, MaintenanceMode mode) { - Offer offer = Offers.makeOffer(offerId, hostName); - return new HostOffer( - offer, - IHostAttributes.build(new HostAttributes() - .setHost(hostName) - .setSlaveId(offer.getSlaveId().getValue()) - .setAttributes(ImmutableSet.of()) - .setMode(mode))); - } - - private void expectPreemptorCall(IAssignedTask task) { - expect(preemptor.attemptPreemptionFor( - eq(task), - eq(EMPTY), - EasyMock.anyObject())).andReturn(Optional.absent()); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/fb032506/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java index f98818f..c9c6f5d 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java @@ -13,6 +13,7 @@ */ package org.apache.aurora.scheduler.state; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.twitter.common.testing.easymock.EasyMockTest; @@ -20,17 +21,19 @@ import com.twitter.common.testing.easymock.EasyMockTest; import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.gen.ExecutorConfig; import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.JobKey; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.ResourceSlot; +import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import org.apache.aurora.scheduler.mesos.MesosTaskFactory; -import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; +import org.apache.aurora.scheduler.offers.OfferManager; import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -46,11 +49,16 @@ import org.apache.mesos.Protos.Value.Type; import org.junit.Before; import org.junit.Test; +import static org.apache.aurora.gen.ScheduleStatus.LOST; +import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; +import static org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl.LAUNCH_FAILED_MSG; import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import static org.apache.mesos.Protos.Offer; import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertEquals; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class TaskAssignerImplTest extends EasyMockTest { @@ -74,8 +82,10 @@ public class TaskAssignerImplTest extends EasyMockTest { .setAssignedTask(new AssignedTask() .setTaskId("taskId") .setTask(new TaskConfig() + .setJob(new JobKey("r", "e", "n")) .setExecutorConfig(new ExecutorConfig().setData("opaque data")) .setRequestedPorts(ImmutableSet.of(PORT_NAME))))); + private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask()); private static final TaskInfo TASK_INFO = TaskInfo.newBuilder() .setName("taskName") .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK))) @@ -86,19 +96,23 @@ public class TaskAssignerImplTest extends EasyMockTest { private StateManager stateManager; private SchedulingFilter filter; private MesosTaskFactory taskFactory; + private OfferManager offerManager; private TaskAssigner assigner; @Before public void setUp() throws Exception { storeProvider = createMock(MutableStoreProvider.class); - stateManager = createMock(StateManager.class); filter = createMock(SchedulingFilter.class); taskFactory = createMock(MesosTaskFactory.class); - assigner = new TaskAssignerImpl(stateManager, filter, taskFactory); + stateManager = createMock(StateManager.class); + offerManager = createMock(OfferManager.class); + assigner = new TaskAssignerImpl(stateManager, filter, taskFactory, offerManager); } @Test - public void testAssignNoVetoes() { + public void testAssignNoVetoes() throws Exception { + expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); + offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); expect(filter.filter( new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()), new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY))) @@ -115,17 +129,18 @@ public class TaskAssignerImplTest extends EasyMockTest { control.replay(); - assertEquals( - Assignment.success(TASK_INFO), - assigner.maybeAssign( - storeProvider, - OFFER, - new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY), - Tasks.id(TASK))); + assertTrue(assigner.maybeAssign( + storeProvider, + new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY), + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + Tasks.id(TASK), + Optional.of(MESOS_OFFER.getSlaveId().getValue()))); } @Test - public void testAssignVetoes() { + public void testAssignVetoesWithStaticBan() throws Exception { + expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); + offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY); expect(filter.filter( new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()), new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY))) @@ -133,12 +148,79 @@ public class TaskAssignerImplTest extends EasyMockTest { control.replay(); - assertEquals( - Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied"))), - assigner.maybeAssign( - storeProvider, - OFFER, - new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY), - Tasks.id(TASK))); + assertFalse(assigner.maybeAssign( + storeProvider, + new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY), + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + Tasks.id(TASK), + Optional.<String>absent())); + } + + @Test + public void testAssignVetoesWithNoStaticBan() throws Exception { + expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); + expect(filter.filter( + new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()), + new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY))) + .andReturn(ImmutableSet.of(Veto.unsatisfiedLimit("limit"))); + + control.replay(); + + assertFalse(assigner.maybeAssign( + storeProvider, + new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY), + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + Tasks.id(TASK), + Optional.<String>absent())); + } + + @Test + public void testAssignmentClearedOnError() throws Exception { + expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); + offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); + expectLastCall().andThrow(new OfferManager.LaunchException("expected")); + expect(filter.filter( + new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()), + new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY))) + .andReturn(ImmutableSet.of()); + expect(stateManager.assignTask( + storeProvider, + Tasks.id(TASK), + MESOS_OFFER.getHostname(), + MESOS_OFFER.getSlaveId(), + ImmutableMap.of(PORT_NAME, PORT))) + .andReturn(TASK.getAssignedTask()); + expect(stateManager.changeState( + storeProvider, + Tasks.id(TASK), + Optional.of(PENDING), + LOST, + LAUNCH_FAILED_MSG)) + .andReturn(StateChangeResult.SUCCESS); + expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER.getSlaveId())) + .andReturn(TASK_INFO); + + control.replay(); + + assertFalse(assigner.maybeAssign( + storeProvider, + new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY), + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + Tasks.id(TASK), + Optional.<String>absent())); + } + + @Test + public void testAssignmentSkippedForReservedSlave() throws Exception { + expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); + + control.replay(); + + assertFalse(assigner.maybeAssign( + storeProvider, + new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY), + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + Tasks.id(TASK), + Optional.of("invalid"))); } }
