http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/preemptor/ClusterStateImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/ClusterStateImplTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/ClusterStateImplTest.java new file mode 100644 index 0000000..a1ac922 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/ClusterStateImplTest.java @@ -0,0 +1,133 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMultimap; + +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.JobKey; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED; +import static org.apache.aurora.gen.ScheduleStatus.FAILED; +import static org.apache.aurora.gen.ScheduleStatus.FINISHED; +import static org.apache.aurora.gen.ScheduleStatus.KILLED; +import static org.apache.aurora.gen.ScheduleStatus.KILLING; +import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.gen.ScheduleStatus.THROTTLED; +import static org.junit.Assert.assertEquals; + +public class ClusterStateImplTest { + + private ClusterStateImpl state; + + @Before + public void setUp() { + state = new ClusterStateImpl(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testImmutable() { + state.getSlavesToActiveTasks().clear(); + } + + @Test + public void testTaskLifecycle() { + IAssignedTask a = makeTask("a", "s1"); + + assertVictims(); + changeState(a, THROTTLED); + assertVictims(); + changeState(a, PENDING); + assertVictims(); + changeState(a, ASSIGNED); + assertVictims(a); + changeState(a, RUNNING); + assertVictims(a); + changeState(a, KILLING); + assertVictims(a); + changeState(a, FINISHED); + assertVictims(); + } + + @Test + public void testTaskChangesSlaves() { + // We do not intend to handle the case of an external failure leading to the same task ID + // on a different slave. + IAssignedTask a = makeTask("a", "s1"); + IAssignedTask a1 = makeTask("a", "s2"); + changeState(a, RUNNING); + changeState(a1, RUNNING); + assertVictims(a, a1); + } + + @Test + public void testMultipleTasks() { + IAssignedTask a = makeTask("a", "s1"); + IAssignedTask b = makeTask("b", "s1"); + IAssignedTask c = makeTask("c", "s2"); + IAssignedTask d = makeTask("d", "s3"); + IAssignedTask e = makeTask("e", "s3"); + IAssignedTask f = makeTask("f", "s1"); + changeState(a, RUNNING); + assertVictims(a); + changeState(b, RUNNING); + assertVictims(a, b); + changeState(c, RUNNING); + assertVictims(a, b, c); + changeState(d, RUNNING); + assertVictims(a, b, c, d); + changeState(e, RUNNING); + assertVictims(a, b, c, d, e); + changeState(c, FINISHED); + assertVictims(a, b, d, e); + changeState(a, FAILED); + changeState(e, KILLED); + assertVictims(b, d); + changeState(f, RUNNING); + assertVictims(b, d, f); + } + + private void assertVictims(IAssignedTask... tasks) { + ImmutableMultimap.Builder<String, PreemptionVictim> victims = ImmutableMultimap.builder(); + for (IAssignedTask task : tasks) { + victims.put(task.getSlaveId(), PreemptionVictim.fromTask(task)); + } + assertEquals(HashMultimap.create(victims.build()), state.getSlavesToActiveTasks()); + } + + private IAssignedTask makeTask(String taskId, String slaveId) { + return IAssignedTask.build(new AssignedTask() + .setTaskId(taskId) + .setSlaveId(slaveId) + .setSlaveHost(slaveId + "host") + .setTask(new TaskConfig().setJob(new JobKey("role", "env", "job")))); + } + + private void changeState(IAssignedTask assignedTask, ScheduleStatus status) { + IScheduledTask task = IScheduledTask.build(new ScheduledTask() + .setStatus(status) + .setAssignedTask(assignedTask.newBuilder())); + state.taskChangedState(TaskStateChange.transition(task, ScheduleStatus.INIT)); + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java new file mode 100644 index 0000000..b9cb5bf --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java @@ -0,0 +1,285 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import java.util.Arrays; + +import javax.annotation.Nullable; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.util.testing.FakeClock; + +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.JobKey; +import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.gen.TaskEvent; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.stats.CachedCounters; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.apache.mesos.Protos; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.TASK_PROCESSOR_RUN_NAME; +import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.attemptsStatName; +import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.slotSearchStatName; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; + +public class PendingTaskProcessorTest extends EasyMockTest { + private static final String CACHE_STAT = "cache_size"; + private static final String SLAVE_ID_1 = "slave_id_1"; + private static final String SLAVE_ID_2 = "slave_id_2"; + private static final JobKey JOB_A = new JobKey("role_a", "env", "job_a"); + private static final JobKey JOB_B = new JobKey("role_b", "env", "job_b"); + private static final IScheduledTask TASK_A = makeTask(JOB_A, SLAVE_ID_1, "id1"); + private static final IScheduledTask TASK_B = makeTask(JOB_B, SLAVE_ID_2, "id2"); + private static final PreemptionProposal SLOT_A = createPreemptionProposal(TASK_A, SLAVE_ID_1); + private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS); + private static final Amount<Long, Time> EXPIRATION = Amount.of(10L, Time.MINUTES); + + private StorageTestUtil storageUtil; + private OfferManager offerManager; + private FakeStatsProvider statsProvider; + private PreemptionVictimFilter preemptionVictimFilter; + private PendingTaskProcessor slotFinder; + private BiCache<PreemptionProposal, TaskGroupKey> slotCache; + private ClusterState clusterState; + private FakeClock clock; + + @Before + public void setUp() { + storageUtil = new StorageTestUtil(this); + storageUtil.expectOperations(); + offerManager = createMock(OfferManager.class); + preemptionVictimFilter = createMock(PreemptionVictimFilter.class); + statsProvider = new FakeStatsProvider(); + clusterState = createMock(ClusterState.class); + clock = new FakeClock(); + slotCache = new BiCache<>( + statsProvider, + new BiCache.BiCacheSettings(EXPIRATION, CACHE_STAT), + clock); + + slotFinder = new PendingTaskProcessor( + storageUtil.storage, + offerManager, + preemptionVictimFilter, + new PreemptorMetrics(new CachedCounters(statsProvider)), + PREEMPTION_DELAY, + slotCache, + clusterState, + clock); + } + + @Test + public void testSearchSlotSuccessful() throws Exception { + expectGetPendingTasks(TASK_A, TASK_B); + expectGetClusterState(TASK_A, TASK_B); + HostOffer offer1 = makeOffer(SLAVE_ID_1); + HostOffer offer2 = makeOffer(SLAVE_ID_2); + expectOffers(offer1, offer2); + expectSlotSearch(TASK_A.getAssignedTask().getTask(), TASK_A); + expectSlotSearch(TASK_B.getAssignedTask().getTask(), TASK_B); + + control.replay(); + + clock.advance(PREEMPTION_DELAY); + + slotFinder.run(); + assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); + assertEquals(2L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, true))); + assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true))); + assertEquals(2L, statsProvider.getLongValue(CACHE_STAT)); + } + + @Test + public void testSearchSlotFailed() throws Exception { + expectGetPendingTasks(TASK_A); + expectGetClusterState(TASK_A); + HostOffer offer1 = makeOffer(SLAVE_ID_1); + expectOffers(offer1); + expectSlotSearch(TASK_A.getAssignedTask().getTask()); + + control.replay(); + + clock.advance(PREEMPTION_DELAY); + + slotFinder.run(); + assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); + assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true))); + assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(false, true))); + } + + @Test + public void testHasCachedSlots() throws Exception { + slotCache.put(SLOT_A, group(TASK_A)); + expectGetPendingTasks(TASK_A); + expectGetClusterState(TASK_A); + HostOffer offer1 = makeOffer(SLAVE_ID_1); + expectOffers(offer1); + + control.replay(); + + clock.advance(PREEMPTION_DELAY); + + slotFinder.run(); + assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true))); + assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true))); + } + + @Test + public void testMultipleTaskGroups() throws Exception { + IScheduledTask task1 = makeTask(JOB_A, "1"); + IScheduledTask task2 = makeTask(JOB_A, "2"); + IScheduledTask task3 = makeTask(JOB_A, "3"); + IScheduledTask task4 = makeTask(JOB_B, "4"); + IScheduledTask task5 = makeTask(JOB_B, "5"); + + expectGetPendingTasks(task1, task4, task2, task5, task3); + expectGetClusterState(TASK_A, TASK_B); + + HostOffer offer1 = makeOffer(SLAVE_ID_1); + HostOffer offer2 = makeOffer(SLAVE_ID_2); + expectOffers(offer1, offer2); + expectSlotSearch(task1.getAssignedTask().getTask()); + expectSlotSearch(task4.getAssignedTask().getTask(), TASK_B); + PreemptionProposal proposal1 = createPreemptionProposal(TASK_B, SLAVE_ID_1); + PreemptionProposal proposal2 = createPreemptionProposal(TASK_B, SLAVE_ID_2); + + control.replay(); + + clock.advance(PREEMPTION_DELAY); + + slotFinder.run(); + assertEquals(slotCache.get(proposal1), Optional.of(group(task4))); + assertEquals(slotCache.get(proposal2), Optional.of(group(task5))); + assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); + assertEquals(3L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, true))); + assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(false, true))); + assertEquals(2L, statsProvider.getLongValue(CACHE_STAT)); + } + + @Test + public void testNoVictims() throws Exception { + expectGetClusterState(); + control.replay(); + + clock.advance(PREEMPTION_DELAY); + + slotFinder.run(); + assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true))); + assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true))); + } + + private Multimap<String, PreemptionVictim> getVictims(IScheduledTask... tasks) { + return Multimaps.transformValues( + Multimaps.index(Arrays.asList(tasks), task -> task.getAssignedTask().getSlaveId()), + task -> PreemptionVictim.fromTask(task.getAssignedTask()) + ); + } + + private HostOffer makeOffer(String slaveId) { + Protos.Offer.Builder builder = Protos.Offer.newBuilder(); + builder.getIdBuilder().setValue("id"); + builder.getFrameworkIdBuilder().setValue("framework-id"); + builder.getSlaveIdBuilder().setValue(slaveId); + builder.setHostname(slaveId); + return new HostOffer( + builder.build(), + IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE))); + } + + private void expectOffers(HostOffer... offers) { + expect(offerManager.getOffers()).andReturn(ImmutableSet.copyOf(offers)); + } + + private void expectGetClusterState(IScheduledTask... returnedTasks) { + expect(clusterState.getSlavesToActiveTasks()).andReturn(getVictims(returnedTasks)); + } + + private void expectSlotSearch(ITaskConfig config, IScheduledTask... victims) { + expect(preemptionVictimFilter.filterPreemptionVictims( + eq(config), + EasyMock.anyObject(), + anyObject(AttributeAggregate.class), + EasyMock.anyObject(), + eq(storageUtil.storeProvider))); + expectLastCall().andReturn( + victims.length == 0 + ? Optional.absent() + : Optional.of(ImmutableSet.copyOf(getVictims(victims).values()))) + .anyTimes(); + } + + private static PreemptionProposal createPreemptionProposal(IScheduledTask task, String slaveId) { + return new PreemptionProposal( + ImmutableSet.of(PreemptionVictim.fromTask(task.getAssignedTask())), + slaveId); + } + + private static IScheduledTask makeTask(JobKey key, String taskId) { + return makeTask(key, null, taskId); + } + + private static TaskGroupKey group(IScheduledTask task) { + return TaskGroupKey.from(task.getAssignedTask().getTask()); + } + + private static IScheduledTask makeTask(JobKey key, @Nullable String slaveId, String taskId) { + ScheduledTask task = new ScheduledTask() + .setAssignedTask(new AssignedTask() + .setSlaveId(slaveId) + .setTaskId(taskId) + .setTask(new TaskConfig() + .setPriority(1) + .setProduction(true) + .setJob(key))); + task.addToTaskEvents(new TaskEvent(0, PENDING)); + return IScheduledTask.build(task); + } + + private void expectGetPendingTasks(IScheduledTask... returnedTasks) { + storageUtil.expectTaskFetch(Query.statusScoped(PENDING), ImmutableSet.copyOf(returnedTasks)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java new file mode 100644 index 0000000..997d326 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java @@ -0,0 +1,512 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Data; +import com.twitter.common.testing.easymock.EasyMockTest; + +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.Attribute; +import org.apache.aurora.gen.Constraint; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.JobKey; +import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.gen.TaskEvent; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.configuration.Resources; +import org.apache.aurora.scheduler.filter.SchedulingFilter; +import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; +import org.apache.aurora.scheduler.filter.SchedulingFilterImpl; +import org.apache.aurora.scheduler.mesos.TaskExecutors; +import org.apache.aurora.scheduler.stats.CachedCounters; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.easymock.IExpectationSetters; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.MaintenanceMode.NONE; +import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; +import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME; +import static org.apache.mesos.Protos.Offer; +import static org.apache.mesos.Protos.Resource; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; + +public class PreemptionVictimFilterTest extends EasyMockTest { + private static final String USER_A = "user_a"; + private static final String USER_B = "user_b"; + private static final String USER_C = "user_c"; + private static final String JOB_A = "job_a"; + private static final String JOB_B = "job_b"; + private static final String JOB_C = "job_c"; + private static final String TASK_ID_A = "task_a"; + private static final String TASK_ID_B = "task_b"; + private static final String TASK_ID_C = "task_c"; + private static final String TASK_ID_D = "task_d"; + private static final String HOST = "host"; + private static final String RACK = "rack"; + private static final String SLAVE_ID = HOST + "_id"; + private static final String RACK_ATTRIBUTE = "rack"; + private static final String HOST_ATTRIBUTE = "host"; + private static final String OFFER = "offer"; + private static final Optional<HostOffer> NO_OFFER = Optional.absent(); + + private StorageTestUtil storageUtil; + private SchedulingFilter schedulingFilter; + private FakeStatsProvider statsProvider; + private PreemptorMetrics preemptorMetrics; + + @Before + public void setUp() { + storageUtil = new StorageTestUtil(this); + storageUtil.expectOperations(); + statsProvider = new FakeStatsProvider(); + preemptorMetrics = new PreemptorMetrics(new CachedCounters(statsProvider)); + } + + private Optional<ImmutableSet<PreemptionVictim>> runFilter( + ScheduledTask pendingTask, + Optional<HostOffer> offer, + ScheduledTask... victims) { + + PreemptionVictimFilter.PreemptionVictimFilterImpl filter = + new PreemptionVictimFilter.PreemptionVictimFilterImpl( + schedulingFilter, + TaskExecutors.NO_OVERHEAD_EXECUTOR, + preemptorMetrics); + + return filter.filterPreemptionVictims( + ITaskConfig.build(pendingTask.getAssignedTask().getTask()), + preemptionVictims(victims), + EMPTY, + offer, + storageUtil.mutableStoreProvider); + } + + @Test + public void testPreempted() throws Exception { + setUpHost(); + + schedulingFilter = createMock(SchedulingFilter.class); + ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A); + assignToHost(lowPriority); + + ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100); + + expectFiltering(); + + control.replay(); + assertVictims(runFilter(highPriority, NO_OFFER, lowPriority), lowPriority); + } + + @Test + public void testLowestPriorityPreempted() throws Exception { + setUpHost(); + + schedulingFilter = createMock(SchedulingFilter.class); + ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10); + assignToHost(lowPriority); + + ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 1); + assignToHost(lowerPriority); + + ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 100); + + expectFiltering(); + + control.replay(); + assertVictims(runFilter(highPriority, NO_OFFER, lowerPriority), lowerPriority); + } + + @Test + public void testOnePreemptableTask() throws Exception { + setUpHost(); + + schedulingFilter = createMock(SchedulingFilter.class); + ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100); + assignToHost(highPriority); + + ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 99); + assignToHost(lowerPriority); + + ScheduledTask lowestPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 1); + assignToHost(lowestPriority); + + ScheduledTask pendingPriority = makeTask(USER_A, JOB_A, TASK_ID_D, 98); + + expectFiltering(); + + control.replay(); + assertVictims( + runFilter(pendingPriority, NO_OFFER, highPriority, lowerPriority, lowestPriority), + lowestPriority); + } + + @Test + public void testHigherPriorityRunning() throws Exception { + schedulingFilter = createMock(SchedulingFilter.class); + ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100); + assignToHost(highPriority); + + ScheduledTask task = makeTask(USER_A, JOB_A, TASK_ID_A); + + control.replay(); + assertNoVictims(runFilter(task, NO_OFFER, highPriority)); + } + + @Test + public void testProductionPreemptingNonproduction() throws Exception { + setUpHost(); + + schedulingFilter = createMock(SchedulingFilter.class); + // Use a very low priority for the production task to show that priority is irrelevant. + ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000); + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_B + "_a1", 100); + assignToHost(a1); + + expectFiltering(); + + control.replay(); + assertVictims(runFilter(p1, NO_OFFER, a1), a1); + } + + @Test + public void testProductionPreemptingNonproductionAcrossUsers() throws Exception { + setUpHost(); + + schedulingFilter = createMock(SchedulingFilter.class); + // Use a very low priority for the production task to show that priority is irrelevant. + ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000); + ScheduledTask a1 = makeTask(USER_B, JOB_A, TASK_ID_B + "_a1", 100); + assignToHost(a1); + + expectFiltering(); + + control.replay(); + assertVictims(runFilter(p1, NO_OFFER, a1), a1); + } + + @Test + public void testProductionUsersDoNotPreemptEachOther() throws Exception { + schedulingFilter = createMock(SchedulingFilter.class); + ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", 1000); + ScheduledTask a1 = makeProductionTask(USER_B, JOB_A, TASK_ID_B + "_a1", 0); + assignToHost(a1); + + control.replay(); + assertNoVictims(runFilter(p1, NO_OFFER, a1)); + } + + // Ensures a production task can preempt 2 tasks on the same host. + @Test + public void testProductionPreemptingManyNonProduction() throws Exception { + schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); + a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + + ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1"); + b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + + setUpHost(); + + assignToHost(a1); + assignToHost(b1); + + ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); + p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); + + control.replay(); + assertVictims(runFilter(p1, NO_OFFER, a1, b1), a1, b1); + } + + // Ensures we select the minimal number of tasks to preempt + @Test + public void testMinimalSetPreempted() throws Exception { + schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); + a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096); + + ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1"); + b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + + ScheduledTask b2 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b2"); + b2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + + setUpHost(); + + assignToHost(a1); + assignToHost(b1); + assignToHost(b2); + + ScheduledTask p1 = makeProductionTask(USER_C, JOB_C, TASK_ID_C + "_p1"); + p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); + + control.replay(); + assertVictims(runFilter(p1, NO_OFFER, b1, b2, a1), a1); + } + + // Ensures a production task *never* preempts a production task from another job. + @Test + public void testProductionJobNeverPreemptsProductionJob() throws Exception { + schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1"); + p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); + + setUpHost(); + + assignToHost(p1); + + ScheduledTask p2 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p2"); + p2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + + control.replay(); + assertNoVictims(runFilter(p2, NO_OFFER, p1)); + } + + // Ensures that we can preempt if a task + offer can satisfy a pending task. + @Test + public void testPreemptWithOfferAndTask() throws Exception { + schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + + setUpHost(); + + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); + a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + assignToHost(a1); + + ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); + p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); + + control.replay(); + assertVictims( + runFilter(p1, makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1), a1), + a1); + } + + // Ensures we can preempt if two tasks and an offer can satisfy a pending task. + @Test + public void testPreemptWithOfferAndMultipleTasks() throws Exception { + schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + + setUpHost(); + + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); + a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + assignToHost(a1); + + ScheduledTask a2 = makeTask(USER_A, JOB_B, TASK_ID_A + "_a2"); + a2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + assignToHost(a2); + + ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); + p1.getAssignedTask().getTask().setNumCpus(4).setRamMb(2048); + + control.replay(); + Optional<HostOffer> offer = + makeOffer(OFFER, 2, Amount.of(1024L, Data.MB), Amount.of(1L, Data.MB), 1); + assertVictims(runFilter(p1, offer, a1, a2), a1, a2); + } + + @Test + public void testNoPreemptionVictims() { + schedulingFilter = createMock(SchedulingFilter.class); + ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A); + + control.replay(); + + assertNoVictims(runFilter(task, NO_OFFER)); + } + + @Test + public void testMissingAttributes() { + schedulingFilter = createMock(SchedulingFilter.class); + ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A); + assignToHost(task); + + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); + a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + assignToHost(a1); + + expect(storageUtil.attributeStore.getHostAttributes(HOST)).andReturn(Optional.absent()); + + control.replay(); + + assertNoVictims(runFilter(task, NO_OFFER, a1)); + assertEquals(1L, statsProvider.getLongValue(MISSING_ATTRIBUTES_NAME)); + } + + @Test + public void testAllVictimsVetoed() { + schedulingFilter = createMock(SchedulingFilter.class); + ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A); + assignToHost(task); + + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); + a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + assignToHost(a1); + + setUpHost(); + expectFiltering(Optional.of(Veto.constraintMismatch("ban"))); + + control.replay(); + + assertNoVictims(runFilter(task, NO_OFFER, a1)); + } + + private static ImmutableSet<PreemptionVictim> preemptionVictims(ScheduledTask... tasks) { + return FluentIterable.from(ImmutableSet.copyOf(tasks)) + .transform( + new Function<ScheduledTask, PreemptionVictim>() { + @Override + public PreemptionVictim apply(ScheduledTask task) { + return PreemptionVictim.fromTask(IAssignedTask.build(task.getAssignedTask())); + } + }).toSet(); + } + + private static void assertVictims( + Optional<ImmutableSet<PreemptionVictim>> actual, + ScheduledTask... expected) { + + assertEquals(Optional.of(preemptionVictims(expected)), actual); + } + + private static void assertNoVictims(Optional<ImmutableSet<PreemptionVictim>> actual) { + assertEquals(Optional.<ImmutableSet<PreemptionVictim>>absent(), actual); + } + + private Optional<HostOffer> makeOffer( + String offerId, + double cpu, + Amount<Long, Data> ram, + Amount<Long, Data> disk, + int numPorts) { + + List<Resource> resources = new Resources(cpu, ram, disk, numPorts).toResourceList(); + Offer.Builder builder = Offer.newBuilder(); + builder.getIdBuilder().setValue(offerId); + builder.getFrameworkIdBuilder().setValue("framework-id"); + builder.getSlaveIdBuilder().setValue(SLAVE_ID); + builder.setHostname(HOST); + for (Resource r: resources) { + builder.addResources(r); + } + return Optional.of(new HostOffer( + builder.build(), + IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)))); + } + + private IExpectationSetters<Set<SchedulingFilter.Veto>> expectFiltering() { + return expectFiltering(Optional.absent()); + } + + private IExpectationSetters<Set<SchedulingFilter.Veto>> expectFiltering( + final Optional<Veto> veto) { + + return expect(schedulingFilter.filter( + EasyMock.anyObject(), + EasyMock.anyObject())) + .andAnswer( + new IAnswer<Set<SchedulingFilter.Veto>>() { + @Override + public Set<SchedulingFilter.Veto> answer() { + return veto.asSet(); + } + }); + } + + static ScheduledTask makeTask( + String role, + String job, + String taskId, + int priority, + String env, + boolean production) { + + AssignedTask assignedTask = new AssignedTask() + .setTaskId(taskId) + .setTask(new TaskConfig() + .setJob(new JobKey(role, env, job)) + .setPriority(priority) + .setProduction(production) + .setJobName(job) + .setEnvironment(env) + .setConstraints(new HashSet<Constraint>())); + return new ScheduledTask().setAssignedTask(assignedTask); + } + + static ScheduledTask makeTask(String role, String job, String taskId) { + return makeTask(role, job, taskId, 0, "dev", false); + } + + static void addEvent(ScheduledTask task, ScheduleStatus status) { + task.addToTaskEvents(new TaskEvent(0, status)); + } + + private ScheduledTask makeProductionTask(String role, String job, String taskId) { + return makeTask(role, job, taskId, 0, "prod", true); + } + + private ScheduledTask makeProductionTask(String role, String job, String taskId, int priority) { + return makeTask(role, job, taskId, priority, "prod", true); + } + + private ScheduledTask makeTask(String role, String job, String taskId, int priority) { + return makeTask(role, job, taskId, priority, "dev", false); + } + + private void assignToHost(ScheduledTask task) { + task.setStatus(RUNNING); + addEvent(task, RUNNING); + task.getAssignedTask().setSlaveHost(HOST); + task.getAssignedTask().setSlaveId(SLAVE_ID); + } + + private Attribute host(String host) { + return new Attribute(HOST_ATTRIBUTE, ImmutableSet.of(host)); + } + + private Attribute rack(String rack) { + return new Attribute(RACK_ATTRIBUTE, ImmutableSet.of(rack)); + } + + // Sets up a normal host, no dedicated hosts and no maintenance. + private void setUpHost() { + IHostAttributes hostAttrs = IHostAttributes.build( + new HostAttributes().setHost(HOST).setSlaveId(HOST + "_id") + .setMode(NONE).setAttributes(ImmutableSet.of(rack(RACK), host(RACK)))); + + expect(storageUtil.attributeStore.getHostAttributes(HOST)) + .andReturn(Optional.of(hostAttrs)).anyTimes(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimTest.java new file mode 100644 index 0000000..09380f9 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimTest.java @@ -0,0 +1,49 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.JobKey; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class PreemptionVictimTest { + + @Test + public void testBeanMethods() { + PreemptionVictim a = makeVictim("a"); + PreemptionVictim a1 = makeVictim("a"); + PreemptionVictim b = makeVictim("b"); + assertEquals(a, a1); + assertEquals(a.hashCode(), a1.hashCode()); + assertEquals(a.toString(), a1.toString()); + assertNotEquals(a, b); + assertNotEquals(a.toString(), b.toString()); + assertEquals(ImmutableSet.of(a, b), ImmutableSet.of(a, a1, b)); + } + + private PreemptionVictim makeVictim(String taskId) { + return PreemptionVictim.fromTask(IAssignedTask.build(new AssignedTask() + .setTaskId(taskId) + .setSlaveId(taskId + "slave") + .setSlaveHost(taskId + "host") + .setTask(new TaskConfig().setJob(new JobKey("role", "env", "job"))))); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java new file mode 100644 index 0000000..b07ff7b --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java @@ -0,0 +1,177 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import java.util.Set; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; +import com.twitter.common.testing.easymock.EasyMockTest; + +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.JobKey; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.gen.TaskEvent; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.preemptor.Preemptor.PreemptorImpl; +import org.apache.aurora.scheduler.state.StateChangeResult; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.stats.CachedCounters; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.apache.mesos.Protos; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; +import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.slotValidationStatName; +import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.successStatName; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; + +public class PreemptorImplTest extends EasyMockTest { + private static final String SLAVE_ID = "slave_id"; + private static final IScheduledTask TASK = IScheduledTask.build(makeTask()); + private static final PreemptionProposal PROPOSAL = createPreemptionProposal(TASK); + private static final TaskGroupKey GROUP_KEY = + TaskGroupKey.from(ITaskConfig.build(makeTask().getAssignedTask().getTask())); + + private static final Set<PreemptionProposal> NO_SLOTS = ImmutableSet.of(); + private static final Optional<String> EMPTY_RESULT = Optional.absent(); + private static final HostOffer OFFER = + new HostOffer(Protos.Offer.getDefaultInstance(), IHostAttributes.build(new HostAttributes())); + + private StateManager stateManager; + private FakeStatsProvider statsProvider; + private PreemptionVictimFilter preemptionVictimFilter; + private PreemptorImpl preemptor; + private BiCache<PreemptionProposal, TaskGroupKey> slotCache; + private Storage.MutableStoreProvider storeProvider; + + @Before + public void setUp() { + storeProvider = createMock(Storage.MutableStoreProvider.class); + stateManager = createMock(StateManager.class); + preemptionVictimFilter = createMock(PreemptionVictimFilter.class); + slotCache = createMock(new Clazz<BiCache<PreemptionProposal, TaskGroupKey>>() { }); + statsProvider = new FakeStatsProvider(); + OfferManager offerManager = createMock(OfferManager.class); + expect(offerManager.getOffer(anyObject(Protos.SlaveID.class))) + .andReturn(Optional.of(OFFER)) + .anyTimes(); + + preemptor = new PreemptorImpl( + stateManager, + offerManager, + preemptionVictimFilter, + new PreemptorMetrics(new CachedCounters(statsProvider)), + slotCache); + } + + @Test + public void testPreemptTasksSuccessful() throws Exception { + expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL)); + slotCache.remove(PROPOSAL, GROUP_KEY); + expectSlotValidation(PROPOSAL, Optional.of(ImmutableSet.of( + PreemptionVictim.fromTask(TASK.getAssignedTask())))); + + expectPreempted(TASK); + + control.replay(); + + assertEquals(Optional.of(SLAVE_ID), callPreemptor()); + assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true))); + assertEquals(1L, statsProvider.getLongValue(successStatName(true))); + } + + @Test + public void testPreemptTasksValidationFailed() throws Exception { + expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL)); + slotCache.remove(PROPOSAL, GROUP_KEY); + expectSlotValidation(PROPOSAL, Optional.absent()); + + control.replay(); + + assertEquals(EMPTY_RESULT, callPreemptor()); + assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(false))); + assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + } + + @Test + public void testNoCachedSlot() throws Exception { + expect(slotCache.getByValue(GROUP_KEY)).andReturn(NO_SLOTS); + + control.replay(); + + assertEquals(EMPTY_RESULT, callPreemptor()); + assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(false))); + assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + } + + private Optional<String> callPreemptor() { + return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), EMPTY, storeProvider); + } + + private void expectSlotValidation( + PreemptionProposal slot, + Optional<ImmutableSet<PreemptionVictim>> victims) { + + expect(preemptionVictimFilter.filterPreemptionVictims( + TASK.getAssignedTask().getTask(), + slot.getVictims(), + EMPTY, + Optional.of(OFFER), + storeProvider)).andReturn(victims); + } + + private void expectPreempted(IScheduledTask preempted) throws Exception { + expect(stateManager.changeState( + anyObject(Storage.MutableStoreProvider.class), + eq(Tasks.id(preempted)), + eq(Optional.absent()), + eq(ScheduleStatus.PREEMPTING), + EasyMock.anyObject())) + .andReturn(StateChangeResult.SUCCESS); + } + + private static PreemptionProposal createPreemptionProposal(IScheduledTask task) { + IAssignedTask assigned = task.getAssignedTask(); + return new PreemptionProposal(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), SLAVE_ID); + } + + private static ScheduledTask makeTask() { + ScheduledTask task = new ScheduledTask() + .setAssignedTask(new AssignedTask() + .setTask(new TaskConfig() + .setPriority(1) + .setProduction(true) + .setJob(new JobKey("role", "env", "name")))); + task.addToTaskEvents(new TaskEvent(0, PENDING)); + return task; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java new file mode 100644 index 0000000..ea76639 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java @@ -0,0 +1,91 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import com.google.common.base.Optional; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.twitter.common.application.StartupStage; +import com.twitter.common.application.modules.LifecycleModule; +import com.twitter.common.base.ExceptionalCommand; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.testing.easymock.EasyMockTest; + +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.filter.SchedulingFilter; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.state.TaskAssigner; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class PreemptorModuleTest extends EasyMockTest { + + private StorageTestUtil storageUtil; + + @Before + public void setUp() { + storageUtil = new StorageTestUtil(this); + storageUtil.expectOperations(); + } + + private Injector createInjector(Module module) { + return Guice.createInjector( + module, + new LifecycleModule(), + new AbstractModule() { + private <T> void bindMock(Class<T> clazz) { + bind(clazz).toInstance(createMock(clazz)); + } + + @Override + protected void configure() { + bindMock(SchedulingFilter.class); + bindMock(StateManager.class); + bindMock(TaskAssigner.class); + bindMock(Thread.UncaughtExceptionHandler.class); + bind(Storage.class).toInstance(storageUtil.storage); + } + }); + } + + @Test + public void testPreemptorDisabled() throws Exception { + Injector injector = createInjector(new PreemptorModule( + false, + Amount.of(0L, Time.SECONDS), + Amount.of(0L, Time.SECONDS))); + + control.replay(); + + injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute(); + + injector.getBindings(); + assertEquals( + Optional.absent(), + injector.getInstance(Preemptor.class).attemptPreemptionFor( + IAssignedTask.build(new AssignedTask()), + AttributeAggregate.EMPTY, + storageUtil.mutableStoreProvider)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java new file mode 100644 index 0000000..814edef --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPrunerTest.java @@ -0,0 +1,69 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.pruning; + +import java.util.concurrent.ScheduledExecutorService; + +import com.google.common.collect.ImmutableSet; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.util.Clock; + +import org.apache.aurora.gen.JobKey; +import org.apache.aurora.gen.JobUpdateKey; +import org.apache.aurora.scheduler.pruning.JobUpdateHistoryPruner.HistoryPrunerSettings; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; +import org.junit.Test; + +import static org.easymock.EasyMock.expect; + +public class JobUpdateHistoryPrunerTest extends EasyMockTest { + @Test + public void testExecution() throws Exception { + StorageTestUtil storageUtil = new StorageTestUtil(this); + storageUtil.expectOperations(); + + final ScheduledExecutorService executor = createMock(ScheduledExecutorService.class); + FakeScheduledExecutor executorClock = + FakeScheduledExecutor.scheduleAtFixedRateExecutor(executor, 2); + + Clock mockClock = createMock(Clock.class); + expect(mockClock.nowMillis()).andReturn(2L).times(2); + + expect(storageUtil.jobUpdateStore.pruneHistory(1, 1)) + .andReturn(ImmutableSet.of( + IJobUpdateKey.build( + new JobUpdateKey().setJob(new JobKey("role", "env", "job")).setId("id1")))); + expect(storageUtil.jobUpdateStore.pruneHistory(1, 1)).andReturn(ImmutableSet.of()); + + control.replay(); + + executorClock.assertEmpty(); + JobUpdateHistoryPruner pruner = new JobUpdateHistoryPruner( + mockClock, + executor, + storageUtil.storage, + new HistoryPrunerSettings( + Amount.of(1L, Time.MILLISECONDS), + Amount.of(1L, Time.MILLISECONDS), + 1)); + + pruner.startAsync().awaitRunning(); + executorClock.advance(Amount.of(1L, Time.MILLISECONDS)); + executorClock.advance(Amount.of(1L, Time.MILLISECONDS)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java new file mode 100644 index 0000000..461c4d0 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java @@ -0,0 +1,398 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.pruning; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.twitter.common.base.Command; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.util.testing.FakeClock; + +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.ExecutorConfig; +import org.apache.aurora.gen.Identity; +import org.apache.aurora.gen.JobKey; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.gen.TaskEvent; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunnerSettings; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED; +import static org.apache.aurora.gen.ScheduleStatus.FINISHED; +import static org.apache.aurora.gen.ScheduleStatus.KILLED; +import static org.apache.aurora.gen.ScheduleStatus.LOST; +import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.gen.ScheduleStatus.STARTING; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.fail; + +public class TaskHistoryPrunerTest extends EasyMockTest { + private static final String JOB_A = "job-a"; + private static final String TASK_ID = "task_id"; + private static final String SLAVE_HOST = "HOST_A"; + private static final Amount<Long, Time> ONE_MS = Amount.of(1L, Time.MILLISECONDS); + private static final Amount<Long, Time> ONE_MINUTE = Amount.of(1L, Time.MINUTES); + private static final Amount<Long, Time> ONE_DAY = Amount.of(1L, Time.DAYS); + private static final Amount<Long, Time> ONE_HOUR = Amount.of(1L, Time.HOURS); + private static final int PER_JOB_HISTORY = 2; + + private ScheduledFuture<?> future; + private ScheduledExecutorService executor; + private FakeClock clock; + private StateManager stateManager; + private StorageTestUtil storageUtil; + private TaskHistoryPruner pruner; + + @Before + public void setUp() { + future = createMock(new Clazz<ScheduledFuture<?>>() { }); + executor = createMock(ScheduledExecutorService.class); + clock = new FakeClock(); + stateManager = createMock(StateManager.class); + storageUtil = new StorageTestUtil(this); + storageUtil.expectOperations(); + pruner = new TaskHistoryPruner( + executor, + stateManager, + clock, + new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY), + storageUtil.storage); + } + + @Test + public void testNoPruning() { + long taskATimestamp = clock.nowMillis(); + IScheduledTask a = makeTask("a", FINISHED); + + clock.advance(ONE_MS); + long taskBTimestamp = clock.nowMillis(); + IScheduledTask b = makeTask("b", LOST); + + expectNoImmediatePrune(ImmutableSet.of(a)); + expectOneDelayedPrune(taskATimestamp); + expectNoImmediatePrune(ImmutableSet.of(a, b)); + expectOneDelayedPrune(taskBTimestamp); + + control.replay(); + + pruner.recordStateChange(TaskStateChange.initialized(a)); + pruner.recordStateChange(TaskStateChange.initialized(b)); + } + + @Test + public void testStorageStartedWithPruning() { + long taskATimestamp = clock.nowMillis(); + IScheduledTask a = makeTask("a", FINISHED); + + clock.advance(ONE_MINUTE); + long taskBTimestamp = clock.nowMillis(); + IScheduledTask b = makeTask("b", LOST); + + clock.advance(ONE_MINUTE); + long taskCTimestamp = clock.nowMillis(); + IScheduledTask c = makeTask("c", FINISHED); + + clock.advance(ONE_MINUTE); + IScheduledTask d = makeTask("d", FINISHED); + IScheduledTask e = makeTask("job-x", "e", FINISHED); + + expectNoImmediatePrune(ImmutableSet.of(a)); + expectOneDelayedPrune(taskATimestamp); + expectNoImmediatePrune(ImmutableSet.of(a, b)); + expectOneDelayedPrune(taskBTimestamp); + expectImmediatePrune(ImmutableSet.of(a, b, c), a); + expectOneDelayedPrune(taskCTimestamp); + expectImmediatePrune(ImmutableSet.of(b, c, d), b); + expectDefaultDelayedPrune(); + expectNoImmediatePrune(ImmutableSet.of(e)); + expectDefaultDelayedPrune(); + + control.replay(); + + for (IScheduledTask task : ImmutableList.of(a, b, c, d, e)) { + pruner.recordStateChange(TaskStateChange.initialized(task)); + } + } + + @Test + public void testStateChange() { + IScheduledTask starting = makeTask("a", STARTING); + IScheduledTask running = copy(starting, RUNNING); + IScheduledTask killed = copy(starting, KILLED); + + expectNoImmediatePrune(ImmutableSet.of(killed)); + expectDefaultDelayedPrune(); + + control.replay(); + + // No future set for non-terminal state transition. + changeState(starting, running); + + // Future set for terminal state transition. + changeState(running, killed); + } + + @Test + public void testActivateFutureAndExceedHistoryGoal() { + IScheduledTask running = makeTask("a", RUNNING); + IScheduledTask killed = copy(running, KILLED); + expectNoImmediatePrune(ImmutableSet.of(running)); + Capture<Runnable> delayedDelete = expectDefaultDelayedPrune(); + + // Expect task "a" to be pruned when future is activated. + expectDeleteTasks("a"); + + control.replay(); + + // Capture future for inactive task "a" + changeState(running, killed); + clock.advance(ONE_HOUR); + // Execute future to prune task "a" from the system. + delayedDelete.getValue().run(); + } + + @Test + public void testJobHistoryExceeded() { + IScheduledTask a = makeTask("a", RUNNING); + clock.advance(ONE_MS); + IScheduledTask aKilled = copy(a, KILLED); + + IScheduledTask b = makeTask("b", RUNNING); + clock.advance(ONE_MS); + IScheduledTask bKilled = copy(b, KILLED); + + IScheduledTask c = makeTask("c", RUNNING); + clock.advance(ONE_MS); + IScheduledTask cLost = copy(c, LOST); + + IScheduledTask d = makeTask("d", RUNNING); + clock.advance(ONE_MS); + IScheduledTask dLost = copy(d, LOST); + + expectNoImmediatePrune(ImmutableSet.of(a)); + expectDefaultDelayedPrune(); + expectNoImmediatePrune(ImmutableSet.of(a, b)); + expectDefaultDelayedPrune(); + expectNoImmediatePrune(ImmutableSet.of(a, b)); // no pruning yet due to min threshold + expectDefaultDelayedPrune(); + clock.advance(ONE_HOUR); + expectImmediatePrune(ImmutableSet.of(a, b, c, d), a, b); // now prune 2 tasks + expectDefaultDelayedPrune(); + + control.replay(); + + changeState(a, aKilled); + changeState(b, bKilled); + changeState(c, cLost); + changeState(d, dLost); + } + + // TODO(William Farner): Consider removing the thread safety tests. Now that intrinsic locks + // are not used, it is rather awkward to test this. + @Test + public void testThreadSafeStateChangeEvent() throws Exception { + // This tests against regression where an executor pruning a task holds an intrinsic lock and + // an unrelated task state change in the scheduler fires an event that requires this intrinsic + // lock. This causes a deadlock when the executor tries to acquire a lock held by the event + // fired. + + pruner = prunerWithRealExecutor(); + Command onDeleted = new Command() { + @Override + public void execute() { + // The goal is to verify that the call does not deadlock. We do not care about the outcome. + IScheduledTask b = makeTask("b", ASSIGNED); + + changeState(b, STARTING); + } + }; + CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID); + + control.replay(); + + // Change the task to a terminal state and wait for it to be pruned. + changeState(makeTask(TASK_ID, RUNNING), KILLED); + taskDeleted.await(); + } + + private TaskHistoryPruner prunerWithRealExecutor() { + ScheduledExecutorService realExecutor = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("testThreadSafeEvents-executor") + .build()); + return new TaskHistoryPruner( + realExecutor, + stateManager, + clock, + new HistoryPrunnerSettings(Amount.of(1L, Time.MILLISECONDS), ONE_MS, PER_JOB_HISTORY), + storageUtil.storage); + } + + private CountDownLatch expectTaskDeleted(final Command onDelete, String taskId) { + final CountDownLatch deleteCalled = new CountDownLatch(1); + final CountDownLatch eventDelivered = new CountDownLatch(1); + + Thread eventDispatch = new Thread() { + @Override + public void run() { + try { + deleteCalled.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + fail("Interrupted while awaiting for delete call."); + return; + } + onDelete.execute(); + eventDelivered.countDown(); + } + }; + eventDispatch.setDaemon(true); + eventDispatch.setName(getClass().getName() + "-EventDispatch"); + eventDispatch.start(); + + stateManager.deleteTasks(storageUtil.mutableStoreProvider, ImmutableSet.of(taskId)); + expectLastCall().andAnswer(new IAnswer<Void>() { + @Override + public Void answer() { + deleteCalled.countDown(); + try { + eventDelivered.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + fail("Interrupted while awaiting for event delivery."); + } + return null; + } + }); + + return eventDelivered; + } + + private void expectDeleteTasks(String... tasks) { + stateManager.deleteTasks(storageUtil.mutableStoreProvider, ImmutableSet.copyOf(tasks)); + } + + private Capture<Runnable> expectDefaultDelayedPrune() { + return expectDelayedPrune(ONE_DAY.as(Time.MILLISECONDS), 1); + } + + private Capture<Runnable> expectOneDelayedPrune(long timestampMillis) { + return expectDelayedPrune(timestampMillis, 1); + } + + private void expectNoImmediatePrune(ImmutableSet<IScheduledTask> tasksInJob) { + expectImmediatePrune(tasksInJob); + } + + private void expectImmediatePrune( + ImmutableSet<IScheduledTask> tasksInJob, + IScheduledTask... pruned) { + + // Expect a deferred prune operation when a new task is being watched. + executor.submit(EasyMock.<Runnable>anyObject()); + expectLastCall().andAnswer( + new IAnswer<Future<?>>() { + @Override + public Future<?> answer() { + Runnable work = (Runnable) EasyMock.getCurrentArguments()[0]; + work.run(); + return null; + } + } + ); + + IJobKey jobKey = Iterables.getOnlyElement( + FluentIterable.from(tasksInJob).transform(Tasks.SCHEDULED_TO_JOB_KEY).toSet()); + storageUtil.expectTaskFetch(TaskHistoryPruner.jobHistoryQuery(jobKey), tasksInJob); + if (pruned.length > 0) { + stateManager.deleteTasks(storageUtil.mutableStoreProvider, Tasks.ids(pruned)); + } + } + + private Capture<Runnable> expectDelayedPrune(long timestampMillis, int count) { + Capture<Runnable> capture = createCapture(); + executor.schedule( + EasyMock.capture(capture), + eq(pruner.calculateTimeout(timestampMillis)), + eq(TimeUnit.MILLISECONDS)); + expectLastCall().andReturn(future).times(count); + return capture; + } + + private void changeState(IScheduledTask oldStateTask, IScheduledTask newStateTask) { + pruner.recordStateChange(TaskStateChange.transition(newStateTask, oldStateTask.getStatus())); + } + + private void changeState(IScheduledTask oldStateTask, ScheduleStatus status) { + pruner.recordStateChange( + TaskStateChange.transition(copy(oldStateTask, status), oldStateTask.getStatus())); + } + + private IScheduledTask copy(IScheduledTask task, ScheduleStatus status) { + return IScheduledTask.build(task.newBuilder().setStatus(status)); + } + + private IScheduledTask makeTask( + String job, + String taskId, + ScheduleStatus status) { + + return IScheduledTask.build(new ScheduledTask() + .setStatus(status) + .setTaskEvents(ImmutableList.of(new TaskEvent(clock.nowMillis(), status))) + .setAssignedTask(makeAssignedTask(job, taskId))); + } + + private IScheduledTask makeTask(String taskId, ScheduleStatus status) { + return makeTask(JOB_A, taskId, status); + } + + private AssignedTask makeAssignedTask(String job, String taskId) { + return new AssignedTask() + .setSlaveHost(SLAVE_HOST) + .setTaskId(taskId) + .setTask(new TaskConfig() + .setJob(new JobKey("role", "staging45", job)) + .setOwner(new Identity().setRole("role").setUser("user")) + .setEnvironment("staging45") + .setJobName(job) + .setExecutorConfig(new ExecutorConfig("aurora", "config"))); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java new file mode 100644 index 0000000..26f65fa --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java @@ -0,0 +1,159 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.reconciliation; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.ScheduledExecutorService; + +import javax.inject.Singleton; + +import com.google.common.eventbus.EventBus; +import com.google.common.testing.TearDown; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.twitter.common.application.modules.LifecycleModule; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.StatsProvider; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.util.BackoffStrategy; + +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.events.PubsubEventModule; +import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.state.PubsubTestUtil; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.ScheduleStatus.KILLING; +import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; + +public class KillRetryTest extends EasyMockTest { + + private Driver driver; + private StorageTestUtil storageUtil; + private BackoffStrategy backoffStrategy; + private FakeScheduledExecutor clock; + private EventBus eventBus; + private FakeStatsProvider statsProvider; + + @Before + public void setUp() throws Exception { + driver = createMock(Driver.class); + storageUtil = new StorageTestUtil(this); + storageUtil.expectOperations(); + backoffStrategy = createMock(BackoffStrategy.class); + final ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class); + clock = FakeScheduledExecutor.scheduleExecutor(executorMock); + addTearDown(new TearDown() { + @Override + public void tearDown() { + clock.assertEmpty(); + } + }); + statsProvider = new FakeStatsProvider(); + + Injector injector = Guice.createInjector( + new LifecycleModule(), + new PubsubEventModule(false), + new AbstractModule() { + @Override + protected void configure() { + bind(Driver.class).toInstance(driver); + bind(Storage.class).toInstance(storageUtil.storage); + bind(ScheduledExecutorService.class).annotatedWith(AsyncExecutor.class) + .toInstance(executorMock); + PubsubEventModule.bindSubscriber(binder(), KillRetry.class); + bind(KillRetry.class).in(Singleton.class); + bind(BackoffStrategy.class).toInstance(backoffStrategy); + bind(StatsProvider.class).toInstance(statsProvider); + bind(UncaughtExceptionHandler.class) + .toInstance(createMock(UncaughtExceptionHandler.class)); + } + } + ); + eventBus = injector.getInstance(EventBus.class); + PubsubTestUtil.startPubsub(injector); + } + + private static IScheduledTask makeTask(String id, ScheduleStatus status) { + return IScheduledTask.build(new ScheduledTask() + .setStatus(status) + .setAssignedTask(new AssignedTask().setTaskId(id))); + } + + private void moveToKilling(String taskId) { + eventBus.post(TaskStateChange.transition(makeTask(taskId, KILLING), RUNNING)); + } + + private static Query.Builder killingQuery(String taskId) { + return Query.taskScoped(taskId).byStatus(KILLING); + } + + private void expectGetRetryDelay(long prevRetryMs, long retryInMs) { + expect(backoffStrategy.calculateBackoffMs(prevRetryMs)).andReturn(retryInMs); + } + + private void expectRetry(String taskId, long prevRetryMs, long nextRetryMs) { + storageUtil.expectTaskFetch(killingQuery(taskId), makeTask(taskId, KILLING)); + driver.killTask(taskId); + expectGetRetryDelay(prevRetryMs, nextRetryMs); + } + + @Test + public void testRetries() { + String taskId = "a"; + expectGetRetryDelay(0, 100); + expectRetry(taskId, 100, 1000); + expectRetry(taskId, 1000, 10000); + + // Signal that task has transitioned. + storageUtil.expectTaskFetch(killingQuery(taskId)); + + control.replay(); + + moveToKilling(taskId); + clock.advance(Amount.of(100L, Time.MILLISECONDS)); + clock.advance(Amount.of(1000L, Time.MILLISECONDS)); + clock.advance(Amount.of(10000L, Time.MILLISECONDS)); + assertEquals(2L, statsProvider.getLongValue(KillRetry.RETRIES_COUNTER)); + } + + @Test + public void testDoesNotRetry() { + String taskId = "a"; + expectGetRetryDelay(0, 100); + + storageUtil.expectTaskFetch(killingQuery(taskId)); + + control.replay(); + + moveToKilling(taskId); + clock.advance(Amount.of(100L, Time.MILLISECONDS)); + assertEquals(0L, statsProvider.getLongValue(KillRetry.RETRIES_COUNTER)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java new file mode 100644 index 0000000..b980a4e --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskReconcilerTest.java @@ -0,0 +1,140 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.reconciliation; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.collect.ImmutableSet; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.StatsProvider; +import com.twitter.common.testing.easymock.EasyMockTest; + +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; +import org.junit.Before; +import org.junit.Test; + +import static com.twitter.common.quantity.Time.MINUTES; + +import static org.apache.aurora.scheduler.reconciliation.TaskReconciler.EXPLICIT_STAT_NAME; +import static org.apache.aurora.scheduler.reconciliation.TaskReconciler.IMPLICIT_STAT_NAME; +import static org.apache.aurora.scheduler.reconciliation.TaskReconciler.TASK_TO_PROTO; +import static org.apache.aurora.scheduler.reconciliation.TaskReconciler.TaskReconcilerSettings; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; + +public class TaskReconcilerTest extends EasyMockTest { + private static final Amount<Long, Time> INITIAL_DELAY = Amount.of(10L, MINUTES); + private static final Amount<Long, Time> EXPLICIT_SCHEDULE = Amount.of(60L, MINUTES); + private static final Amount<Long, Time> IMPLICT_SCHEDULE = Amount.of(180L, MINUTES); + private static final Amount<Long, Time> SPREAD = Amount.of(30L, MINUTES); + private static final TaskReconcilerSettings SETTINGS = new TaskReconcilerSettings( + INITIAL_DELAY, + EXPLICIT_SCHEDULE, + IMPLICT_SCHEDULE, + SPREAD); + + private StorageTestUtil storageUtil; + private StatsProvider statsProvider; + private Driver driver; + private ScheduledExecutorService executorService; + private FakeScheduledExecutor clock; + private AtomicLong explicitRuns; + private AtomicLong implicitRuns; + + @Before + public void setUp() { + storageUtil = new StorageTestUtil(this); + statsProvider = createMock(StatsProvider.class); + driver = createMock(Driver.class); + executorService = createMock(ScheduledExecutorService.class); + explicitRuns = new AtomicLong(); + implicitRuns = new AtomicLong(); + } + + @Test + public void testExecution() { + expect(statsProvider.makeCounter(EXPLICIT_STAT_NAME)).andReturn(explicitRuns); + expect(statsProvider.makeCounter(IMPLICIT_STAT_NAME)).andReturn(implicitRuns); + clock = FakeScheduledExecutor.scheduleAtFixedRateExecutor(executorService, 2, 5); + + IScheduledTask task = TaskTestUtil.makeTask("id1", TaskTestUtil.JOB); + storageUtil.expectOperations(); + storageUtil.expectTaskFetch(Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES), task) + .times(5); + + driver.reconcileTasks(ImmutableSet.of(TASK_TO_PROTO.apply(task))); + expectLastCall().times(5); + + driver.reconcileTasks(ImmutableSet.of()); + expectLastCall().times(2); + + control.replay(); + + TaskReconciler reconciler = new TaskReconciler( + SETTINGS, + storageUtil.storage, + driver, + executorService, + statsProvider); + + reconciler.startAsync().awaitRunning(); + + clock.advance(INITIAL_DELAY); + assertEquals(1L, explicitRuns.get()); + assertEquals(0L, implicitRuns.get()); + + clock.advance(SPREAD); + assertEquals(1L, explicitRuns.get()); + assertEquals(1L, implicitRuns.get()); + + clock.advance(EXPLICIT_SCHEDULE); + assertEquals(2L, explicitRuns.get()); + assertEquals(1L, implicitRuns.get()); + + clock.advance(IMPLICT_SCHEDULE); + assertEquals(5L, explicitRuns.get()); + assertEquals(2L, implicitRuns.get()); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidImplicitDelay() throws Exception { + control.replay(); + + new TaskReconcilerSettings( + INITIAL_DELAY, + EXPLICIT_SCHEDULE, + IMPLICT_SCHEDULE, + Amount.of(Long.MAX_VALUE, MINUTES)); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidExplicitDelay() throws Exception { + control.replay(); + + new TaskReconcilerSettings( + Amount.of(Long.MAX_VALUE, MINUTES), + EXPLICIT_SCHEDULE, + IMPLICT_SCHEDULE, + SPREAD); + } +}
