Repository: aurora Updated Branches: refs/heads/master e36e07720 -> 75129b694
AURORA-1876 Expose stats on scheduler rate limiter This patch exposes stats on `rateLimiter.acquire()` blocking events in `TaskGroups`. Hence, providing visibility into whether scheduling rate is above/below `MAX_SCHEDULE_ATTEMPTS_PER_SEC`. Bugs closed: AURORA-1876 Reviewed at https://reviews.apache.org/r/55471/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/75129b69 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/75129b69 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/75129b69 Branch: refs/heads/master Commit: 75129b694dabde5cfd673c6760dc4fa22e6314fc Parents: e36e077 Author: Mehrdad Nurolahzade <[email protected]> Authored: Mon Jan 23 14:58:19 2017 -0600 Committer: Joshua Cohen <[email protected]> Committed: Mon Jan 23 14:58:19 2017 -0600 ---------------------------------------------------------------------- .../aurora/scheduler/scheduling/TaskGroups.java | 13 +++++++++++-- .../scheduler/scheduling/TaskGroupsTest.java | 17 +++++++++++++---- 2 files changed, 24 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/75129b69/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java index 2d548b0..cea8d0f 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java @@ -19,6 +19,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; import javax.inject.Qualifier; @@ -68,6 +69,9 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING; */ public class TaskGroups implements EventSubscriber { + @VisibleForTesting + static final String SCHEDULE_ATTEMPTS_BLOCKS = "schedule_attempts_blocks"; + private final ConcurrentMap<TaskGroupKey, TaskGroup> groups = Maps.newConcurrentMap(); private final DelayExecutor executor; private final TaskGroupsSettings settings; @@ -79,6 +83,7 @@ public class TaskGroups implements EventSubscriber { // may influence the selection of a different backoff strategy. private final SlidingStats scheduledTaskPenalties = new SlidingStats("scheduled_task_penalty", "ms"); + private final AtomicLong scheduleAttemptsBlocks; /** * Annotation for the max scheduling batch size. @@ -133,13 +138,15 @@ public class TaskGroups implements EventSubscriber { TaskGroupsSettings settings, TaskScheduler taskScheduler, RescheduleCalculator rescheduleCalculator, - TaskGroupBatchWorker batchWorker) { + TaskGroupBatchWorker batchWorker, + StatsProvider statsProvider) { this.executor = requireNonNull(executor); this.settings = requireNonNull(settings); this.taskScheduler = requireNonNull(taskScheduler); this.rescheduleCalculator = requireNonNull(rescheduleCalculator); this.batchWorker = requireNonNull(batchWorker); + this.scheduleAttemptsBlocks = statsProvider.makeCounter(SCHEDULE_ATTEMPTS_BLOCKS); } private synchronized void evaluateGroupLater(Runnable evaluate, TaskGroup group) { @@ -159,7 +166,9 @@ public class TaskGroups implements EventSubscriber { final Set<String> taskIds = group.peek(settings.maxTasksPerSchedule); long penaltyMs = 0; if (!taskIds.isEmpty()) { - settings.rateLimiter.acquire(); + if (settings.rateLimiter.acquire() > 0) { + scheduleAttemptsBlocks.incrementAndGet(); + } CompletableFuture<Set<String>> result = batchWorker.execute(storeProvider -> taskScheduler.schedule(storeProvider, taskIds)); http://git-wip-us.apache.org/repos/asf/aurora/blob/75129b69/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java index 566e0d9..b88d5f1 100644 --- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java @@ -37,6 +37,7 @@ import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.junit.Before; import org.junit.Test; @@ -46,6 +47,7 @@ import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExe import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; public class TaskGroupsTest extends EasyMockTest { private static final Amount<Long, Time> FIRST_SCHEDULE_DELAY = Amount.of(1L, Time.MILLISECONDS); @@ -62,6 +64,7 @@ public class TaskGroupsTest extends EasyMockTest { private TaskGroups taskGroups; private TaskGroupBatchWorker batchWorker; private StorageTestUtil storageUtil; + private FakeStatsProvider statsProvider; @Before public void setUp() throws Exception { @@ -74,12 +77,14 @@ public class TaskGroupsTest extends EasyMockTest { rateLimiter = createMock(RateLimiter.class); rescheduleCalculator = createMock(RescheduleCalculator.class); batchWorker = createMock(TaskGroupBatchWorker.class); + statsProvider = new FakeStatsProvider(); taskGroups = new TaskGroups( executor, new TaskGroupsSettings(FIRST_SCHEDULE_DELAY, backoffStrategy, rateLimiter, 2), taskScheduler, rescheduleCalculator, - batchWorker); + batchWorker, + statsProvider); } @Test @@ -94,12 +99,13 @@ public class TaskGroupsTest extends EasyMockTest { taskGroups.taskChangedState(TaskStateChange.transition(makeTask(TASK_A_ID), INIT)); clock.advance(FIRST_SCHEDULE_DELAY); + assertEquals(0L, statsProvider.getLongValue(TaskGroups.SCHEDULE_ATTEMPTS_BLOCKS)); } @Test public void testTaskDeletedBeforeEvaluating() throws Exception { final IScheduledTask task = makeTask(TASK_A_ID); - expect(rateLimiter.acquire()).andReturn(0D); + expect(rateLimiter.acquire()).andReturn(0.5D); expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of(TASK_A_ID)))) .andAnswer(() -> { // Test a corner case where a task is deleted while it is being evaluated by the task @@ -118,11 +124,12 @@ public class TaskGroupsTest extends EasyMockTest { taskGroups.taskChangedState(TaskStateChange.transition(makeTask(Tasks.id(task)), INIT)); clock.advance(FIRST_SCHEDULE_DELAY); + assertEquals(1L, statsProvider.getLongValue(TaskGroups.SCHEDULE_ATTEMPTS_BLOCKS)); } @Test public void testEvaluatedOnStartup() throws Exception { - expect(rateLimiter.acquire()).andReturn(0D); + expect(rateLimiter.acquire()).andReturn(0.000000001D); expect(rescheduleCalculator.getStartupScheduleDelayMs(makeTask(TASK_A_ID))).andReturn(1L); expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of(TASK_A_ID)))) .andReturn(ImmutableSet.of(TASK_A_ID)); @@ -134,11 +141,12 @@ public class TaskGroupsTest extends EasyMockTest { taskGroups.taskChangedState(TaskStateChange.initialized(makeTask(TASK_A_ID))); clock.advance(FIRST_SCHEDULE_DELAY); clock.advance(RESCHEDULE_DELAY); + assertEquals(1L, statsProvider.getLongValue(TaskGroups.SCHEDULE_ATTEMPTS_BLOCKS)); } @Test public void testMultipleTasksAndResistStarvation() throws Exception { - expect(rateLimiter.acquire()).andReturn(0D).times(2); + expect(rateLimiter.acquire()).andReturn(0.001D).times(2); expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of("a0", "a1")))) .andReturn(ImmutableSet.of("a0", "a1")); expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of("b0")))) @@ -160,6 +168,7 @@ public class TaskGroupsTest extends EasyMockTest { makeTask(IJobKey.build(JOB_A.newBuilder().setName("jobB")), "b0", 0), INIT)); clock.advance(FIRST_SCHEDULE_DELAY); + assertEquals(2L, statsProvider.getLongValue(TaskGroups.SCHEDULE_ATTEMPTS_BLOCKS)); } @Test
