Batching writes - Part 3 (of 3): Converting TaskScheduler to use BatchWorker.

Reviewed at https://reviews.apache.org/r/51765/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/496397aa
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/496397aa
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/496397aa

Branch: refs/heads/master
Commit: 496397aa5a9534d04bcc273e50a6b9de204ae133
Parents: 2cb43d6
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Sep 16 14:17:48 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Sep 16 14:17:48 2016 -0700

----------------------------------------------------------------------
 .../aurora/benchmark/SchedulingBenchmarks.java  | 20 ++++---
 .../scheduler/scheduling/SchedulingModule.java  | 13 ++++
 .../aurora/scheduler/scheduling/TaskGroups.java | 62 ++++++++++++++++++--
 .../scheduler/scheduling/TaskScheduler.java     | 17 ++----
 .../scheduler/http/AbstractJettyTest.java       |  2 +
 .../scheduler/scheduling/TaskGroupsTest.java    | 35 +++++++----
 .../scheduling/TaskSchedulerImplTest.java       | 19 +++---
 7 files changed, 125 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java 
b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 9d0d40b..6f1cbfb 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -190,9 +190,11 @@ public class SchedulingBenchmarks {
     private void fillUpCluster(int numOffers) {
       Set<IScheduledTask> tasksToAssign = buildClusterTasks(numOffers);
       saveTasks(tasksToAssign);
-      for (IScheduledTask scheduledTask : tasksToAssign) {
-        taskScheduler.schedule(scheduledTask.getAssignedTask().getTaskId());
-      }
+      storage.write((NoResult.Quiet) store -> {
+        for (IScheduledTask scheduledTask : tasksToAssign) {
+          taskScheduler.schedule(store, 
scheduledTask.getAssignedTask().getTaskId());
+        }
+      });
     }
 
     private void saveTasks(final Set<IScheduledTask> tasks) {
@@ -219,11 +221,13 @@ public class SchedulingBenchmarks {
      */
     @Benchmark
     public boolean runBenchmark() {
-      boolean result = false;
-      for (IScheduledTask task : settings.getTasks()) {
-        result = taskScheduler.schedule(task.getAssignedTask().getTaskId());
-      }
-      return result;
+      return storage.write((Storage.MutateWork.Quiet<Boolean>) store -> {
+        boolean result = false;
+        for (IScheduledTask task : settings.getTasks()) {
+          result = taskScheduler.schedule(store, 
task.getAssignedTask().getTaskId());
+        }
+        return result;
+      });
     }
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java 
b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
index 11e8033..664bc6c 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
@@ -31,6 +31,8 @@ import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.preemptor.BiCache;
 import 
org.apache.aurora.scheduler.scheduling.RescheduleCalculator.RescheduleCalculatorImpl;
 
+import static 
org.apache.aurora.scheduler.SchedulerServicesModule.addSchedulerActiveServiceBinding;
+
 /**
  * Binding module for task scheduling logic.
  */
@@ -83,6 +85,11 @@ public class SchedulingModule extends AbstractModule {
   private static final Arg<Amount<Long, Time>> RESERVATION_DURATION =
       Arg.create(Amount.of(3L, Time.MINUTES));
 
+  @Positive
+  @CmdLine(name = "scheduling_max_batch_size",
+      help = "The maximum number of scheduling attempts that can be processed 
in a batch.")
+  private static final Arg<Integer> SCHEDULING_MAX_BATCH_SIZE = Arg.create(3);
+
   @Override
   protected void configure() {
     install(new PrivateModule() {
@@ -109,6 +116,12 @@ public class SchedulingModule extends AbstractModule {
     });
     PubsubEventModule.bindSubscriber(binder(), TaskGroups.class);
 
+    bind(new TypeLiteral<Integer>() { })
+        .annotatedWith(TaskGroups.SchedulingMaxBatchSize.class)
+        .toInstance(SCHEDULING_MAX_BATCH_SIZE.get());
+    bind(TaskGroups.TaskGroupBatchWorker.class).in(Singleton.class);
+    
addSchedulerActiveServiceBinding(binder()).to(TaskGroups.TaskGroupBatchWorker.class);
+
     install(new PrivateModule() {
       @Override
       protected void configure() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java 
b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
index c044ebe..d390c07 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
@@ -13,12 +13,19 @@
  */
 package org.apache.aurora.scheduler.scheduling;
 
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 
 import javax.inject.Inject;
+import javax.inject.Qualifier;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -28,7 +35,9 @@ import com.google.common.util.concurrent.RateLimiter;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.SlidingStats;
+import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.util.BackoffStrategy;
+import org.apache.aurora.scheduler.BatchWorker;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
@@ -36,9 +45,14 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
@@ -61,12 +75,38 @@ public class TaskGroups implements EventSubscriber {
   private final long firstScheduleDelay;
   private final BackoffStrategy backoff;
   private final RescheduleCalculator rescheduleCalculator;
+  private final BatchWorker<Boolean> batchWorker;
 
   // Track the penalties of tasks at the time they were scheduled. This is to 
provide data that
   // may influence the selection of a different backoff strategy.
   private final SlidingStats scheduledTaskPenalties =
       new SlidingStats("scheduled_task_penalty", "ms");
 
+  /**
+   * Annotation for the max scheduling batch size.
+   */
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface SchedulingMaxBatchSize { }
+
+  @VisibleForTesting
+  public static class TaskGroupBatchWorker extends BatchWorker<Boolean> {
+    @Inject
+    TaskGroupBatchWorker(
+        Storage storage,
+        StatsProvider statsProvider,
+        @SchedulingMaxBatchSize int maxBatchSize) {
+
+      super(storage, statsProvider, maxBatchSize);
+    }
+
+    @Override
+    protected String serviceName() {
+      return "TaskGroupBatchWorker";
+    }
+  }
+
   public static class TaskGroupsSettings {
     private final Amount<Long, Time> firstScheduleDelay;
     private final BackoffStrategy taskGroupBackoff;
@@ -88,7 +128,8 @@ public class TaskGroups implements EventSubscriber {
       @AsyncExecutor DelayExecutor executor,
       TaskGroupsSettings settings,
       TaskScheduler taskScheduler,
-      RescheduleCalculator rescheduleCalculator) {
+      RescheduleCalculator rescheduleCalculator,
+      TaskGroupBatchWorker batchWorker) {
 
     requireNonNull(settings.firstScheduleDelay);
     Preconditions.checkArgument(settings.firstScheduleDelay.getValue() > 0);
@@ -99,10 +140,11 @@ public class TaskGroups implements EventSubscriber {
     this.firstScheduleDelay = 
settings.firstScheduleDelay.as(Time.MILLISECONDS);
     this.backoff = requireNonNull(settings.taskGroupBackoff);
     this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
+    this.batchWorker = requireNonNull(batchWorker);
 
-    this.taskScheduler = taskId -> {
+    this.taskScheduler = (store, taskId) -> {
       settings.rateLimiter.acquire();
-      return taskScheduler.schedule(taskId);
+      return taskScheduler.schedule(store, taskId);
     };
   }
 
@@ -120,10 +162,20 @@ public class TaskGroups implements EventSubscriber {
     Runnable monitor = new Runnable() {
       @Override
       public void run() {
-        Optional<String> taskId = group.peek();
+        final Optional<String> taskId = group.peek();
         long penaltyMs = 0;
         if (taskId.isPresent()) {
-          if (taskScheduler.schedule(taskId.get())) {
+          CompletableFuture<Boolean> result = 
batchWorker.execute(storeProvider ->
+              taskScheduler.schedule(storeProvider, taskId.get()));
+          boolean isScheduled = false;
+          try {
+            isScheduled = result.get();
+          } catch (ExecutionException | InterruptedException e) {
+            Thread.currentThread().interrupt();
+            Throwables.propagate(e);
+          }
+
+          if (isScheduled) {
             scheduledTaskPenalties.accumulate(group.getPenaltyMs());
             group.remove(taskId.get());
             if (group.hasMore()) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java 
b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index d266f6a..207d38d 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -38,7 +38,6 @@ import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.preemptor.Preemptor;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -63,11 +62,12 @@ public interface TaskScheduler extends EventSubscriber {
   /**
    * Attempts to schedule a task, possibly performing irreversible actions.
    *
+   * @param storeProvider {@code MutableStoreProvider} instance to access data 
store.
    * @param taskId The task to attempt to schedule.
    * @return {@code true} if the task was scheduled, {@code false} otherwise. 
The caller should
    *         call schedule again if {@code false} is returned.
    */
-  boolean schedule(String taskId);
+  boolean schedule(MutableStoreProvider storeProvider, String taskId);
 
   /**
    * An asynchronous task scheduler.  Scheduling of tasks is performed on a 
delay, where each task
@@ -86,7 +86,6 @@ public interface TaskScheduler extends EventSubscriber {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TaskSchedulerImpl.class);
 
-    private final Storage storage;
     private final TaskAssigner assigner;
     private final Preemptor preemptor;
     private final ExecutorSettings executorSettings;
@@ -98,25 +97,22 @@ public interface TaskScheduler extends EventSubscriber {
 
     @Inject
     TaskSchedulerImpl(
-        Storage storage,
         TaskAssigner assigner,
         Preemptor preemptor,
         ExecutorSettings executorSettings,
         BiCache<String, TaskGroupKey> reservations) {
 
-      this.storage = requireNonNull(storage);
       this.assigner = requireNonNull(assigner);
       this.preemptor = requireNonNull(preemptor);
       this.executorSettings = requireNonNull(executorSettings);
       this.reservations = requireNonNull(reservations);
     }
 
-    @Timed("task_schedule_attempt")
-    @Override
-    public boolean schedule(final String taskId) {
+    @Timed ("task_schedule_attempt")
+    public boolean schedule(MutableStoreProvider store, String taskId) {
       attemptsFired.incrementAndGet();
       try {
-        return storage.write(store -> scheduleTask(store, taskId));
+        return scheduleTask(store, taskId);
       } catch (RuntimeException e) {
         // We catch the generic unchecked exception here to ensure tasks are 
not abandoned
         // if there is a transient issue resulting in an unchecked exception.
@@ -126,8 +122,7 @@ public interface TaskScheduler extends EventSubscriber {
       }
     }
 
-    @Timed("task_schedule_attempt_locked")
-    protected boolean scheduleTask(MutableStoreProvider store, String taskId) {
+    private boolean scheduleTask(MutableStoreProvider store, String taskId) {
       LOG.debug("Attempting to schedule task " + taskId);
       IAssignedTask assignedTask = Iterables.getOnlyElement(
           Iterables.transform(

http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java 
b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
index c2ceb4e..c1c3eca 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
@@ -54,6 +54,7 @@ import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.http.api.GsonMessageBodyHandler;
 import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
+import org.apache.aurora.scheduler.scheduling.TaskGroups;
 import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupsSettings;
 import org.apache.aurora.scheduler.scheduling.TaskScheduler;
 import org.apache.aurora.scheduler.state.LockManager;
@@ -128,6 +129,7 @@ public abstract class AbstractJettyTest extends 
EasyMockTest {
             bindMock(TaskScheduler.class);
             bindMock(TierManager.class);
             bindMock(Thread.UncaughtExceptionHandler.class);
+            bindMock(TaskGroups.TaskGroupBatchWorker.class);
 
             bind(ServletContextListener.class).toProvider(() -> {
               return makeServletContextListener(injector, 
getChildServletModule());

http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java 
b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
index 95cf25e..8872962 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
@@ -29,15 +29,20 @@ import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupBatchWorker;
 import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupsSettings;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
 import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static 
org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 
 public class TaskGroupsTest extends EasyMockTest {
@@ -52,26 +57,33 @@ public class TaskGroupsTest extends EasyMockTest {
   private FakeScheduledExecutor clock;
   private RescheduleCalculator rescheduleCalculator;
   private TaskGroups taskGroups;
+  private TaskGroupBatchWorker batchWorker;
+  private StorageTestUtil storageUtil;
 
   @Before
   public void setUp() throws Exception {
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
     DelayExecutor executor = createMock(DelayExecutor.class);
     clock = FakeScheduledExecutor.fromDelayExecutor(executor);
     backoffStrategy = createMock(BackoffStrategy.class);
     taskScheduler = createMock(TaskScheduler.class);
     rateLimiter = createMock(RateLimiter.class);
     rescheduleCalculator = createMock(RescheduleCalculator.class);
+    batchWorker = createMock(TaskGroupBatchWorker.class);
     taskGroups = new TaskGroups(
         executor,
         new TaskGroupsSettings(FIRST_SCHEDULE_DELAY, backoffStrategy, 
rateLimiter),
         taskScheduler,
-        rescheduleCalculator);
+        rescheduleCalculator,
+        batchWorker);
   }
 
   @Test
-  public void testEvaluatedAfterFirstSchedulePenalty() {
+  public void testEvaluatedAfterFirstSchedulePenalty() throws Exception {
     expect(rateLimiter.acquire()).andReturn(0D);
-    expect(taskScheduler.schedule(TASK_A_ID)).andReturn(true);
+    expect(taskScheduler.schedule(anyObject(), eq(TASK_A_ID))).andReturn(true);
+    expectBatchExecute(batchWorker, storageUtil.storage, control, 
true).anyTimes();
 
     control.replay();
 
@@ -80,10 +92,10 @@ public class TaskGroupsTest extends EasyMockTest {
   }
 
   @Test
-  public void testTaskDeletedBeforeEvaluating() {
+  public void testTaskDeletedBeforeEvaluating() throws Exception {
     final IScheduledTask task = makeTask(TASK_A_ID);
     expect(rateLimiter.acquire()).andReturn(0D);
-    expect(taskScheduler.schedule(Tasks.id(task))).andAnswer(() -> {
+    expect(taskScheduler.schedule(anyObject(), 
eq(Tasks.id(task)))).andAnswer(() -> {
       // Test a corner case where a task is deleted while it is being 
evaluated by the task
       // scheduler.  If not handled carefully, this could result in the 
scheduler trying again
       // later to satisfy the deleted task.
@@ -91,6 +103,7 @@ public class TaskGroupsTest extends EasyMockTest {
 
       return false;
     });
+    expectBatchExecute(batchWorker, storageUtil.storage, control, 
false).anyTimes();
     
expect(backoffStrategy.calculateBackoffMs(FIRST_SCHEDULE_DELAY.as(Time.MILLISECONDS)))
         .andReturn(0L);
 
@@ -101,10 +114,11 @@ public class TaskGroupsTest extends EasyMockTest {
   }
 
   @Test
-  public void testEvaluatedOnStartup() {
+  public void testEvaluatedOnStartup() throws Exception {
     expect(rateLimiter.acquire()).andReturn(0D);
     
expect(rescheduleCalculator.getStartupScheduleDelayMs(makeTask(TASK_A_ID))).andReturn(1L);
-    expect(taskScheduler.schedule(TASK_A_ID)).andReturn(true);
+    expect(taskScheduler.schedule(anyObject(), eq(TASK_A_ID))).andReturn(true);
+    expectBatchExecute(batchWorker, storageUtil.storage, control, 
true).anyTimes();
 
     control.replay();
 
@@ -114,10 +128,11 @@ public class TaskGroupsTest extends EasyMockTest {
   }
 
   @Test
-  public void testResistStarvation() {
+  public void testResistStarvation() throws Exception {
     expect(rateLimiter.acquire()).andReturn(0D).times(2);
-    expect(taskScheduler.schedule("a0")).andReturn(true);
-    expect(taskScheduler.schedule("b0")).andReturn(true);
+    expect(taskScheduler.schedule(anyObject(), eq("a0"))).andReturn(true);
+    expect(taskScheduler.schedule(anyObject(), eq("b0"))).andReturn(true);
+    expectBatchExecute(batchWorker, storageUtil.storage, control, 
true).anyTimes();
 
     control.replay();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
 
b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index 72562e6..a4e87d2 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -157,7 +157,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertTrue(scheduler.schedule("a"));
+    assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
   }
 
   @Test
@@ -169,7 +169,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertTrue(scheduler.schedule("a"));
+    assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
   }
 
   @Test
@@ -201,9 +201,9 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertFalse(scheduler.schedule("a"));
-    assertFalse(scheduler.schedule("a"));
-    assertTrue(scheduler.schedule("a"));
+    assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+    assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+    assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
   }
 
   @Test
@@ -218,7 +218,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertFalse(scheduler.schedule("a"));
+    assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
   }
 
   @Test
@@ -233,7 +233,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertFalse(scheduler.schedule("a"));
+    assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
   }
 
   @Test
@@ -281,7 +281,8 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertTrue(scheduler.schedule(Tasks.id(taskA)));
+    memStorage.write((NoResult.Quiet)
+        store -> assertTrue(scheduler.schedule(store, Tasks.id(taskA))));
   }
 
   @Test
@@ -295,7 +296,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
 
     control.replay();
 
-    assertFalse(scheduler.schedule("a"));
+    assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
   }
 
   private void expectPreemptorCall(IScheduledTask task, Optional<String> 
result) {

Reply via email to