Repository: aurora
Updated Branches:
  refs/heads/master 4b9c759cf -> a7b95d95b


Making preemptor asynchronous. Part 3(final) - background service.

Bugs closed: AURORA-1158

Reviewed at https://reviews.apache.org/r/32352/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/a7b95d95
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/a7b95d95
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/a7b95d95

Branch: refs/heads/master
Commit: a7b95d95bf99b2698ec5ca89f504c825d10b1754
Parents: 4b9c759
Author: Maxim Khutornenko <[email protected]>
Authored: Thu Apr 2 14:29:39 2015 -0700
Committer: Maxim Khutornenko <[email protected]>
Committed: Thu Apr 2 14:29:39 2015 -0700

----------------------------------------------------------------------
 .../aurora/benchmark/SchedulingBenchmarks.java  |  10 +-
 .../aurora/scheduler/async/TaskScheduler.java   |  52 ++---
 .../async/preemptor/PendingTaskProcessor.java   | 147 ++++++++++++
 .../async/preemptor/PreemptionSlotCache.java    |   2 +-
 .../async/preemptor/PreemptionSlotFinder.java   |   2 +
 .../scheduler/async/preemptor/Preemptor.java    |  90 +++++++-
 .../async/preemptor/PreemptorImpl.java          | 226 -------------------
 .../async/preemptor/PreemptorMetrics.java       |  10 +-
 .../async/preemptor/PreemptorModule.java        |  84 +++++--
 .../scheduler/filter/AttributeAggregate.java    |  26 +++
 .../scheduler/async/TaskSchedulerImplTest.java  |  32 +--
 .../scheduler/async/TaskSchedulerTest.java      |  19 +-
 .../preemptor/PendingTaskProcessorTest.java     | 169 ++++++++++++++
 .../async/preemptor/PreemptorImplTest.java      | 159 +++----------
 .../async/preemptor/PreemptorModuleTest.java    |  12 +-
 15 files changed, 592 insertions(+), 448 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java 
