Repository: aurora Updated Branches: refs/heads/master 6cab9c51f -> a45a8ee1a
Adding missing coverage in TaskGroups Reviewed at https://reviews.apache.org/r/36994/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/a45a8ee1 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/a45a8ee1 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/a45a8ee1 Branch: refs/heads/master Commit: a45a8ee1ae3d007a86978e71864de945753e1630 Parents: 6cab9c5 Author: Maxim Khutornenko <[email protected]> Authored: Fri Jul 31 15:55:33 2015 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Fri Jul 31 15:55:33 2015 -0700 ---------------------------------------------------------------------- .../scheduler/scheduling/TaskGroupsTest.java | 113 +++++++++++-------- 1 file changed, 67 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/a45a8ee1/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java index 10de372..55aad35 100644 --- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java @@ -14,7 +14,6 @@ package org.apache.aurora.scheduler.scheduling; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.RateLimiter; @@ -24,89 +23,68 @@ import com.twitter.common.testing.easymock.EasyMockTest; import com.twitter.common.util.BackoffStrategy; import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.gen.Identity; +import org.apache.aurora.gen.JobKey; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted; +import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.easymock.Capture; -import org.easymock.EasyMock; +import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; import org.easymock.IAnswer; import org.junit.Before; import org.junit.Test; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - +import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED; import static org.apache.aurora.gen.ScheduleStatus.INIT; import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; public class TaskGroupsTest extends EasyMockTest { + private static final Amount<Long, Time> FIRST_SCHEDULE_DELAY = Amount.of(1L, Time.MILLISECONDS); + private static final Amount<Long, Time> RESCHEDULE_DELAY = FIRST_SCHEDULE_DELAY; + private static final IJobKey JOB_A = IJobKey.build(new JobKey("role", "test", "jobA")); + private static final String TASK_A_ID = "a"; - private static final long FIRST_SCHEDULE_DELAY_MS = 1L; - - private ScheduledExecutorService executor; private BackoffStrategy backoffStrategy; private TaskScheduler taskScheduler; private RateLimiter rateLimiter; - + private FakeScheduledExecutor clock; + private RescheduleCalculator rescheduleCalculator; private TaskGroups taskGroups; @Before public void setUp() throws Exception { - executor = createMock(ScheduledExecutorService.class); + ScheduledExecutorService executor = createMock(ScheduledExecutorService.class); + clock = FakeScheduledExecutor.scheduleExecutor(executor); backoffStrategy = createMock(BackoffStrategy.class); taskScheduler = createMock(TaskScheduler.class); rateLimiter = createMock(RateLimiter.class); + rescheduleCalculator = createMock(RescheduleCalculator.class); taskGroups = new TaskGroups( executor, - Amount.of(FIRST_SCHEDULE_DELAY_MS, Time.MILLISECONDS), + FIRST_SCHEDULE_DELAY, backoffStrategy, rateLimiter, taskScheduler, - createMock(RescheduleCalculator.class)); + rescheduleCalculator); } @Test public void testEvaluatedAfterFirstSchedulePenalty() { - executor.schedule( - EasyMock.<Runnable>anyObject(), - EasyMock.eq(FIRST_SCHEDULE_DELAY_MS), - EasyMock.eq(MILLISECONDS)); - expectLastCall().andAnswer(new IAnswer<ScheduledFuture<Void>>() { - @Override - public ScheduledFuture<Void> answer() { - ((Runnable) EasyMock.getCurrentArguments()[0]).run(); - return null; - } - }); expect(rateLimiter.acquire()).andReturn(0D); - expect(taskScheduler.schedule("a")).andReturn(true); + expect(taskScheduler.schedule(TASK_A_ID)).andReturn(true); control.replay(); - taskGroups.taskChangedState(TaskStateChange.transition(makeTask("a"), INIT)); - } - - private Capture<Runnable> expectEvaluate() { - Capture<Runnable> capture = createCapture(); - executor.schedule( - EasyMock.capture(capture), - EasyMock.eq(FIRST_SCHEDULE_DELAY_MS), - EasyMock.eq(MILLISECONDS)); - expectLastCall().andReturn(null); - return capture; + taskGroups.taskChangedState(TaskStateChange.transition(makeTask(TASK_A_ID), INIT)); + clock.advance(FIRST_SCHEDULE_DELAY); } @Test public void testTaskDeletedBeforeEvaluating() { - final IScheduledTask task = makeTask("a"); - - Capture<Runnable> evaluate = expectEvaluate(); - + final IScheduledTask task = makeTask(TASK_A_ID); expect(rateLimiter.acquire()).andReturn(0D); expect(taskScheduler.schedule(Tasks.id(task))).andAnswer(new IAnswer<Boolean>() { @Override @@ -119,22 +97,65 @@ public class TaskGroupsTest extends EasyMockTest { return false; } }); - expect(backoffStrategy.calculateBackoffMs(FIRST_SCHEDULE_DELAY_MS)).andReturn(0L); + expect(backoffStrategy.calculateBackoffMs(FIRST_SCHEDULE_DELAY.as(Time.MILLISECONDS))) + .andReturn(0L); control.replay(); taskGroups.taskChangedState(TaskStateChange.transition(makeTask(Tasks.id(task)), INIT)); - evaluate.getValue().run(); + clock.advance(FIRST_SCHEDULE_DELAY); + } + + @Test + public void testEvaluatedOnStartup() { + expect(rateLimiter.acquire()).andReturn(0D); + expect(rescheduleCalculator.getStartupScheduleDelayMs(makeTask(TASK_A_ID))).andReturn(1L); + expect(taskScheduler.schedule(TASK_A_ID)).andReturn(true); + + control.replay(); + + taskGroups.taskChangedState(TaskStateChange.initialized(makeTask(TASK_A_ID))); + clock.advance(FIRST_SCHEDULE_DELAY); + clock.advance(RESCHEDULE_DELAY); + } + + @Test + public void testResistStarvation() { + expect(rateLimiter.acquire()).andReturn(0D).times(2); + expect(taskScheduler.schedule("a0")).andReturn(true); + expect(taskScheduler.schedule("b0")).andReturn(true); + + control.replay(); + + taskGroups.taskChangedState(TaskStateChange.transition(makeTask(JOB_A, "a0", 0), INIT)); + taskGroups.taskChangedState(TaskStateChange.transition(makeTask(JOB_A, "a1", 1), INIT)); + taskGroups.taskChangedState(TaskStateChange.transition(makeTask(JOB_A, "a2", 2), INIT)); + taskGroups.taskChangedState(TaskStateChange.transition( + makeTask(IJobKey.build(JOB_A.newBuilder().setName("jobB")), "b0", 0), INIT)); + + clock.advance(FIRST_SCHEDULE_DELAY); + } + + @Test + public void testNonPendingIgnored() { + control.replay(); + + IScheduledTask task = + IScheduledTask.build(makeTask(TASK_A_ID).newBuilder().setStatus(ASSIGNED)); + taskGroups.taskChangedState(TaskStateChange.initialized(task)); } private static IScheduledTask makeTask(String id) { + return makeTask(JOB_A, id, 0); + } + + private static IScheduledTask makeTask(IJobKey jobKey, String id, int instanceId) { return IScheduledTask.build(new ScheduledTask() .setStatus(ScheduleStatus.PENDING) .setAssignedTask(new AssignedTask() + .setInstanceId(instanceId) .setTaskId(id) .setTask(new TaskConfig() - .setOwner(new Identity("owner", "owner")) - .setEnvironment("test") - .setJobName("job")))); + .setJob(jobKey.newBuilder())))); } }
