http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java deleted file mode 100644 index b98a8d7..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java +++ /dev/null @@ -1,244 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicLong; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.stats.StatsProvider; -import com.twitter.common.testing.easymock.EasyMockTest; -import com.twitter.common.util.testing.FakeClock; - -import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.gen.TaskEvent; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.state.StateChangeResult; -import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED; -import static org.apache.aurora.gen.ScheduleStatus.FINISHED; -import static org.apache.aurora.gen.ScheduleStatus.INIT; -import static org.apache.aurora.gen.ScheduleStatus.KILLED; -import static org.apache.aurora.gen.ScheduleStatus.KILLING; -import static org.apache.aurora.gen.ScheduleStatus.LOST; -import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING; -import static org.apache.aurora.gen.ScheduleStatus.RUNNING; -import static org.apache.aurora.gen.ScheduleStatus.STARTING; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.junit.Assert.assertEquals; - -public class TaskTimeoutTest extends EasyMockTest { - - private static final String TASK_ID = "task_id"; - private static final Amount<Long, Time> TIMEOUT = Amount.of(1L, Time.MINUTES); - - private AtomicLong timedOutTaskCounter; - private ScheduledExecutorService executor; - private StorageTestUtil storageUtil; - private ScheduledFuture<?> future; - private StateManager stateManager; - private FakeClock clock; - private TaskTimeout timeout; - private StatsProvider statsProvider; - - @Before - public void setUp() { - executor = createMock(ScheduledExecutorService.class); - storageUtil = new StorageTestUtil(this); - storageUtil.expectOperations(); - future = createMock(new Clazz<ScheduledFuture<?>>() { }); - stateManager = createMock(StateManager.class); - clock = new FakeClock(); - statsProvider = createMock(StatsProvider.class); - timedOutTaskCounter = new AtomicLong(); - expect(statsProvider.makeCounter(TaskTimeout.TIMED_OUT_TASKS_COUNTER)) - .andReturn(timedOutTaskCounter); - } - - private void replayAndCreate() { - control.replay(); - timeout = new TaskTimeout( - executor, - storageUtil.storage, - stateManager, - TIMEOUT, - statsProvider); - timeout.startAsync().awaitRunning(); - } - - private Capture<Runnable> expectTaskWatch(Amount<Long, Time> expireIn) { - Capture<Runnable> capture = createCapture(); - executor.schedule( - EasyMock.capture(capture), - eq((long) expireIn.getValue()), - eq(expireIn.getUnit().getTimeUnit())); - expectLastCall().andReturn(future); - return capture; - } - - private Capture<Runnable> expectTaskWatch() { - return expectTaskWatch(TIMEOUT); - } - - private void changeState(String taskId, ScheduleStatus from, ScheduleStatus to) { - IScheduledTask task = IScheduledTask.build(new ScheduledTask() - .setStatus(to) - .setAssignedTask(new AssignedTask().setTaskId(taskId))); - timeout.recordStateChange(TaskStateChange.transition(task, from)); - } - - private void changeState(ScheduleStatus from, ScheduleStatus to) { - changeState(TASK_ID, from, to); - } - - @Test - public void testNormalTransitions() { - expectTaskWatch(); - expectTaskWatch(); - - replayAndCreate(); - - changeState(INIT, PENDING); - changeState(PENDING, ASSIGNED); - changeState(ASSIGNED, STARTING); - changeState(STARTING, RUNNING); - changeState(RUNNING, KILLING); - changeState(KILLING, KILLED); - } - - @Test - public void testTransientToTransient() { - expectTaskWatch(); - Capture<Runnable> killingTimeout = expectTaskWatch(); - expect(stateManager.changeState( - storageUtil.mutableStoreProvider, - TASK_ID, - Optional.of(KILLING), - LOST, - TaskTimeout.TIMEOUT_MESSAGE)) - .andReturn(StateChangeResult.SUCCESS); - - replayAndCreate(); - - changeState(PENDING, ASSIGNED); - changeState(ASSIGNED, KILLING); - killingTimeout.getValue().run(); - } - - @Test - public void testTimeout() throws Exception { - Capture<Runnable> assignedTimeout = expectTaskWatch(); - expect(stateManager.changeState( - storageUtil.mutableStoreProvider, - TASK_ID, - Optional.of(ASSIGNED), - LOST, - TaskTimeout.TIMEOUT_MESSAGE)) - .andReturn(StateChangeResult.SUCCESS); - - replayAndCreate(); - - changeState(INIT, PENDING); - changeState(PENDING, ASSIGNED); - assignedTimeout.getValue().run(); - assertEquals(timedOutTaskCounter.intValue(), 1); - } - - @Test - public void testTaskDeleted() throws Exception { - Capture<Runnable> assignedTimeout = expectTaskWatch(); - expect(stateManager.changeState( - storageUtil.mutableStoreProvider, - TASK_ID, - Optional.of(KILLING), - LOST, - TaskTimeout.TIMEOUT_MESSAGE)) - .andReturn(StateChangeResult.ILLEGAL); - - replayAndCreate(); - - changeState(INIT, PENDING); - changeState(PENDING, KILLING); - assignedTimeout.getValue().run(); - assertEquals(timedOutTaskCounter.intValue(), 0); - } - - private static IScheduledTask makeTask( - String taskId, - ScheduleStatus status, - long stateEnteredMs) { - - return IScheduledTask.build(new ScheduledTask() - .setStatus(status) - .setTaskEvents(ImmutableList.of(new TaskEvent(stateEnteredMs, status))) - .setAssignedTask(new AssignedTask() - .setTaskId(taskId) - .setTask(new TaskConfig()))); - } - - @Test - public void testStorageStart() { - expectTaskWatch(TIMEOUT); - expectTaskWatch(TIMEOUT); - expectTaskWatch(TIMEOUT); - - replayAndCreate(); - - clock.setNowMillis(TIMEOUT.as(Time.MILLISECONDS) * 2); - for (IScheduledTask task : ImmutableList.of( - makeTask("a", ASSIGNED, 0), - makeTask("b", KILLING, TIMEOUT.as(Time.MILLISECONDS)), - makeTask("c", PREEMPTING, clock.nowMillis() + TIMEOUT.as(Time.MILLISECONDS)))) { - - timeout.recordStateChange(TaskStateChange.initialized(task)); - } - - changeState("a", ASSIGNED, RUNNING); - changeState("b", KILLING, KILLED); - changeState("c", PREEMPTING, FINISHED); - } - - @Test - public void testTimeoutWhileNotStarted() throws Exception { - // Since the timeout is never instructed to start, it should not attempt to transition tasks, - // but it should try again later. - Capture<Runnable> assignedTimeout = expectTaskWatch(); - expectTaskWatch(TaskTimeout.NOT_STARTED_RETRY); - - control.replay(); - timeout = new TaskTimeout(executor, storageUtil.storage, stateManager, TIMEOUT, statsProvider); - - changeState(INIT, PENDING); - changeState(PENDING, ASSIGNED); - assignedTimeout.getValue().run(); - assertEquals(timedOutTaskCounter.intValue(), 0); - } -}
http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/preemptor/BiCacheTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/BiCacheTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/BiCacheTest.java deleted file mode 100644 index babc17f..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/BiCacheTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.util.testing.FakeClock; - -import org.apache.aurora.scheduler.async.preemptor.BiCache.BiCacheSettings; -import org.apache.aurora.scheduler.testing.FakeStatsProvider; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class BiCacheTest { - private static final Amount<Long, Time> HOLD_DURATION = Amount.of(1L, Time.MINUTES); - private static final String STAT_NAME = "cache_size_stat"; - private static final String KEY_1 = "Key 1"; - private static final String KEY_2 = "Key 2"; - private static final Optional<Integer> NO_VALUE = Optional.absent(); - - private FakeStatsProvider statsProvider; - private FakeClock clock; - private BiCache<String, Integer> biCache; - - @Before - public void setUp() { - statsProvider = new FakeStatsProvider(); - clock = new FakeClock(); - biCache = new BiCache<>(statsProvider, new BiCacheSettings(HOLD_DURATION, STAT_NAME), clock); - } - - @Test - public void testExpiration() { - biCache.put(KEY_1, 1); - assertEquals(Optional.of(1), biCache.get(KEY_1)); - assertEquals(1L, statsProvider.getLongValue(STAT_NAME)); - - clock.advance(HOLD_DURATION); - - assertEquals(NO_VALUE, biCache.get(KEY_1)); - assertEquals(ImmutableSet.of(), biCache.getByValue(1)); - assertEquals(0L, statsProvider.getLongValue(STAT_NAME)); - } - - @Test - public void testRemoval() { - biCache.put(KEY_1, 1); - assertEquals(1L, statsProvider.getLongValue(STAT_NAME)); - assertEquals(Optional.of(1), biCache.get(KEY_1)); - biCache.remove(KEY_1, 1); - assertEquals(NO_VALUE, biCache.get(KEY_1)); - assertEquals(0L, statsProvider.getLongValue(STAT_NAME)); - } - - @Test(expected = NullPointerException.class) - public void testRemovalWithNullKey() { - biCache.remove(null, 1); - } - - @Test - public void testDifferentKeysIdenticalValues() { - biCache.put(KEY_1, 1); - biCache.put(KEY_2, 1); - assertEquals(2L, statsProvider.getLongValue(STAT_NAME)); - - assertEquals(Optional.of(1), biCache.get(KEY_1)); - assertEquals(Optional.of(1), biCache.get(KEY_2)); - assertEquals(ImmutableSet.of(KEY_1, KEY_2), biCache.getByValue(1)); - - biCache.remove(KEY_1, 1); - assertEquals(NO_VALUE, biCache.get(KEY_1)); - assertEquals(Optional.of(1), biCache.get(KEY_2)); - assertEquals(ImmutableSet.of(KEY_2), biCache.getByValue(1)); - assertEquals(1L, statsProvider.getLongValue(STAT_NAME)); - - clock.advance(HOLD_DURATION); - assertEquals(NO_VALUE, biCache.get(KEY_1)); - assertEquals(NO_VALUE, biCache.get(KEY_2)); - assertEquals(ImmutableSet.of(), biCache.getByValue(1)); - assertEquals(0L, statsProvider.getLongValue(STAT_NAME)); - } - - @Test - public void testIdenticalKeysDifferentValues() { - biCache.put(KEY_1, 1); - biCache.put(KEY_1, 2); - assertEquals(Optional.of(2), biCache.get(KEY_1)); - assertEquals(ImmutableSet.of(), biCache.getByValue(1)); - assertEquals(ImmutableSet.of(KEY_1), biCache.getByValue(2)); - assertEquals(1L, statsProvider.getLongValue(STAT_NAME)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImplTest.java deleted file mode 100644 index 1572a08..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImplTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableMultimap; - -import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.gen.JobKey; -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED; -import static org.apache.aurora.gen.ScheduleStatus.FAILED; -import static org.apache.aurora.gen.ScheduleStatus.FINISHED; -import static org.apache.aurora.gen.ScheduleStatus.KILLED; -import static org.apache.aurora.gen.ScheduleStatus.KILLING; -import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.gen.ScheduleStatus.RUNNING; -import static org.apache.aurora.gen.ScheduleStatus.THROTTLED; -import static org.junit.Assert.assertEquals; - -public class ClusterStateImplTest { - - private ClusterStateImpl state; - - @Before - public void setUp() { - state = new ClusterStateImpl(); - } - - @Test(expected = UnsupportedOperationException.class) - public void testImmutable() { - state.getSlavesToActiveTasks().clear(); - } - - @Test - public void testTaskLifecycle() { - IAssignedTask a = makeTask("a", "s1"); - - assertVictims(); - changeState(a, THROTTLED); - assertVictims(); - changeState(a, PENDING); - assertVictims(); - changeState(a, ASSIGNED); - assertVictims(a); - changeState(a, RUNNING); - assertVictims(a); - changeState(a, KILLING); - assertVictims(a); - changeState(a, FINISHED); - assertVictims(); - } - - @Test - public void testTaskChangesSlaves() { - // We do not intend to handle the case of an external failure leading to the same task ID - // on a different slave. - IAssignedTask a = makeTask("a", "s1"); - IAssignedTask a1 = makeTask("a", "s2"); - changeState(a, RUNNING); - changeState(a1, RUNNING); - assertVictims(a, a1); - } - - @Test - public void testMultipleTasks() { - IAssignedTask a = makeTask("a", "s1"); - IAssignedTask b = makeTask("b", "s1"); - IAssignedTask c = makeTask("c", "s2"); - IAssignedTask d = makeTask("d", "s3"); - IAssignedTask e = makeTask("e", "s3"); - IAssignedTask f = makeTask("f", "s1"); - changeState(a, RUNNING); - assertVictims(a); - changeState(b, RUNNING); - assertVictims(a, b); - changeState(c, RUNNING); - assertVictims(a, b, c); - changeState(d, RUNNING); - assertVictims(a, b, c, d); - changeState(e, RUNNING); - assertVictims(a, b, c, d, e); - changeState(c, FINISHED); - assertVictims(a, b, d, e); - changeState(a, FAILED); - changeState(e, KILLED); - assertVictims(b, d); - changeState(f, RUNNING); - assertVictims(b, d, f); - } - - private void assertVictims(IAssignedTask... tasks) { - ImmutableMultimap.Builder<String, PreemptionVictim> victims = ImmutableMultimap.builder(); - for (IAssignedTask task : tasks) { - victims.put(task.getSlaveId(), PreemptionVictim.fromTask(task)); - } - assertEquals(HashMultimap.create(victims.build()), state.getSlavesToActiveTasks()); - } - - private IAssignedTask makeTask(String taskId, String slaveId) { - return IAssignedTask.build(new AssignedTask() - .setTaskId(taskId) - .setSlaveId(slaveId) - .setSlaveHost(slaveId + "host") - .setTask(new TaskConfig().setJob(new JobKey("role", "env", "job")))); - } - - private void changeState(IAssignedTask assignedTask, ScheduleStatus status) { - IScheduledTask task = IScheduledTask.build(new ScheduledTask() - .setStatus(status) - .setAssignedTask(assignedTask.newBuilder())); - state.taskChangedState(TaskStateChange.transition(task, ScheduleStatus.INIT)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java deleted file mode 100644 index a0dbb25..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java +++ /dev/null @@ -1,285 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import java.util.Arrays; - -import javax.annotation.Nullable; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.testing.easymock.EasyMockTest; -import com.twitter.common.util.testing.FakeClock; - -import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.JobKey; -import org.apache.aurora.gen.MaintenanceMode; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.gen.TaskEvent; -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.async.OfferManager; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.filter.AttributeAggregate; -import org.apache.aurora.scheduler.stats.CachedCounters; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; -import org.apache.aurora.scheduler.testing.FakeStatsProvider; -import org.apache.mesos.Protos; -import org.easymock.EasyMock; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.TASK_PROCESSOR_RUN_NAME; -import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.attemptsStatName; -import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotSearchStatName; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.junit.Assert.assertEquals; - -public class PendingTaskProcessorTest extends EasyMockTest { - private static final String CACHE_STAT = "cache_size"; - private static final String SLAVE_ID_1 = "slave_id_1"; - private static final String SLAVE_ID_2 = "slave_id_2"; - private static final JobKey JOB_A = new JobKey("role_a", "env", "job_a"); - private static final JobKey JOB_B = new JobKey("role_b", "env", "job_b"); - private static final IScheduledTask TASK_A = makeTask(JOB_A, SLAVE_ID_1, "id1"); - private static final IScheduledTask TASK_B = makeTask(JOB_B, SLAVE_ID_2, "id2"); - private static final PreemptionProposal SLOT_A = createPreemptionProposal(TASK_A, SLAVE_ID_1); - private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS); - private static final Amount<Long, Time> EXPIRATION = Amount.of(10L, Time.MINUTES); - - private StorageTestUtil storageUtil; - private OfferManager offerManager; - private FakeStatsProvider statsProvider; - private PreemptionVictimFilter preemptionVictimFilter; - private PendingTaskProcessor slotFinder; - private BiCache<PreemptionProposal, TaskGroupKey> slotCache; - private ClusterState clusterState; - private FakeClock clock; - - @Before - public void setUp() { - storageUtil = new StorageTestUtil(this); - storageUtil.expectOperations(); - offerManager = createMock(OfferManager.class); - preemptionVictimFilter = createMock(PreemptionVictimFilter.class); - statsProvider = new FakeStatsProvider(); - clusterState = createMock(ClusterState.class); - clock = new FakeClock(); - slotCache = new BiCache<>( - statsProvider, - new BiCache.BiCacheSettings(EXPIRATION, CACHE_STAT), - clock); - - slotFinder = new PendingTaskProcessor( - storageUtil.storage, - offerManager, - preemptionVictimFilter, - new PreemptorMetrics(new CachedCounters(statsProvider)), - PREEMPTION_DELAY, - slotCache, - clusterState, - clock); - } - - @Test - public void testSearchSlotSuccessful() throws Exception { - expectGetPendingTasks(TASK_A, TASK_B); - expectGetClusterState(TASK_A, TASK_B); - HostOffer offer1 = makeOffer(SLAVE_ID_1); - HostOffer offer2 = makeOffer(SLAVE_ID_2); - expectOffers(offer1, offer2); - expectSlotSearch(TASK_A.getAssignedTask().getTask(), TASK_A); - expectSlotSearch(TASK_B.getAssignedTask().getTask(), TASK_B); - - control.replay(); - - clock.advance(PREEMPTION_DELAY); - - slotFinder.run(); - assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); - assertEquals(2L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, true))); - assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true))); - assertEquals(2L, statsProvider.getLongValue(CACHE_STAT)); - } - - @Test - public void testSearchSlotFailed() throws Exception { - expectGetPendingTasks(TASK_A); - expectGetClusterState(TASK_A); - HostOffer offer1 = makeOffer(SLAVE_ID_1); - expectOffers(offer1); - expectSlotSearch(TASK_A.getAssignedTask().getTask()); - - control.replay(); - - clock.advance(PREEMPTION_DELAY); - - slotFinder.run(); - assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); - assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true))); - assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(false, true))); - } - - @Test - public void testHasCachedSlots() throws Exception { - slotCache.put(SLOT_A, group(TASK_A)); - expectGetPendingTasks(TASK_A); - expectGetClusterState(TASK_A); - HostOffer offer1 = makeOffer(SLAVE_ID_1); - expectOffers(offer1); - - control.replay(); - - clock.advance(PREEMPTION_DELAY); - - slotFinder.run(); - assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); - assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true))); - assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true))); - } - - @Test - public void testMultipleTaskGroups() throws Exception { - IScheduledTask task1 = makeTask(JOB_A, "1"); - IScheduledTask task2 = makeTask(JOB_A, "2"); - IScheduledTask task3 = makeTask(JOB_A, "3"); - IScheduledTask task4 = makeTask(JOB_B, "4"); - IScheduledTask task5 = makeTask(JOB_B, "5"); - - expectGetPendingTasks(task1, task4, task2, task5, task3); - expectGetClusterState(TASK_A, TASK_B); - - HostOffer offer1 = makeOffer(SLAVE_ID_1); - HostOffer offer2 = makeOffer(SLAVE_ID_2); - expectOffers(offer1, offer2); - expectSlotSearch(task1.getAssignedTask().getTask()); - expectSlotSearch(task4.getAssignedTask().getTask(), TASK_B); - PreemptionProposal proposal1 = createPreemptionProposal(TASK_B, SLAVE_ID_1); - PreemptionProposal proposal2 = createPreemptionProposal(TASK_B, SLAVE_ID_2); - - control.replay(); - - clock.advance(PREEMPTION_DELAY); - - slotFinder.run(); - assertEquals(slotCache.get(proposal1), Optional.of(group(task4))); - assertEquals(slotCache.get(proposal2), Optional.of(group(task5))); - assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); - assertEquals(3L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, true))); - assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(false, true))); - assertEquals(2L, statsProvider.getLongValue(CACHE_STAT)); - } - - @Test - public void testNoVictims() throws Exception { - expectGetClusterState(); - control.replay(); - - clock.advance(PREEMPTION_DELAY); - - slotFinder.run(); - assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); - assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true))); - assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true))); - assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true))); - } - - private Multimap<String, PreemptionVictim> getVictims(IScheduledTask... tasks) { - return Multimaps.transformValues( - Multimaps.index(Arrays.asList(tasks), task -> task.getAssignedTask().getSlaveId()), - task -> PreemptionVictim.fromTask(task.getAssignedTask()) - ); - } - - private HostOffer makeOffer(String slaveId) { - Protos.Offer.Builder builder = Protos.Offer.newBuilder(); - builder.getIdBuilder().setValue("id"); - builder.getFrameworkIdBuilder().setValue("framework-id"); - builder.getSlaveIdBuilder().setValue(slaveId); - builder.setHostname(slaveId); - return new HostOffer( - builder.build(), - IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE))); - } - - private void expectOffers(HostOffer... offers) { - expect(offerManager.getOffers()).andReturn(ImmutableSet.copyOf(offers)); - } - - private void expectGetClusterState(IScheduledTask... returnedTasks) { - expect(clusterState.getSlavesToActiveTasks()).andReturn(getVictims(returnedTasks)); - } - - private void expectSlotSearch(ITaskConfig config, IScheduledTask... victims) { - expect(preemptionVictimFilter.filterPreemptionVictims( - eq(config), - EasyMock.anyObject(), - anyObject(AttributeAggregate.class), - EasyMock.anyObject(), - eq(storageUtil.storeProvider))); - expectLastCall().andReturn( - victims.length == 0 - ? Optional.absent() - : Optional.of(ImmutableSet.copyOf(getVictims(victims).values()))) - .anyTimes(); - } - - private static PreemptionProposal createPreemptionProposal(IScheduledTask task, String slaveId) { - return new PreemptionProposal( - ImmutableSet.of(PreemptionVictim.fromTask(task.getAssignedTask())), - slaveId); - } - - private static IScheduledTask makeTask(JobKey key, String taskId) { - return makeTask(key, null, taskId); - } - - private static TaskGroupKey group(IScheduledTask task) { - return TaskGroupKey.from(task.getAssignedTask().getTask()); - } - - private static IScheduledTask makeTask(JobKey key, @Nullable String slaveId, String taskId) { - ScheduledTask task = new ScheduledTask() - .setAssignedTask(new AssignedTask() - .setSlaveId(slaveId) - .setTaskId(taskId) - .setTask(new TaskConfig() - .setPriority(1) - .setProduction(true) - .setJob(key))); - task.addToTaskEvents(new TaskEvent(0, PENDING)); - return IScheduledTask.build(task); - } - - private void expectGetPendingTasks(IScheduledTask... returnedTasks) { - storageUtil.expectTaskFetch(Query.statusScoped(PENDING), ImmutableSet.copyOf(returnedTasks)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilterTest.java deleted file mode 100644 index 5fe8e2e..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilterTest.java +++ /dev/null @@ -1,512 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableSet; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Data; -import com.twitter.common.testing.easymock.EasyMockTest; - -import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.gen.Attribute; -import org.apache.aurora.gen.Constraint; -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.JobKey; -import org.apache.aurora.gen.MaintenanceMode; -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.gen.TaskEvent; -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.configuration.Resources; -import org.apache.aurora.scheduler.filter.SchedulingFilter; -import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; -import org.apache.aurora.scheduler.filter.SchedulingFilterImpl; -import org.apache.aurora.scheduler.mesos.TaskExecutors; -import org.apache.aurora.scheduler.stats.CachedCounters; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; -import org.apache.aurora.scheduler.testing.FakeStatsProvider; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.easymock.IExpectationSetters; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.gen.MaintenanceMode.NONE; -import static org.apache.aurora.gen.ScheduleStatus.RUNNING; -import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME; -import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; -import static org.apache.mesos.Protos.Offer; -import static org.apache.mesos.Protos.Resource; -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertEquals; - -public class PreemptionVictimFilterTest extends EasyMockTest { - private static final String USER_A = "user_a"; - private static final String USER_B = "user_b"; - private static final String USER_C = "user_c"; - private static final String JOB_A = "job_a"; - private static final String JOB_B = "job_b"; - private static final String JOB_C = "job_c"; - private static final String TASK_ID_A = "task_a"; - private static final String TASK_ID_B = "task_b"; - private static final String TASK_ID_C = "task_c"; - private static final String TASK_ID_D = "task_d"; - private static final String HOST = "host"; - private static final String RACK = "rack"; - private static final String SLAVE_ID = HOST + "_id"; - private static final String RACK_ATTRIBUTE = "rack"; - private static final String HOST_ATTRIBUTE = "host"; - private static final String OFFER = "offer"; - private static final Optional<HostOffer> NO_OFFER = Optional.absent(); - - private StorageTestUtil storageUtil; - private SchedulingFilter schedulingFilter; - private FakeStatsProvider statsProvider; - private PreemptorMetrics preemptorMetrics; - - @Before - public void setUp() { - storageUtil = new StorageTestUtil(this); - storageUtil.expectOperations(); - statsProvider = new FakeStatsProvider(); - preemptorMetrics = new PreemptorMetrics(new CachedCounters(statsProvider)); - } - - private Optional<ImmutableSet<PreemptionVictim>> runFilter( - ScheduledTask pendingTask, - Optional<HostOffer> offer, - ScheduledTask... victims) { - - PreemptionVictimFilter.PreemptionVictimFilterImpl filter = - new PreemptionVictimFilter.PreemptionVictimFilterImpl( - schedulingFilter, - TaskExecutors.NO_OVERHEAD_EXECUTOR, - preemptorMetrics); - - return filter.filterPreemptionVictims( - ITaskConfig.build(pendingTask.getAssignedTask().getTask()), - preemptionVictims(victims), - EMPTY, - offer, - storageUtil.mutableStoreProvider); - } - - @Test - public void testPreempted() throws Exception { - setUpHost(); - - schedulingFilter = createMock(SchedulingFilter.class); - ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A); - assignToHost(lowPriority); - - ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100); - - expectFiltering(); - - control.replay(); - assertVictims(runFilter(highPriority, NO_OFFER, lowPriority), lowPriority); - } - - @Test - public void testLowestPriorityPreempted() throws Exception { - setUpHost(); - - schedulingFilter = createMock(SchedulingFilter.class); - ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10); - assignToHost(lowPriority); - - ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 1); - assignToHost(lowerPriority); - - ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 100); - - expectFiltering(); - - control.replay(); - assertVictims(runFilter(highPriority, NO_OFFER, lowerPriority), lowerPriority); - } - - @Test - public void testOnePreemptableTask() throws Exception { - setUpHost(); - - schedulingFilter = createMock(SchedulingFilter.class); - ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100); - assignToHost(highPriority); - - ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 99); - assignToHost(lowerPriority); - - ScheduledTask lowestPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 1); - assignToHost(lowestPriority); - - ScheduledTask pendingPriority = makeTask(USER_A, JOB_A, TASK_ID_D, 98); - - expectFiltering(); - - control.replay(); - assertVictims( - runFilter(pendingPriority, NO_OFFER, highPriority, lowerPriority, lowestPriority), - lowestPriority); - } - - @Test - public void testHigherPriorityRunning() throws Exception { - schedulingFilter = createMock(SchedulingFilter.class); - ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100); - assignToHost(highPriority); - - ScheduledTask task = makeTask(USER_A, JOB_A, TASK_ID_A); - - control.replay(); - assertNoVictims(runFilter(task, NO_OFFER, highPriority)); - } - - @Test - public void testProductionPreemptingNonproduction() throws Exception { - setUpHost(); - - schedulingFilter = createMock(SchedulingFilter.class); - // Use a very low priority for the production task to show that priority is irrelevant. - ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000); - ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_B + "_a1", 100); - assignToHost(a1); - - expectFiltering(); - - control.replay(); - assertVictims(runFilter(p1, NO_OFFER, a1), a1); - } - - @Test - public void testProductionPreemptingNonproductionAcrossUsers() throws Exception { - setUpHost(); - - schedulingFilter = createMock(SchedulingFilter.class); - // Use a very low priority for the production task to show that priority is irrelevant. - ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000); - ScheduledTask a1 = makeTask(USER_B, JOB_A, TASK_ID_B + "_a1", 100); - assignToHost(a1); - - expectFiltering(); - - control.replay(); - assertVictims(runFilter(p1, NO_OFFER, a1), a1); - } - - @Test - public void testProductionUsersDoNotPreemptEachOther() throws Exception { - schedulingFilter = createMock(SchedulingFilter.class); - ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", 1000); - ScheduledTask a1 = makeProductionTask(USER_B, JOB_A, TASK_ID_B + "_a1", 0); - assignToHost(a1); - - control.replay(); - assertNoVictims(runFilter(p1, NO_OFFER, a1)); - } - - // Ensures a production task can preempt 2 tasks on the same host. - @Test - public void testProductionPreemptingManyNonProduction() throws Exception { - schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); - ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); - a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); - - ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1"); - b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); - - setUpHost(); - - assignToHost(a1); - assignToHost(b1); - - ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); - p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); - - control.replay(); - assertVictims(runFilter(p1, NO_OFFER, a1, b1), a1, b1); - } - - // Ensures we select the minimal number of tasks to preempt - @Test - public void testMinimalSetPreempted() throws Exception { - schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); - ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); - a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096); - - ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1"); - b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); - - ScheduledTask b2 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b2"); - b2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); - - setUpHost(); - - assignToHost(a1); - assignToHost(b1); - assignToHost(b2); - - ScheduledTask p1 = makeProductionTask(USER_C, JOB_C, TASK_ID_C + "_p1"); - p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); - - control.replay(); - assertVictims(runFilter(p1, NO_OFFER, b1, b2, a1), a1); - } - - // Ensures a production task *never* preempts a production task from another job. - @Test - public void testProductionJobNeverPreemptsProductionJob() throws Exception { - schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); - ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1"); - p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); - - setUpHost(); - - assignToHost(p1); - - ScheduledTask p2 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p2"); - p2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); - - control.replay(); - assertNoVictims(runFilter(p2, NO_OFFER, p1)); - } - - // Ensures that we can preempt if a task + offer can satisfy a pending task. - @Test - public void testPreemptWithOfferAndTask() throws Exception { - schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); - - setUpHost(); - - ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); - a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); - assignToHost(a1); - - ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); - p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); - - control.replay(); - assertVictims( - runFilter(p1, makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1), a1), - a1); - } - - // Ensures we can preempt if two tasks and an offer can satisfy a pending task. - @Test - public void testPreemptWithOfferAndMultipleTasks() throws Exception { - schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); - - setUpHost(); - - ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); - a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); - assignToHost(a1); - - ScheduledTask a2 = makeTask(USER_A, JOB_B, TASK_ID_A + "_a2"); - a2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); - assignToHost(a2); - - ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); - p1.getAssignedTask().getTask().setNumCpus(4).setRamMb(2048); - - control.replay(); - Optional<HostOffer> offer = - makeOffer(OFFER, 2, Amount.of(1024L, Data.MB), Amount.of(1L, Data.MB), 1); - assertVictims(runFilter(p1, offer, a1, a2), a1, a2); - } - - @Test - public void testNoPreemptionVictims() { - schedulingFilter = createMock(SchedulingFilter.class); - ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A); - - control.replay(); - - assertNoVictims(runFilter(task, NO_OFFER)); - } - - @Test - public void testMissingAttributes() { - schedulingFilter = createMock(SchedulingFilter.class); - ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A); - assignToHost(task); - - ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); - a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); - assignToHost(a1); - - expect(storageUtil.attributeStore.getHostAttributes(HOST)).andReturn(Optional.absent()); - - control.replay(); - - assertNoVictims(runFilter(task, NO_OFFER, a1)); - assertEquals(1L, statsProvider.getLongValue(MISSING_ATTRIBUTES_NAME)); - } - - @Test - public void testAllVictimsVetoed() { - schedulingFilter = createMock(SchedulingFilter.class); - ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A); - assignToHost(task); - - ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); - a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); - assignToHost(a1); - - setUpHost(); - expectFiltering(Optional.of(Veto.constraintMismatch("ban"))); - - control.replay(); - - assertNoVictims(runFilter(task, NO_OFFER, a1)); - } - - private static ImmutableSet<PreemptionVictim> preemptionVictims(ScheduledTask... tasks) { - return FluentIterable.from(ImmutableSet.copyOf(tasks)) - .transform( - new Function<ScheduledTask, PreemptionVictim>() { - @Override - public PreemptionVictim apply(ScheduledTask task) { - return PreemptionVictim.fromTask(IAssignedTask.build(task.getAssignedTask())); - } - }).toSet(); - } - - private static void assertVictims( - Optional<ImmutableSet<PreemptionVictim>> actual, - ScheduledTask... expected) { - - assertEquals(Optional.of(preemptionVictims(expected)), actual); - } - - private static void assertNoVictims(Optional<ImmutableSet<PreemptionVictim>> actual) { - assertEquals(Optional.<ImmutableSet<PreemptionVictim>>absent(), actual); - } - - private Optional<HostOffer> makeOffer( - String offerId, - double cpu, - Amount<Long, Data> ram, - Amount<Long, Data> disk, - int numPorts) { - - List<Resource> resources = new Resources(cpu, ram, disk, numPorts).toResourceList(); - Offer.Builder builder = Offer.newBuilder(); - builder.getIdBuilder().setValue(offerId); - builder.getFrameworkIdBuilder().setValue("framework-id"); - builder.getSlaveIdBuilder().setValue(SLAVE_ID); - builder.setHostname(HOST); - for (Resource r: resources) { - builder.addResources(r); - } - return Optional.of(new HostOffer( - builder.build(), - IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)))); - } - - private IExpectationSetters<Set<SchedulingFilter.Veto>> expectFiltering() { - return expectFiltering(Optional.absent()); - } - - private IExpectationSetters<Set<SchedulingFilter.Veto>> expectFiltering( - final Optional<Veto> veto) { - - return expect(schedulingFilter.filter( - EasyMock.anyObject(), - EasyMock.anyObject())) - .andAnswer( - new IAnswer<Set<SchedulingFilter.Veto>>() { - @Override - public Set<SchedulingFilter.Veto> answer() { - return veto.asSet(); - } - }); - } - - static ScheduledTask makeTask( - String role, - String job, - String taskId, - int priority, - String env, - boolean production) { - - AssignedTask assignedTask = new AssignedTask() - .setTaskId(taskId) - .setTask(new TaskConfig() - .setJob(new JobKey(role, env, job)) - .setPriority(priority) - .setProduction(production) - .setJobName(job) - .setEnvironment(env) - .setConstraints(new HashSet<Constraint>())); - return new ScheduledTask().setAssignedTask(assignedTask); - } - - static ScheduledTask makeTask(String role, String job, String taskId) { - return makeTask(role, job, taskId, 0, "dev", false); - } - - static void addEvent(ScheduledTask task, ScheduleStatus status) { - task.addToTaskEvents(new TaskEvent(0, status)); - } - - private ScheduledTask makeProductionTask(String role, String job, String taskId) { - return makeTask(role, job, taskId, 0, "prod", true); - } - - private ScheduledTask makeProductionTask(String role, String job, String taskId, int priority) { - return makeTask(role, job, taskId, priority, "prod", true); - } - - private ScheduledTask makeTask(String role, String job, String taskId, int priority) { - return makeTask(role, job, taskId, priority, "dev", false); - } - - private void assignToHost(ScheduledTask task) { - task.setStatus(RUNNING); - addEvent(task, RUNNING); - task.getAssignedTask().setSlaveHost(HOST); - task.getAssignedTask().setSlaveId(SLAVE_ID); - } - - private Attribute host(String host) { - return new Attribute(HOST_ATTRIBUTE, ImmutableSet.of(host)); - } - - private Attribute rack(String rack) { - return new Attribute(RACK_ATTRIBUTE, ImmutableSet.of(rack)); - } - - // Sets up a normal host, no dedicated hosts and no maintenance. - private void setUpHost() { - IHostAttributes hostAttrs = IHostAttributes.build( - new HostAttributes().setHost(HOST).setSlaveId(HOST + "_id") - .setMode(NONE).setAttributes(ImmutableSet.of(rack(RACK), host(RACK)))); - - expect(storageUtil.attributeStore.getHostAttributes(HOST)) - .andReturn(Optional.of(hostAttrs)).anyTimes(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimTest.java deleted file mode 100644 index bb93b63..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.gen.JobKey; -import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - -public class PreemptionVictimTest { - - @Test - public void testBeanMethods() { - PreemptionVictim a = makeVictim("a"); - PreemptionVictim a1 = makeVictim("a"); - PreemptionVictim b = makeVictim("b"); - assertEquals(a, a1); - assertEquals(a.hashCode(), a1.hashCode()); - assertEquals(a.toString(), a1.toString()); - assertNotEquals(a, b); - assertNotEquals(a.toString(), b.toString()); - assertEquals(ImmutableSet.of(a, b), ImmutableSet.of(a, a1, b)); - } - - private PreemptionVictim makeVictim(String taskId) { - return PreemptionVictim.fromTask(IAssignedTask.build(new AssignedTask() - .setTaskId(taskId) - .setSlaveId(taskId + "slave") - .setSlaveHost(taskId + "host") - .setTask(new TaskConfig().setJob(new JobKey("role", "env", "job"))))); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java deleted file mode 100644 index d36499f..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import java.util.Set; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; -import com.twitter.common.testing.easymock.EasyMockTest; - -import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.JobKey; -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.gen.TaskEvent; -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.async.OfferManager; -import org.apache.aurora.scheduler.async.preemptor.Preemptor.PreemptorImpl; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.state.StateChangeResult; -import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.stats.CachedCounters; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import org.apache.aurora.scheduler.testing.FakeStatsProvider; -import org.apache.mesos.Protos; -import org.easymock.EasyMock; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotValidationStatName; -import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.successStatName; -import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertEquals; - -public class PreemptorImplTest extends EasyMockTest { - private static final String SLAVE_ID = "slave_id"; - private static final IScheduledTask TASK = IScheduledTask.build(makeTask()); - private static final PreemptionProposal PROPOSAL = createPreemptionProposal(TASK); - private static final TaskGroupKey GROUP_KEY = - TaskGroupKey.from(ITaskConfig.build(makeTask().getAssignedTask().getTask())); - - private static final Set<PreemptionProposal> NO_SLOTS = ImmutableSet.of(); - private static final Optional<String> EMPTY_RESULT = Optional.absent(); - private static final HostOffer OFFER = - new HostOffer(Protos.Offer.getDefaultInstance(), IHostAttributes.build(new HostAttributes())); - - private StateManager stateManager; - private FakeStatsProvider statsProvider; - private PreemptionVictimFilter preemptionVictimFilter; - private PreemptorImpl preemptor; - private BiCache<PreemptionProposal, TaskGroupKey> slotCache; - private Storage.MutableStoreProvider storeProvider; - - @Before - public void setUp() { - storeProvider = createMock(Storage.MutableStoreProvider.class); - stateManager = createMock(StateManager.class); - preemptionVictimFilter = createMock(PreemptionVictimFilter.class); - slotCache = createMock(new Clazz<BiCache<PreemptionProposal, TaskGroupKey>>() { }); - statsProvider = new FakeStatsProvider(); - OfferManager offerManager = createMock(OfferManager.class); - expect(offerManager.getOffer(anyObject(Protos.SlaveID.class))) - .andReturn(Optional.of(OFFER)) - .anyTimes(); - - preemptor = new PreemptorImpl( - stateManager, - offerManager, - preemptionVictimFilter, - new PreemptorMetrics(new CachedCounters(statsProvider)), - slotCache); - } - - @Test - public void testPreemptTasksSuccessful() throws Exception { - expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL)); - slotCache.remove(PROPOSAL, GROUP_KEY); - expectSlotValidation(PROPOSAL, Optional.of(ImmutableSet.of( - PreemptionVictim.fromTask(TASK.getAssignedTask())))); - - expectPreempted(TASK); - - control.replay(); - - assertEquals(Optional.of(SLAVE_ID), callPreemptor()); - assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true))); - assertEquals(1L, statsProvider.getLongValue(successStatName(true))); - } - - @Test - public void testPreemptTasksValidationFailed() throws Exception { - expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL)); - slotCache.remove(PROPOSAL, GROUP_KEY); - expectSlotValidation(PROPOSAL, Optional.absent()); - - control.replay(); - - assertEquals(EMPTY_RESULT, callPreemptor()); - assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(false))); - assertEquals(0L, statsProvider.getLongValue(successStatName(true))); - } - - @Test - public void testNoCachedSlot() throws Exception { - expect(slotCache.getByValue(GROUP_KEY)).andReturn(NO_SLOTS); - - control.replay(); - - assertEquals(EMPTY_RESULT, callPreemptor()); - assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(false))); - assertEquals(0L, statsProvider.getLongValue(successStatName(true))); - } - - private Optional<String> callPreemptor() { - return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), EMPTY, storeProvider); - } - - private void expectSlotValidation( - PreemptionProposal slot, - Optional<ImmutableSet<PreemptionVictim>> victims) { - - expect(preemptionVictimFilter.filterPreemptionVictims( - TASK.getAssignedTask().getTask(), - slot.getVictims(), - EMPTY, - Optional.of(OFFER), - storeProvider)).andReturn(victims); - } - - private void expectPreempted(IScheduledTask preempted) throws Exception { - expect(stateManager.changeState( - anyObject(Storage.MutableStoreProvider.class), - eq(Tasks.id(preempted)), - eq(Optional.absent()), - eq(ScheduleStatus.PREEMPTING), - EasyMock.anyObject())) - .andReturn(StateChangeResult.SUCCESS); - } - - private static PreemptionProposal createPreemptionProposal(IScheduledTask task) { - IAssignedTask assigned = task.getAssignedTask(); - return new PreemptionProposal(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), SLAVE_ID); - } - - private static ScheduledTask makeTask() { - ScheduledTask task = new ScheduledTask() - .setAssignedTask(new AssignedTask() - .setTask(new TaskConfig() - .setPriority(1) - .setProduction(true) - .setJob(new JobKey("role", "env", "name")))); - task.addToTaskEvents(new TaskEvent(0, PENDING)); - return task; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java deleted file mode 100644 index 2c20571..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import com.google.common.base.Optional; -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Key; -import com.google.inject.Module; -import com.twitter.common.application.StartupStage; -import com.twitter.common.application.modules.LifecycleModule; -import com.twitter.common.base.ExceptionalCommand; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.testing.easymock.EasyMockTest; - -import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.scheduler.filter.AttributeAggregate; -import org.apache.aurora.scheduler.filter.SchedulingFilter; -import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.state.TaskAssigner; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class PreemptorModuleTest extends EasyMockTest { - - private StorageTestUtil storageUtil; - - @Before - public void setUp() { - storageUtil = new StorageTestUtil(this); - storageUtil.expectOperations(); - } - - private Injector createInjector(Module module) { - return Guice.createInjector( - module, - new LifecycleModule(), - new AbstractModule() { - private <T> void bindMock(Class<T> clazz) { - bind(clazz).toInstance(createMock(clazz)); - } - - @Override - protected void configure() { - bindMock(SchedulingFilter.class); - bindMock(StateManager.class); - bindMock(TaskAssigner.class); - bindMock(Thread.UncaughtExceptionHandler.class); - bind(Storage.class).toInstance(storageUtil.storage); - } - }); - } - - @Test - public void testPreemptorDisabled() throws Exception { - Injector injector = createInjector(new PreemptorModule( - false, - Amount.of(0L, Time.SECONDS), - Amount.of(0L, Time.SECONDS))); - - control.replay(); - - injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute(); - - injector.getBindings(); - assertEquals( - Optional.absent(), - injector.getInstance(Preemptor.class).attemptPreemptionFor( - IAssignedTask.build(new AssignedTask()), - AttributeAggregate.EMPTY, - storageUtil.mutableStoreProvider)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java b/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java index 0d9aeff..91b91bc 100644 --- a/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java @@ -47,12 +47,12 @@ import com.twitter.thrift.ServiceInstance; import org.apache.aurora.gen.ServerInfo; import org.apache.aurora.scheduler.SchedulerServicesModule; -import org.apache.aurora.scheduler.async.OfferManager; -import org.apache.aurora.scheduler.async.RescheduleCalculator; -import org.apache.aurora.scheduler.async.TaskGroups.TaskGroupsSettings; -import org.apache.aurora.scheduler.async.TaskScheduler; import org.apache.aurora.scheduler.cron.CronJobManager; import org.apache.aurora.scheduler.http.api.GsonMessageBodyHandler; +import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.scheduling.RescheduleCalculator; +import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupsSettings; +import org.apache.aurora.scheduler.scheduling.TaskScheduler; import org.apache.aurora.scheduler.state.LockManager; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IServerInfo; http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java index 49af15b..050a654 100644 --- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java @@ -32,13 +32,13 @@ import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.TaskStatusHandler; -import org.apache.aurora.scheduler.async.OfferManager; import org.apache.aurora.scheduler.base.Conversions; import org.apache.aurora.scheduler.base.SchedulerException; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStatusReceived; +import org.apache.aurora.scheduler.offers.OfferManager; import org.apache.aurora.scheduler.stats.CachedCounters; import org.apache.aurora.scheduler.storage.Storage.StorageException; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java new file mode 100644 index 0000000..04be32e --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java @@ -0,0 +1,234 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.offers; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.logging.Level; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.testing.TearDown; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.testing.easymock.EasyMockTest; + +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.JobKey; +import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; +import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; +import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl; +import org.apache.aurora.scheduler.offers.OfferManager.OfferReturnDelay; +import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; +import org.apache.mesos.Protos.TaskInfo; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.MaintenanceMode.DRAINING; +import static org.apache.aurora.gen.MaintenanceMode.NONE; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class OfferManagerImplTest extends EasyMockTest { + + private static final Amount<Long, Time> RETURN_DELAY = Amount.of(1L, Time.DAYS); + private static final String HOST_A = "HOST_A"; + private static final HostOffer OFFER_A = new HostOffer( + Offers.makeOffer("OFFER_A", HOST_A), + IHostAttributes.build(new HostAttributes().setMode(NONE))); + private static final String HOST_B = "HOST_B"; + private static final HostOffer OFFER_B = new HostOffer( + Offers.makeOffer("OFFER_B", HOST_B), + IHostAttributes.build(new HostAttributes().setMode(NONE))); + private static final String HOST_C = "HOST_C"; + private static final HostOffer OFFER_C = new HostOffer( + Offers.makeOffer("OFFER_C", HOST_C), + IHostAttributes.build(new HostAttributes().setMode(NONE))); + private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from( + ITaskConfig.build(new TaskConfig().setJob(new JobKey("role", "env", "name")))); + + private Driver driver; + private FakeScheduledExecutor clock; + private Function<HostOffer, Assignment> offerAcceptor; + private OfferManagerImpl offerManager; + + @Before + public void setUp() { + offerManager.LOG.setLevel(Level.FINE); + addTearDown(new TearDown() { + @Override + public void tearDown() throws Exception { + offerManager.LOG.setLevel(Level.INFO); + } + }); + driver = createMock(Driver.class); + ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class); + clock = FakeScheduledExecutor.scheduleExecutor(executorMock); + + addTearDown(new TearDown() { + @Override + public void tearDown() throws Exception { + clock.assertEmpty(); + } + }); + offerAcceptor = createMock(new Clazz<Function<HostOffer, Assignment>>() { }); + OfferReturnDelay returnDelay = new OfferReturnDelay() { + @Override + public Amount<Long, Time> get() { + return RETURN_DELAY; + } + }; + offerManager = new OfferManagerImpl(driver, returnDelay, executorMock); + } + + @Test + public void testOffersSorted() throws Exception { + // Ensures that non-DRAINING offers are preferred - the DRAINING offer would be tried last. + + HostOffer offerA = setMode(OFFER_A, DRAINING); + HostOffer offerC = setMode(OFFER_C, DRAINING); + + TaskInfo task = TaskInfo.getDefaultInstance(); + expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task)); + driver.launchTask(OFFER_B.getOffer().getId(), task); + + driver.declineOffer(offerA.getOffer().getId()); + driver.declineOffer(offerC.getOffer().getId()); + + control.replay(); + + offerManager.addOffer(offerA); + offerManager.addOffer(OFFER_B); + offerManager.addOffer(offerC); + assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); + clock.advance(RETURN_DELAY); + } + + @Test + public void testGetOffersReturnsAllOffers() throws Exception { + expect(offerAcceptor.apply(OFFER_A)) + .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied")))); + + control.replay(); + + offerManager.addOffer(OFFER_A); + assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); + assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); + + offerManager.cancelOffer(OFFER_A.getOffer().getId()); + assertTrue(Iterables.isEmpty(offerManager.getOffers())); + + clock.advance(RETURN_DELAY); + } + + @Test + public void testOfferFilteringDueToStaticBan() throws Exception { + expect(offerAcceptor.apply(OFFER_A)) + .andReturn(Assignment.failure(ImmutableSet.of(Veto.constraintMismatch("denied")))); + + TaskInfo task = TaskInfo.getDefaultInstance(); + expect(offerAcceptor.apply(OFFER_B)).andReturn(Assignment.success(task)); + driver.launchTask(OFFER_B.getOffer().getId(), task); + + driver.declineOffer(OFFER_A.getOffer().getId()); + + control.replay(); + + offerManager.addOffer(OFFER_A); + assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); + // Run again to make sure all offers are banned (via no expectations set). + assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); + + // Add a new offer to accept the task previously banned for OFFER_A. + offerManager.addOffer(OFFER_B); + assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); + + clock.advance(RETURN_DELAY); + } + + @Test + public void testStaticBanIsCleared() throws Exception { + expect(offerAcceptor.apply(OFFER_A)) + .andReturn(Assignment.failure(ImmutableSet.of(Veto.insufficientResources("ram", 100)))); + + TaskInfo task = TaskInfo.getDefaultInstance(); + expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task)); + driver.launchTask(OFFER_A.getOffer().getId(), task); + + expect(offerAcceptor.apply(OFFER_A)) + .andReturn(Assignment.failure(ImmutableSet.of(Veto.maintenance("draining")))); + + expect(offerAcceptor.apply(OFFER_A)).andReturn(Assignment.success(task)); + driver.launchTask(OFFER_A.getOffer().getId(), task); + + driver.declineOffer(OFFER_A.getOffer().getId()); + + control.replay(); + + offerManager.addOffer(OFFER_A); + assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); + + // Make sure the static ban is cleared when the offers are returned. + clock.advance(RETURN_DELAY); + offerManager.addOffer(OFFER_A); + assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); + + offerManager.addOffer(OFFER_A); + assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); + + // Make sure the static ban is cleared when driver is disconnected. + offerManager.driverDisconnected(new DriverDisconnected()); + offerManager.addOffer(OFFER_A); + assertTrue(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); + + clock.advance(RETURN_DELAY); + } + + @Test + public void testFlushOffers() throws Exception { + control.replay(); + + offerManager.addOffer(OFFER_A); + offerManager.addOffer(OFFER_B); + offerManager.driverDisconnected(new DriverDisconnected()); + assertFalse(offerManager.launchFirst(offerAcceptor, GROUP_KEY)); + clock.advance(RETURN_DELAY); + } + + @Test + public void testDeclineOffer() throws Exception { + driver.declineOffer(OFFER_A.getOffer().getId()); + + control.replay(); + + offerManager.addOffer(OFFER_A); + clock.advance(RETURN_DELAY); + } + + private static HostOffer setMode(HostOffer offer, MaintenanceMode mode) { + return new HostOffer( + offer.getOffer(), + IHostAttributes.build(offer.getAttributes().newBuilder().setMode(mode))); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/offers/Offers.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/offers/Offers.java b/src/test/java/org/apache/aurora/scheduler/offers/Offers.java new file mode 100644 index 0000000..c0899b0 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/offers/Offers.java @@ -0,0 +1,43 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.offers; + +import org.apache.mesos.Protos.FrameworkID; +import org.apache.mesos.Protos.Offer; +import org.apache.mesos.Protos.OfferID; +import org.apache.mesos.Protos.SlaveID; + +/** + * Utility class for creating resource offers in unit tests. + */ +public final class Offers { + private Offers() { + // Utility class. + } + + public static final String DEFAULT_HOST = "hostname"; + + public static Offer makeOffer(String offerId) { + return Offers.makeOffer(offerId, DEFAULT_HOST); + } + + public static Offer makeOffer(String offerId, String hostName) { + return Offer.newBuilder() + .setId(OfferID.newBuilder().setValue(offerId)) + .setFrameworkId(FrameworkID.newBuilder().setValue("framework_id")) + .setSlaveId(SlaveID.newBuilder().setValue("slave_id-" + offerId)) + .setHostname(hostName) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/offers/RandomJitterReturnDelayTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/offers/RandomJitterReturnDelayTest.java b/src/test/java/org/apache/aurora/scheduler/offers/RandomJitterReturnDelayTest.java new file mode 100644 index 0000000..be21cd6 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/offers/RandomJitterReturnDelayTest.java @@ -0,0 +1,77 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.offers; + +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.util.Random; + +import org.junit.Test; + +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; + +public class RandomJitterReturnDelayTest extends EasyMockTest { + private void assertRandomJitterReturnDelay( + int minHoldTimeMs, + int jitterWindowMs, + boolean shouldThrow) { + + int randomValue = 123; + + Random mockRandom = control.createMock(Random.class); + + if (!shouldThrow) { + expect(mockRandom.nextInt(jitterWindowMs)).andReturn(randomValue); + } + + control.replay(); + + assertEquals( + minHoldTimeMs + randomValue, + new RandomJitterReturnDelay( + minHoldTimeMs, + jitterWindowMs, + mockRandom).get().getValue().intValue()); + } + + @Test + public void testRandomJitterReturnDelay() throws Exception { + assertRandomJitterReturnDelay(100, 200, false); + } + + @Test(expected = IllegalArgumentException.class) + public void testNegativeHoldTimeThrowsIllegalArgumentException() throws Exception { + assertRandomJitterReturnDelay(-1, 200, true); + } + + @Test(expected = IllegalArgumentException.class) + public void testNegativeWindowThrowsIllegalArgumentException() throws Exception { + assertRandomJitterReturnDelay(100, -1, true); + } + + @Test + public void testZeroHoldTime() throws Exception { + assertRandomJitterReturnDelay(0, 200, false); + } + + @Test + public void testZeroWindow() throws Exception { + assertRandomJitterReturnDelay(100, 0, false); + } + + @Test + public void testZeroHoldTimeZeroWindow() throws Exception { + assertRandomJitterReturnDelay(0, 0, false); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java new file mode 100644 index 0000000..7312091 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/BiCacheTest.java @@ -0,0 +1,107 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.testing.FakeClock; + +import org.apache.aurora.scheduler.preemptor.BiCache.BiCacheSettings; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class BiCacheTest { + private static final Amount<Long, Time> HOLD_DURATION = Amount.of(1L, Time.MINUTES); + private static final String STAT_NAME = "cache_size_stat"; + private static final String KEY_1 = "Key 1"; + private static final String KEY_2 = "Key 2"; + private static final Optional<Integer> NO_VALUE = Optional.absent(); + + private FakeStatsProvider statsProvider; + private FakeClock clock; + private BiCache<String, Integer> biCache; + + @Before + public void setUp() { + statsProvider = new FakeStatsProvider(); + clock = new FakeClock(); + biCache = new BiCache<>(statsProvider, new BiCacheSettings(HOLD_DURATION, STAT_NAME), clock); + } + + @Test + public void testExpiration() { + biCache.put(KEY_1, 1); + assertEquals(Optional.of(1), biCache.get(KEY_1)); + assertEquals(1L, statsProvider.getLongValue(STAT_NAME)); + + clock.advance(HOLD_DURATION); + + assertEquals(NO_VALUE, biCache.get(KEY_1)); + assertEquals(ImmutableSet.of(), biCache.getByValue(1)); + assertEquals(0L, statsProvider.getLongValue(STAT_NAME)); + } + + @Test + public void testRemoval() { + biCache.put(KEY_1, 1); + assertEquals(1L, statsProvider.getLongValue(STAT_NAME)); + assertEquals(Optional.of(1), biCache.get(KEY_1)); + biCache.remove(KEY_1, 1); + assertEquals(NO_VALUE, biCache.get(KEY_1)); + assertEquals(0L, statsProvider.getLongValue(STAT_NAME)); + } + + @Test(expected = NullPointerException.class) + public void testRemovalWithNullKey() { + biCache.remove(null, 1); + } + + @Test + public void testDifferentKeysIdenticalValues() { + biCache.put(KEY_1, 1); + biCache.put(KEY_2, 1); + assertEquals(2L, statsProvider.getLongValue(STAT_NAME)); + + assertEquals(Optional.of(1), biCache.get(KEY_1)); + assertEquals(Optional.of(1), biCache.get(KEY_2)); + assertEquals(ImmutableSet.of(KEY_1, KEY_2), biCache.getByValue(1)); + + biCache.remove(KEY_1, 1); + assertEquals(NO_VALUE, biCache.get(KEY_1)); + assertEquals(Optional.of(1), biCache.get(KEY_2)); + assertEquals(ImmutableSet.of(KEY_2), biCache.getByValue(1)); + assertEquals(1L, statsProvider.getLongValue(STAT_NAME)); + + clock.advance(HOLD_DURATION); + assertEquals(NO_VALUE, biCache.get(KEY_1)); + assertEquals(NO_VALUE, biCache.get(KEY_2)); + assertEquals(ImmutableSet.of(), biCache.getByValue(1)); + assertEquals(0L, statsProvider.getLongValue(STAT_NAME)); + } + + @Test + public void testIdenticalKeysDifferentValues() { + biCache.put(KEY_1, 1); + biCache.put(KEY_1, 2); + assertEquals(Optional.of(2), biCache.get(KEY_1)); + assertEquals(ImmutableSet.of(), biCache.getByValue(1)); + assertEquals(ImmutableSet.of(KEY_1), biCache.getByValue(2)); + assertEquals(1L, statsProvider.getLongValue(STAT_NAME)); + } +}
