Repository: aurora Updated Branches: refs/heads/master 3b29a4b79 -> 1dc11fb1a
Generalizing preemption reservation pool. Bugs closed: AURORA-1219 Reviewed at https://reviews.apache.org/r/32907/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/1dc11fb1 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/1dc11fb1 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/1dc11fb1 Branch: refs/heads/master Commit: 1dc11fb1a92bb85ee629971d3f07f79e26a31a59 Parents: 3b29a4b Author: Maxim Khutornenko <[email protected]> Authored: Tue Apr 14 13:01:07 2015 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Tue Apr 14 13:01:07 2015 -0700 ---------------------------------------------------------------------- .../aurora/scheduler/async/AsyncModule.java | 41 ++--- .../aurora/scheduler/async/TaskScheduler.java | 87 ++------- .../scheduler/async/preemptor/BiCache.java | 140 ++++++++++++++ .../async/preemptor/PendingTaskProcessor.java | 11 +- .../async/preemptor/PreemptionSlotCache.java | 99 ---------- .../scheduler/async/preemptor/Preemptor.java | 28 +-- .../async/preemptor/PreemptorModule.java | 10 +- .../scheduler/async/TaskSchedulerImplTest.java | 184 +++++++++---------- .../scheduler/async/TaskSchedulerTest.java | 59 +++--- .../scheduler/async/preemptor/BiCacheTest.java | 108 +++++++++++ .../preemptor/PendingTaskProcessorTest.java | 31 ++-- .../preemptor/PreemptionSlotCacheTest.java | 66 ------- .../async/preemptor/PreemptorImplTest.java | 24 ++- 13 files changed, 458 insertions(+), 430 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java index e87dda4..35c7e43 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java +++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java @@ -30,7 +30,6 @@ import com.google.common.base.Supplier; import com.google.common.util.concurrent.AbstractIdleService; import com.google.common.util.concurrent.RateLimiter; import com.google.inject.AbstractModule; -import com.google.inject.Binder; import com.google.inject.PrivateModule; import com.google.inject.TypeLiteral; import com.twitter.common.args.Arg; @@ -53,7 +52,10 @@ import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculat import org.apache.aurora.scheduler.async.TaskGroups.TaskGroupsSettings; import org.apache.aurora.scheduler.async.TaskHistoryPruner.HistoryPrunnerSettings; import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl; +import org.apache.aurora.scheduler.async.preemptor.BiCache; +import org.apache.aurora.scheduler.async.preemptor.BiCache.BiCacheSettings; import org.apache.aurora.scheduler.base.AsyncUtil; +import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.events.PubsubEventModule; import static java.lang.annotation.ElementType.FIELD; @@ -62,8 +64,6 @@ import static java.lang.annotation.ElementType.PARAMETER; import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.util.Objects.requireNonNull; -import static org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.ReservationDuration; - /** * Binding module for async task management. */ @@ -237,12 +237,24 @@ public class AsyncModule extends AbstractModule { expose(TaskGroups.class); } }); - bindTaskScheduler(binder(), RESERVATION_DURATION.get()); PubsubEventModule.bindSubscriber(binder(), TaskGroups.class); install(new PrivateModule() { @Override protected void configure() { + bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).in(Singleton.class); + bind(BiCacheSettings.class).toInstance( + new BiCacheSettings(RESERVATION_DURATION.get(), "reservation_cache_size")); + bind(TaskScheduler.class).to(TaskSchedulerImpl.class); + bind(TaskSchedulerImpl.class).in(Singleton.class); + expose(TaskScheduler.class); + } + }); + PubsubEventModule.bindSubscriber(binder(), TaskScheduler.class); + + install(new PrivateModule() { + @Override + protected void configure() { bind(OfferReturnDelay.class).toInstance( new RandomJitterReturnDelay( MIN_OFFER_HOLD_TIME.get().as(Time.MILLISECONDS), @@ -331,27 +343,6 @@ public class AsyncModule extends AbstractModule { PubsubEventModule.bindSubscriber(binder(), KillRetry.class); } - /** - * This method exists because we want to test the wiring up of TaskSchedulerImpl class to the - * PubSub system in the TaskSchedulerImplTest class. The method has a complex signature because - * the binding of the TaskScheduler and friends occurs in a PrivateModule which does not interact - * well with the MultiBinder that backs the PubSub system. - */ - @VisibleForTesting - static void bindTaskScheduler(Binder binder, final Amount<Long, Time> reservationDuration) { - binder.install(new PrivateModule() { - @Override - protected void configure() { - bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(ReservationDuration.class) - .toInstance(reservationDuration); - bind(TaskScheduler.class).to(TaskSchedulerImpl.class); - bind(TaskSchedulerImpl.class).in(Singleton.class); - expose(TaskScheduler.class); - } - }); - PubsubEventModule.bindSubscriber(binder, TaskScheduler.class); - } - static class RegisterGauges extends AbstractIdleService { private final StatsProvider statsProvider; private final ScheduledThreadPoolExecutor executor; http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java index ebc520e..6f169e8 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java +++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java @@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.async; import java.lang.annotation.Retention; import java.lang.annotation.Target; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; @@ -26,20 +25,13 @@ import javax.inject.Qualifier; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.base.Supplier; -import com.google.common.base.Ticker; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import com.google.common.collect.Iterables; import com.google.common.eventbus.Subscribe; import com.twitter.common.inject.TimedInterceptor.Timed; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; import com.twitter.common.stats.Stats; -import com.twitter.common.stats.StatsProvider; -import com.twitter.common.util.Clock; import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.async.preemptor.BiCache; import org.apache.aurora.scheduler.async.preemptor.Preemptor; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskGroupKey; @@ -56,7 +48,6 @@ import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.MutateWork; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import org.apache.mesos.Protos.SlaveID; import static java.lang.annotation.ElementType.FIELD; import static java.lang.annotation.ElementType.METHOD; @@ -103,7 +94,7 @@ public interface TaskScheduler extends EventSubscriber { private final TaskAssigner assigner; private final OfferManager offerManager; private final Preemptor preemptor; - private final Reservations reservations; + private final BiCache<String, TaskGroupKey> reservations; private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired"); private final AtomicLong attemptsFailed = Stats.exportLong("schedule_attempts_failed"); @@ -116,16 +107,14 @@ public interface TaskScheduler extends EventSubscriber { TaskAssigner assigner, OfferManager offerManager, Preemptor preemptor, - @ReservationDuration Amount<Long, Time> reservationDuration, - final Clock clock, - StatsProvider statsProvider) { + BiCache<String, TaskGroupKey> reservations) { this.storage = requireNonNull(storage); this.stateManager = requireNonNull(stateManager); this.assigner = requireNonNull(assigner); this.offerManager = requireNonNull(offerManager); this.preemptor = requireNonNull(preemptor); - this.reservations = new Reservations(statsProvider, reservationDuration, clock); + this.reservations = requireNonNull(reservations); } private Function<HostOffer, Assignment> getAssignerFunction( @@ -138,11 +127,12 @@ public interface TaskScheduler extends EventSubscriber { return new Function<HostOffer, Assignment>() { @Override public Assignment apply(HostOffer offer) { - Optional<String> reservedTaskId = - reservations.getSlaveReservation(offer.getOffer().getSlaveId()); - if (reservedTaskId.isPresent()) { - if (resourceRequest.getTaskId().equals(reservedTaskId.get())) { - // Slave is reserved to satisfy this task. + Optional<TaskGroupKey> reservation = + reservations.get(offer.getOffer().getSlaveId().getValue()); + + if (reservation.isPresent()) { + if (TaskGroupKey.from(resourceRequest.getTask()).equals(reservation.get())) { + // Slave is reserved to satisfy this task group. return assigner.maybeAssign(storeProvider, offer, resourceRequest); } else { // Slave is reserved for another task. @@ -234,67 +224,22 @@ public interface TaskScheduler extends EventSubscriber { AttributeAggregate jobState, MutableStoreProvider storeProvider) { - if (reservations.hasReservationForTask(task.getTaskId())) { + if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) { return; } Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider); if (slaveId.isPresent()) { - reservations.add(SlaveID.newBuilder().setValue(slaveId.get()).build(), task.getTaskId()); + reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask())); } } @Subscribe public void taskChanged(final TaskStateChange stateChangeEvent) { if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) { - reservations.invalidateTask(stateChangeEvent.getTaskId()); - } - } - - @VisibleForTesting - static final String RESERVATIONS_CACHE_SIZE_STAT = "reservation_cache_size"; - - private static class Reservations { - private final Cache<SlaveID, String> reservations; - - Reservations( - StatsProvider statsProvider, - Amount<Long, Time> duration, - final Clock clock) { - requireNonNull(duration); - requireNonNull(clock); - this.reservations = CacheBuilder.newBuilder() - .expireAfterWrite(duration.as(Time.MINUTES), TimeUnit.MINUTES) - .ticker(new Ticker() { - @Override - public long read() { - return clock.nowNanos(); - } - }) - .build(); - statsProvider.makeGauge( - RESERVATIONS_CACHE_SIZE_STAT, - new Supplier<Long>() { - @Override - public Long get() { - return reservations.size(); - } - }); - } - - private synchronized void add(SlaveID slaveId, String taskId) { - reservations.put(slaveId, taskId); - } - - private synchronized boolean hasReservationForTask(String taskId) { - return reservations.asMap().containsValue(taskId); - } - - private synchronized Optional<String> getSlaveReservation(SlaveID slaveID) { - return Optional.fromNullable(reservations.getIfPresent(slaveID)); - } - - private synchronized void invalidateTask(String taskId) { - reservations.asMap().values().remove(taskId); + IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask(); + if (assigned.getSlaveId() != null) { + reservations.remove(assigned.getSlaveId(), TaskGroupKey.from(assigned.getTask())); + } } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/main/java/org/apache/aurora/scheduler/async/preemptor/BiCache.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/BiCache.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/BiCache.java new file mode 100644 index 0000000..f5a1833 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/BiCache.java @@ -0,0 +1,140 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.async.preemptor; + +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import javax.inject.Inject; + +import com.google.common.base.Optional; +import com.google.common.base.Supplier; +import com.google.common.base.Ticker; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; + +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.StatsProvider; +import com.twitter.common.util.Clock; + +import static java.util.Objects.requireNonNull; + +/** + * A bi-directional cache of items. Entries are purged from cache after + * {@link BiCacheSettings#expireAfter}. + * + * @param <K> Key type. + * @param <V> Value type. + */ +public class BiCache<K, V> { + + public static class BiCacheSettings { + private final Amount<Long, Time> expireAfter; + private final String cacheSizeStatName; + + public BiCacheSettings(Amount<Long, Time> expireAfter, String cacheSizeStatName) { + this.expireAfter = requireNonNull(expireAfter); + this.cacheSizeStatName = requireNonNull(cacheSizeStatName); + } + } + + private final Cache<K, V> cache; + private final Multimap<V, K> inverse = HashMultimap.create(); + + @Inject + public BiCache( + StatsProvider statsProvider, + BiCacheSettings settings, + final Clock clock) { + + requireNonNull(clock); + this.cache = CacheBuilder.newBuilder() + .expireAfterWrite(settings.expireAfter.as(Time.MINUTES), TimeUnit.MINUTES) + .ticker(new Ticker() { + @Override + public long read() { + return clock.nowNanos(); + } + }) + .removalListener(new RemovalListener<K, V>() { + @Override + public void onRemoval(RemovalNotification<K, V> notification) { + inverse.remove(notification.getValue(), notification.getKey()); + } + }) + .build(); + + statsProvider.makeGauge( + settings.cacheSizeStatName, + new Supplier<Long>() { + @Override + public Long get() { + return cache.size(); + } + }); + } + + /** + * Puts a new key/value pair. + * + * @param key Key to add. + * @param value Value to add. + */ + public synchronized void put(K key, V value) { + requireNonNull(key); + requireNonNull(value); + cache.put(key, value); + inverse.put(value, key); + } + + /** + * Gets a cached value by key. + * + * @param key Key to get value for. + * @return Optional of value. + */ + public synchronized Optional<V> get(K key) { + return Optional.fromNullable(cache.getIfPresent(key)); + } + + /** + * Gets a set of keys for a given value. + * + * @param value Value to get all keys for. + * @return An {@link Iterable} of keys or empty if value does not exist. + */ + public synchronized Set<K> getByValue(V value) { + // Cache items are lazily removed by routine maintenance operations during get/write access. + // Forcing cleanup here to ensure proper data integrity. + cache.cleanUp(); + return ImmutableSet.copyOf(inverse.get(value)); + } + + /** + * Removes a key/value pair from cache. + * + * @param key Key to remove. + * @param value Value to remove. + */ + public synchronized void remove(K key, V value) { + inverse.remove(value, key); + cache.invalidate(key); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java index 67ad5d7..00919b7 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java @@ -32,6 +32,7 @@ import com.twitter.common.util.Clock; import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.storage.Storage; @@ -58,7 +59,7 @@ class PendingTaskProcessor implements Runnable { private final PreemptionSlotFinder preemptionSlotFinder; private final PreemptorMetrics metrics; private final Amount<Long, Time> preemptionCandidacyDelay; - private final PreemptionSlotCache slotCache; + private final BiCache<PreemptionSlot, TaskGroupKey> slotCache; private final Clock clock; /** @@ -78,7 +79,7 @@ class PendingTaskProcessor implements Runnable { PreemptionSlotFinder preemptionSlotFinder, PreemptorMetrics metrics, @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay, - PreemptionSlotCache slotCache, + BiCache<PreemptionSlot, TaskGroupKey> slotCache, Clock clock) { this.storage = requireNonNull(storage); @@ -112,7 +113,7 @@ class PendingTaskProcessor implements Runnable { metrics.recordSlotSearchResult(slot, task); if (slot.isPresent()) { - slotCache.add(pendingTask.getTaskId(), slot.get()); + slotCache.put(slot.get(), TaskGroupKey.from(task)); } } } @@ -132,8 +133,8 @@ class PendingTaskProcessor implements Runnable { private final Predicate<IScheduledTask> hasCachedSlot = new Predicate<IScheduledTask>() { @Override - public boolean apply(IScheduledTask input) { - return slotCache.get(input.getAssignedTask().getTaskId()).isPresent(); + public boolean apply(IScheduledTask task) { + return !slotCache.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())).isEmpty(); } }; http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java deleted file mode 100644 index b5a42a0..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCache.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import java.lang.annotation.Retention; -import java.lang.annotation.Target; -import java.util.concurrent.TimeUnit; - -import javax.inject.Inject; -import javax.inject.Qualifier; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Supplier; -import com.google.common.base.Ticker; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; - -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.stats.StatsProvider; -import com.twitter.common.util.Clock; - -import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; - -import static java.lang.annotation.ElementType.FIELD; -import static java.lang.annotation.ElementType.METHOD; -import static java.lang.annotation.ElementType.PARAMETER; -import static java.lang.annotation.RetentionPolicy.RUNTIME; -import static java.util.Objects.requireNonNull; - -/** - * Caches preemption slots found for candidate tasks. Entries are purged from cache after #duration. - */ -class PreemptionSlotCache { - - @VisibleForTesting - @Qualifier - @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) - @interface PreemptionSlotHoldDuration { } - - @VisibleForTesting - static final String PREEMPTION_SLOT_CACHE_SIZE_STAT = "preemption_slot_cache_size"; - - private final Cache<String, PreemptionSlot> slots; - - @Inject - PreemptionSlotCache( - StatsProvider statsProvider, - @PreemptionSlotHoldDuration Amount<Long, Time> duration, - final Clock clock) { - - requireNonNull(duration); - requireNonNull(clock); - this.slots = CacheBuilder.newBuilder() - .expireAfterWrite(duration.as(Time.MINUTES), TimeUnit.MINUTES) - .ticker(new Ticker() { - @Override - public long read() { - return clock.nowNanos(); - } - }) - .build(); - - statsProvider.makeGauge( - PREEMPTION_SLOT_CACHE_SIZE_STAT, - new Supplier<Long>() { - @Override - public Long get() { - return slots.size(); - } - }); - } - - void add(String taskId, PreemptionSlot preemptionSlot) { - requireNonNull(taskId); - requireNonNull(preemptionSlot); - slots.put(taskId, preemptionSlot); - } - - Optional<PreemptionSlot> get(String taskId) { - return Optional.fromNullable(slots.getIfPresent(taskId)); - } - - void remove(String taskId) { - slots.invalidate(taskId); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java index 77617ec..5200811 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java @@ -13,6 +13,8 @@ */ package org.apache.aurora.scheduler.async.preemptor; +import java.util.Set; + import javax.inject.Inject; import com.google.common.base.Optional; @@ -20,6 +22,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; +import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; @@ -51,14 +54,14 @@ public interface Preemptor { private final StateManager stateManager; private final PreemptionSlotFinder preemptionSlotFinder; private final PreemptorMetrics metrics; - private final PreemptionSlotCache slotCache; + private final BiCache<PreemptionSlot, TaskGroupKey> slotCache; @Inject PreemptorImpl( StateManager stateManager, PreemptionSlotFinder preemptionSlotFinder, PreemptorMetrics metrics, - PreemptionSlotCache slotCache) { + BiCache<PreemptionSlot, TaskGroupKey> slotCache) { this.stateManager = requireNonNull(stateManager); this.preemptionSlotFinder = requireNonNull(preemptionSlotFinder); @@ -70,21 +73,20 @@ public interface Preemptor { public Optional<String> attemptPreemptionFor( IAssignedTask pendingTask, AttributeAggregate jobState, - MutableStoreProvider storeProvider) { + MutableStoreProvider store) { - final Optional<PreemptionSlot> preemptionSlot = slotCache.get(pendingTask.getTaskId()); + TaskGroupKey groupKey = TaskGroupKey.from(pendingTask.getTask()); + Set<PreemptionSlot> preemptionSlots = slotCache.getByValue(groupKey); // A preemption slot is available -> attempt to preempt tasks. - if (preemptionSlot.isPresent()) { - slotCache.remove(pendingTask.getTaskId()); + if (!preemptionSlots.isEmpty()) { + // Get the next available preemption slot. + PreemptionSlot slot = preemptionSlots.iterator().next(); + slotCache.remove(slot, groupKey); // Validate a PreemptionSlot is still valid for the given task. Optional<ImmutableSet<PreemptionVictim>> validatedVictims = - preemptionSlotFinder.validatePreemptionSlotFor( - pendingTask, - jobState, - preemptionSlot.get(), - storeProvider); + preemptionSlotFinder.validatePreemptionSlotFor(pendingTask, jobState, slot, store); metrics.recordSlotValidationResult(validatedVictims); if (!validatedVictims.isPresent()) { @@ -95,13 +97,13 @@ public interface Preemptor { for (PreemptionVictim toPreempt : validatedVictims.get()) { metrics.recordTaskPreemption(toPreempt); stateManager.changeState( - storeProvider, + store, toPreempt.getTaskId(), Optional.<ScheduleStatus>absent(), PREEMPTING, Optional.of("Preempting in favor of " + pendingTask.getTaskId())); } - return Optional.of(preemptionSlot.get().getSlaveId()); + return Optional.of(slot.getSlaveId()); } return Optional.absent(); http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java index 1092c05..7cea881 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java @@ -30,6 +30,9 @@ import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.async.preemptor.BiCache.BiCacheSettings; +import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; +import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.storage.Storage; @@ -95,10 +98,9 @@ public class PreemptorModule extends AbstractModule { bind(new TypeLiteral<Amount<Long, Time>>() { }) .annotatedWith(PendingTaskProcessor.PreemptionDelay.class) .toInstance(preemptionDelay); - bind(new TypeLiteral<Amount<Long, Time>>() { }) - .annotatedWith(PreemptionSlotCache.PreemptionSlotHoldDuration.class) - .toInstance(PREEMPTION_SLOT_HOLD_TIME.get()); - bind(PreemptionSlotCache.class).in(Singleton.class); + bind(BiCacheSettings.class).toInstance( + new BiCacheSettings(PREEMPTION_SLOT_HOLD_TIME.get(), "preemption_slot_cache_size")); + bind(new TypeLiteral<BiCache<PreemptionSlot, TaskGroupKey>>() { }).in(Singleton.class); bind(PendingTaskProcessor.class).in(Singleton.class); bind(ClusterState.class).to(ClusterStateImpl.class); bind(ClusterStateImpl.class).in(Singleton.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java index b61abf9..b0cced7 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java @@ -19,13 +19,11 @@ import com.google.common.collect.ImmutableSet; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.stats.Stats; +import com.google.inject.TypeLiteral; + import com.twitter.common.stats.StatsProvider; import com.twitter.common.testing.easymock.EasyMockTest; import com.twitter.common.util.Clock; -import com.twitter.common.util.testing.FakeClock; import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.gen.HostAttributes; @@ -35,6 +33,8 @@ import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl; +import org.apache.aurora.scheduler.async.preemptor.BiCache; import org.apache.aurora.scheduler.async.preemptor.Preemptor; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskGroupKey; @@ -58,10 +58,12 @@ import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.mesos.Protos.TaskInfo; import org.easymock.Capture; import org.easymock.EasyMock; +import org.easymock.IExpectationSetters; import org.junit.Before; import org.junit.Test; import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.apache.aurora.gen.ScheduleStatus.THROTTLED; import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; import static org.easymock.EasyMock.capture; @@ -89,10 +91,8 @@ public class TaskSchedulerImplTest extends EasyMockTest { private TaskAssigner assigner; private OfferManager offerManager; private TaskScheduler scheduler; - private FakeClock clock; private Preemptor preemptor; - private Amount<Long, Time> reservationDuration; - private Amount<Long, Time> halfReservationDuration; + private BiCache<String, TaskGroupKey> reservations; private EventSink eventSink; @Before @@ -101,11 +101,8 @@ public class TaskSchedulerImplTest extends EasyMockTest { stateManager = createMock(StateManager.class); assigner = createMock(TaskAssigner.class); offerManager = createMock(OfferManager.class); - reservationDuration = Amount.of(2L, Time.MINUTES); - halfReservationDuration = Amount.of(1L, Time.MINUTES); - clock = new FakeClock(); - clock.setNowMillis(0); preemptor = createMock(Preemptor.class); + reservations = createMock(new Clazz<BiCache<String, TaskGroupKey>>() { }); Injector injector = getInjector(storageUtil.storage); scheduler = injector.getInstance(TaskScheduler.class); @@ -118,14 +115,16 @@ public class TaskSchedulerImplTest extends EasyMockTest { new AbstractModule() { @Override protected void configure() { + bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).toInstance(reservations); + bind(TaskScheduler.class).to(TaskSchedulerImpl.class); bind(Preemptor.class).toInstance(preemptor); - AsyncModule.bindTaskScheduler(binder(), reservationDuration); bind(OfferManager.class).toInstance(offerManager); bind(StateManager.class).toInstance(stateManager); bind(TaskAssigner.class).toInstance(assigner); - bind(Clock.class).toInstance(clock); + bind(Clock.class).toInstance(createMock(Clock.class)); + bind(StatsProvider.class).toInstance(createMock(StatsProvider.class)); bind(Storage.class).toInstance(storageImpl); - bind(StatsProvider.class).toInstance(Stats.STATS_PROVIDER); + PubsubEventModule.bindSubscriber(binder(), TaskScheduler.class); } }); } @@ -145,143 +144,99 @@ public class TaskSchedulerImplTest extends EasyMockTest { } @Test - public void testReservationsDeniesTasksForTimePeriod() throws Exception { + public void testReservation() throws Exception { storageUtil.expectOperations(); expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); expectLaunchAttempt(false); // Reserve "a" with offerA + expectReservationCheck(TASK_A); expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID)); + expectAddReservation(SLAVE_ID, TASK_A); - expectTaskStillPendingQuery(TASK_B); - expectActiveJobFetch(TASK_B); - AssignmentCapture firstAssignment = expectLaunchAttempt(false); - expectPreemptorCall(TASK_B, Optional.<String>absent()); - - expectTaskStillPendingQuery(TASK_B); - expectActiveJobFetch(TASK_B); - AssignmentCapture secondAssignment = expectLaunchAttempt(true); - expectAssigned(TASK_B); + // Use previously created reservation. + expectTaskStillPendingQuery(TASK_A); + expectActiveJobFetch(TASK_A); + expectGetReservation(SLAVE_ID, TASK_A); + expectAssigned(TASK_A); + AssignmentCapture assignment = expectLaunchAttempt(true); control.replay(); assertFalse(scheduler.schedule("a")); - assertFalse(scheduler.schedule("b")); - - assignAndAssert(Result.FAILURE, GROUP_B, OFFER, firstAssignment); - - clock.advance(reservationDuration); - - assertTrue(scheduler.schedule("b")); - - assignAndAssert(Result.SUCCESS, GROUP_B, OFFER, secondAssignment); + assertTrue(scheduler.schedule("a")); + assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment); } @Test - public void testReservationsExpireAfterAccepted() throws Exception { + public void testReservationExpires() throws Exception { storageUtil.expectOperations(); expectTaskStillPendingQuery(TASK_A); expectActiveJobFetch(TASK_A); expectLaunchAttempt(false); // Reserve "a" with offerA + expectReservationCheck(TASK_A); expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID)); + expectAddReservation(SLAVE_ID, TASK_A); - expectTaskStillPendingQuery(TASK_A); - expectActiveJobFetch(TASK_A); - AssignmentCapture firstAssignment = expectLaunchAttempt(true); - expectAssigned(TASK_A); - + // First attempt -> reservation is active. expectTaskStillPendingQuery(TASK_B); expectActiveJobFetch(TASK_B); + AssignmentCapture firstAssignment = expectLaunchAttempt(false); + expectGetReservation(SLAVE_ID, TASK_A); + expectReservationCheck(TASK_B); + expectPreemptorCall(TASK_B, Optional.<String>absent()); - AssignmentCapture secondAssignment = expectLaunchAttempt(true); + // Status changed -> reservation removed. + reservations.remove(SLAVE_ID, TaskGroupKey.from(TASK_A.getAssignedTask().getTask())); - expect(assigner.maybeAssign( - storageUtil.mutableStoreProvider, - OFFER, - new ResourceRequest(TASK_B.getAssignedTask().getTask(), Tasks.id(TASK_B), EMPTY))) - .andReturn(Assignment.success(TaskInfo.getDefaultInstance())); + // Second attempt -> reservation expires. + expectGetNoReservation(SLAVE_ID); + expectTaskStillPendingQuery(TASK_B); + expectActiveJobFetch(TASK_B); + AssignmentCapture secondAssignment = expectLaunchAttempt(true); + expectAssigned(TASK_B); control.replay(); + assertFalse(scheduler.schedule("a")); - assertTrue(scheduler.schedule("a")); - assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, firstAssignment); - eventSink.post(TaskStateChange.transition(TASK_A, PENDING)); - clock.advance(halfReservationDuration); + assertFalse(scheduler.schedule("b")); + assignAndAssert(Result.FAILURE, GROUP_B, OFFER, firstAssignment); + + eventSink.post(TaskStateChange.transition(assign(TASK_A, SLAVE_ID), PENDING)); assertTrue(scheduler.schedule("b")); assignAndAssert(Result.SUCCESS, GROUP_B, OFFER, secondAssignment); } @Test - public void testReservationsAcceptsWithInTimePeriod() throws Exception { + public void testReservationUnusable() throws Exception { storageUtil.expectOperations(); - expectTaskStillPendingQuery(TASK_A); - expectActiveJobFetch(TASK_A); - expectLaunchAttempt(false); - // Reserve "a" with offerA - expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID)); expectTaskStillPendingQuery(TASK_A); - expectActiveJobFetch(TASK_A); - AssignmentCapture assignment = expectLaunchAttempt(true); - expectAssigned(TASK_A); + expectLaunchAttempt(false); + expect(reservations.getByValue(TaskGroupKey.from(TASK_A.getAssignedTask().getTask()))) + .andReturn(ImmutableSet.of(SLAVE_ID)); control.replay(); - assertFalse(scheduler.schedule("a")); - clock.advance(halfReservationDuration); - assertTrue(scheduler.schedule("a")); - assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment); + assertFalse(scheduler.schedule("a")); } @Test - public void testReservationsCancellation() throws Exception { - storageUtil.expectOperations(); - - expectTaskStillPendingQuery(TASK_A); - expectActiveJobFetch(TASK_A); - expectLaunchAttempt(false); - - // Reserve "a" with offerA - expectPreemptorCall(TASK_A, Optional.of(SLAVE_ID)); - - expectTaskStillPendingQuery(TASK_B); - expectActiveJobFetch(TASK_B); - AssignmentCapture assignment = expectLaunchAttempt(true); - expectAssigned(TASK_B); - + public void testNonPendingIgnored() throws Exception { control.replay(); - assertFalse(scheduler.schedule("a")); - clock.advance(halfReservationDuration); - // Task is killed by user before it is scheduled - eventSink.post(TaskStateChange.transition(TASK_A, PENDING)); - assertTrue(scheduler.schedule("b")); - assignAndAssert(Result.SUCCESS, GROUP_B, OFFER, assignment); + + eventSink.post(TaskStateChange.transition(TASK_A, RUNNING)); } @Test - public void testReservationsExpire() throws Exception { - storageUtil.expectOperations(); - - expectTaskStillPendingQuery(TASK_B); - expectActiveJobFetch(TASK_B); - expectLaunchAttempt(false); - // Reserve "b" with offer1 - expectPreemptorCall(TASK_B, Optional.of(SLAVE_ID)); - - expectTaskStillPendingQuery(TASK_A); - expectActiveJobFetch(TASK_A); - AssignmentCapture assignment = expectLaunchAttempt(true); - expectAssigned(TASK_A); - + public void testPendingDeletedHandled() throws Exception { control.replay(); - assertFalse(scheduler.schedule("b")); - // We don't act on the reservation made by b because we want to see timeout behaviour. - clock.advance(reservationDuration); - assertTrue(scheduler.schedule("a")); - assignAndAssert(Result.SUCCESS, GROUP_A, OFFER, assignment); + + IScheduledTask task = IScheduledTask.build(TASK_A.newBuilder().setStatus(PENDING)); + eventSink.post(TaskStateChange.transition(task, PENDING)); } @Test @@ -307,6 +262,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { } }); + expectGetNoReservation(SLAVE_ID); AssignmentCapture assignment = expectLaunchAttempt(true); expect(assigner.maybeAssign( EasyMock.<MutableStoreProvider>anyObject(), @@ -353,6 +309,12 @@ public class TaskSchedulerImplTest extends EasyMockTest { return capture; } + private IScheduledTask assign(IScheduledTask task, String slaveId) { + ScheduledTask result = task.newBuilder(); + result.getAssignedTask().setSlaveId(slaveId); + return IScheduledTask.build(result); + } + private void assignAndAssert( Result result, TaskGroupKey groupKey, @@ -369,4 +331,22 @@ public class TaskSchedulerImplTest extends EasyMockTest { .byStatus(Tasks.SLAVE_ASSIGNED_STATES), ImmutableSet.<IScheduledTask>of()); } + + private void expectAddReservation(String slaveId, IScheduledTask task) { + reservations.put(slaveId, TaskGroupKey.from(task.getAssignedTask().getTask())); + } + + private IExpectationSetters<?> expectGetReservation(String slaveId, IScheduledTask task) { + return expect(reservations.get(slaveId)) + .andReturn(Optional.of(TaskGroupKey.from(task.getAssignedTask().getTask()))); + } + + private IExpectationSetters<?> expectGetNoReservation(String slaveId) { + return expect(reservations.get(slaveId)).andReturn(Optional.<TaskGroupKey>absent()); + } + + private IExpectationSetters<?> expectReservationCheck(IScheduledTask task) { + return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask()))) + .andReturn(ImmutableSet.<String>of()); + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java index 9c47a76..34cbd19 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java @@ -19,17 +19,13 @@ import java.util.concurrent.TimeUnit; import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.RateLimiter; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; -import com.twitter.common.stats.Stat; -import com.twitter.common.stats.StatsProvider; import com.twitter.common.testing.easymock.EasyMockTest; import com.twitter.common.util.BackoffStrategy; -import com.twitter.common.util.testing.FakeClock; import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.gen.Attribute; @@ -44,6 +40,7 @@ import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.async.OfferManager.OfferManagerImpl; import org.apache.aurora.scheduler.async.OfferManager.OfferReturnDelay; import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl; +import org.apache.aurora.scheduler.async.preemptor.BiCache; import org.apache.aurora.scheduler.async.preemptor.Preemptor; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskGroupKey; @@ -107,6 +104,9 @@ public class TaskSchedulerTest extends EasyMockTest { private static final HostOffer OFFER_B = makeOffer("OFFER_B", "HOST_B", SCHEDULED); private static final HostOffer OFFER_C = makeOffer("OFFER_C", "HOST_C", DRAINING); private static final HostOffer OFFER_D = makeOffer("OFFER_D", "HOST_D", DRAINED); + private static final String SLAVE_A = OFFER_A.getOffer().getSlaveId().getValue(); + private static final String SLAVE_B = OFFER_B.getOffer().getSlaveId().getValue(); + private static final String SLAVE_C = OFFER_C.getOffer().getSlaveId().getValue(); private Storage storage; @@ -120,11 +120,9 @@ public class TaskSchedulerTest extends EasyMockTest { private OfferReturnDelay returnDelay; private OfferManager offerManager; private TaskGroups taskGroups; - private FakeClock clock; - private StatsProvider statsProvider; private RescheduleCalculator rescheduleCalculator; private Preemptor preemptor; - private Amount<Long, Time> reservationDuration = Amount.of(1L, Time.MINUTES); + private BiCache<String, TaskGroupKey> reservations; @Before public void setUp() { @@ -137,20 +135,12 @@ public class TaskSchedulerTest extends EasyMockTest { executor = createMock(ScheduledExecutorService.class); future = createMock(ScheduledFuture.class); returnDelay = createMock(OfferReturnDelay.class); - clock = new FakeClock(); - clock.setNowMillis(0); - statsProvider = createMock(StatsProvider.class); rescheduleCalculator = createMock(RescheduleCalculator.class); preemptor = createMock(Preemptor.class); + reservations = createMock(new Clazz<BiCache<String, TaskGroupKey>>() { }); } private void replayAndCreateScheduler() { - Capture<Supplier<Long>> cacheSizeSupplier = createCapture(); - Stat<Long> stat = createMock(new Clazz<Stat<Long>>() { }); - expect(statsProvider.makeGauge( - EasyMock.eq(TaskSchedulerImpl.RESERVATIONS_CACHE_SIZE_STAT), - capture(cacheSizeSupplier))).andReturn(stat); - control.replay(); offerManager = new OfferManagerImpl(driver, returnDelay, executor); TaskScheduler scheduler = new TaskSchedulerImpl(storage, @@ -158,9 +148,7 @@ public class TaskSchedulerTest extends EasyMockTest { assigner, offerManager, preemptor, - reservationDuration, - clock, - statsProvider); + reservations); taskGroups = new TaskGroups( executor, Amount.of(FIRST_SCHEDULE_DELAY_MS, Time.MILLISECONDS), @@ -168,7 +156,6 @@ public class TaskSchedulerTest extends EasyMockTest { RateLimiter.create(100), scheduler, rescheduleCalculator); - assertEquals(0L, (long) cacheSizeSupplier.getValue().get()); } private Capture<Runnable> expectOffer() { @@ -233,7 +220,9 @@ public class TaskSchedulerTest extends EasyMockTest { public void testNoOffers() { Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); - expectPreemptorCall(makeTask("a")); + IScheduledTask task = makeTask("a"); + expectPreemptorCall(task); + expectReservationCheck(task); replayAndCreateScheduler(); @@ -301,6 +290,15 @@ public class TaskSchedulerTest extends EasyMockTest { eq(new ResourceRequest(task.getAssignedTask().getTask(), Tasks.id(task), jobAggregate)))); } + private IExpectationSetters<?> expectNoReservation(String slaveId) { + return expect(reservations.get(slaveId)).andReturn(Optional.<TaskGroupKey>absent()); + } + + private IExpectationSetters<?> expectReservationCheck(IScheduledTask task) { + return expect(reservations.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask()))) + .andReturn(ImmutableSet.<String>of()); + } + @Test public void testTaskAssigned() { expectAnyMaintenanceCalls(); @@ -310,6 +308,8 @@ public class TaskSchedulerTest extends EasyMockTest { TaskInfo mesosTask = makeTaskInfo(task); Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); + expectNoReservation(SLAVE_A).times(2); + expectReservationCheck(task); expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure()); expectPreemptorCall(task); @@ -319,7 +319,9 @@ public class TaskSchedulerTest extends EasyMockTest { Capture<Runnable> timeoutCapture3 = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); - expectPreemptorCall(makeTask("b")); + IScheduledTask taskB = makeTask("b"); + expectReservationCheck(taskB); + expectPreemptorCall(taskB); replayAndCreateScheduler(); @@ -345,6 +347,7 @@ public class TaskSchedulerTest extends EasyMockTest { Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); expectAnyMaintenanceCalls(); expectOfferDeclineIn(10); + expectNoReservation(SLAVE_A); expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask)); driver.launchTask(OFFER_A.getOffer().getId(), mesosTask); expectLastCall().andThrow(new IllegalStateException("Driver not ready.")); @@ -375,6 +378,7 @@ public class TaskSchedulerTest extends EasyMockTest { Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); expectAnyMaintenanceCalls(); expectOfferDeclineIn(10); + expectNoReservation(SLAVE_A).times(2); expectMaybeAssign(OFFER_A, task, EMPTY).andThrow(new StorageException("Injected failure.")); Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); @@ -397,6 +401,8 @@ public class TaskSchedulerTest extends EasyMockTest { Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10); expectAnyMaintenanceCalls(); + expectNoReservation(SLAVE_A); + expectReservationCheck(task).times(2); expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure()); Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); expectPreemptorCall(task); @@ -459,12 +465,14 @@ public class TaskSchedulerTest extends EasyMockTest { IScheduledTask taskA = makeTask("A", PENDING); TaskInfo mesosTaskA = makeTaskInfo(taskA); + expectNoReservation(SLAVE_A); expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA)); driver.launchTask(OFFER_A.getOffer().getId(), mesosTaskA); Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); IScheduledTask taskB = makeTask("B", PENDING); TaskInfo mesosTaskB = makeTaskInfo(taskB); + expectNoReservation(SLAVE_B); expectMaybeAssign(OFFER_B, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB)); driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskB); Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); @@ -491,6 +499,7 @@ public class TaskSchedulerTest extends EasyMockTest { IScheduledTask taskA = makeTask("A", PENDING); TaskInfo mesosTaskA = makeTaskInfo(taskA); + expectNoReservation(SLAVE_B); expectMaybeAssign(OFFER_B, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA)); driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskA); Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); @@ -500,6 +509,7 @@ public class TaskSchedulerTest extends EasyMockTest { HostOffer updatedOfferC = new HostOffer( OFFER_C.getOffer(), IHostAttributes.build(OFFER_C.getAttributes().newBuilder().setMode(NONE))); + expectNoReservation(SLAVE_C); expectMaybeAssign(updatedOfferC, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB)); driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB); Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); @@ -556,6 +566,9 @@ public class TaskSchedulerTest extends EasyMockTest { IScheduledTask jobB0 = makeTask("b0", PENDING); + expectNoReservation(SLAVE_A); + expectNoReservation(SLAVE_B); + expectOfferDeclineIn(10); expectOfferDeclineIn(10); expectOfferDeclineIn(10); @@ -597,8 +610,10 @@ public class TaskSchedulerTest extends EasyMockTest { final IScheduledTask task = makeTask("a", PENDING); Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); + expectNoReservation(SLAVE_A); expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure()); expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 20); + expectReservationCheck(task); expectPreemptorCall(task); replayAndCreateScheduler(); http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/test/java/org/apache/aurora/scheduler/async/preemptor/BiCacheTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/BiCacheTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/BiCacheTest.java new file mode 100644 index 0000000..4734776 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/BiCacheTest.java @@ -0,0 +1,108 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.async.preemptor; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; + +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.testing.FakeClock; + +import org.apache.aurora.scheduler.async.preemptor.BiCache.BiCacheSettings; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class BiCacheTest { + private static final Amount<Long, Time> HOLD_DURATION = Amount.of(1L, Time.MINUTES); + private static final String STAT_NAME = "cache_size_stat"; + private static final String KEY_1 = "Key 1"; + private static final String KEY_2 = "Key 2"; + private static final Optional<Integer> NO_VALUE = Optional.absent(); + + private FakeStatsProvider statsProvider; + private FakeClock clock; + private BiCache<String, Integer> biCache; + + @Before + public void setUp() { + statsProvider = new FakeStatsProvider(); + clock = new FakeClock(); + biCache = new BiCache<>(statsProvider, new BiCacheSettings(HOLD_DURATION, STAT_NAME), clock); + } + + @Test + public void testExpiration() { + biCache.put(KEY_1, 1); + assertEquals(Optional.of(1), biCache.get(KEY_1)); + assertEquals(1L, statsProvider.getLongValue(STAT_NAME)); + + clock.advance(HOLD_DURATION); + + assertEquals(NO_VALUE, biCache.get(KEY_1)); + assertEquals(ImmutableSet.of(), biCache.getByValue(1)); + assertEquals(0L, statsProvider.getLongValue(STAT_NAME)); + } + + @Test + public void testRemoval() { + biCache.put(KEY_1, 1); + assertEquals(1L, statsProvider.getLongValue(STAT_NAME)); + assertEquals(Optional.of(1), biCache.get(KEY_1)); + biCache.remove(KEY_1, 1); + assertEquals(NO_VALUE, biCache.get(KEY_1)); + assertEquals(0L, statsProvider.getLongValue(STAT_NAME)); + } + + @Test(expected = NullPointerException.class) + public void testRemovalWithNullKey() { + biCache.remove(null, 1); + } + + @Test + public void testDifferentKeysIdenticalValues() { + biCache.put(KEY_1, 1); + biCache.put(KEY_2, 1); + assertEquals(2L, statsProvider.getLongValue(STAT_NAME)); + + assertEquals(Optional.of(1), biCache.get(KEY_1)); + assertEquals(Optional.of(1), biCache.get(KEY_2)); + assertEquals(ImmutableSet.of(KEY_1, KEY_2), biCache.getByValue(1)); + + biCache.remove(KEY_1, 1); + assertEquals(NO_VALUE, biCache.get(KEY_1)); + assertEquals(Optional.of(1), biCache.get(KEY_2)); + assertEquals(ImmutableSet.of(KEY_2), biCache.getByValue(1)); + assertEquals(1L, statsProvider.getLongValue(STAT_NAME)); + + clock.advance(HOLD_DURATION); + assertEquals(NO_VALUE, biCache.get(KEY_1)); + assertEquals(NO_VALUE, biCache.get(KEY_2)); + assertEquals(ImmutableSet.of(), biCache.getByValue(1)); + assertEquals(0L, statsProvider.getLongValue(STAT_NAME)); + } + + @Test + public void testIdenticalKeysDifferentValues() { + biCache.put(KEY_1, 1); + biCache.put(KEY_1, 2); + assertEquals(Optional.of(2), biCache.get(KEY_1)); + assertEquals(ImmutableSet.of(), biCache.getByValue(1)); + assertEquals(ImmutableSet.of(KEY_1), biCache.getByValue(2)); + assertEquals(1L, statsProvider.getLongValue(STAT_NAME)); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java index 75fc16d..8a9a3b7 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java @@ -14,6 +14,7 @@ package org.apache.aurora.scheduler.async.preemptor; import java.util.Arrays; +import java.util.Set; import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; @@ -29,12 +30,14 @@ import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.TaskEvent; import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.stats.CachedCounters; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.easymock.IExpectationSetters; @@ -49,24 +52,26 @@ import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; public class PendingTaskProcessorTest extends EasyMockTest { - private static final String TASK_ID_A = "task_a"; - private static final String TASK_ID_B = "task_b"; - private static final ScheduledTask TASK_A = makeTask(TASK_ID_A); - private static final ScheduledTask TASK_B = makeTask(TASK_ID_B); + private static final ScheduledTask TASK_A = makeTask("task_a"); + private static final ScheduledTask TASK_B = makeTask("task_b"); private static final PreemptionSlot SLOT_A = createPreemptionSlot(TASK_A); private static final PreemptionSlot SLOT_B = createPreemptionSlot(TASK_B); + private static final TaskGroupKey GROUP_A = + TaskGroupKey.from(ITaskConfig.build(TASK_A.getAssignedTask().getTask())); + private static final TaskGroupKey GROUP_B = + TaskGroupKey.from(ITaskConfig.build(TASK_B.getAssignedTask().getTask())); private static final String SLAVE_ID = "slave_id"; private static final IJobKey JOB_KEY = IJobKey.build(TASK_A.getAssignedTask().getTask().getJob()); private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS); - private static final Optional<PreemptionSlot> EMPTY_SLOT = Optional.absent(); + private static final Set<PreemptionSlot> NO_SLOTS = ImmutableSet.of(); private StorageTestUtil storageUtil; private FakeStatsProvider statsProvider; private PreemptionSlotFinder preemptionSlotFinder; private PendingTaskProcessor slotFinder; - private PreemptionSlotCache slotCache; + private BiCache<PreemptionSlot, TaskGroupKey> slotCache; private FakeClock clock; @Before @@ -74,7 +79,7 @@ public class PendingTaskProcessorTest extends EasyMockTest { storageUtil = new StorageTestUtil(this); storageUtil.expectOperations(); preemptionSlotFinder = createMock(PreemptionSlotFinder.class); - slotCache = createMock(PreemptionSlotCache.class); + slotCache = createMock(new Clazz<BiCache<PreemptionSlot, TaskGroupKey>>() { }); statsProvider = new FakeStatsProvider(); clock = new FakeClock(); @@ -88,14 +93,14 @@ public class PendingTaskProcessorTest extends EasyMockTest { } @Test public void testSearchSlotSuccessful() throws Exception { - expect(slotCache.get(TASK_ID_A)).andReturn(EMPTY_SLOT); - expect(slotCache.get(TASK_ID_B)).andReturn(EMPTY_SLOT); + expect(slotCache.getByValue(GROUP_A)).andReturn(NO_SLOTS); + expect(slotCache.getByValue(GROUP_B)).andReturn(NO_SLOTS); expectGetPendingTasks(TASK_A, TASK_B); expectAttributeAggegateFetchTasks(); expectSlotSearch(TASK_A, Optional.of(SLOT_A)); expectSlotSearch(TASK_B, Optional.of(SLOT_B)); - slotCache.add(TASK_ID_A, SLOT_A); - slotCache.add(TASK_ID_B, SLOT_B); + slotCache.put(SLOT_A, GROUP_A); + slotCache.put(SLOT_B, GROUP_B); control.replay(); @@ -110,10 +115,10 @@ public class PendingTaskProcessorTest extends EasyMockTest { @Test public void testSearchSlotFailed() throws Exception { - expect(slotCache.get(TASK_ID_A)).andReturn(EMPTY_SLOT); + expect(slotCache.getByValue(GROUP_A)).andReturn(NO_SLOTS); expectGetPendingTasks(TASK_A); expectAttributeAggegateFetchTasks(); - expectSlotSearch(TASK_A, EMPTY_SLOT); + expectSlotSearch(TASK_A, Optional.<PreemptionSlot>absent()); control.replay(); http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCacheTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCacheTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCacheTest.java deleted file mode 100644 index 80bd13a..0000000 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotCacheTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; - -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.util.testing.FakeClock; - -import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; -import org.apache.aurora.scheduler.testing.FakeStatsProvider; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class PreemptionSlotCacheTest { - private static final Amount<Long, Time> HOLD_DURATION = Amount.of(1L, Time.MINUTES); - private static final String TASK_ID = "task_id"; - private static final PreemptionSlot SLOT = - new PreemptionSlot(ImmutableSet.<PreemptionVictim>of(), "slave_id"); - - private FakeStatsProvider statsProvider; - private FakeClock clock; - private PreemptionSlotCache slotCache; - - @Before - public void setUp() { - statsProvider = new FakeStatsProvider(); - clock = new FakeClock(); - slotCache = new PreemptionSlotCache(statsProvider, HOLD_DURATION, clock); - } - - @Test - public void testExpiration() { - slotCache.add(TASK_ID, SLOT); - assertEquals(Optional.of(SLOT), slotCache.get(TASK_ID)); - assertEquals(1L, statsProvider.getLongValue( - PreemptionSlotCache.PREEMPTION_SLOT_CACHE_SIZE_STAT)); - - clock.advance(HOLD_DURATION); - - assertEquals(Optional.<PreemptionSlot>absent(), slotCache.get(TASK_ID)); - } - - @Test - public void testRemoval() { - slotCache.add(TASK_ID, SLOT); - assertEquals(Optional.of(SLOT), slotCache.get(TASK_ID)); - slotCache.remove(TASK_ID); - assertEquals(Optional.<PreemptionSlot>absent(), slotCache.get(TASK_ID)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/1dc11fb1/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java index 97d6087..64283fa 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java @@ -13,6 +13,8 @@ */ package org.apache.aurora.scheduler.async.preemptor; +import java.util.Set; + import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; import com.twitter.common.testing.easymock.EasyMockTest; @@ -25,12 +27,14 @@ import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.TaskEvent; import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; import org.apache.aurora.scheduler.async.preemptor.Preemptor.PreemptorImpl; +import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.stats.CachedCounters; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.easymock.EasyMock; import org.junit.Before; @@ -46,19 +50,20 @@ import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; public class PreemptorImplTest extends EasyMockTest { - private static final String TASK_ID = "task_a"; private static final String SLAVE_ID = "slave_id"; private static final IScheduledTask TASK = IScheduledTask.build(makeTask()); private static final PreemptionSlot SLOT = createPreemptionSlot(TASK); + private static final TaskGroupKey GROUP_KEY = + TaskGroupKey.from(ITaskConfig.build(makeTask().getAssignedTask().getTask())); - private static final Optional<PreemptionSlot> EMPTY_SLOT = Optional.absent(); + private static final Set<PreemptionSlot> NO_SLOTS = ImmutableSet.of(); private static final Optional<String> EMPTY_RESULT = Optional.absent(); private StateManager stateManager; private FakeStatsProvider statsProvider; private PreemptionSlotFinder preemptionSlotFinder; private PreemptorImpl preemptor; - private PreemptionSlotCache slotCache; + private BiCache<PreemptionSlot, TaskGroupKey> slotCache; private Storage.MutableStoreProvider storeProvider; @Before @@ -66,7 +71,7 @@ public class PreemptorImplTest extends EasyMockTest { storeProvider = createMock(Storage.MutableStoreProvider.class); stateManager = createMock(StateManager.class); preemptionSlotFinder = createMock(PreemptionSlotFinder.class); - slotCache = createMock(PreemptionSlotCache.class); + slotCache = createMock(new Clazz<BiCache<PreemptionSlot, TaskGroupKey>>() { }); statsProvider = new FakeStatsProvider(); preemptor = new PreemptorImpl( stateManager, @@ -77,8 +82,8 @@ public class PreemptorImplTest extends EasyMockTest { @Test public void testPreemptTasksSuccessful() throws Exception { - expect(slotCache.get(TASK_ID)).andReturn(Optional.of(SLOT)); - slotCache.remove(TASK_ID); + expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(SLOT)); + slotCache.remove(SLOT, GROUP_KEY); expectSlotValidation(Optional.of(ImmutableSet.of( PreemptionVictim.fromTask(TASK.getAssignedTask())))); @@ -93,8 +98,8 @@ public class PreemptorImplTest extends EasyMockTest { @Test public void testPreemptTasksValidationFailed() throws Exception { - expect(slotCache.get(TASK_ID)).andReturn(Optional.of(SLOT)); - slotCache.remove(TASK_ID); + expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(SLOT)); + slotCache.remove(SLOT, GROUP_KEY); expectSlotValidation(Optional.<ImmutableSet<PreemptionVictim>>absent()); control.replay(); @@ -106,7 +111,7 @@ public class PreemptorImplTest extends EasyMockTest { @Test public void testNoCachedSlot() throws Exception { - expect(slotCache.get(TASK_ID)).andReturn(EMPTY_SLOT); + expect(slotCache.getByValue(GROUP_KEY)).andReturn(NO_SLOTS); control.replay(); @@ -145,7 +150,6 @@ public class PreemptorImplTest extends EasyMockTest { private static ScheduledTask makeTask() { ScheduledTask task = new ScheduledTask() .setAssignedTask(new AssignedTask() - .setTaskId(TASK_ID) .setTask(new TaskConfig() .setPriority(1) .setProduction(true)
