Repository: aurora
Updated Branches:
  refs/heads/master 3b29a4b79 -> 1dc11fb1a


Generalizing preemption reservation pool.

Bugs closed: AURORA-1219

Reviewed at https://reviews.apache.org/r/32907/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/1dc11fb1
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/1dc11fb1
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/1dc11fb1

Branch: refs/heads/master
Commit: 1dc11fb1a92bb85ee629971d3f07f79e26a31a59
Parents: 3b29a4b
Author: Maxim Khutornenko <[email protected]>
Authored: Tue Apr 14 13:01:07 2015 -0700
Committer: Maxim Khutornenko <[email protected]>
Committed: Tue Apr 14 13:01:07 2015 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/async/AsyncModule.java     |  41 ++---
 .../aurora/scheduler/async/TaskScheduler.java   |  87 ++-------
 .../scheduler/async/preemptor/BiCache.java      | 140 ++++++++++++++
 .../async/preemptor/PendingTaskProcessor.java   |  11 +-
 .../async/preemptor/PreemptionSlotCache.java    |  99 ----------
 .../scheduler/async/preemptor/Preemptor.java    |  28 +--
 .../async/preemptor/PreemptorModule.java        |  10 +-
 .../scheduler/async/TaskSchedulerImplTest.java  | 184 +++++++++----------
 .../scheduler/async/TaskSchedulerTest.java      |  59 +++---
 .../scheduler/async/preemptor/BiCacheTest.java  | 108 +++++++++++
 .../preemptor/PendingTaskProcessorTest.java     |  31 ++--
 .../preemptor/PreemptionSlotCacheTest.java      |  66 -------
 .../async/preemptor/PreemptorImplTest.java      |  24 ++-
 13 files changed, 458 insertions(+), 430 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java 
b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index e87dda4..35c7e43 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -30,7 +30,6 @@ import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
 import com.google.inject.PrivateModule;
 import com.google.inject.TypeLiteral;
 import com.twitter.common.args.Arg;
@@ -53,7 +52,10 @@ import 
org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculat
 import org.apache.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
 import 
org.apache.aurora.scheduler.async.TaskHistoryPruner.HistoryPrunnerSettings;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
+import org.apache.aurora.scheduler.async.preemptor.BiCache;
+import org.apache.aurora.scheduler.async.preemptor.BiCache.BiCacheSettings;
 import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 
 import static java.lang.annotation.ElementType.FIELD;
@@ -62,8 +64,6 @@ import static java.lang.annotation.ElementType.PARAMETER;
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
-import static 
org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
-
 /**
  * Binding module for async task management.
  */
@@ -237,12 +237,24 @@ public class AsyncModule extends AbstractModule {
         expose(TaskGroups.class);
       }
     });
