Repository: aurora
Updated Branches:
  refs/heads/master e36e07720 -> 75129b694


AURORA-1876 Expose stats on scheduler rate limiter

This patch exposes stats on `rateLimiter.acquire()` blocking events in 
`TaskGroups`. Hence,
providing visibility into whether scheduling rate is above/below 
`MAX_SCHEDULE_ATTEMPTS_PER_SEC`.

Bugs closed: AURORA-1876

Reviewed at https://reviews.apache.org/r/55471/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/75129b69
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/75129b69
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/75129b69

Branch: refs/heads/master
Commit: 75129b694dabde5cfd673c6760dc4fa22e6314fc
Parents: e36e077
Author: Mehrdad Nurolahzade <[email protected]>
Authored: Mon Jan 23 14:58:19 2017 -0600
Committer: Joshua Cohen <[email protected]>
Committed: Mon Jan 23 14:58:19 2017 -0600

----------------------------------------------------------------------
 .../aurora/scheduler/scheduling/TaskGroups.java    | 13 +++++++++++--
 .../scheduler/scheduling/TaskGroupsTest.java       | 17 +++++++++++++----
 2 files changed, 24 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/75129b69/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java 
b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
index 2d548b0..cea8d0f 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
@@ -19,6 +19,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.inject.Inject;
 import javax.inject.Qualifier;