b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 5309e81..75a67dc 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -79,6 +79,8 @@ public class SchedulingBenchmarks {
    */
   @State(Scope.Thread)
   public abstract static class AbstractBase {
+    private static final Amount<Long, Time> NO_DELAY = Amount.of(0L, 
Time.MILLISECONDS);
+    private static final Amount<Long, Time> DELAY_FOREVER = Amount.of(30L, 
Time.DAYS);
     protected Storage storage;
     protected Preemptor preemptor;
     protected ScheduledThreadPoolExecutor executor;
@@ -105,7 +107,7 @@ public class SchedulingBenchmarks {
       // TODO(maxim): Find a way to DRY it and reuse existing modules instead.
       Injector injector = Guice.createInjector(
           new StateModule(),
-          new PreemptorModule(true, Amount.of(0L, Time.MILLISECONDS), 
executor),
+          new PreemptorModule(true, NO_DELAY, NO_DELAY),
           new PrivateModule() {
             @Override
             protected void configure() {
@@ -116,7 +118,7 @@ public class SchedulingBenchmarks {
                   new OfferManager.OfferReturnDelay() {
                     @Override
                     public Amount<Long, Time> get() {
-                      return Amount.of(30L, Time.DAYS);
+                      return DELAY_FOREVER;
                     }
                   });
 
@@ -128,7 +130,7 @@ public class SchedulingBenchmarks {
             protected void configure() {
               bind(new TypeLiteral<Amount<Long, Time>>() { })
                   .annotatedWith(ReservationDuration.class)
-                  .toInstance(Amount.of(30L, Time.DAYS));
+                  .toInstance(DELAY_FOREVER);
               
bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);
               bind(TaskScheduler.TaskSchedulerImpl.class).in(Singleton.class);
               
bind(TaskIdGenerator.class).to(TaskIdGenerator.TaskIdGeneratorImpl.class);
@@ -333,7 +335,7 @@ public class SchedulingBenchmarks {
               new AttributeAggregate(taskSupplier, 
storeProvider.getAttributeStore());
 
           Optional<String> result =
-              preemptor.attemptPreemptionFor(assignedTask.getTaskId(), 
aggregate);
+              preemptor.attemptPreemptionFor(assignedTask, aggregate, 
storeProvider);
 
           while (executor.getActiveCount() > 0) {
             // Using a tight loop to wait for a search completion. This is 
executed on a benchmark

http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java 
b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 1b9d741..ebc520e 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -27,11 +27,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
 import com.google.common.base.Ticker;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.eventbus.Subscribe;
 import com.twitter.common.inject.TimedInterceptor.Timed;
@@ -56,9 +54,7 @@ import 
org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.mesos.Protos.SlaveID;
 
@@ -164,25 +160,6 @@ public interface TaskScheduler extends EventSubscriber {
     static final Optional<String> LAUNCH_FAILED_MSG =
         Optional.of("Unknown exception attempting to schedule task.");
 
-    @VisibleForTesting
-    static Query.Builder activeJobStateQuery(IJobKey jobKey) {
-      return Query.jobScoped(jobKey).byStatus(Tasks.SLAVE_ASSIGNED_STATES);
-    }
-
-    private AttributeAggregate getJobState(
-        final StoreProvider storeProvider,
-        final IJobKey jobKey) {
-
-      Supplier<ImmutableSet<IScheduledTask>> taskSupplier = Suppliers.memoize(
-          new Supplier<ImmutableSet<IScheduledTask>>() {
-            @Override
-            public ImmutableSet<IScheduledTask> get() {
-              return 
storeProvider.getTaskStore().fetchTasks(activeJobStateQuery(jobKey));
-            }
-          });
-      return new AttributeAggregate(taskSupplier, 
storeProvider.getAttributeStore());
-    }
-
     @Timed("task_schedule_attempt")
     @Override
     public boolean schedule(final String taskId) {
@@ -206,15 +183,17 @@ public interface TaskScheduler extends EventSubscriber {
     @Timed("task_schedule_attempt_locked")
     protected boolean scheduleTask(MutableStoreProvider store, String taskId) {
       LOG.fine("Attempting to schedule task " + taskId);
-      final ITaskConfig task = Iterables.getOnlyElement(
+      IAssignedTask assignedTask = Iterables.getOnlyElement(
           Iterables.transform(
               
store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)),
-              Tasks.SCHEDULED_TO_INFO),
+              Tasks.SCHEDULED_TO_ASSIGNED),
           null);
-      if (task == null) {
+
+      if (assignedTask == null) {
         LOG.warning("Failed to look up task " + taskId + ", it may have been 
deleted.");
       } else {
-        AttributeAggregate aggregate = getJobState(store, task.getJob());
+        ITaskConfig task = assignedTask.getTask();
+        AttributeAggregate aggregate = 
AttributeAggregate.getJobActiveState(store, task.getJob());
         try {
           boolean launched = offerManager.launchFirst(
               getAssignerFunction(store, new ResourceRequest(task, taskId, 
aggregate)),
@@ -222,7 +201,10 @@ public interface TaskScheduler extends EventSubscriber {
 
           if (!launched) {
             // Task could not be scheduled.
-            maybePreemptFor(taskId, aggregate);
+            // TODO(maxim): Now that preemption slots are searched 
asynchronously, consider
+            // retrying a launch attempt within the current scheduling round 
IFF a reservation is
+            // available.
+            maybePreemptFor(assignedTask, aggregate, store);
             attemptsNoMatch.incrementAndGet();
             return false;
           }
@@ -247,13 +229,17 @@ public interface TaskScheduler extends EventSubscriber {
       return true;
     }
 
-    private void maybePreemptFor(String taskId, AttributeAggregate 
attributeAggregate) {
-      if (reservations.hasReservationForTask(taskId)) {
+    private void maybePreemptFor(
+        IAssignedTask task,
+        AttributeAggregate jobState,
+        MutableStoreProvider storeProvider) {
+
+      if (reservations.hasReservationForTask(task.getTaskId())) {
         return;
       }
-      Optional<String> slaveId = preemptor.attemptPreemptionFor(taskId, 
attributeAggregate);
+      Optional<String> slaveId = preemptor.attemptPreemptionFor(task, 
jobState, storeProvider);
       if (slaveId.isPresent()) {
-        
this.reservations.add(SlaveID.newBuilder().setValue(slaveId.get()).build(), 
taskId);
+        reservations.add(SlaveID.newBuilder().setValue(slaveId.get()).build(), 
task.getTaskId());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
new file mode 100644
index 0000000..67ad5d7
--- /dev/null
+++ 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import javax.inject.Inject;
+import javax.inject.Qualifier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import 
org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
+
+/**
+ * Attempts to find preemption slots for all PENDING tasks eligible for 
preemption.
+ */
+class PendingTaskProcessor implements Runnable {
+  private final Storage storage;
+  private final PreemptionSlotFinder preemptionSlotFinder;
+  private final PreemptorMetrics metrics;
+  private final Amount<Long, Time> preemptionCandidacyDelay;
+  private final PreemptionSlotCache slotCache;
+  private final Clock clock;
+
+  /**
+   * Binding annotation for the time interval after which a pending task 
becomes eligible to
+   * preempt other tasks. To avoid excessive churn, the preemptor requires 
that a task is PENDING
+   * for a duration (dictated by {@link #preemptionCandidacyDelay}) before it 
becomes eligible
+   * to preempt other tasks.
+   */
+  @VisibleForTesting
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface PreemptionDelay { }
+
+  @Inject
+  PendingTaskProcessor(
+      Storage storage,
+      PreemptionSlotFinder preemptionSlotFinder,
+      PreemptorMetrics metrics,
+      @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
+      PreemptionSlotCache slotCache,
+      Clock clock) {
+
+    this.storage = requireNonNull(storage);
+    this.preemptionSlotFinder = requireNonNull(preemptionSlotFinder);
+    this.metrics = requireNonNull(metrics);
+    this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay);
+    this.slotCache = requireNonNull(slotCache);
+    this.clock = requireNonNull(clock);
+  }
+
+  @Override
+  public void run() {
+    metrics.recordTaskProcessorRun();
+    storage.read(new Storage.Work.Quiet<Void>() {
+      @Override
+      public Void apply(StoreProvider storeProvider) {
+        Multimap<IJobKey, IAssignedTask> pendingTasks = 
fetchIdlePendingTasks(storeProvider);
+
+        for (IJobKey job : pendingTasks.keySet()) {
+          AttributeAggregate jobState = 
AttributeAggregate.getJobActiveState(storeProvider, job);
+
+          for (IAssignedTask pendingTask : pendingTasks.get(job)) {
+            ITaskConfig task = pendingTask.getTask();
+            metrics.recordPreemptionAttemptFor(task);
+
+            Optional<PreemptionSlot> slot = 
preemptionSlotFinder.findPreemptionSlotFor(
+                pendingTask,
+                jobState,
+                storeProvider);
+
+            metrics.recordSlotSearchResult(slot, task);
+
+            if (slot.isPresent()) {
+              slotCache.add(pendingTask.getTaskId(), slot.get());
+            }
+          }
+        }
+        return null;
+      }
+    });
+  }
+
+  private Multimap<IJobKey, IAssignedTask> fetchIdlePendingTasks(StoreProvider 
store) {
+    return Multimaps.index(
+        FluentIterable
+            .from(store.getTaskStore().fetchTasks(Query.statusScoped(PENDING)))
+            .filter(Predicates.and(isIdleTask, Predicates.not(hasCachedSlot)))
+            .transform(SCHEDULED_TO_ASSIGNED),
+        Tasks.ASSIGNED_TO_JOB_KEY);
+  }
+
+  private final Predicate<IScheduledTask> hasCachedSlot = new 
Predicate<IScheduledTask>() {
+    @Override
+    public boolean apply(IScheduledTask input) {
+      return slotCache.get(input.getAssignedTask().getTaskId()).isPresent();
+    }
+  };
+
+  private final Predicate<IScheduledTask> isIdleTask = new 
Predicate<IScheduledTask>() {
+    @Override
+    public boolean apply(IScheduledTask task) {
+      return (clock.nowMillis() - Tasks.getLatestEvent(task).getTimestamp())
+          >= preemptionCandidacyDelay.as(Time.MILLISECONDS);
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java
 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java
index 4ca36e5..b5a42a0 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java
@@ -53,7 +53,7 @@ class PreemptionSlotCache {
   @VisibleForTesting
   static final String PREEMPTION_SLOT_CACHE_SIZE_STAT = 
"preemption_slot_cache_size";
 
-  private final Cache<String, PreemptionSlotFinder.PreemptionSlot> slots;
+  private final Cache<String, PreemptionSlot> slots;
 
   @Inject
   PreemptionSlotCache(

http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java
 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java
index 84bcdc5..427c0de 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java
@@ -202,6 +202,8 @@ public interface PreemptionSlotFinder {
           }
         };
 
+    // TODO(maxim): This should take pre-computed mappings (e.g. 
slaveToOffers) to avoid
+    // unnecessary repeated work.
     @Override
     public Optional<PreemptionSlot> findPreemptionSlotFor(
         final IAssignedTask pendingTask,

http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
index 84791a2..77617ec 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
@@ -13,22 +13,98 @@
  */
 package org.apache.aurora.scheduler.async.preemptor;
 
+import javax.inject.Inject;
+
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
 
+import org.apache.aurora.gen.ScheduleStatus;
+import 
org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
 
 /**
- * Preempts active tasks in favor of higher priority tasks.
+ * Attempts to preempt active tasks in favor of the provided PENDING task in 
case a preemption
+ * slot has been previously found.
  */
 public interface Preemptor {
-
-  // TODO(maxim): Move AttributeAggregate creation into implementing class.
   /**
-   * Attempts to preempt active tasks in favor of the input task.
+   * Preempts victim tasks in case a valid preemption slot exists.
    *
-   * @param taskId ID of the preempting task.
-   * @param attributeAggregate Attribute information for tasks in the job 
containing {@code task}.
+   * @param task Preempting task.
+   * @param jobState Current job state aggregate.
+   * @param storeProvider Store provider to use for task preemption.
    * @return ID of the slave where preemption occurred.
    */
-  Optional<String> attemptPreemptionFor(String taskId, AttributeAggregate 
attributeAggregate);
+  Optional<String> attemptPreemptionFor(
+      IAssignedTask task,
+      AttributeAggregate jobState,
+      MutableStoreProvider storeProvider);
+
+  class PreemptorImpl implements Preemptor {
+    private final StateManager stateManager;
+    private final PreemptionSlotFinder preemptionSlotFinder;
+    private final PreemptorMetrics metrics;
+    private final PreemptionSlotCache slotCache;
+
+    @Inject
+    PreemptorImpl(
+        StateManager stateManager,
+        PreemptionSlotFinder preemptionSlotFinder,
+        PreemptorMetrics metrics,
+        PreemptionSlotCache slotCache) {
+
+      this.stateManager = requireNonNull(stateManager);
+      this.preemptionSlotFinder = requireNonNull(preemptionSlotFinder);
+      this.metrics = requireNonNull(metrics);
+      this.slotCache = requireNonNull(slotCache);
+    }
+
+    @Override
+    public Optional<String> attemptPreemptionFor(
+        IAssignedTask pendingTask,
+        AttributeAggregate jobState,
+        MutableStoreProvider storeProvider) {
+
+      final Optional<PreemptionSlot> preemptionSlot = 
slotCache.get(pendingTask.getTaskId());
+
+      // A preemption slot is available -> attempt to preempt tasks.
+      if (preemptionSlot.isPresent()) {
+        slotCache.remove(pendingTask.getTaskId());
+
+        // Validate a PreemptionSlot is still valid for the given task.
+        Optional<ImmutableSet<PreemptionVictim>> validatedVictims =
+            preemptionSlotFinder.validatePreemptionSlotFor(
+                pendingTask,
+                jobState,
+                preemptionSlot.get(),
+                storeProvider);
+
+        metrics.recordSlotValidationResult(validatedVictims);
+        if (!validatedVictims.isPresent()) {
+          // Previously found victims are no longer valid -> let the next run 
find a new slot.
+          return Optional.absent();
+        }
+
+        for (PreemptionVictim toPreempt : validatedVictims.get()) {
+          metrics.recordTaskPreemption(toPreempt);
+          stateManager.changeState(
+              storeProvider,
+              toPreempt.getTaskId(),
+              Optional.<ScheduleStatus>absent(),
+              PREEMPTING,
+              Optional.of("Preempting in favor of " + 
pendingTask.getTaskId()));
+        }
+        return Optional.of(preemptionSlot.get().getSlaveId());
+      }
+
+      return Optional.absent();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
deleted file mode 100644
index 18a2e60..0000000
--- 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImpl.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.concurrent.ScheduledExecutorService;
-
-import javax.inject.Inject;
-import javax.inject.Qualifier;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.Clock;
-
-import org.apache.aurora.gen.ScheduleStatus;
-import 
org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
-import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
-
-/**
- * Coordinates preemption slot search for a PENDING tasks and triggers 
preemption if such
- * slot is found.
- */
-@VisibleForTesting
-public class PreemptorImpl implements Preemptor {
-
-  private final Storage storage;
-  private final StateManager stateManager;
-  private final PreemptionSlotFinder preemptionSlotFinder;
-  private final PreemptorMetrics metrics;
-  private final Amount<Long, Time> preemptionCandidacyDelay;
-  private final ScheduledExecutorService executor;
-  private final PreemptionSlotCache slotCache;
-  private final Clock clock;
-
-  /**
-   * Binding annotation for the time interval after which a pending task 
becomes eligible to
-   * preempt other tasks. To avoid excessive churn, the preemptor requires 
that a task is PENDING
-   * for a duration (dictated by {@link #preemptionCandidacyDelay}) before it 
becomes eligible
-   * to preempt other tasks.
-   */
-  @VisibleForTesting
-  @Qualifier
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  public @interface PreemptionDelay { }
-
-  @VisibleForTesting
-  @Qualifier
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  public @interface PreemptionExecutor { }
-
-  @Inject
-  PreemptorImpl(
-      Storage storage,
-      StateManager stateManager,
-      PreemptionSlotFinder preemptionSlotFinder,
-      PreemptorMetrics metrics,
-      @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
-      @PreemptionExecutor ScheduledExecutorService executor,
-      PreemptionSlotCache slotCache,
-      Clock clock) {
-
-    this.storage = requireNonNull(storage);
-    this.stateManager = requireNonNull(stateManager);
-    this.preemptionSlotFinder = requireNonNull(preemptionSlotFinder);
-    this.metrics = requireNonNull(metrics);
-    this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay);
-    this.executor = requireNonNull(executor);
-    this.slotCache = requireNonNull(slotCache);
-    this.clock = requireNonNull(clock);
-  }
-
-  @Override
-  public synchronized Optional<String> attemptPreemptionFor(
-      final String taskId,
-      final AttributeAggregate attributeAggregate) {
-
-    final Optional<PreemptionSlot> preemptionSlot = slotCache.get(taskId);
-    if (preemptionSlot.isPresent()) {
-      // A preemption slot is available -> attempt to preempt tasks.
-      slotCache.remove(taskId);
-      return preemptTasks(taskId, preemptionSlot.get(), attributeAggregate);
-    } else {
-      // TODO(maxim): There is a potential race between preemption requests 
and async search.
-      // The side-effect of the race is benign as it only wastes CPU time and 
is unlikely to happen
-      // often given our schedule penalty >> slot search time. However, we may 
want to re-evaluate
-      // this when moving preemptor into background mode.
-      searchForPreemptionSlot(taskId, attributeAggregate);
-      return Optional.absent();
-    }
-  }
-
-  private Optional<String> preemptTasks(
-      final String taskId,
-      final PreemptionSlot preemptionSlot,
-      final AttributeAggregate attributeAggregate) {
-
-    return storage.write(new Storage.MutateWork.Quiet<Optional<String>>() {
-      @Override
-      public Optional<String> apply(Storage.MutableStoreProvider 
storeProvider) {
-        final Optional<IAssignedTask> pendingTask = 
fetchIdlePendingTask(taskId, storeProvider);
-
-        // Task is no longer PENDING no need to preempt.
-        if (!pendingTask.isPresent()) {
-            return Optional.absent();
-        }
-
-        // Validate a PreemptionSlot is still valid for the given task.
-        Optional<ImmutableSet<PreemptionVictim>> validatedVictims =
-            preemptionSlotFinder.validatePreemptionSlotFor(
-                pendingTask.get(),
-                attributeAggregate,
-                preemptionSlot,
-                storeProvider);
-
-        metrics.recordSlotValidationResult(validatedVictims);
-        if (!validatedVictims.isPresent()) {
-          // Previously found victims are no longer valid -> trigger a new 
search.
-          searchForPreemptionSlot(taskId, attributeAggregate);
-          return Optional.absent();
-        }
-
-        for (PreemptionVictim toPreempt : validatedVictims.get()) {
-          metrics.recordTaskPreemption(toPreempt);
-          stateManager.changeState(
-              storeProvider,
-              toPreempt.getTaskId(),
-              Optional.<ScheduleStatus>absent(),
-              PREEMPTING,
-              Optional.of("Preempting in favor of " + taskId));
-        }
-        return Optional.of(preemptionSlot.getSlaveId());
-      }
-    });
-  }
-
-  private void searchForPreemptionSlot(
-      final String taskId,
-      final AttributeAggregate attributeAggregate) {
-
-    executor.execute(new Runnable() {
-      @Override
-      public void run() {
-        Optional<PreemptionSlot> slot = storage.read(
-            new Storage.Work.Quiet<Optional<PreemptionSlot>>() {
-              @Override
-              public Optional<PreemptionSlot> apply(StoreProvider 
storeProvider) {
-                Optional<IAssignedTask> pendingTask = 
fetchIdlePendingTask(taskId, storeProvider);
-
-                // Task is no longer PENDING no need to search for preemption 
slot.
-                if (!pendingTask.isPresent()) {
-                  return Optional.absent();
-                }
-
-                ITaskConfig task = pendingTask.get().getTask();
-                metrics.recordPreemptionAttemptFor(task);
-
-                Optional<PreemptionSlot> result = 
preemptionSlotFinder.findPreemptionSlotFor(
-                    pendingTask.get(),
-                    attributeAggregate,
-                    storeProvider);
-
-                metrics.recordSlotSearchResult(result, task);
-                return result;
-              }
-            });
-
-        if (slot.isPresent()) {
-          slotCache.add(taskId, slot.get());
-        }
-      }
-    });
-  }
-
-  private Optional<IAssignedTask> fetchIdlePendingTask(String taskId, 
Storage.StoreProvider store) {
-    Query.Builder query = Query.taskScoped(taskId).byStatus(PENDING);
-    Iterable<IAssignedTask> result = FluentIterable
-        .from(store.getTaskStore().fetchTasks(query))
-        .filter(isIdleTask)
-        .transform(SCHEDULED_TO_ASSIGNED);
-    return Optional.fromNullable(Iterables.getOnlyElement(result, null));
-  }
-
-  private final Predicate<IScheduledTask> isIdleTask = new 
Predicate<IScheduledTask>() {
-    @Override
-    public boolean apply(IScheduledTask task) {
-      return (clock.nowMillis() - Tasks.getLatestEvent(task).getTimestamp())
-          >= preemptionCandidacyDelay.as(Time.MILLISECONDS);
-    }
-  };
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java
 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java
index 782e751..dc7eb44 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java
@@ -34,6 +34,9 @@ public class PreemptorMetrics {
   @VisibleForTesting
   static final String MISSING_ATTRIBUTES_NAME = "preemptor_missing_attributes";
 
+  @VisibleForTesting
+  static final String PENDING_PROCESSOR_RUN_NAME = 
"preemptor_task_processor_runs";
+
   private volatile boolean exported = false;
   private final CachedCounters counters;
 
@@ -68,7 +71,8 @@ public class PreemptorMetrics {
         slotSearchStatName(false, true),
         slotValidationStatName(true),
         slotValidationStatName(false),
-        MISSING_ATTRIBUTES_NAME);
+        MISSING_ATTRIBUTES_NAME,
+        PENDING_PROCESSOR_RUN_NAME);
     for (String stat : allStats) {
       counters.get(stat);
     }
@@ -120,4 +124,8 @@ public class PreemptorMetrics {
   void recordMissingAttributes() {
     increment(MISSING_ATTRIBUTES_NAME);
   }
+
+  void recordTaskProcessorRun() {
+    increment(PENDING_PROCESSOR_RUN_NAME);
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
index 7034a07..1092c05 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
@@ -13,13 +13,14 @@
  */
 package org.apache.aurora.scheduler.async.preemptor;
 
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.logging.Logger;
 
+import javax.inject.Inject;
 import javax.inject.Singleton;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractScheduledService;
 import com.google.inject.AbstractModule;
 import com.google.inject.PrivateModule;
 import com.google.inject.TypeLiteral;
@@ -28,14 +29,14 @@ import com.twitter.common.args.CmdLine;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 
-import 
org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlotFinderImpl;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 
 import static java.util.Objects.requireNonNull;
 
-import static 
org.apache.aurora.scheduler.base.AsyncUtil.singleThreadLoggingScheduledExecutor;
-
 public class PreemptorModule extends AbstractModule {
 
   private static final Logger LOG = 
Logger.getLogger(PreemptorModule.class.getName());
@@ -47,33 +48,35 @@ public class PreemptorModule extends AbstractModule {
   @CmdLine(name = "preemption_delay",
       help = "Time interval after which a pending task becomes eligible to 
preempt other tasks")
   private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY =
-      Arg.create(Amount.of(10L, Time.MINUTES));
+      Arg.create(Amount.of(3L, Time.MINUTES));
 
   @CmdLine(name = "preemption_slot_hold_time",
       help = "Time to hold a preemption slot found before it is discarded.")
   private static final Arg<Amount<Long, Time>> PREEMPTION_SLOT_HOLD_TIME =
-      Arg.create(Amount.of(3L, Time.MINUTES));
+      Arg.create(Amount.of(5L, Time.MINUTES));
+
+  @CmdLine(name = "preemption_slot_search_interval",
+      help = "Time interval between pending task preemption slot searches.")
+  private static final Arg<Amount<Long, Time>> PREEMPTION_SLOT_SEARCH_INTERVAL 
=
+      Arg.create(Amount.of(1L, Time.MINUTES));
 
   private final boolean enablePreemptor;
   private final Amount<Long, Time> preemptionDelay;
-  private final ScheduledExecutorService executor;
+  private final Amount<Long, Time> slotSearchInterval;
 
   @VisibleForTesting
   public PreemptorModule(
       boolean enablePreemptor,
       Amount<Long, Time> preemptionDelay,
-      ScheduledExecutorService executor) {
+      Amount<Long, Time> slotSearchInterval) {
 
     this.enablePreemptor = enablePreemptor;
     this.preemptionDelay = requireNonNull(preemptionDelay);
-    this.executor = requireNonNull(executor);
+    this.slotSearchInterval = requireNonNull(slotSearchInterval);
   }
 
   public PreemptorModule() {
-    this(
-        ENABLE_PREEMPTOR.get(),
-        PREEMPTION_DELAY.get(),
-        singleThreadLoggingScheduledExecutor("PreemptorProcessor-%d", LOG));
+    this(ENABLE_PREEMPTOR.get(), PREEMPTION_DELAY.get(), 
PREEMPTION_SLOT_SEARCH_INTERVAL.get());
   }
 
   @Override
@@ -83,29 +86,36 @@ public class PreemptorModule extends AbstractModule {
       protected void configure() {
         if (enablePreemptor) {
           LOG.info("Preemptor Enabled.");
-          bind(ScheduledExecutorService.class)
-              .annotatedWith(PreemptorImpl.PreemptionExecutor.class)
-              .toInstance(executor);
           bind(PreemptorMetrics.class).in(Singleton.class);
-          bind(PreemptionSlotFinder.class).to(PreemptionSlotFinderImpl.class);
-          bind(PreemptionSlotFinderImpl.class).in(Singleton.class);
-          bind(Preemptor.class).to(PreemptorImpl.class);
-          bind(PreemptorImpl.class).in(Singleton.class);
+          bind(PreemptionSlotFinder.class)
+              .to(PreemptionSlotFinder.PreemptionSlotFinderImpl.class);
+          
bind(PreemptionSlotFinder.PreemptionSlotFinderImpl.class).in(Singleton.class);
+          bind(Preemptor.class).to(Preemptor.PreemptorImpl.class);
+          bind(Preemptor.PreemptorImpl.class).in(Singleton.class);
           bind(new TypeLiteral<Amount<Long, Time>>() { })
-              .annotatedWith(PreemptorImpl.PreemptionDelay.class)
+              .annotatedWith(PendingTaskProcessor.PreemptionDelay.class)
               .toInstance(preemptionDelay);
           bind(new TypeLiteral<Amount<Long, Time>>() { })
               
.annotatedWith(PreemptionSlotCache.PreemptionSlotHoldDuration.class)
               .toInstance(PREEMPTION_SLOT_HOLD_TIME.get());
           bind(PreemptionSlotCache.class).in(Singleton.class);
+          bind(PendingTaskProcessor.class).in(Singleton.class);
           bind(ClusterState.class).to(ClusterStateImpl.class);
           bind(ClusterStateImpl.class).in(Singleton.class);
           expose(ClusterStateImpl.class);
+
+          bind(PreemptorService.class).in(Singleton.class);
+          bind(AbstractScheduledService.Scheduler.class).toInstance(
+              AbstractScheduledService.Scheduler.newFixedRateSchedule(
+                  0L,
+                  slotSearchInterval.getValue(),
+                  slotSearchInterval.getUnit().getTimeUnit()));
+
+          expose(PreemptorService.class);
         } else {
           bind(Preemptor.class).toInstance(NULL_PREEMPTOR);
           LOG.warning("Preemptor Disabled.");
         }
-
         expose(Preemptor.class);
       }
     });
@@ -114,13 +124,39 @@ public class PreemptorModule extends AbstractModule {
     // and private modules due to multiple injectors.  We accept the added 
complexity here to keep
     // the other bindings private.
     PubsubEventModule.bindSubscriber(binder(), ClusterStateImpl.class);
+    if (enablePreemptor) {
+      SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
+          .to(PreemptorService.class);
+    }
+  }
+
+  static class PreemptorService extends AbstractScheduledService {
+    private final PendingTaskProcessor slotFinder;
+    private final Scheduler schedule;
+
+    @Inject
+    PreemptorService(PendingTaskProcessor slotFinder, Scheduler schedule) {
+      this.slotFinder = requireNonNull(slotFinder);
+      this.schedule = requireNonNull(schedule);
+    }
+
+    @Override
+    protected void runOneIteration() {
+      slotFinder.run();
+    }
+
+    @Override
+    protected Scheduler scheduler() {
+      return schedule;
+    }
   }
 
   private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
     @Override
     public Optional<String> attemptPreemptionFor(
-        String taskId,
-        AttributeAggregate attributeAggregate) {
+        IAssignedTask task,
+        AttributeAggregate jobState,
+        Storage.MutableStoreProvider storeProvider) {
 
       return Optional.absent();
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java 
b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
index da7b662..ed82ae9 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
@@ -26,8 +26,12 @@ import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.AtomicLongMap;
 import com.twitter.common.collections.Pair;
 
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IAttribute;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 
 import static java.util.Objects.requireNonNull;
@@ -51,6 +55,28 @@ public class AttributeAggregate {
   private final Supplier<Map<Pair<String, String>, Long>> aggregate;
 
   /**
+   * Initializes an {@link AttributeAggregate} instance from data store.
+   *
+   * @param storeProvider Store provider to get data from.
+   * @param jobKey Job key.
+   * @return An {@link AttributeAggregate} instance.
+   */
+  public static AttributeAggregate getJobActiveState(
+      final StoreProvider storeProvider,
+      final IJobKey jobKey) {
+
+    Supplier<ImmutableSet<IScheduledTask>> taskSupplier = Suppliers.memoize(
+        new Supplier<ImmutableSet<IScheduledTask>>() {
+          @Override
+          public ImmutableSet<IScheduledTask> get() {
+            return storeProvider.getTaskStore().fetchTasks(
+                Query.jobScoped(jobKey).byStatus(Tasks.SLAVE_ASSIGNED_STATES));
+          }
+        });
+    return new AttributeAggregate(taskSupplier, 
storeProvider.getAttributeStore());
+  }
+
+  /**
    * Creates a new attribute aggregate, which will be computed from the 
provided external state.
    *
    * @param activeTaskSupplier Supplier of active tasks within the aggregated 
scope.

http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java 
b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 29fe156..c5643d9 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -36,7 +36,6 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.async.preemptor.Preemptor;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
@@ -82,6 +81,8 @@ public class TaskSchedulerImplTest extends EasyMockTest {
       Offers.makeOffer("OFFER_A", "HOST_A"),
       IHostAttributes.build(new 
HostAttributes().setMode(MaintenanceMode.NONE)));
 
+  private static final String SLAVE_ID = 
OFFER.getOffer().getSlaveId().getValue();
+
   private static final TaskGroupKey GROUP_A = 
TaskGroupKey.from(TASK_A.getAssignedTask().getTask());
   private static final TaskGroupKey GROUP_B = 
TaskGroupKey.from(TASK_B.getAssignedTask().getTask());
 
@@ -157,13 +158,12 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectActiveJobFetch(TASK_A);
     expectLaunchAttempt(false);
     // Reserve "a" with offerA
-    expect(preemptor.attemptPreemptionFor("a", emptyJob))
-        .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue()));
+    expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
 
     expectTaskStillPendingQuery(TASK_B);
     expectActiveJobFetch(TASK_B);
     AssignmentCapture firstAssignment = expectLaunchAttempt(false);
-    expect(preemptor.attemptPreemptionFor("b", 
emptyJob)).andReturn(Optional.<String>absent());
+    expectPreemptorCall(TASK_B, Optional.<String>absent());
 
     expectTaskStillPendingQuery(TASK_B);
     expectActiveJobFetch(TASK_B);
@@ -192,8 +192,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectActiveJobFetch(TASK_A);
     expectLaunchAttempt(false);
     // Reserve "a" with offerA
-    expect(preemptor.attemptPreemptionFor("a", emptyJob))
-        .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue()));
+    expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
 
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
@@ -228,8 +227,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectActiveJobFetch(TASK_A);
     expectLaunchAttempt(false);
     // Reserve "a" with offerA
-    expect(preemptor.attemptPreemptionFor("a", emptyJob))
-        .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue()));
+    expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
 
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
@@ -253,8 +251,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectLaunchAttempt(false);
 
     // Reserve "a" with offerA
-    expect(preemptor.attemptPreemptionFor("a", emptyJob))
-        .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue()));
+    expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
 
     expectTaskStillPendingQuery(TASK_B);
     expectActiveJobFetch(TASK_B);
@@ -278,8 +275,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     expectActiveJobFetch(TASK_B);
     expectLaunchAttempt(false);
     // Reserve "b" with offer1
-    expect(preemptor.attemptPreemptionFor("b", emptyJob))
-        .andReturn(Optional.of(OFFER.getOffer().getSlaveId().getValue()));
+    expectPreemptorCall(TASK_B, Optional.of(SLAVE_ID));
 
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
@@ -347,6 +343,13 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     public Capture<TaskGroupKey> groupKey = createCapture();
   }
 
+  private void expectPreemptorCall(IScheduledTask task, Optional<String> 
result) {
+    expect(preemptor.attemptPreemptionFor(
+        task.getAssignedTask(),
+        emptyJob,
+        storageUtil.mutableStoreProvider)).andReturn(result);
+  }
+
   private AssignmentCapture expectLaunchAttempt(boolean taskLaunched)
       throws OfferManager.LaunchException {
 
@@ -366,9 +369,10 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     assertEquals(groupKey, capture.groupKey.getValue());
   }
 
-  private void expectActiveJobFetch(IScheduledTask taskInJob) {
+  private void expectActiveJobFetch(IScheduledTask task) {
     storageUtil.expectTaskFetch(
-        
TaskSchedulerImpl.activeJobStateQuery(Tasks.SCHEDULED_TO_JOB_KEY.apply(taskInJob)),
+        Query.jobScoped(Tasks.SCHEDULED_TO_JOB_KEY.apply(task))
+            .byStatus(Tasks.SLAVE_ASSIGNED_STATES),
         ImmutableSet.<IScheduledTask>of());
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java 
b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index f5c2128..88c0163 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -238,7 +238,7 @@ public class TaskSchedulerTest extends EasyMockTest {
   public void testNoOffers() {
     Capture<Runnable> timeoutCapture = 
expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
     expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
-    expect(preemptor.attemptPreemptionFor("a", 
emptyJob)).andReturn(Optional.<String>absent());
+    expectPreemptorCall(makeTask("a"));
 
     replayAndCreateScheduler();
 
@@ -316,7 +316,7 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     Capture<Runnable> timeoutCapture = 
expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
     expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure());
-    expect(preemptor.attemptPreemptionFor("a", 
emptyJob)).andReturn(Optional.<String>absent());
+    expectPreemptorCall(task);
 
     Capture<Runnable> timeoutCapture2 = 
expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
     expectMaybeAssign(OFFER_A, task, 
emptyJob).andReturn(Assignment.success(mesosTask));
@@ -324,7 +324,7 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     Capture<Runnable> timeoutCapture3 = 
expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
     expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
-    expect(preemptor.attemptPreemptionFor("b", 
emptyJob)).andReturn(Optional.<String>absent());
+    expectPreemptorCall(makeTask("b"));
 
     replayAndCreateScheduler();
 
@@ -404,10 +404,10 @@ public class TaskSchedulerTest extends EasyMockTest {
     expectAnyMaintenanceCalls();
     expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure());
     Capture<Runnable> timeoutCapture2 = 
expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
-    expect(preemptor.attemptPreemptionFor("a", 
emptyJob)).andReturn(Optional.<String>absent());
+    expectPreemptorCall(task);
     driver.declineOffer(OFFER_A.getOffer().getId());
     expectTaskGroupBackoff(10, 20);
-    expect(preemptor.attemptPreemptionFor("a", 
emptyJob)).andReturn(Optional.<String>absent());
+    expectPreemptorCall(task);
 
     replayAndCreateScheduler();
 
@@ -604,7 +604,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     Capture<Runnable> timeoutCapture = 
expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
     expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure());
     expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 20);
-    expect(preemptor.attemptPreemptionFor("a", 
emptyJob)).andReturn(Optional.<String>absent());
+    expectPreemptorCall(task);
 
     replayAndCreateScheduler();
 
@@ -651,4 +651,11 @@ public class TaskSchedulerTest extends EasyMockTest {
             .setAttributes(ImmutableSet.<Attribute>of())
             .setMode(mode)));
   }
+
+  private void expectPreemptorCall(IScheduledTask task) {
+    expect(preemptor.attemptPreemptionFor(
+        eq(task.getAssignedTask()),
+        eq(emptyJob),
+        
EasyMock.<MutableStoreProvider>anyObject())).andReturn(Optional.<String>absent());
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
 
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
new file mode 100644
index 0000000..bcd1b4e
--- /dev/null
+++ 
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import java.util.Arrays;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import 
org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.stats.CachedCounters;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.easymock.IExpectationSetters;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static 
org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.PENDING_PROCESSOR_RUN_NAME;
+import static 
org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.attemptsStatName;
+import static 
org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotSearchStatName;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class PendingTaskProcessorTest extends EasyMockTest {
+  private static final String TASK_ID_A = "task_a";
+  private static final String TASK_ID_B = "task_b";
+  private static final ScheduledTask TASK_A = makeTask(TASK_ID_A);
+  private static final ScheduledTask TASK_B = makeTask(TASK_ID_B);
+  private static final PreemptionSlot SLOT_A = createPreemptionSlot(TASK_A);
+  private static final PreemptionSlot SLOT_B = createPreemptionSlot(TASK_B);
+  private static final String SLAVE_ID = "slave_id";
+  private static final IJobKey JOB_KEY = 
IJobKey.build(TASK_A.getAssignedTask().getTask().getJob());
+
+  private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, 
Time.SECONDS);
+
+  private static final Optional<PreemptionSlot> EMPTY_SLOT = Optional.absent();
+
+  private StorageTestUtil storageUtil;
+  private FakeStatsProvider statsProvider;
+  private PreemptionSlotFinder preemptionSlotFinder;
+  private PendingTaskProcessor slotFinder;
+  private AttributeAggregate attrAggregate;
+  private PreemptionSlotCache slotCache;
+  private FakeClock clock;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    preemptionSlotFinder = createMock(PreemptionSlotFinder.class);
+    slotCache = createMock(PreemptionSlotCache.class);
+    statsProvider = new FakeStatsProvider();
+    clock = new FakeClock();
+    attrAggregate = new AttributeAggregate(
+        Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
+        createMock(AttributeStore.class));
+
+    slotFinder = new PendingTaskProcessor(
+        storageUtil.storage,
+        preemptionSlotFinder,
+        new PreemptorMetrics(new CachedCounters(statsProvider)),
+        PREEMPTION_DELAY,
+        slotCache,
+        clock);
+  }
+  @Test
+  public void testSearchSlotSuccessful() throws Exception {
+    expect(slotCache.get(TASK_ID_A)).andReturn(EMPTY_SLOT);
+    expect(slotCache.get(TASK_ID_B)).andReturn(EMPTY_SLOT);
+    expectGetPendingTasks(TASK_A, TASK_B);
+    expectAttributeAggegateFetchTasks();
+    expectSlotSearch(TASK_A, Optional.of(SLOT_A));
+    expectSlotSearch(TASK_B, Optional.of(SLOT_B));
+    slotCache.add(TASK_ID_A, SLOT_A);
+    slotCache.add(TASK_ID_B, SLOT_B);
+
+    control.replay();
+
+    clock.advance(PREEMPTION_DELAY);
+
+    slotFinder.run();
+    assertEquals(1L, statsProvider.getLongValue(PENDING_PROCESSOR_RUN_NAME));
+    assertEquals(2L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, 
true)));
+    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, 
true)));
+  }
+
+  @Test
+  public void testSearchSlotFailed() throws Exception {
+    expect(slotCache.get(TASK_ID_A)).andReturn(EMPTY_SLOT);
+    expectGetPendingTasks(TASK_A);
+    expectAttributeAggegateFetchTasks();
+    expectSlotSearch(TASK_A, EMPTY_SLOT);
+
+    control.replay();
+
+    clock.advance(PREEMPTION_DELAY);
+
+    slotFinder.run();
+    assertEquals(1L, statsProvider.getLongValue(PENDING_PROCESSOR_RUN_NAME));
+    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, 
true)));
+    assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(false, 
true)));
+  }
+
+  private void expectSlotSearch(ScheduledTask task, Optional<PreemptionSlot> 
slot) {
+    expect(preemptionSlotFinder.findPreemptionSlotFor(
+        IAssignedTask.build(task.getAssignedTask()),
+        attrAggregate,
+        storageUtil.storeProvider)).andReturn(slot);
+  }
+
+  private static PreemptionSlot createPreemptionSlot(ScheduledTask task) {
+    IAssignedTask assigned = IAssignedTask.build(task.getAssignedTask());
+    return new 
PreemptionSlot(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), SLAVE_ID);
+  }
+
+  private static ScheduledTask makeTask(String taskId) {
+    ScheduledTask task = new ScheduledTask()
+        .setAssignedTask(new AssignedTask()
+            .setTaskId(taskId)
+            .setTask(new TaskConfig()
+                .setPriority(1)
+                .setProduction(true)
+                .setJob(new JobKey("role", "env", "name"))));
+    task.addToTaskEvents(new TaskEvent(0, PENDING));
+    return task;
+  }
+
+  private IExpectationSetters<?> expectAttributeAggegateFetchTasks() {
+    return storageUtil.expectTaskFetch(
+        Query.jobScoped(JOB_KEY).byStatus(Tasks.SLAVE_ASSIGNED_STATES));
+  }
+
+  private void expectGetPendingTasks(ScheduledTask... returnedTasks) {
+    storageUtil.expectTaskFetch(
+        Query.statusScoped(PENDING),
+        IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
 
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
index d17c4fb..281f4e0 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
@@ -13,16 +13,10 @@
  */
 package org.apache.aurora.scheduler.async.preemptor;
 
-import java.util.Arrays;
-import java.util.concurrent.ScheduledExecutorService;
-
 import com.google.common.base.Optional;
 import com.google.common.base.Suppliers;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.gen.AssignedTask;
@@ -32,26 +26,24 @@ import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
 import 
org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
-import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.async.preemptor.Preemptor.PreemptorImpl;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.stats.CachedCounters;
 import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static 
org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.attemptsStatName;
-import static 
org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotSearchStatName;
 import static 
org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotValidationStatName;
 import static 
org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.successStatName;
+import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
@@ -59,171 +51,93 @@ import static org.junit.Assert.assertEquals;
 public class PreemptorImplTest extends EasyMockTest {
   private static final String TASK_ID = "task_a";
   private static final String SLAVE_ID = "slave_id";
-
-  private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, 
Time.SECONDS);
+  private static final IScheduledTask TASK = IScheduledTask.build(makeTask());
+  private static final PreemptionSlot SLOT = createPreemptionSlot(TASK);
 
   private static final Optional<PreemptionSlot> EMPTY_SLOT = Optional.absent();
   private static final Optional<String> EMPTY_RESULT = Optional.absent();
 
-  private StorageTestUtil storageUtil;
   private StateManager stateManager;
   private FakeStatsProvider statsProvider;
   private PreemptionSlotFinder preemptionSlotFinder;
   private PreemptorImpl preemptor;
   private AttributeAggregate attrAggregate;
   private PreemptionSlotCache slotCache;
-  private FakeScheduledExecutor clock;
+  private Storage.MutableStoreProvider storeProvider;
 
   @Before
   public void setUp() {
-    storageUtil = new StorageTestUtil(this);
-    storageUtil.expectOperations();
+    storeProvider = createMock(Storage.MutableStoreProvider.class);
     stateManager = createMock(StateManager.class);
     preemptionSlotFinder = createMock(PreemptionSlotFinder.class);
     slotCache = createMock(PreemptionSlotCache.class);
     statsProvider = new FakeStatsProvider();
-    ScheduledExecutorService executor = 
createMock(ScheduledExecutorService.class);
-    clock = FakeScheduledExecutor.scheduleExecutor(executor);
     attrAggregate = new AttributeAggregate(
         Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
         createMock(AttributeStore.class));
 
     preemptor = new PreemptorImpl(
-        storageUtil.storage,
         stateManager,
         preemptionSlotFinder,
         new PreemptorMetrics(new CachedCounters(statsProvider)),
-        PREEMPTION_DELAY,
-        executor,
-        slotCache,
-        clock);
-  }
-
-  @Test
-  public void testSearchSlotSuccessful() throws Exception {
-    ScheduledTask task = makeTask();
-    PreemptionSlot slot = createPreemptionSlot(task);
-
-    expect(slotCache.get(TASK_ID)).andReturn(EMPTY_SLOT);
-    expectGetPendingTasks(task);
-    expectSlotSearch(task, Optional.of(slot));
-    slotCache.add(TASK_ID, slot);
-
-    control.replay();
-
-    clock.advance(PREEMPTION_DELAY);
-
-    assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, 
attrAggregate));
-    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(true, 
true)));
-    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, 
true)));
-  }
-
-  @Test
-  public void testSearchSlotFailed() throws Exception {
-    ScheduledTask task = makeTask();
-
-    expect(slotCache.get(TASK_ID)).andReturn(EMPTY_SLOT);
-    expectGetPendingTasks(task);
-    expectSlotSearch(task, EMPTY_SLOT);
-
-    control.replay();
-
-    clock.advance(PREEMPTION_DELAY);
-
-    assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, 
attrAggregate));
-    assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, 
true)));
-    assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(false, 
true)));
-  }
-
-  @Test
-  public void testSearchSlotTaskNoLongerPending() throws Exception {
-    expect(slotCache.get(TASK_ID)).andReturn(EMPTY_SLOT);
-    storageUtil.expectTaskFetch(Query.statusScoped(PENDING).byId(TASK_ID));
-
-    control.replay();
-
-    assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, 
attrAggregate));
+        slotCache);
   }
 
   @Test
   public void testPreemptTasksSuccessful() throws Exception {
-    ScheduledTask task = makeTask();
-    PreemptionSlot slot = createPreemptionSlot(task);
-
-    expect(slotCache.get(TASK_ID)).andReturn(Optional.of(slot));
+    expect(slotCache.get(TASK_ID)).andReturn(Optional.of(SLOT));
     slotCache.remove(TASK_ID);
-    expectGetPendingTasks(task);
-    expectSlotValidation(task, slot, Optional.of(ImmutableSet.of(
-        
PreemptionVictim.fromTask(IAssignedTask.build(task.getAssignedTask())))));
+    expectSlotValidation(Optional.of(ImmutableSet.of(
+        PreemptionVictim.fromTask(TASK.getAssignedTask()))));
 
-    expectPreempted(task);
+    expectPreempted(TASK);
 
     control.replay();
 
-    clock.advance(PREEMPTION_DELAY);
-
-    assertEquals(Optional.of(SLAVE_ID), 
preemptor.attemptPreemptionFor(TASK_ID, attrAggregate));
+    assertEquals(Optional.of(SLAVE_ID), callPreemptor());
     assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true)));
     assertEquals(1L, statsProvider.getLongValue(successStatName(true)));
   }
 
   @Test
   public void testPreemptTasksValidationFailed() throws Exception {
-    ScheduledTask task = makeTask();
-    PreemptionSlot slot = createPreemptionSlot(task);
-
-    expect(slotCache.get(TASK_ID)).andReturn(Optional.of(slot));
+    expect(slotCache.get(TASK_ID)).andReturn(Optional.of(SLOT));
     slotCache.remove(TASK_ID);
-    expectGetPendingTasks(task);
-    storageUtil.expectTaskFetch(Query.statusScoped(PENDING).byId(TASK_ID));
-    expectSlotValidation(task, slot, 
Optional.<ImmutableSet<PreemptionVictim>>absent());
+    expectSlotValidation(Optional.<ImmutableSet<PreemptionVictim>>absent());
 
     control.replay();
 
-    clock.advance(PREEMPTION_DELAY);
-
-    assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, 
attrAggregate));
+    assertEquals(EMPTY_RESULT, callPreemptor());
     assertEquals(1L, 
statsProvider.getLongValue(slotValidationStatName(false)));
     assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
   }
 
   @Test
-  public void testPreemptTaskNoLongerPending() throws Exception {
-    ScheduledTask task = makeTask();
-    PreemptionSlot slot = createPreemptionSlot(task);
-    expect(slotCache.get(TASK_ID)).andReturn(Optional.of(slot));
-    slotCache.remove(TASK_ID);
-    storageUtil.expectTaskFetch(Query.statusScoped(PENDING).byId(TASK_ID));
+  public void testNoCachedSlot() throws Exception {
+    expect(slotCache.get(TASK_ID)).andReturn(EMPTY_SLOT);
 
     control.replay();
 
-    assertEquals(EMPTY_RESULT, preemptor.attemptPreemptionFor(TASK_ID, 
attrAggregate));
+    assertEquals(EMPTY_RESULT, callPreemptor());
+    assertEquals(0L, 
statsProvider.getLongValue(slotValidationStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
   }
 
-  private void expectSlotSearch(ScheduledTask task, Optional<PreemptionSlot> 
slot) {
-    expect(preemptionSlotFinder.findPreemptionSlotFor(
-        IAssignedTask.build(task.getAssignedTask()),
-        attrAggregate,
-        storageUtil.storeProvider)).andReturn(slot);
+  private Optional<String> callPreemptor() {
+    return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), 
attrAggregate, storeProvider);
   }
 
-  private void expectSlotValidation(
-      ScheduledTask task,
-      PreemptionSlot slot,
-      Optional<ImmutableSet<PreemptionVictim>> victims) {
-
+  private void expectSlotValidation(Optional<ImmutableSet<PreemptionVictim>> 
victims) {
     expect(preemptionSlotFinder.validatePreemptionSlotFor(
-        IAssignedTask.build(task.getAssignedTask()),
-        attrAggregate,
-        slot,
-        storageUtil.mutableStoreProvider)).andReturn(victims);
+        eq(TASK.getAssignedTask()),
+        eq(attrAggregate),
+        eq(SLOT),
+        anyObject(Storage.MutableStoreProvider.class))).andReturn(victims);
   }
 
-  private void expectPreempted(ScheduledTask preempted) throws Exception {
+  private void expectPreempted(IScheduledTask preempted) throws Exception {
     expect(stateManager.changeState(
-        eq(storageUtil.mutableStoreProvider),
+        anyObject(Storage.MutableStoreProvider.class),
         eq(Tasks.id(preempted)),
         eq(Optional.<ScheduleStatus>absent()),
         eq(ScheduleStatus.PREEMPTING),
@@ -231,8 +145,8 @@ public class PreemptorImplTest extends EasyMockTest {
         .andReturn(true);
   }
 
-  private static PreemptionSlot createPreemptionSlot(ScheduledTask task) {
-    IAssignedTask assigned = IAssignedTask.build(task.getAssignedTask());
+  private static PreemptionSlot createPreemptionSlot(IScheduledTask task) {
+    IAssignedTask assigned = task.getAssignedTask();
     return new 
PreemptionSlot(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), SLAVE_ID);
   }
 
@@ -247,13 +161,4 @@ public class PreemptorImplTest extends EasyMockTest {
     task.addToTaskEvents(new TaskEvent(0, PENDING));
     return task;
   }
-
-  private void expectGetPendingTasks(ScheduledTask... returnedTasks) {
-    Iterable<String> taskIds = 
FluentIterable.from(Arrays.asList(returnedTasks))
-        .transform(IScheduledTask.FROM_BUILDER)
-        .transform(Tasks.SCHEDULED_TO_ID);
-    storageUtil.expectTaskFetch(
-        Query.statusScoped(PENDING).byId(taskIds),
-        IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks)));
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/a7b95d95/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
 
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
index 0e2e958..7e2d1c5 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
@@ -13,8 +13,6 @@
  */
 package org.apache.aurora.scheduler.async.preemptor;
 
-import java.util.concurrent.ScheduledExecutorService;
-
 import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
@@ -30,12 +28,14 @@ import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
+import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.junit.Before;
@@ -78,7 +78,7 @@ public class PreemptorModuleTest extends EasyMockTest {
     Injector injector = createInjector(new PreemptorModule(
         false,
         Amount.of(0L, Time.SECONDS),
-        createMock(ScheduledExecutorService.class)));
+        Amount.of(0L, Time.SECONDS)));
 
     Supplier<ImmutableSet<IScheduledTask>> taskSupplier =
         createMock(new 
EasyMockTest.Clazz<Supplier<ImmutableSet<IScheduledTask>>>() { });
@@ -91,7 +91,9 @@ public class PreemptorModuleTest extends EasyMockTest {
     injector.getBindings();
     assertEquals(
         Optional.<String>absent(),
-        injector.getInstance(Preemptor.class)
-            .attemptPreemptionFor("a", new AttributeAggregate(taskSupplier, 
attributeStore)));
+        injector.getInstance(Preemptor.class).attemptPreemptionFor(
+            IAssignedTask.build(new AssignedTask()),
+            new AttributeAggregate(taskSupplier, attributeStore),
+            storageUtil.mutableStoreProvider));
   }
 }

Reply via email to