-    bindTaskScheduler(binder(), RESERVATION_DURATION.get());
     PubsubEventModule.bindSubscriber(binder(), TaskGroups.class);
 
     install(new PrivateModule() {
       @Override
       protected void configure() {
+        bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { 
}).in(Singleton.class);
+        bind(BiCacheSettings.class).toInstance(
+            new BiCacheSettings(RESERVATION_DURATION.get(), 
"reservation_cache_size"));
+        bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
+        bind(TaskSchedulerImpl.class).in(Singleton.class);
+        expose(TaskScheduler.class);
+      }
+    });
+    PubsubEventModule.bindSubscriber(binder(), TaskScheduler.class);
+
+    install(new PrivateModule() {
+      @Override
+      protected void configure() {
         bind(OfferReturnDelay.class).toInstance(
             new RandomJitterReturnDelay(
                 MIN_OFFER_HOLD_TIME.get().as(Time.MILLISECONDS),
@@ -331,27 +343,6 @@ public class AsyncModule extends AbstractModule {
     PubsubEventModule.bindSubscriber(binder(), KillRetry.class);
   }
 
-  /**
-   * This method exists because we want to test the wiring up of 
TaskSchedulerImpl class to the
-   * PubSub system in the TaskSchedulerImplTest class. The method has a 
complex signature because
-   * the binding of the TaskScheduler and friends occurs in a PrivateModule 
which does not interact
-   * well with the MultiBinder that backs the PubSub system.
-   */
-  @VisibleForTesting
-  static void bindTaskScheduler(Binder binder, final Amount<Long, Time> 
reservationDuration) {
-    binder.install(new PrivateModule() {
-      @Override
-      protected void configure() {
-        bind(new TypeLiteral<Amount<Long, Time>>() { 
}).annotatedWith(ReservationDuration.class)
-            .toInstance(reservationDuration);
-        bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
-        bind(TaskSchedulerImpl.class).in(Singleton.class);
-        expose(TaskScheduler.class);
-      }
-    });
-    PubsubEventModule.bindSubscriber(binder, TaskScheduler.class);
-  }
-
   static class RegisterGauges extends AbstractIdleService {
     private final StatsProvider statsProvider;
     private final ScheduledThreadPoolExecutor executor;

http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java 
b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index ebc520e..6f169e8 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.async;
 
 import java.lang.annotation.Retention;
 import java.lang.annotation.Target;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -26,20 +25,13 @@ import javax.inject.Qualifier;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Ticker;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Iterables;
 import com.google.common.eventbus.Subscribe;
 import com.twitter.common.inject.TimedInterceptor.Timed;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
 import com.twitter.common.stats.Stats;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.util.Clock;
 
 import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.async.preemptor.BiCache;
 import org.apache.aurora.scheduler.async.preemptor.Preemptor;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
@@ -56,7 +48,6 @@ import 
org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.mesos.Protos.SlaveID;
 
 import static java.lang.annotation.ElementType.FIELD;
 import static java.lang.annotation.ElementType.METHOD;
@@ -103,7 +94,7 @@ public interface TaskScheduler extends EventSubscriber {
     private final TaskAssigner assigner;
     private final OfferManager offerManager;
     private final Preemptor preemptor;
-    private final Reservations reservations;
+    private final BiCache<String, TaskGroupKey> reservations;
 
     private final AtomicLong attemptsFired = 
Stats.exportLong("schedule_attempts_fired");
     private final AtomicLong attemptsFailed = 
Stats.exportLong("schedule_attempts_failed");
@@ -116,16 +107,14 @@ public interface TaskScheduler extends EventSubscriber {
         TaskAssigner assigner,
         OfferManager offerManager,
         Preemptor preemptor,
-        @ReservationDuration Amount<Long, Time> reservationDuration,
-        final Clock clock,
-        StatsProvider statsProvider) {
+        BiCache<String, TaskGroupKey> reservations) {
 
       this.storage = requireNonNull(storage);
       this.stateManager = requireNonNull(stateManager);
       this.assigner = requireNonNull(assigner);
       this.offerManager = requireNonNull(offerManager);
       this.preemptor = requireNonNull(preemptor);
-      this.reservations = new Reservations(statsProvider, reservationDuration, 
clock);
+      this.reservations = requireNonNull(reservations);
     }
 
     private Function<HostOffer, Assignment> getAssignerFunction(
@@ -138,11 +127,12 @@ public interface TaskScheduler extends EventSubscriber {
       return new Function<HostOffer, Assignment>() {
         @Override
         public Assignment apply(HostOffer offer) {
-          Optional<String> reservedTaskId =
-              reservations.getSlaveReservation(offer.getOffer().getSlaveId());
-          if (reservedTaskId.isPresent()) {
-            if (resourceRequest.getTaskId().equals(reservedTaskId.get())) {
-              // Slave is reserved to satisfy this task.
+          Optional<TaskGroupKey> reservation =
+              reservations.get(offer.getOffer().getSlaveId().getValue());
+
+          if (reservation.isPresent()) {
+            if 
(TaskGroupKey.from(resourceRequest.getTask()).equals(reservation.get())) {
+              // Slave is reserved to satisfy this task group.
               return assigner.maybeAssign(storeProvider, offer, 
resourceRequest);
             } else {
               // Slave is reserved for another task.
@@ -234,67 +224,22 @@ public interface TaskScheduler extends EventSubscriber {
         AttributeAggregate jobState,
         MutableStoreProvider storeProvider) {
 
-      if (reservations.hasReservationForTask(task.getTaskId())) {
+      if 
(!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) {
         return;
       }
       Optional<String> slaveId = preemptor.attemptPreemptionFor(task, 
jobState, storeProvider);
       if (slaveId.isPresent()) {
-        reservations.add(SlaveID.newBuilder().setValue(slaveId.get()).build(), 
task.getTaskId());
+        reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask()));
       }
     }
 
     @Subscribe
     public void taskChanged(final TaskStateChange stateChangeEvent) {
       if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) {
-        reservations.invalidateTask(stateChangeEvent.getTaskId());
-      }
-    }
-
-    @VisibleForTesting
-    static final String RESERVATIONS_CACHE_SIZE_STAT = 
"reservation_cache_size";
-
-    private static class Reservations {
-      private final Cache<SlaveID, String> reservations;
-
-      Reservations(
-          StatsProvider statsProvider,
-          Amount<Long, Time> duration,
-          final Clock clock) {
-        requireNonNull(duration);
-        requireNonNull(clock);
-        this.reservations = CacheBuilder.newBuilder()
-            .expireAfterWrite(duration.as(Time.MINUTES), TimeUnit.MINUTES)
-            .ticker(new Ticker() {
-              @Override
-              public long read() {
-                return clock.nowNanos();
-              }
-            })
-            .build();
-        statsProvider.makeGauge(
-            RESERVATIONS_CACHE_SIZE_STAT,
-            new Supplier<Long>() {
-              @Override
-              public Long get() {
-                return reservations.size();
-              }
-            });
-      }
-
-      private synchronized void add(SlaveID slaveId, String taskId) {
-        reservations.put(slaveId, taskId);
-      }
-
-      private synchronized boolean hasReservationForTask(String taskId) {
-        return reservations.asMap().containsValue(taskId);
-      }
-
-      private synchronized Optional<String> getSlaveReservation(SlaveID 
slaveID) {
-        return Optional.fromNullable(reservations.getIfPresent(slaveID));
-      }
-
-      private synchronized void invalidateTask(String taskId) {
-        reservations.asMap().values().remove(taskId);
+        IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask();
+        if (assigned.getSlaveId() != null) {
+          reservations.remove(assigned.getSlaveId(), 
TaskGroupKey.from(assigned.getTask()));
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/main/java/org/apache/aurora/scheduler/async/preemptor/BiCache.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/BiCache.java 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/BiCache.java
new file mode 100644
index 0000000..f5a1833
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/BiCache.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.base.Ticker;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.util.Clock;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A bi-directional cache of items. Entries are purged from cache after
+ * {@link BiCacheSettings#expireAfter}.
+ *
+ * @param <K> Key type.
+ * @param <V> Value type.
+ */
+public class BiCache<K, V> {
+
+  public static class BiCacheSettings {
+    private final Amount<Long, Time> expireAfter;
+    private final String cacheSizeStatName;
+
+    public BiCacheSettings(Amount<Long, Time> expireAfter, String 
cacheSizeStatName) {
+      this.expireAfter = requireNonNull(expireAfter);
+      this.cacheSizeStatName = requireNonNull(cacheSizeStatName);
+    }
+  }
+
+  private final Cache<K, V> cache;
+  private final Multimap<V, K> inverse = HashMultimap.create();
+
+  @Inject
+  public BiCache(
+      StatsProvider statsProvider,
+      BiCacheSettings settings,
+      final Clock clock) {
+
+    requireNonNull(clock);
+    this.cache = CacheBuilder.newBuilder()
+        .expireAfterWrite(settings.expireAfter.as(Time.MINUTES), 
TimeUnit.MINUTES)
+        .ticker(new Ticker() {
+          @Override
+          public long read() {
+            return clock.nowNanos();
+          }
+        })
+        .removalListener(new RemovalListener<K, V>() {
+          @Override
+          public void onRemoval(RemovalNotification<K, V> notification) {
+            inverse.remove(notification.getValue(), notification.getKey());
+          }
+        })
+        .build();
+
+    statsProvider.makeGauge(
+        settings.cacheSizeStatName,
+        new Supplier<Long>() {
+          @Override
+          public Long get() {
+            return cache.size();
+          }
+        });
+  }
+
+  /**
+   * Puts a new key/value pair.
+   *
+   * @param key Key to add.
+   * @param value Value to add.
+   */
+  public synchronized void put(K key, V value) {
+    requireNonNull(key);
+    requireNonNull(value);
+    cache.put(key, value);
+    inverse.put(value, key);
+  }
+
+  /**
+   * Gets a cached value by key.
+   *
+   * @param key Key to get value for.
+   * @return Optional of value.
+   */
+  public synchronized Optional<V> get(K key) {
+    return Optional.fromNullable(cache.getIfPresent(key));
+  }
+
+  /**
+   * Gets a set of keys for a given value.
+   *
+   * @param value Value to get all keys for.
+   * @return An {@link Iterable} of keys or empty if value does not exist.
+   */
+  public synchronized Set<K> getByValue(V value) {
+    // Cache items are lazily removed by routine maintenance operations during 
get/write access.
+    // Forcing cleanup here to ensure proper data integrity.
+    cache.cleanUp();
+    return ImmutableSet.copyOf(inverse.get(value));
+  }
+
+  /**
+   * Removes a key/value pair from cache.
+   *
+   * @param key Key to remove.
+   * @param value Value to remove.
+   */
+  public synchronized void remove(K key, V value) {
+    inverse.remove(value, key);
+    cache.invalidate(key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
index 67ad5d7..00919b7 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
@@ -32,6 +32,7 @@ import com.twitter.common.util.Clock;
 
 import 
org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
 import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -58,7 +59,7 @@ class PendingTaskProcessor implements Runnable {
   private final PreemptionSlotFinder preemptionSlotFinder;
   private final PreemptorMetrics metrics;
   private final Amount<Long, Time> preemptionCandidacyDelay;
-  private final PreemptionSlotCache slotCache;
+  private final BiCache<PreemptionSlot, TaskGroupKey> slotCache;
   private final Clock clock;
 
   /**
@@ -78,7 +79,7 @@ class PendingTaskProcessor implements Runnable {
       PreemptionSlotFinder preemptionSlotFinder,
       PreemptorMetrics metrics,
       @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
-      PreemptionSlotCache slotCache,
+      BiCache<PreemptionSlot, TaskGroupKey> slotCache,
       Clock clock) {
 
     this.storage = requireNonNull(storage);
@@ -112,7 +113,7 @@ class PendingTaskProcessor implements Runnable {
             metrics.recordSlotSearchResult(slot, task);
 
             if (slot.isPresent()) {
-              slotCache.add(pendingTask.getTaskId(), slot.get());
+              slotCache.put(slot.get(), TaskGroupKey.from(task));
             }
           }
         }
@@ -132,8 +133,8 @@ class PendingTaskProcessor implements Runnable {
 
   private final Predicate<IScheduledTask> hasCachedSlot = new 
Predicate<IScheduledTask>() {
     @Override
-    public boolean apply(IScheduledTask input) {
-      return slotCache.get(input.getAssignedTask().getTaskId()).isPresent();
+    public boolean apply(IScheduledTask task) {
+      return 
!slotCache.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())).isEmpty();
     }
   };
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java
 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java
deleted file mode 100644
index b5a42a0..0000000
--- 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.concurrent.TimeUnit;
-
-import javax.inject.Inject;
-import javax.inject.Qualifier;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Ticker;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.util.Clock;
-
-import 
org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-import static java.util.Objects.requireNonNull;
-
-/**
- * Caches preemption slots found for candidate tasks. Entries are purged from 
cache after #duration.
- */
-class PreemptionSlotCache {
-
-  @VisibleForTesting
-  @Qualifier
-  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-  @interface PreemptionSlotHoldDuration { }
-
-  @VisibleForTesting
-  static final String PREEMPTION_SLOT_CACHE_SIZE_STAT = 
"preemption_slot_cache_size";
-
-  private final Cache<String, PreemptionSlot> slots;
-
-  @Inject
-  PreemptionSlotCache(
-      StatsProvider statsProvider,
-      @PreemptionSlotHoldDuration Amount<Long, Time> duration,
-      final Clock clock) {
-
-    requireNonNull(duration);
-    requireNonNull(clock);
-    this.slots = CacheBuilder.newBuilder()
-        .expireAfterWrite(duration.as(Time.MINUTES), TimeUnit.MINUTES)
-        .ticker(new Ticker() {
-          @Override
-          public long read() {
-            return clock.nowNanos();
-          }
-        })
-        .build();
-
-    statsProvider.makeGauge(
-        PREEMPTION_SLOT_CACHE_SIZE_STAT,
-        new Supplier<Long>() {
-          @Override
-          public Long get() {
-            return slots.size();
-          }
-        });
-  }
-
-  void add(String taskId, PreemptionSlot preemptionSlot) {
-    requireNonNull(taskId);
-    requireNonNull(preemptionSlot);
-    slots.put(taskId, preemptionSlot);
-  }
-
-  Optional<PreemptionSlot> get(String taskId) {
-    return Optional.fromNullable(slots.getIfPresent(taskId));
-  }
-
-  void remove(String taskId) {
-    slots.invalidate(taskId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
index 77617ec..5200811 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java
@@ -13,6 +13,8 @@
  */
 package org.apache.aurora.scheduler.async.preemptor;
 
+import java.util.Set;
+
 import javax.inject.Inject;
 
 import com.google.common.base.Optional;
@@ -20,6 +22,7 @@ import com.google.common.collect.ImmutableSet;
 
 import org.apache.aurora.gen.ScheduleStatus;
 import 
org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
@@ -51,14 +54,14 @@ public interface Preemptor {
     private final StateManager stateManager;
     private final PreemptionSlotFinder preemptionSlotFinder;
     private final PreemptorMetrics metrics;
-    private final PreemptionSlotCache slotCache;
+    private final BiCache<PreemptionSlot, TaskGroupKey> slotCache;
 
     @Inject
     PreemptorImpl(
         StateManager stateManager,
         PreemptionSlotFinder preemptionSlotFinder,
         PreemptorMetrics metrics,
-        PreemptionSlotCache slotCache) {
+        BiCache<PreemptionSlot, TaskGroupKey> slotCache) {
 
       this.stateManager = requireNonNull(stateManager);
       this.preemptionSlotFinder = requireNonNull(preemptionSlotFinder);
@@ -70,21 +73,20 @@ public interface Preemptor {
     public Optional<String> attemptPreemptionFor(
         IAssignedTask pendingTask,
         AttributeAggregate jobState,
-        MutableStoreProvider storeProvider) {
+        MutableStoreProvider store) {
 
-      final Optional<PreemptionSlot> preemptionSlot = 
slotCache.get(pendingTask.getTaskId());
+      TaskGroupKey groupKey = TaskGroupKey.from(pendingTask.getTask());
+      Set<PreemptionSlot> preemptionSlots = slotCache.getByValue(groupKey);
 
       // A preemption slot is available -> attempt to preempt tasks.
-      if (preemptionSlot.isPresent()) {
-        slotCache.remove(pendingTask.getTaskId());
+      if (!preemptionSlots.isEmpty()) {
+        // Get the next available preemption slot.
+        PreemptionSlot slot = preemptionSlots.iterator().next();
+        slotCache.remove(slot, groupKey);
 
         // Validate a PreemptionSlot is still valid for the given task.
         Optional<ImmutableSet<PreemptionVictim>> validatedVictims =
-            preemptionSlotFinder.validatePreemptionSlotFor(
-                pendingTask,
-                jobState,
-                preemptionSlot.get(),
-                storeProvider);
+            preemptionSlotFinder.validatePreemptionSlotFor(pendingTask, 
jobState, slot, store);
 
         metrics.recordSlotValidationResult(validatedVictims);
         if (!validatedVictims.isPresent()) {
@@ -95,13 +97,13 @@ public interface Preemptor {
         for (PreemptionVictim toPreempt : validatedVictims.get()) {
           metrics.recordTaskPreemption(toPreempt);
           stateManager.changeState(
-              storeProvider,
+              store,
               toPreempt.getTaskId(),
               Optional.<ScheduleStatus>absent(),
               PREEMPTING,
               Optional.of("Preempting in favor of " + 
pendingTask.getTaskId()));
         }
-        return Optional.of(preemptionSlot.get().getSlaveId());
+        return Optional.of(slot.getSlaveId());
       }
 
       return Optional.absent();

http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
index 1092c05..7cea881 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
@@ -30,6 +30,9 @@ import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
 
 import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.async.preemptor.BiCache.BiCacheSettings;
+import 
org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.storage.Storage;
@@ -95,10 +98,9 @@ public class PreemptorModule extends AbstractModule {
           bind(new TypeLiteral<Amount<Long, Time>>() { })
               .annotatedWith(PendingTaskProcessor.PreemptionDelay.class)
               .toInstance(preemptionDelay);
-          bind(new TypeLiteral<Amount<Long, Time>>() { })
-              
.annotatedWith(PreemptionSlotCache.PreemptionSlotHoldDuration.class)
-              .toInstance(PREEMPTION_SLOT_HOLD_TIME.get());
-          bind(PreemptionSlotCache.class).in(Singleton.class);
+          bind(BiCacheSettings.class).toInstance(
+              new BiCacheSettings(PREEMPTION_SLOT_HOLD_TIME.get(), 
"preemption_slot_cache_size"));
+          bind(new TypeLiteral<BiCache<PreemptionSlot, TaskGroupKey>>() { 
}).in(Singleton.class);
           bind(PendingTaskProcessor.class).in(Singleton.class);
           bind(ClusterState.class).to(ClusterStateImpl.class);
           bind(ClusterStateImpl.class).in(Singleton.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java 
b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index b61abf9..b0cced7 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -19,13 +19,11 @@ import com.google.common.collect.ImmutableSet;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
+import com.google.inject.TypeLiteral;
+
 import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.Clock;
-import com.twitter.common.util.testing.FakeClock;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.HostAttributes;
@@ -35,6 +33,8 @@ import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
+import org.apache.aurora.scheduler.async.preemptor.BiCache;
 import org.apache.aurora.scheduler.async.preemptor.Preemptor;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
@@ -58,10 +58,12 @@ import 
org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.mesos.Protos.TaskInfo;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
+import org.easymock.IExpectationSetters;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
 import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
 import static org.easymock.EasyMock.capture;
@@ -89,10 +91,8 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   private TaskAssigner assigner;
   private OfferManager offerManager;
   private TaskScheduler scheduler;
-  private FakeClock clock;
   private Preemptor preemptor;
-  private Amount<Long, Time> reservationDuration;
-  private Amount<Long, Time> halfReservationDuration;
+  private BiCache<String, TaskGroupKey> reservations;
   private EventSink eventSink;
 
   @Before
@@ -101,11 +101,8 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     stateManager = createMock(StateManager.class);
     assigner = createMock(TaskAssigner.class);
     offerManager = createMock(OfferManager.class);
-    reservationDuration = Amount.of(2L, Time.MINUTES);
-    halfReservationDuration = Amount.of(1L, Time.MINUTES);
-    clock = new FakeClock();
-    clock.setNowMillis(0);
     preemptor = createMock(Preemptor.class);
+    reservations = createMock(new Clazz<BiCache<String, TaskGroupKey>>() { });
 
     Injector injector = getInjector(storageUtil.storage);
     scheduler = injector.getInstance(TaskScheduler.class);
@@ -118,14 +115,16 @@ public class TaskSchedulerImplTest extends EasyMockTest {
         new AbstractModule() {
           @Override
           protected void configure() {
+            bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { 
}).toInstance(reservations);
+            bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
             bind(Preemptor.class).toInstance(preemptor);
-            AsyncModule.bindTaskScheduler(binder(), reservationDuration);
             bind(OfferManager.class).toInstance(offerManager);
             bind(StateManager.class).toInstance(stateManager);
             bind(TaskAssigner.class).toInstance(assigner);
-            bind(Clock.class).toInstance(clock);
+            bind(Clock.class).toInstance(createMock(Clock.class));
+            
bind(StatsProvider.class).toInstance(createMock(StatsProvider.class));
             bind(Storage.class).toInstance(storageImpl);
-            bind(StatsProvider.class).toInstance(Stats.STATS_PROVIDER);
+            PubsubEventModule.bindSubscriber(binder(), TaskScheduler.class);
           }
         });
   }
@@ -145,143 +144,99 @@ public class TaskSchedulerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testReservationsDeniesTasksForTimePeriod() throws Exception {
+  public void testReservation() throws Exception {
     storageUtil.expectOperations();
 
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
     expectLaunchAttempt(false);
     // Reserve "a" with offerA
+    expectReservationCheck(TASK_A);
     expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
+    expectAddReservation(SLAVE_ID, TASK_A);
 
-    expectTaskStillPendingQuery(TASK_B);
-    expectActiveJobFetch(TASK_B);
-    AssignmentCapture firstAssignment = expectLaunchAttempt(false);
-    expectPreemptorCall(TASK_B, Optional.<String>absent());
-
-    expectTaskStillPendingQuery(TASK_B);
-    expectActiveJobFetch(TASK_B);
-    AssignmentCapture secondAssignment = expectLaunchAttempt(true);
-    expectAssigned(TASK_B);
+    // Use previously created reservation.
+    expectTaskStillPendingQuery(TASK_A);
+    expectActiveJobFetch(TASK_A);
+    expectGetReservation(SLAVE_ID, TASK_A);
+    expectAssigned(TASK_A);
+    AssignmentCapture assignment = expectLaunchAttempt(true);
 
     control.replay();
 
     assertFalse(scheduler.schedule("a"));
-    assertFalse(scheduler.schedule("b"));
-
-    assignAndAssert(Result.FAILURE, GROUP_B, OFFER, firstAssignment);
-
-    clock.advance(reservationDuration);
-
-    assertTrue(scheduler.schedule("b"));
-
-    assignAndAssert(Result.SUCCESS, GROUP_B, OFFER, secondAssignment);
+    assertTrue(scheduler.schedule("a"));
+    assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment);
   }
 
   @Test
-  public void testReservationsExpireAfterAccepted() throws Exception {
+  public void testReservationExpires() throws Exception {
     storageUtil.expectOperations();
 
     expectTaskStillPendingQuery(TASK_A);
     expectActiveJobFetch(TASK_A);
     expectLaunchAttempt(false);
     // Reserve "a" with offerA
+    expectReservationCheck(TASK_A);
     expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
+    expectAddReservation(SLAVE_ID, TASK_A);
 
-    expectTaskStillPendingQuery(TASK_A);
-    expectActiveJobFetch(TASK_A);
-    AssignmentCapture firstAssignment = expectLaunchAttempt(true);
-    expectAssigned(TASK_A);
-
+    // First attempt -> reservation is active.
     expectTaskStillPendingQuery(TASK_B);
     expectActiveJobFetch(TASK_B);
+    AssignmentCapture firstAssignment = expectLaunchAttempt(false);
+    expectGetReservation(SLAVE_ID, TASK_A);
+    expectReservationCheck(TASK_B);
+    expectPreemptorCall(TASK_B, Optional.<String>absent());
 
-    AssignmentCapture secondAssignment = expectLaunchAttempt(true);
+    // Status changed -> reservation removed.
+    reservations.remove(SLAVE_ID, 
TaskGroupKey.from(TASK_A.getAssignedTask().getTask()));
 
-    expect(assigner.maybeAssign(
-        storageUtil.mutableStoreProvider,
-        OFFER,
-        new ResourceRequest(TASK_B.getAssignedTask().getTask(), 
Tasks.id(TASK_B), EMPTY)))
-        .andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
+    // Second attempt -> reservation expires.
+    expectGetNoReservation(SLAVE_ID);
+    expectTaskStillPendingQuery(TASK_B);
+    expectActiveJobFetch(TASK_B);
+    AssignmentCapture secondAssignment = expectLaunchAttempt(true);
+    expectAssigned(TASK_B);
 
     control.replay();
+
     assertFalse(scheduler.schedule("a"));
-    assertTrue(scheduler.schedule("a"));
-    assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, firstAssignment);
-    eventSink.post(TaskStateChange.transition(TASK_A, PENDING));
-    clock.advance(halfReservationDuration);
+    assertFalse(scheduler.schedule("b"));
+    assignAndAssert(Result.FAILURE, GROUP_B, OFFER, firstAssignment);
+
+    eventSink.post(TaskStateChange.transition(assign(TASK_A, SLAVE_ID), 
PENDING));
     assertTrue(scheduler.schedule("b"));
     assignAndAssert(Result.SUCCESS, GROUP_B, OFFER, secondAssignment);
   }
 
   @Test
-  public void testReservationsAcceptsWithInTimePeriod() throws Exception {
+  public void testReservationUnusable() throws Exception {
     storageUtil.expectOperations();
-    expectTaskStillPendingQuery(TASK_A);
-    expectActiveJobFetch(TASK_A);
-    expectLaunchAttempt(false);
-    // Reserve "a" with offerA
-    expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
 
     expectTaskStillPendingQuery(TASK_A);
-    expectActiveJobFetch(TASK_A);
-    AssignmentCapture assignment = expectLaunchAttempt(true);
-    expectAssigned(TASK_A);
+    expectLaunchAttempt(false);
+    
expect(reservations.getByValue(TaskGroupKey.from(TASK_A.getAssignedTask().getTask())))
+        .andReturn(ImmutableSet.of(SLAVE_ID));
 
     control.replay();
-    assertFalse(scheduler.schedule("a"));
-    clock.advance(halfReservationDuration);
-    assertTrue(scheduler.schedule("a"));
 
-    assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment);
+    assertFalse(scheduler.schedule("a"));
   }
 
   @Test
-  public void testReservationsCancellation() throws Exception {
-    storageUtil.expectOperations();
-
-    expectTaskStillPendingQuery(TASK_A);
-    expectActiveJobFetch(TASK_A);
-    expectLaunchAttempt(false);
-
-    // Reserve "a" with offerA
-    expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID));
-
-    expectTaskStillPendingQuery(TASK_B);
-    expectActiveJobFetch(TASK_B);
-    AssignmentCapture assignment = expectLaunchAttempt(true);
-    expectAssigned(TASK_B);
-
+  public void testNonPendingIgnored() throws Exception {
     control.replay();
-    assertFalse(scheduler.schedule("a"));
-    clock.advance(halfReservationDuration);
-    // Task is killed by user before it is scheduled
-    eventSink.post(TaskStateChange.transition(TASK_A, PENDING));
-    assertTrue(scheduler.schedule("b"));
-    assignAndAssert(Result.SUCCESS, GROUP_B, OFFER, assignment);
+
+    eventSink.post(TaskStateChange.transition(TASK_A, RUNNING));
   }
 
   @Test
-  public void testReservationsExpire() throws Exception {
-    storageUtil.expectOperations();
-
-    expectTaskStillPendingQuery(TASK_B);
-    expectActiveJobFetch(TASK_B);
-    expectLaunchAttempt(false);
-    // Reserve "b" with offer1
-    expectPreemptorCall(TASK_B, Optional.of(SLAVE_ID));
-
-    expectTaskStillPendingQuery(TASK_A);
-    expectActiveJobFetch(TASK_A);
-    AssignmentCapture assignment = expectLaunchAttempt(true);
-    expectAssigned(TASK_A);
-
+  public void testPendingDeletedHandled() throws Exception {
     control.replay();
-    assertFalse(scheduler.schedule("b"));
-    // We don't act on the reservation made by b because we want to see 
timeout behaviour.
-    clock.advance(reservationDuration);
-    assertTrue(scheduler.schedule("a"));
-    assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment);
+
+    IScheduledTask task = 
IScheduledTask.build(TASK_A.newBuilder().setStatus(PENDING));
+    eventSink.post(TaskStateChange.transition(task, PENDING));
   }
 
   @Test
@@ -307,6 +262,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
       }
     });
 
+    expectGetNoReservation(SLAVE_ID);
     AssignmentCapture assignment = expectLaunchAttempt(true);
     expect(assigner.maybeAssign(
         EasyMock.<MutableStoreProvider>anyObject(),
@@ -353,6 +309,12 @@ public class TaskSchedulerImplTest extends EasyMockTest {
     return capture;
   }
 
+  private IScheduledTask assign(IScheduledTask task, String slaveId) {
+    ScheduledTask result = task.newBuilder();
+    result.getAssignedTask().setSlaveId(slaveId);
+    return IScheduledTask.build(result);
+  }
+
   private void assignAndAssert(
       Result result,
       TaskGroupKey groupKey,
@@ -369,4 +331,22 @@ public class TaskSchedulerImplTest extends EasyMockTest {
             .byStatus(Tasks.SLAVE_ASSIGNED_STATES),
         ImmutableSet.<IScheduledTask>of());
   }
+
+  private void expectAddReservation(String slaveId, IScheduledTask task) {
+    reservations.put(slaveId, 
TaskGroupKey.from(task.getAssignedTask().getTask()));
+  }
+
+  private IExpectationSetters<?> expectGetReservation(String slaveId, 
IScheduledTask task) {
+    return expect(reservations.get(slaveId))
+        
.andReturn(Optional.of(TaskGroupKey.from(task.getAssignedTask().getTask())));
+  }
+
+  private IExpectationSetters<?> expectGetNoReservation(String slaveId) {
+    return 
expect(reservations.get(slaveId)).andReturn(Optional.<TaskGroupKey>absent());
+  }
+
+  private IExpectationSetters<?> expectReservationCheck(IScheduledTask task) {
+    return 
expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
+        .andReturn(ImmutableSet.<String>of());
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java 
b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 9c47a76..34cbd19 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -19,17 +19,13 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.RateLimiter;
 import com.twitter.common.quantity.Amount;
 import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stat;
-import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 import com.twitter.common.util.BackoffStrategy;
-import com.twitter.common.util.testing.FakeClock;
 
 import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.Attribute;
@@ -44,6 +40,7 @@ import org.apache.aurora.scheduler.HostOffer;
 import org.apache.aurora.scheduler.async.OfferManager.OfferManagerImpl;
 import org.apache.aurora.scheduler.async.OfferManager.OfferReturnDelay;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
+import org.apache.aurora.scheduler.async.preemptor.BiCache;
 import org.apache.aurora.scheduler.async.preemptor.Preemptor;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
@@ -107,6 +104,9 @@ public class TaskSchedulerTest extends EasyMockTest {
   private static final HostOffer OFFER_B = makeOffer("OFFER_B", "HOST_B", 
SCHEDULED);
   private static final HostOffer OFFER_C = makeOffer("OFFER_C", "HOST_C", 
DRAINING);
   private static final HostOffer OFFER_D = makeOffer("OFFER_D", "HOST_D", 
DRAINED);
+  private static final String SLAVE_A = 
OFFER_A.getOffer().getSlaveId().getValue();
+  private static final String SLAVE_B = 
OFFER_B.getOffer().getSlaveId().getValue();
+  private static final String SLAVE_C = 
OFFER_C.getOffer().getSlaveId().getValue();
 
   private Storage storage;
 
@@ -120,11 +120,9 @@ public class TaskSchedulerTest extends EasyMockTest {
   private OfferReturnDelay returnDelay;
   private OfferManager offerManager;
   private TaskGroups taskGroups;
-  private FakeClock clock;
-  private StatsProvider statsProvider;
   private RescheduleCalculator rescheduleCalculator;
   private Preemptor preemptor;
-  private Amount<Long, Time> reservationDuration = Amount.of(1L, Time.MINUTES);
+  private BiCache<String, TaskGroupKey> reservations;
 
   @Before
   public void setUp() {
@@ -137,20 +135,12 @@ public class TaskSchedulerTest extends EasyMockTest {
     executor = createMock(ScheduledExecutorService.class);
     future = createMock(ScheduledFuture.class);
     returnDelay = createMock(OfferReturnDelay.class);
-    clock = new FakeClock();
-    clock.setNowMillis(0);
-    statsProvider = createMock(StatsProvider.class);
     rescheduleCalculator = createMock(RescheduleCalculator.class);
     preemptor = createMock(Preemptor.class);
+    reservations = createMock(new Clazz<BiCache<String, TaskGroupKey>>() { });
   }
 
   private void replayAndCreateScheduler() {
-    Capture<Supplier<Long>> cacheSizeSupplier = createCapture();
-    Stat<Long> stat = createMock(new Clazz<Stat<Long>>() { });
-    expect(statsProvider.makeGauge(
-        EasyMock.eq(TaskSchedulerImpl.RESERVATIONS_CACHE_SIZE_STAT),
-        capture(cacheSizeSupplier))).andReturn(stat);
-
     control.replay();
     offerManager = new OfferManagerImpl(driver, returnDelay, executor);
     TaskScheduler scheduler = new TaskSchedulerImpl(storage,
@@ -158,9 +148,7 @@ public class TaskSchedulerTest extends EasyMockTest {
         assigner,
         offerManager,
         preemptor,
-        reservationDuration,
-        clock,
-        statsProvider);
+        reservations);
     taskGroups = new TaskGroups(
         executor,
         Amount.of(FIRST_SCHEDULE_DELAY_MS, Time.MILLISECONDS),
@@ -168,7 +156,6 @@ public class TaskSchedulerTest extends EasyMockTest {
         RateLimiter.create(100),
         scheduler,
         rescheduleCalculator);
-    assertEquals(0L, (long) cacheSizeSupplier.getValue().get());
   }
 
   private Capture<Runnable> expectOffer() {
@@ -233,7 +220,9 @@ public class TaskSchedulerTest extends EasyMockTest {
   public void testNoOffers() {
     Capture<Runnable> timeoutCapture = 
expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
     expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
-    expectPreemptorCall(makeTask("a"));
+    IScheduledTask task = makeTask("a");
+    expectPreemptorCall(task);
+    expectReservationCheck(task);
 
     replayAndCreateScheduler();
 
@@ -301,6 +290,15 @@ public class TaskSchedulerTest extends EasyMockTest {
         eq(new ResourceRequest(task.getAssignedTask().getTask(), 
Tasks.id(task), jobAggregate))));
   }
 
+  private IExpectationSetters<?> expectNoReservation(String slaveId) {
+    return 
expect(reservations.get(slaveId)).andReturn(Optional.<TaskGroupKey>absent());
+  }
+
+  private IExpectationSetters<?> expectReservationCheck(IScheduledTask task) {
+    return 
expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())))
+        .andReturn(ImmutableSet.<String>of());
+  }
+
   @Test
   public void testTaskAssigned() {
     expectAnyMaintenanceCalls();
@@ -310,6 +308,8 @@ public class TaskSchedulerTest extends EasyMockTest {
     TaskInfo mesosTask = makeTaskInfo(task);
 
     Capture<Runnable> timeoutCapture = 
expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
+    expectNoReservation(SLAVE_A).times(2);
+    expectReservationCheck(task);
     expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure());
     expectPreemptorCall(task);
 
@@ -319,7 +319,9 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     Capture<Runnable> timeoutCapture3 = 
expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
     expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
-    expectPreemptorCall(makeTask("b"));
+    IScheduledTask taskB = makeTask("b");
+    expectReservationCheck(taskB);
+    expectPreemptorCall(taskB);
 
     replayAndCreateScheduler();
 
@@ -345,6 +347,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     Capture<Runnable> timeoutCapture = 
expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
     expectAnyMaintenanceCalls();
     expectOfferDeclineIn(10);
+    expectNoReservation(SLAVE_A);
     expectMaybeAssign(OFFER_A, task, 
EMPTY).andReturn(Assignment.success(mesosTask));
     driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
     expectLastCall().andThrow(new IllegalStateException("Driver not ready."));
@@ -375,6 +378,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     Capture<Runnable> timeoutCapture = 
expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
     expectAnyMaintenanceCalls();
     expectOfferDeclineIn(10);
+    expectNoReservation(SLAVE_A).times(2);
     expectMaybeAssign(OFFER_A, task, EMPTY).andThrow(new 
StorageException("Injected failure."));
 
     Capture<Runnable> timeoutCapture2 = 
expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
@@ -397,6 +401,8 @@ public class TaskSchedulerTest extends EasyMockTest {
     Capture<Runnable> timeoutCapture = 
expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
     Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
     expectAnyMaintenanceCalls();
+    expectNoReservation(SLAVE_A);
+    expectReservationCheck(task).times(2);
     expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure());
     Capture<Runnable> timeoutCapture2 = 
expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
     expectPreemptorCall(task);
@@ -459,12 +465,14 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     IScheduledTask taskA = makeTask("A", PENDING);
     TaskInfo mesosTaskA = makeTaskInfo(taskA);
+    expectNoReservation(SLAVE_A);
     expectMaybeAssign(OFFER_A, taskA, 
EMPTY).andReturn(Assignment.success(mesosTaskA));
     driver.launchTask(OFFER_A.getOffer().getId(), mesosTaskA);
     Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
 
     IScheduledTask taskB = makeTask("B", PENDING);
     TaskInfo mesosTaskB = makeTaskInfo(taskB);
+    expectNoReservation(SLAVE_B);
     expectMaybeAssign(OFFER_B, taskB, 
EMPTY).andReturn(Assignment.success(mesosTaskB));
     driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskB);
     Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
@@ -491,6 +499,7 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     IScheduledTask taskA = makeTask("A", PENDING);
     TaskInfo mesosTaskA = makeTaskInfo(taskA);
+    expectNoReservation(SLAVE_B);
     expectMaybeAssign(OFFER_B, taskA, 
EMPTY).andReturn(Assignment.success(mesosTaskA));
     driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskA);
     Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
@@ -500,6 +509,7 @@ public class TaskSchedulerTest extends EasyMockTest {
     HostOffer updatedOfferC = new HostOffer(
         OFFER_C.getOffer(),
         
IHostAttributes.build(OFFER_C.getAttributes().newBuilder().setMode(NONE)));
+    expectNoReservation(SLAVE_C);
     expectMaybeAssign(updatedOfferC, taskB, 
EMPTY).andReturn(Assignment.success(mesosTaskB));
     driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB);
     Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
@@ -556,6 +566,9 @@ public class TaskSchedulerTest extends EasyMockTest {
 
     IScheduledTask jobB0 = makeTask("b0", PENDING);
 
+    expectNoReservation(SLAVE_A);
+    expectNoReservation(SLAVE_B);
+
     expectOfferDeclineIn(10);
     expectOfferDeclineIn(10);
     expectOfferDeclineIn(10);
@@ -597,8 +610,10 @@ public class TaskSchedulerTest extends EasyMockTest {
     final IScheduledTask task = makeTask("a", PENDING);
 
     Capture<Runnable> timeoutCapture = 
expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
+    expectNoReservation(SLAVE_A);
     expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure());
     expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 20);
+    expectReservationCheck(task);
     expectPreemptorCall(task);
 
     replayAndCreateScheduler();

http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/test/java/org/apache/aurora/scheduler/async/preemptor/BiCacheTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/async/preemptor/BiCacheTest.java 
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/BiCacheTest.java
new file mode 100644
index 0000000..4734776
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/BiCacheTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async.preemptor;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.scheduler.async.preemptor.BiCache.BiCacheSettings;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class BiCacheTest {
+  private static final Amount<Long, Time> HOLD_DURATION = Amount.of(1L, 
Time.MINUTES);
+  private static final String STAT_NAME = "cache_size_stat";
+  private static final String KEY_1 = "Key 1";
+  private static final String KEY_2 = "Key 2";
+  private static final Optional<Integer> NO_VALUE = Optional.absent();
+
+  private FakeStatsProvider statsProvider;
+  private FakeClock clock;
+  private BiCache<String, Integer> biCache;
+
+  @Before
+  public void setUp() {
+    statsProvider = new FakeStatsProvider();
+    clock = new FakeClock();
+    biCache = new BiCache<>(statsProvider, new BiCacheSettings(HOLD_DURATION, 
STAT_NAME), clock);
+  }
+
+  @Test
+  public void testExpiration() {
+    biCache.put(KEY_1, 1);
+    assertEquals(Optional.of(1), biCache.get(KEY_1));
+    assertEquals(1L, statsProvider.getLongValue(STAT_NAME));
+
+    clock.advance(HOLD_DURATION);
+
+    assertEquals(NO_VALUE, biCache.get(KEY_1));
+    assertEquals(ImmutableSet.of(), biCache.getByValue(1));
+    assertEquals(0L, statsProvider.getLongValue(STAT_NAME));
+  }
+
+  @Test
+  public void testRemoval() {
+    biCache.put(KEY_1, 1);
+    assertEquals(1L, statsProvider.getLongValue(STAT_NAME));
+    assertEquals(Optional.of(1), biCache.get(KEY_1));
+    biCache.remove(KEY_1, 1);
+    assertEquals(NO_VALUE, biCache.get(KEY_1));
+    assertEquals(0L, statsProvider.getLongValue(STAT_NAME));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testRemovalWithNullKey() {
+    biCache.remove(null, 1);
+  }
+
+  @Test
+  public void testDifferentKeysIdenticalValues() {
+    biCache.put(KEY_1, 1);
+    biCache.put(KEY_2, 1);
+    assertEquals(2L, statsProvider.getLongValue(STAT_NAME));
+
+    assertEquals(Optional.of(1), biCache.get(KEY_1));
+    assertEquals(Optional.of(1), biCache.get(KEY_2));
+    assertEquals(ImmutableSet.of(KEY_1, KEY_2), biCache.getByValue(1));
+
+    biCache.remove(KEY_1, 1);
+    assertEquals(NO_VALUE, biCache.get(KEY_1));
+    assertEquals(Optional.of(1), biCache.get(KEY_2));
+    assertEquals(ImmutableSet.of(KEY_2), biCache.getByValue(1));
+    assertEquals(1L, statsProvider.getLongValue(STAT_NAME));
+
+    clock.advance(HOLD_DURATION);
+    assertEquals(NO_VALUE, biCache.get(KEY_1));
+    assertEquals(NO_VALUE, biCache.get(KEY_2));
+    assertEquals(ImmutableSet.of(), biCache.getByValue(1));
+    assertEquals(0L, statsProvider.getLongValue(STAT_NAME));
+  }
+
+  @Test
+  public void testIdenticalKeysDifferentValues() {
+    biCache.put(KEY_1, 1);
+    biCache.put(KEY_1, 2);
+    assertEquals(Optional.of(2), biCache.get(KEY_1));
+    assertEquals(ImmutableSet.of(), biCache.getByValue(1));
+    assertEquals(ImmutableSet.of(KEY_1), biCache.getByValue(2));
+    assertEquals(1L, statsProvider.getLongValue(STAT_NAME));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
 
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
index 75fc16d..8a9a3b7 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler.async.preemptor;
 
 import java.util.Arrays;
+import java.util.Set;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
@@ -29,12 +30,14 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
 import 
org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
 import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.stats.CachedCounters;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.easymock.IExpectationSetters;
@@ -49,24 +52,26 @@ import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 
 public class PendingTaskProcessorTest extends EasyMockTest {
-  private static final String TASK_ID_A = "task_a";
-  private static final String TASK_ID_B = "task_b";
-  private static final ScheduledTask TASK_A = makeTask(TASK_ID_A);
-  private static final ScheduledTask TASK_B = makeTask(TASK_ID_B);
+  private static final ScheduledTask TASK_A = makeTask("task_a");
+  private static final ScheduledTask TASK_B = makeTask("task_b");
   private static final PreemptionSlot SLOT_A = createPreemptionSlot(TASK_A);
   private static final PreemptionSlot SLOT_B = createPreemptionSlot(TASK_B);
+  private static final TaskGroupKey GROUP_A =
+      TaskGroupKey.from(ITaskConfig.build(TASK_A.getAssignedTask().getTask()));
+  private static final TaskGroupKey GROUP_B =
+      TaskGroupKey.from(ITaskConfig.build(TASK_B.getAssignedTask().getTask()));
   private static final String SLAVE_ID = "slave_id";
   private static final IJobKey JOB_KEY = 
IJobKey.build(TASK_A.getAssignedTask().getTask().getJob());
 
   private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, 
Time.SECONDS);
 
-  private static final Optional<PreemptionSlot> EMPTY_SLOT = Optional.absent();
+  private static final Set<PreemptionSlot> NO_SLOTS = ImmutableSet.of();
 
   private StorageTestUtil storageUtil;
   private FakeStatsProvider statsProvider;
   private PreemptionSlotFinder preemptionSlotFinder;
   private PendingTaskProcessor slotFinder;
-  private PreemptionSlotCache slotCache;
+  private BiCache<PreemptionSlot, TaskGroupKey> slotCache;
   private FakeClock clock;
 
   @Before
@@ -74,7 +79,7 @@ public class PendingTaskProcessorTest extends EasyMockTest {
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
     preemptionSlotFinder = createMock(PreemptionSlotFinder.class);
-    slotCache = createMock(PreemptionSlotCache.class);
+    slotCache = createMock(new Clazz<BiCache<PreemptionSlot, TaskGroupKey>>() 
{ });
     statsProvider = new FakeStatsProvider();
     clock = new FakeClock();
 
@@ -88,14 +93,14 @@ public class PendingTaskProcessorTest extends EasyMockTest {
   }
   @Test
   public void testSearchSlotSuccessful() throws Exception {
-    expect(slotCache.get(TASK_ID_A)).andReturn(EMPTY_SLOT);
-    expect(slotCache.get(TASK_ID_B)).andReturn(EMPTY_SLOT);
+    expect(slotCache.getByValue(GROUP_A)).andReturn(NO_SLOTS);
+    expect(slotCache.getByValue(GROUP_B)).andReturn(NO_SLOTS);
     expectGetPendingTasks(TASK_A, TASK_B);
     expectAttributeAggegateFetchTasks();
     expectSlotSearch(TASK_A, Optional.of(SLOT_A));
     expectSlotSearch(TASK_B, Optional.of(SLOT_B));
-    slotCache.add(TASK_ID_A, SLOT_A);
-    slotCache.add(TASK_ID_B, SLOT_B);
+    slotCache.put(SLOT_A, GROUP_A);
+    slotCache.put(SLOT_B, GROUP_B);
 
     control.replay();
 
@@ -110,10 +115,10 @@ public class PendingTaskProcessorTest extends 
EasyMockTest {
 
   @Test
   public void testSearchSlotFailed() throws Exception {
-    expect(slotCache.get(TASK_ID_A)).andReturn(EMPTY_SLOT);
+    expect(slotCache.getByValue(GROUP_A)).andReturn(NO_SLOTS);
     expectGetPendingTasks(TASK_A);
     expectAttributeAggegateFetchTasks();
-    expectSlotSearch(TASK_A, EMPTY_SLOT);
+    expectSlotSearch(TASK_A, Optional.<PreemptionSlot>absent());
 
     control.replay();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCacheTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCacheTest.java
 
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCacheTest.java
deleted file mode 100644
index 80bd13a..0000000
--- 
a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCacheTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async.preemptor;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.testing.FakeClock;
-
-import 
org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class PreemptionSlotCacheTest {
-  private static final Amount<Long, Time> HOLD_DURATION = Amount.of(1L, 
Time.MINUTES);
-  private static final String TASK_ID = "task_id";
-  private static final PreemptionSlot SLOT =
-      new PreemptionSlot(ImmutableSet.<PreemptionVictim>of(), "slave_id");
-
-  private FakeStatsProvider statsProvider;
-  private FakeClock clock;
-  private PreemptionSlotCache slotCache;
-
-  @Before
-  public void setUp() {
-    statsProvider = new FakeStatsProvider();
-    clock = new FakeClock();
-    slotCache = new PreemptionSlotCache(statsProvider, HOLD_DURATION, clock);
-  }
-
-  @Test
-  public void testExpiration() {
-    slotCache.add(TASK_ID, SLOT);
-    assertEquals(Optional.of(SLOT), slotCache.get(TASK_ID));
-    assertEquals(1L, statsProvider.getLongValue(
-        PreemptionSlotCache.PREEMPTION_SLOT_CACHE_SIZE_STAT));
-
-    clock.advance(HOLD_DURATION);
-
-    assertEquals(Optional.<PreemptionSlot>absent(), slotCache.get(TASK_ID));
-  }
-
-  @Test
-  public void testRemoval() {
-    slotCache.add(TASK_ID, SLOT);
-    assertEquals(Optional.of(SLOT), slotCache.get(TASK_ID));
-    slotCache.remove(TASK_ID);
-    assertEquals(Optional.<PreemptionSlot>absent(), slotCache.get(TASK_ID));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
 
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
index 97d6087..64283fa 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
@@ -13,6 +13,8 @@
  */
 package org.apache.aurora.scheduler.async.preemptor;
 
+import java.util.Set;
+
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 import com.twitter.common.testing.easymock.EasyMockTest;
@@ -25,12 +27,14 @@ import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
 import 
org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
 import org.apache.aurora.scheduler.async.preemptor.Preemptor.PreemptorImpl;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.stats.CachedCounters;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.easymock.EasyMock;
 import org.junit.Before;
@@ -46,19 +50,20 @@ import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 
 public class PreemptorImplTest extends EasyMockTest {
-  private static final String TASK_ID = "task_a";
   private static final String SLAVE_ID = "slave_id";
   private static final IScheduledTask TASK = IScheduledTask.build(makeTask());
   private static final PreemptionSlot SLOT = createPreemptionSlot(TASK);
+  private static final TaskGroupKey GROUP_KEY =
+      
TaskGroupKey.from(ITaskConfig.build(makeTask().getAssignedTask().getTask()));
 
-  private static final Optional<PreemptionSlot> EMPTY_SLOT = Optional.absent();
+  private static final Set<PreemptionSlot> NO_SLOTS = ImmutableSet.of();
   private static final Optional<String> EMPTY_RESULT = Optional.absent();
 
   private StateManager stateManager;
   private FakeStatsProvider statsProvider;
   private PreemptionSlotFinder preemptionSlotFinder;
   private PreemptorImpl preemptor;
-  private PreemptionSlotCache slotCache;
+  private BiCache<PreemptionSlot, TaskGroupKey> slotCache;
   private Storage.MutableStoreProvider storeProvider;
 
   @Before
@@ -66,7 +71,7 @@ public class PreemptorImplTest extends EasyMockTest {
     storeProvider = createMock(Storage.MutableStoreProvider.class);
     stateManager = createMock(StateManager.class);
     preemptionSlotFinder = createMock(PreemptionSlotFinder.class);
-    slotCache = createMock(PreemptionSlotCache.class);
+    slotCache = createMock(new Clazz<BiCache<PreemptionSlot, TaskGroupKey>>() 
{ });
     statsProvider = new FakeStatsProvider();
     preemptor = new PreemptorImpl(
         stateManager,
@@ -77,8 +82,8 @@ public class PreemptorImplTest extends EasyMockTest {
 
   @Test
   public void testPreemptTasksSuccessful() throws Exception {
-    expect(slotCache.get(TASK_ID)).andReturn(Optional.of(SLOT));
-    slotCache.remove(TASK_ID);
+    expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(SLOT));
+    slotCache.remove(SLOT, GROUP_KEY);
     expectSlotValidation(Optional.of(ImmutableSet.of(
         PreemptionVictim.fromTask(TASK.getAssignedTask()))));
 
@@ -93,8 +98,8 @@ public class PreemptorImplTest extends EasyMockTest {
 
   @Test
   public void testPreemptTasksValidationFailed() throws Exception {
-    expect(slotCache.get(TASK_ID)).andReturn(Optional.of(SLOT));
-    slotCache.remove(TASK_ID);
+    expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(SLOT));
+    slotCache.remove(SLOT, GROUP_KEY);
     expectSlotValidation(Optional.<ImmutableSet<PreemptionVictim>>absent());
 
     control.replay();
@@ -106,7 +111,7 @@ public class PreemptorImplTest extends EasyMockTest {
 
   @Test
   public void testNoCachedSlot() throws Exception {
-    expect(slotCache.get(TASK_ID)).andReturn(EMPTY_SLOT);
+    expect(slotCache.getByValue(GROUP_KEY)).andReturn(NO_SLOTS);
 
     control.replay();
 
@@ -145,7 +150,6 @@ public class PreemptorImplTest extends EasyMockTest {
   private static ScheduledTask makeTask() {
     ScheduledTask task = new ScheduledTask()
         .setAssignedTask(new AssignedTask()
-            .setTaskId(TASK_ID)
             .setTask(new TaskConfig()
                 .setPriority(1)
                 .setProduction(true)

Reply via email to