@@ -68,6 +69,9 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING;
  */
 public class TaskGroups implements EventSubscriber {
 
+  @VisibleForTesting
+  static final String SCHEDULE_ATTEMPTS_BLOCKS = "schedule_attempts_blocks";
+
   private final ConcurrentMap<TaskGroupKey, TaskGroup> groups = 
Maps.newConcurrentMap();
   private final DelayExecutor executor;
   private final TaskGroupsSettings settings;
@@ -79,6 +83,7 @@ public class TaskGroups implements EventSubscriber {
   // may influence the selection of a different backoff strategy.
   private final SlidingStats scheduledTaskPenalties =
       new SlidingStats("scheduled_task_penalty", "ms");
+  private final AtomicLong scheduleAttemptsBlocks;
 
   /**
    * Annotation for the max scheduling batch size.
@@ -133,13 +138,15 @@ public class TaskGroups implements EventSubscriber {
       TaskGroupsSettings settings,
       TaskScheduler taskScheduler,
       RescheduleCalculator rescheduleCalculator,
-      TaskGroupBatchWorker batchWorker) {
+      TaskGroupBatchWorker batchWorker,
+      StatsProvider statsProvider) {
 
     this.executor = requireNonNull(executor);
     this.settings = requireNonNull(settings);
     this.taskScheduler = requireNonNull(taskScheduler);
     this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
     this.batchWorker = requireNonNull(batchWorker);
+    this.scheduleAttemptsBlocks = 
statsProvider.makeCounter(SCHEDULE_ATTEMPTS_BLOCKS);
   }
 
   private synchronized void evaluateGroupLater(Runnable evaluate, TaskGroup 
group) {
@@ -159,7 +166,9 @@ public class TaskGroups implements EventSubscriber {
         final Set<String> taskIds = group.peek(settings.maxTasksPerSchedule);
         long penaltyMs = 0;
         if (!taskIds.isEmpty()) {
-          settings.rateLimiter.acquire();
+          if (settings.rateLimiter.acquire() > 0) {
+            scheduleAttemptsBlocks.incrementAndGet();
+          }
           CompletableFuture<Set<String>> result = 
batchWorker.execute(storeProvider ->
               taskScheduler.schedule(storeProvider, taskIds));
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/75129b69/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java 
b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
index 566e0d9..b88d5f1 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
@@ -37,6 +37,7 @@ import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -46,6 +47,7 @@ import static 
org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExe
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
 
 public class TaskGroupsTest extends EasyMockTest {
   private static final Amount<Long, Time> FIRST_SCHEDULE_DELAY = Amount.of(1L, 
Time.MILLISECONDS);
@@ -62,6 +64,7 @@ public class TaskGroupsTest extends EasyMockTest {
   private TaskGroups taskGroups;
   private TaskGroupBatchWorker batchWorker;
   private StorageTestUtil storageUtil;
+  private FakeStatsProvider statsProvider;
 
   @Before
   public void setUp() throws Exception {
@@ -74,12 +77,14 @@ public class TaskGroupsTest extends EasyMockTest {
     rateLimiter = createMock(RateLimiter.class);
     rescheduleCalculator = createMock(RescheduleCalculator.class);
     batchWorker = createMock(TaskGroupBatchWorker.class);
+    statsProvider = new FakeStatsProvider();
     taskGroups = new TaskGroups(
         executor,
         new TaskGroupsSettings(FIRST_SCHEDULE_DELAY, backoffStrategy, 
rateLimiter, 2),
         taskScheduler,
         rescheduleCalculator,
-        batchWorker);
+        batchWorker,
+        statsProvider);
   }
 
   @Test
@@ -94,12 +99,13 @@ public class TaskGroupsTest extends EasyMockTest {
 
     
taskGroups.taskChangedState(TaskStateChange.transition(makeTask(TASK_A_ID), 
INIT));
     clock.advance(FIRST_SCHEDULE_DELAY);
+    assertEquals(0L, 
statsProvider.getLongValue(TaskGroups.SCHEDULE_ATTEMPTS_BLOCKS));
   }
 
   @Test
   public void testTaskDeletedBeforeEvaluating() throws Exception {
     final IScheduledTask task = makeTask(TASK_A_ID);
-    expect(rateLimiter.acquire()).andReturn(0D);
+    expect(rateLimiter.acquire()).andReturn(0.5D);
     expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of(TASK_A_ID))))
         .andAnswer(() -> {
           // Test a corner case where a task is deleted while it is being 
evaluated by the task
@@ -118,11 +124,12 @@ public class TaskGroupsTest extends EasyMockTest {
 
     
taskGroups.taskChangedState(TaskStateChange.transition(makeTask(Tasks.id(task)),
 INIT));
     clock.advance(FIRST_SCHEDULE_DELAY);
+    assertEquals(1L, 
statsProvider.getLongValue(TaskGroups.SCHEDULE_ATTEMPTS_BLOCKS));
   }
 
   @Test
   public void testEvaluatedOnStartup() throws Exception {
-    expect(rateLimiter.acquire()).andReturn(0D);
+    expect(rateLimiter.acquire()).andReturn(0.000000001D);
     
expect(rescheduleCalculator.getStartupScheduleDelayMs(makeTask(TASK_A_ID))).andReturn(1L);
     expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of(TASK_A_ID))))
         .andReturn(ImmutableSet.of(TASK_A_ID));
@@ -134,11 +141,12 @@ public class TaskGroupsTest extends EasyMockTest {
     
taskGroups.taskChangedState(TaskStateChange.initialized(makeTask(TASK_A_ID)));
     clock.advance(FIRST_SCHEDULE_DELAY);
     clock.advance(RESCHEDULE_DELAY);
+    assertEquals(1L, 
statsProvider.getLongValue(TaskGroups.SCHEDULE_ATTEMPTS_BLOCKS));
   }
 
   @Test
   public void testMultipleTasksAndResistStarvation() throws Exception {
-    expect(rateLimiter.acquire()).andReturn(0D).times(2);
+    expect(rateLimiter.acquire()).andReturn(0.001D).times(2);
     expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of("a0", 
"a1"))))
         .andReturn(ImmutableSet.of("a0", "a1"));
     expect(taskScheduler.schedule(anyObject(), eq(ImmutableSet.of("b0"))))
@@ -160,6 +168,7 @@ public class TaskGroupsTest extends EasyMockTest {
         makeTask(IJobKey.build(JOB_A.newBuilder().setName("jobB")), "b0", 0), 
INIT));
 
     clock.advance(FIRST_SCHEDULE_DELAY);
+    assertEquals(2L, 
statsProvider.getLongValue(TaskGroups.SCHEDULE_ATTEMPTS_BLOCKS));
   }
 
   @Test

Reply via email to