Improving async preemptor efficiency. Bugs closed: AURORA-1219
Reviewed at https://reviews.apache.org/r/32597/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/8fd21a1a Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/8fd21a1a Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/8fd21a1a Branch: refs/heads/master Commit: 8fd21a1adf4df93166a64f1542120c1af6e77443 Parents: 8ba1b11 Author: Maxim Khutornenko <[email protected]> Authored: Tue Apr 21 17:24:45 2015 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Tue Apr 21 17:24:45 2015 -0700 ---------------------------------------------------------------------- .../async/preemptor/PendingTaskProcessor.java | 164 ++++- .../async/preemptor/PreemptionProposal.java | 66 ++ .../async/preemptor/PreemptionSlotFinder.java | 351 ---------- .../async/preemptor/PreemptionVictimFilter.java | 214 +++++++ .../scheduler/async/preemptor/Preemptor.java | 32 +- .../async/preemptor/PreemptorMetrics.java | 6 +- .../async/preemptor/PreemptorModule.java | 10 +- .../aurora/scheduler/base/TaskGroupKey.java | 9 + .../aurora/scheduler/app/SchedulerIT.java | 1 + .../preemptor/PendingTaskProcessorTest.java | 231 +++++-- .../preemptor/PreemptionVictimFilterTest.java | 514 +++++++++++++++ .../async/preemptor/PreemptorImplTest.java | 56 +- .../preemptor/PreemptorSlotFinderTest.java | 641 ------------------- 13 files changed, 1187 insertions(+), 1108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java index 00919b7..4427115 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java @@ -15,22 +15,40 @@ package org.apache.aurora.scheduler.async.preemptor; import java.lang.annotation.Retention; import java.lang.annotation.Target; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; import javax.inject.Inject; import javax.inject.Qualifier; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Functions; import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.base.Predicates; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.FluentIterable; +import com.google.common.collect.HashMultiset; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; +import com.google.common.collect.Multiset; +import com.google.common.collect.Sets; + import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; import com.twitter.common.util.Clock; -import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.async.OfferManager; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.base.Tasks; @@ -56,10 +74,12 @@ import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED; */ class PendingTaskProcessor implements Runnable { private final Storage storage; - private final PreemptionSlotFinder preemptionSlotFinder; + private final OfferManager offerManager; + private final PreemptionVictimFilter preemptionVictimFilter; private final PreemptorMetrics metrics; private final Amount<Long, Time> preemptionCandidacyDelay; - private final BiCache<PreemptionSlot, TaskGroupKey> slotCache; + private final BiCache<PreemptionProposal, TaskGroupKey> slotCache; + private final ClusterState clusterState; private final Clock clock; /** @@ -76,17 +96,21 @@ class PendingTaskProcessor implements Runnable { @Inject PendingTaskProcessor( Storage storage, - PreemptionSlotFinder preemptionSlotFinder, + OfferManager offerManager, + PreemptionVictimFilter preemptionVictimFilter, PreemptorMetrics metrics, @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay, - BiCache<PreemptionSlot, TaskGroupKey> slotCache, + BiCache<PreemptionProposal, TaskGroupKey> slotCache, + ClusterState clusterState, Clock clock) { this.storage = requireNonNull(storage); - this.preemptionSlotFinder = requireNonNull(preemptionSlotFinder); + this.offerManager = requireNonNull(offerManager); + this.preemptionVictimFilter = requireNonNull(preemptionVictimFilter); this.metrics = requireNonNull(metrics); this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay); this.slotCache = requireNonNull(slotCache); + this.clusterState = requireNonNull(clusterState); this.clock = requireNonNull(clock); } @@ -95,42 +119,120 @@ class PendingTaskProcessor implements Runnable { metrics.recordTaskProcessorRun(); storage.read(new Storage.Work.Quiet<Void>() { @Override - public Void apply(StoreProvider storeProvider) { - Multimap<IJobKey, IAssignedTask> pendingTasks = fetchIdlePendingTasks(storeProvider); + public Void apply(StoreProvider store) { + Multimap<String, PreemptionVictim> slavesToActiveTasks = + clusterState.getSlavesToActiveTasks(); + + if (slavesToActiveTasks.isEmpty()) { + // No preemption victims to consider. + return null; + } - for (IJobKey job : pendingTasks.keySet()) { - AttributeAggregate jobState = AttributeAggregate.getJobActiveState(storeProvider, job); + // Group the offers by slave id so they can be paired with active tasks from the same slave. + Map<String, HostOffer> slavesToOffers = + Maps.uniqueIndex(offerManager.getOffers(), OFFER_TO_SLAVE_ID); - for (IAssignedTask pendingTask : pendingTasks.get(job)) { - ITaskConfig task = pendingTask.getTask(); - metrics.recordPreemptionAttemptFor(task); + Set<String> allSlaves = Sets.newHashSet(Iterables.concat( + slavesToOffers.keySet(), + slavesToActiveTasks.keySet())); - Optional<PreemptionSlot> slot = preemptionSlotFinder.findPreemptionSlotFor( - pendingTask, - jobState, - storeProvider); + // The algorithm below attempts to find a reservation for every task group by matching + // it against all available slaves until a preemption slot is found. Groups are evaluated + // in a round-robin fashion to ensure fairness (e.g.: G1, G2, G3, G1, G2). + // A slave is removed from further matching once a reservation is made. Similarly, all + // identical task group instances are removed from further iteration if none of the + // available slaves could yield a preemption proposal. A consuming iterator is used for + // task groups to ensure iteration order is preserved after a task group is removed. + LoadingCache<IJobKey, AttributeAggregate> jobStates = attributeCache(store); + List<TaskGroupKey> pendingGroups = fetchIdlePendingGroups(store); + Iterator<TaskGroupKey> groups = Iterators.consumingIterator(pendingGroups.iterator()); + while (!pendingGroups.isEmpty()) { + boolean matched = false; + TaskGroupKey group = groups.next(); + ITaskConfig task = group.getTask(); - metrics.recordSlotSearchResult(slot, task); + metrics.recordPreemptionAttemptFor(task); + Iterator<String> slaveIterator = allSlaves.iterator(); + while (slaveIterator.hasNext()) { + String slaveId = slaveIterator.next(); + Optional<ImmutableSet<PreemptionVictim>> candidates = + preemptionVictimFilter.filterPreemptionVictims( + task, + slavesToActiveTasks.get(slaveId), + jobStates.getUnchecked(task.getJob()), + Optional.fromNullable(slavesToOffers.get(slaveId)), + store); - if (slot.isPresent()) { - slotCache.put(slot.get(), TaskGroupKey.from(task)); + metrics.recordSlotSearchResult(candidates, task); + if (candidates.isPresent()) { + // Slot found -> remove slave to avoid multiple task reservations. + slaveIterator.remove(); + slotCache.put(new PreemptionProposal(candidates.get(), slaveId), group); + matched = true; + break; } } + if (!matched) { + // No slot found for the group -> remove group and reset group iterator. + pendingGroups.removeAll(ImmutableSet.of(group)); + groups = Iterators.consumingIterator(pendingGroups.iterator()); + } } return null; } }); } - private Multimap<IJobKey, IAssignedTask> fetchIdlePendingTasks(StoreProvider store) { - return Multimaps.index( - FluentIterable - .from(store.getTaskStore().fetchTasks(Query.statusScoped(PENDING))) + private List<TaskGroupKey> fetchIdlePendingGroups(StoreProvider store) { + Multiset<TaskGroupKey> taskGroupCounts = HashMultiset.create( + FluentIterable.from(store.getTaskStore().fetchTasks(Query.statusScoped(PENDING))) .filter(Predicates.and(isIdleTask, Predicates.not(hasCachedSlot))) - .transform(SCHEDULED_TO_ASSIGNED), - Tasks.ASSIGNED_TO_JOB_KEY); + .transform(Functions.compose(ASSIGNED_TO_GROUP_KEY, SCHEDULED_TO_ASSIGNED))); + + return getPreemptionSequence(taskGroupCounts); + } + + /** + * Creates execution sequence for pending task groups by interleaving their unique occurrences. + * For example: {G1, G1, G1, G2, G2} will be converted into {G1, G2, G1, G2, G1}. + * + * @param groups Multiset of task groups. + * @return A task group execution sequence. + */ + private static List<TaskGroupKey> getPreemptionSequence(Multiset<TaskGroupKey> groups) { + Multiset<TaskGroupKey> mutableGroups = HashMultiset.create(groups); + List<TaskGroupKey> instructions = Lists.newLinkedList(); + Set<TaskGroupKey> keys = ImmutableSet.copyOf(groups.elementSet()); + while (!mutableGroups.isEmpty()) { + for (TaskGroupKey key : keys) { + if (mutableGroups.contains(key)) { + instructions.add(key); + mutableGroups.remove(key); + } + } + } + + return instructions; + } + + private LoadingCache<IJobKey, AttributeAggregate> attributeCache(final StoreProvider store) { + return CacheBuilder.newBuilder().build(CacheLoader.from( + new Function<IJobKey, AttributeAggregate>() { + @Override + public AttributeAggregate apply(IJobKey job) { + return AttributeAggregate.getJobActiveState(store, job); + } + })); } + private static final Function<IAssignedTask, TaskGroupKey> ASSIGNED_TO_GROUP_KEY = + new Function<IAssignedTask, TaskGroupKey>() { + @Override + public TaskGroupKey apply(IAssignedTask task) { + return TaskGroupKey.from(task.getTask()); + } + }; + private final Predicate<IScheduledTask> hasCachedSlot = new Predicate<IScheduledTask>() { @Override public boolean apply(IScheduledTask task) { @@ -145,4 +247,12 @@ class PendingTaskProcessor implements Runnable { >= preemptionCandidacyDelay.as(Time.MILLISECONDS); } }; + + private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID = + new Function<HostOffer, String>() { + @Override + public String apply(HostOffer offer) { + return offer.getOffer().getSlaveId().getValue(); + } + }; } http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionProposal.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionProposal.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionProposal.java new file mode 100644 index 0000000..7a03168 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionProposal.java @@ -0,0 +1,66 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.async.preemptor; + +import java.util.Objects; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; + +import static java.util.Objects.requireNonNull; + +/** + * A set of tasks proposed for preemption on a given slave. + */ +class PreemptionProposal { + private final Set<PreemptionVictim> victims; + private final String slaveId; + + PreemptionProposal(ImmutableSet<PreemptionVictim> victims, String slaveId) { + this.victims = requireNonNull(victims); + this.slaveId = requireNonNull(slaveId); + } + + Set<PreemptionVictim> getVictims() { + return victims; + } + + String getSlaveId() { + return slaveId; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof PreemptionProposal)) { + return false; + } + + PreemptionProposal other = (PreemptionProposal) o; + return Objects.equals(getVictims(), other.getVictims()) + && Objects.equals(getSlaveId(), other.getSlaveId()); + } + + @Override + public int hashCode() { + return Objects.hash(victims, slaveId); + } + + @Override + public String toString() { + return com.google.common.base.Objects.toStringHelper(this) + .add("victims", getVictims()) + .add("slaveId", getSlaveId()) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java deleted file mode 100644 index f16f964..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionSlotFinder.java +++ /dev/null @@ -1,351 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import java.util.Objects; -import java.util.Set; - -import javax.inject.Inject; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; -import com.google.common.collect.Ordering; -import com.google.common.collect.Sets; - -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.ResourceSlot; -import org.apache.aurora.scheduler.async.OfferManager; -import org.apache.aurora.scheduler.filter.AttributeAggregate; -import org.apache.aurora.scheduler.filter.SchedulingFilter; -import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; -import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; -import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; -import org.apache.aurora.scheduler.mesos.ExecutorSettings; -import org.apache.aurora.scheduler.storage.Storage.StoreProvider; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import org.apache.mesos.Protos.SlaveID; - -import static java.util.Objects.requireNonNull; - -/** - * Tries to find a slave with a combination of active tasks (victims) and available offer - * (slack) resources that can accommodate a given task (candidate), provided victims are preempted. - * <p> - * A task may preempt another task if the following conditions hold true: - * <ol> - * <li>The resources reserved for a victim (or a set of victims) are sufficient to satisfy - * the candidate. - * </li> - * <li>Both candidate and victim are owned by the same user and the - * {@link ITaskConfig#getPriority} of a victim is lower OR a victim is non-production and the - * candidate is production. - * </li> - * </ol> - */ -public interface PreemptionSlotFinder { - - class PreemptionSlot { - private final Set<PreemptionVictim> victims; - private final String slaveId; - - @VisibleForTesting - PreemptionSlot(ImmutableSet<PreemptionVictim> victims, String slaveId) { - this.victims = requireNonNull(victims); - this.slaveId = requireNonNull(slaveId); - } - - Set<PreemptionVictim> getVictims() { - return victims; - } - - String getSlaveId() { - return slaveId; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof PreemptionSlot)) { - return false; - } - - PreemptionSlot other = (PreemptionSlot) o; - return Objects.equals(getVictims(), other.getVictims()) - && Objects.equals(getSlaveId(), other.getSlaveId()); - } - - @Override - public int hashCode() { - return Objects.hash(victims, slaveId); - } - - @Override - public String toString() { - return com.google.common.base.Objects.toStringHelper(this) - .add("victims", getVictims()) - .add("slaveId", getSlaveId()) - .toString(); - } - } - - /** - * Searches for a {@link PreemptionSlot} for a given {@code pendingTask}. - * - * @param pendingTask Task to search preemption slot for. - * @param attributeAggregate An {@link AttributeAggregate} instance for the task's job. - * @param storeProvider A store provider to access task data. - * @return An instance of {@link PreemptionSlot} if preemption is possible. - */ - Optional<PreemptionSlot> findPreemptionSlotFor( - IAssignedTask pendingTask, - AttributeAggregate attributeAggregate, - StoreProvider storeProvider); - - /** - * Validates that a previously-found {@code preemptionSlot} can still accommodate a given task. - * - * @param pendingTask Task to validate preemption slot for. - * @param attributeAggregate An {@link AttributeAggregate} instance for the task's job. - * @param preemptionSlot A previously found preemption slot to validate. - * @param storeProvider A store provider to access task data. - * @return A finalized set of {@code PreemptionVictim} instances to preempt for a given task. - */ - Optional<ImmutableSet<PreemptionVictim>> validatePreemptionSlotFor( - IAssignedTask pendingTask, - AttributeAggregate attributeAggregate, - PreemptionSlot preemptionSlot, - StoreProvider storeProvider); - - class PreemptionSlotFinderImpl implements PreemptionSlotFinder { - private final OfferManager offerManager; - private final ClusterState clusterState; - private final SchedulingFilter schedulingFilter; - private final ExecutorSettings executorSettings; - private final PreemptorMetrics metrics; - - @Inject - PreemptionSlotFinderImpl( - OfferManager offerManager, - ClusterState clusterState, - SchedulingFilter schedulingFilter, - ExecutorSettings executorSettings, - PreemptorMetrics metrics) { - - this.offerManager = requireNonNull(offerManager); - this.clusterState = requireNonNull(clusterState); - this.schedulingFilter = requireNonNull(schedulingFilter); - this.executorSettings = requireNonNull(executorSettings); - this.metrics = requireNonNull(metrics); - } - - private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT = - new Function<HostOffer, ResourceSlot>() { - @Override - public ResourceSlot apply(HostOffer offer) { - return ResourceSlot.from(offer.getOffer()); - } - }; - - private static final Function<HostOffer, String> OFFER_TO_HOST = - new Function<HostOffer, String>() { - @Override - public String apply(HostOffer offer) { - return offer.getOffer().getHostname(); - } - }; - - private static final Function<PreemptionVictim, String> VICTIM_TO_HOST = - new Function<PreemptionVictim, String>() { - @Override - public String apply(PreemptionVictim victim) { - return victim.getSlaveHost(); - } - }; - - private final Function<PreemptionVictim, ResourceSlot> victimToResources = - new Function<PreemptionVictim, ResourceSlot>() { - @Override - public ResourceSlot apply(PreemptionVictim victim) { - return ResourceSlot.from(victim, executorSettings); - } - }; - - // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector - // ordering - private final Ordering<PreemptionVictim> resourceOrder = - ResourceSlot.ORDER.onResultOf(victimToResources).reverse(); - - private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID = - new Function<HostOffer, String>() { - @Override - public String apply(HostOffer offer) { - return offer.getOffer().getSlaveId().getValue(); - } - }; - - // TODO(maxim): This should take pre-computed mappings (e.g. slaveToOffers) to avoid - // unnecessary repeated work. - @Override - public Optional<PreemptionSlot> findPreemptionSlotFor( - final IAssignedTask pendingTask, - AttributeAggregate attributeAggregate, - StoreProvider storeProvider) { - - Multimap<String, PreemptionVictim> slavesToActiveTasks = - clusterState.getSlavesToActiveTasks(); - - if (slavesToActiveTasks.isEmpty()) { - return Optional.absent(); - } - - // Group the offers by slave id so they can be paired with active tasks from the same slave. - Multimap<String, HostOffer> slavesToOffers = - Multimaps.index(offerManager.getOffers(), OFFER_TO_SLAVE_ID); - - Set<String> allSlaves = ImmutableSet.<String>builder() - .addAll(slavesToOffers.keySet()) - .addAll(slavesToActiveTasks.keySet()) - .build(); - - for (String slaveId : allSlaves) { - final Optional<ImmutableSet<PreemptionVictim>> preemptionVictims = getTasksToPreempt( - slavesToActiveTasks.get(slaveId), - slavesToOffers.get(slaveId), - pendingTask, - attributeAggregate, - storeProvider); - - if (preemptionVictims.isPresent()) { - return Optional.of(new PreemptionSlot(preemptionVictims.get(), slaveId)); - } - } - - return Optional.absent(); - } - - @Override - public Optional<ImmutableSet<PreemptionVictim>> validatePreemptionSlotFor( - IAssignedTask pendingTask, - AttributeAggregate attributeAggregate, - PreemptionSlot preemptionSlot, - StoreProvider storeProvider) { - - Optional<HostOffer> offer = - offerManager.getOffer(SlaveID.newBuilder().setValue(preemptionSlot.getSlaveId()).build()); - - return getTasksToPreempt( - preemptionSlot.getVictims(), - offer.asSet(), - pendingTask, - attributeAggregate, - storeProvider); - } - - /** - * Optional.absent indicates that this slave does not have enough resources to satisfy the task. - * A set with elements indicates those tasks and the offers are enough. - */ - private Optional<ImmutableSet<PreemptionVictim>> getTasksToPreempt( - Iterable<PreemptionVictim> possibleVictims, - Iterable<HostOffer> offers, - IAssignedTask pendingTask, - AttributeAggregate jobState, - StoreProvider storeProvider) { - - // This enforces the precondition that all of the resources are from the same host. We need to - // get the host for the schedulingFilter. - Set<String> hosts = ImmutableSet.<String>builder() - .addAll(Iterables.transform(possibleVictims, VICTIM_TO_HOST)) - .addAll(Iterables.transform(offers, OFFER_TO_HOST)).build(); - - ResourceSlot slackResources = - ResourceSlot.sum(Iterables.transform(offers, OFFER_TO_RESOURCE_SLOT)); - - FluentIterable<PreemptionVictim> preemptableTasks = FluentIterable.from(possibleVictims) - .filter(preemptionFilter(pendingTask.getTask())); - - if (preemptableTasks.isEmpty()) { - return Optional.absent(); - } - - Set<PreemptionVictim> toPreemptTasks = Sets.newHashSet(); - - Iterable<PreemptionVictim> sortedVictims = - resourceOrder.immutableSortedCopy(preemptableTasks); - - Optional<IHostAttributes> attributes = - storeProvider.getAttributeStore().getHostAttributes(Iterables.getOnlyElement(hosts)); - - if (!attributes.isPresent()) { - metrics.recordMissingAttributes(); - return Optional.absent(); - } - - for (PreemptionVictim victim : sortedVictims) { - toPreemptTasks.add(victim); - - ResourceSlot totalResource = ResourceSlot.sum( - ResourceSlot.sum(Iterables.transform(toPreemptTasks, victimToResources)), - slackResources); - - Set<Veto> vetoes = schedulingFilter.filter( - new UnusedResource(totalResource, attributes.get()), - new ResourceRequest(pendingTask.getTask(), jobState)); - - if (vetoes.isEmpty()) { - return Optional.of(ImmutableSet.copyOf(toPreemptTasks)); - } - } - return Optional.absent(); - } - - /** - * Creates a filter that will find tasks that the provided {@code pendingTask} may preempt. - * - * @param pendingTask A task that is not scheduled to possibly preempt other tasks for. - * @return A filter that will compare the priorities and resources required by other tasks - * with {@code preemptableTask}. - */ - private static Predicate<PreemptionVictim> preemptionFilter(final ITaskConfig pendingTask) { - return new Predicate<PreemptionVictim>() { - @Override - public boolean apply(PreemptionVictim possibleVictim) { - boolean pendingIsProduction = pendingTask.isProduction(); - boolean victimIsProduction = possibleVictim.isProduction(); - - if (pendingIsProduction && !victimIsProduction) { - return true; - } else if (pendingIsProduction == victimIsProduction) { - // If production flags are equal, preemption is based on priority within the same role. - if (pendingTask.getJob().getRole().equals(possibleVictim.getRole())) { - return pendingTask.getPriority() > possibleVictim.getPriority(); - } else { - return false; - } - } else { - return false; - } - } - }; - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilter.java new file mode 100644 index 0000000..75e2370 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilter.java @@ -0,0 +1,214 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.async.preemptor; + +import java.util.Set; + +import javax.inject.Inject; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; + +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.ResourceSlot; +import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.filter.SchedulingFilter; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; +import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; +import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; +import org.apache.aurora.scheduler.mesos.ExecutorSettings; +import org.apache.aurora.scheduler.storage.Storage.StoreProvider; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; + +import static java.util.Objects.requireNonNull; + +/** + * Filters active tasks (victims) and available offer (slack) resources that can accommodate a + * given task (candidate), provided victims are preempted. + * <p> + * A task may preempt another task if the following conditions hold true: + * <ol> + * <li>The resources reserved for a victim (or a set of victims) are sufficient to satisfy + * the candidate. + * </li> + * <li>Both candidate and victim are owned by the same user and the + * {@link ITaskConfig#getPriority} of a victim is lower OR a victim is non-production and the + * candidate is production. + * </li> + * </ol> + */ +public interface PreemptionVictimFilter { + /** + * Returns a set of {@link PreemptionVictim} that can accommodate a given task if preempted. + * + * @param pendingTask Task to search preemption slot for. + * @param victims Active tasks on a slave. + * @param attributeAggregate An {@link AttributeAggregate} instance for the task's job. + * @param offer A resource offer for a slave. + * @param storeProvider A store provider to access task data. + * @return A set of {@code PreemptionVictim} instances to preempt for a given task. + */ + Optional<ImmutableSet<PreemptionVictim>> filterPreemptionVictims( + ITaskConfig pendingTask, + Iterable<PreemptionVictim> victims, + AttributeAggregate attributeAggregate, + Optional<HostOffer> offer, + StoreProvider storeProvider); + + class PreemptionVictimFilterImpl implements PreemptionVictimFilter { + private final SchedulingFilter schedulingFilter; + private final ExecutorSettings executorSettings; + private final PreemptorMetrics metrics; + + @Inject + PreemptionVictimFilterImpl( + SchedulingFilter schedulingFilter, + ExecutorSettings executorSettings, + PreemptorMetrics metrics) { + + this.schedulingFilter = requireNonNull(schedulingFilter); + this.executorSettings = requireNonNull(executorSettings); + this.metrics = requireNonNull(metrics); + } + + private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT = + new Function<HostOffer, ResourceSlot>() { + @Override + public ResourceSlot apply(HostOffer offer) { + return ResourceSlot.from(offer.getOffer()); + } + }; + + private static final Function<HostOffer, String> OFFER_TO_HOST = + new Function<HostOffer, String>() { + @Override + public String apply(HostOffer offer) { + return offer.getOffer().getHostname(); + } + }; + + private static final Function<PreemptionVictim, String> VICTIM_TO_HOST = + new Function<PreemptionVictim, String>() { + @Override + public String apply(PreemptionVictim victim) { + return victim.getSlaveHost(); + } + }; + + private final Function<PreemptionVictim, ResourceSlot> victimToResources = + new Function<PreemptionVictim, ResourceSlot>() { + @Override + public ResourceSlot apply(PreemptionVictim victim) { + return ResourceSlot.from(victim, executorSettings); + } + }; + + // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector + // ordering + private final Ordering<PreemptionVictim> resourceOrder = + ResourceSlot.ORDER.onResultOf(victimToResources).reverse(); + + @Override + public Optional<ImmutableSet<PreemptionVictim>> filterPreemptionVictims( + ITaskConfig pendingTask, + Iterable<PreemptionVictim> possibleVictims, + AttributeAggregate jobState, + Optional<HostOffer> offer, + StoreProvider storeProvider) { + + // This enforces the precondition that all of the resources are from the same host. We need to + // get the host for the schedulingFilter. + Set<String> hosts = ImmutableSet.<String>builder() + .addAll(Iterables.transform(possibleVictims, VICTIM_TO_HOST)) + .addAll(Iterables.transform(offer.asSet(), OFFER_TO_HOST)).build(); + + ResourceSlot slackResources = + ResourceSlot.sum(Iterables.transform(offer.asSet(), OFFER_TO_RESOURCE_SLOT)); + + FluentIterable<PreemptionVictim> preemptableTasks = FluentIterable.from(possibleVictims) + .filter(preemptionFilter(pendingTask)); + + if (preemptableTasks.isEmpty()) { + return Optional.absent(); + } + + Set<PreemptionVictim> toPreemptTasks = Sets.newHashSet(); + + Iterable<PreemptionVictim> sortedVictims = + resourceOrder.immutableSortedCopy(preemptableTasks); + + Optional<IHostAttributes> attributes = + storeProvider.getAttributeStore().getHostAttributes(Iterables.getOnlyElement(hosts)); + + if (!attributes.isPresent()) { + metrics.recordMissingAttributes(); + return Optional.absent(); + } + + for (PreemptionVictim victim : sortedVictims) { + toPreemptTasks.add(victim); + + ResourceSlot totalResource = ResourceSlot.sum( + ResourceSlot.sum(Iterables.transform(toPreemptTasks, victimToResources)), + slackResources); + + Set<Veto> vetoes = schedulingFilter.filter( + new UnusedResource(totalResource, attributes.get()), + new ResourceRequest(pendingTask, jobState)); + + if (vetoes.isEmpty()) { + return Optional.of(ImmutableSet.copyOf(toPreemptTasks)); + } + } + return Optional.absent(); + } + + /** + * Creates a filter that will find tasks that the provided {@code pendingTask} may preempt. + * + * @param pendingTask A task that is not scheduled to possibly preempt other tasks for. + * @return A filter that will compare the priorities and resources required by other tasks + * with {@code preemptableTask}. + */ + private static Predicate<PreemptionVictim> preemptionFilter(final ITaskConfig pendingTask) { + return new Predicate<PreemptionVictim>() { + @Override + public boolean apply(PreemptionVictim possibleVictim) { + boolean pendingIsProduction = pendingTask.isProduction(); + boolean victimIsProduction = possibleVictim.isProduction(); + + if (pendingIsProduction && !victimIsProduction) { + return true; + } else if (pendingIsProduction == victimIsProduction) { + // If production flags are equal, preemption is based on priority within the same role. + if (pendingTask.getJob().getRole().equals(possibleVictim.getRole())) { + return pendingTask.getPriority() > possibleVictim.getPriority(); + } else { + return false; + } + } else { + return false; + } + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java index 5200811..41591b8 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java @@ -21,12 +21,13 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; +import org.apache.aurora.scheduler.async.OfferManager; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.mesos.Protos.SlaveID; import static java.util.Objects.requireNonNull; @@ -52,19 +53,22 @@ public interface Preemptor { class PreemptorImpl implements Preemptor { private final StateManager stateManager; - private final PreemptionSlotFinder preemptionSlotFinder; + private final OfferManager offerManager; + private final PreemptionVictimFilter preemptionVictimFilter; private final PreemptorMetrics metrics; - private final BiCache<PreemptionSlot, TaskGroupKey> slotCache; + private final BiCache<PreemptionProposal, TaskGroupKey> slotCache; @Inject PreemptorImpl( StateManager stateManager, - PreemptionSlotFinder preemptionSlotFinder, + OfferManager offerManager, + PreemptionVictimFilter preemptionVictimFilter, PreemptorMetrics metrics, - BiCache<PreemptionSlot, TaskGroupKey> slotCache) { + BiCache<PreemptionProposal, TaskGroupKey> slotCache) { this.stateManager = requireNonNull(stateManager); - this.preemptionSlotFinder = requireNonNull(preemptionSlotFinder); + this.offerManager = requireNonNull(offerManager); + this.preemptionVictimFilter = requireNonNull(preemptionVictimFilter); this.metrics = requireNonNull(metrics); this.slotCache = requireNonNull(slotCache); } @@ -76,17 +80,23 @@ public interface Preemptor { MutableStoreProvider store) { TaskGroupKey groupKey = TaskGroupKey.from(pendingTask.getTask()); - Set<PreemptionSlot> preemptionSlots = slotCache.getByValue(groupKey); + Set<PreemptionProposal> preemptionProposals = slotCache.getByValue(groupKey); // A preemption slot is available -> attempt to preempt tasks. - if (!preemptionSlots.isEmpty()) { + if (!preemptionProposals.isEmpty()) { // Get the next available preemption slot. - PreemptionSlot slot = preemptionSlots.iterator().next(); + PreemptionProposal slot = preemptionProposals.iterator().next(); slotCache.remove(slot, groupKey); - // Validate a PreemptionSlot is still valid for the given task. + // Validate PreemptionProposal is still valid for the given task. + SlaveID slaveId = SlaveID.newBuilder().setValue(slot.getSlaveId()).build(); Optional<ImmutableSet<PreemptionVictim>> validatedVictims = - preemptionSlotFinder.validatePreemptionSlotFor(pendingTask, jobState, slot, store); + preemptionVictimFilter.filterPreemptionVictims( + pendingTask.getTask(), + slot.getVictims(), + jobState, + offerManager.getOffer(slaveId), + store); metrics.recordSlotValidationResult(validatedVictims); if (!validatedVictims.isPresent()) { http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java index dc7eb44..22a1533 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java @@ -35,7 +35,7 @@ public class PreemptorMetrics { static final String MISSING_ATTRIBUTES_NAME = "preemptor_missing_attributes"; @VisibleForTesting - static final String PENDING_PROCESSOR_RUN_NAME = "preemptor_task_processor_runs"; + static final String TASK_PROCESSOR_RUN_NAME = "preemptor_task_processor_runs"; private volatile boolean exported = false; private final CachedCounters counters; @@ -72,7 +72,7 @@ public class PreemptorMetrics { slotValidationStatName(true), slotValidationStatName(false), MISSING_ATTRIBUTES_NAME, - PENDING_PROCESSOR_RUN_NAME); + TASK_PROCESSOR_RUN_NAME); for (String stat : allStats) { counters.get(stat); } @@ -126,6 +126,6 @@ public class PreemptorMetrics { } void recordTaskProcessorRun() { - increment(PENDING_PROCESSOR_RUN_NAME); + increment(TASK_PROCESSOR_RUN_NAME); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java index 7cea881..156bac2 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java +++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java @@ -31,7 +31,6 @@ import com.twitter.common.quantity.Time; import org.apache.aurora.scheduler.SchedulerServicesModule; import org.apache.aurora.scheduler.async.preemptor.BiCache.BiCacheSettings; -import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.aurora.scheduler.filter.AttributeAggregate; @@ -90,9 +89,9 @@ public class PreemptorModule extends AbstractModule { if (enablePreemptor) { LOG.info("Preemptor Enabled."); bind(PreemptorMetrics.class).in(Singleton.class); - bind(PreemptionSlotFinder.class) - .to(PreemptionSlotFinder.PreemptionSlotFinderImpl.class); - bind(PreemptionSlotFinder.PreemptionSlotFinderImpl.class).in(Singleton.class); + bind(PreemptionVictimFilter.class) + .to(PreemptionVictimFilter.PreemptionVictimFilterImpl.class); + bind(PreemptionVictimFilter.PreemptionVictimFilterImpl.class).in(Singleton.class); bind(Preemptor.class).to(Preemptor.PreemptorImpl.class); bind(Preemptor.PreemptorImpl.class).in(Singleton.class); bind(new TypeLiteral<Amount<Long, Time>>() { }) @@ -100,7 +99,8 @@ public class PreemptorModule extends AbstractModule { .toInstance(preemptionDelay); bind(BiCacheSettings.class).toInstance( new BiCacheSettings(PREEMPTION_SLOT_HOLD_TIME.get(), "preemption_slot_cache_size")); - bind(new TypeLiteral<BiCache<PreemptionSlot, TaskGroupKey>>() { }).in(Singleton.class); + bind(new TypeLiteral<BiCache<PreemptionProposal, TaskGroupKey>>() { }) + .in(Singleton.class); bind(PendingTaskProcessor.class).in(Singleton.class); bind(ClusterState.class).to(ClusterStateImpl.class); bind(ClusterStateImpl.class).in(Singleton.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/main/java/org/apache/aurora/scheduler/base/TaskGroupKey.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskGroupKey.java b/src/main/java/org/apache/aurora/scheduler/base/TaskGroupKey.java index 6af3949..47e4d48 100644 --- a/src/main/java/org/apache/aurora/scheduler/base/TaskGroupKey.java +++ b/src/main/java/org/apache/aurora/scheduler/base/TaskGroupKey.java @@ -40,6 +40,15 @@ public final class TaskGroupKey { return new TaskGroupKey(task); } + /** + * Gets {@link ITaskConfig} the key created from. + * + * @return A task config. + */ + public ITaskConfig getTask() { + return canonicalTask; + } + @Override public int hashCode() { return Objects.hash(canonicalTask); http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java index 7bb1e7a..975920a 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java +++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java @@ -312,6 +312,7 @@ public class SchedulerIT extends BaseZooKeeperTest { .setStatus(status) .setTaskEvents(ImmutableList.of(new TaskEvent(100, status))) .setAssignedTask(new AssignedTask() + .setSlaveId("slaveId") .setTaskId(id) .setTask(new TaskConfig() .setJob(new JobKey("role-" + id, "test", "job-" + id)) http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java index 8a9a3b7..218ae0d 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java @@ -14,152 +14,283 @@ package org.apache.aurora.scheduler.async.preemptor; import java.util.Arrays; -import java.util.Set; +import javax.annotation.Nullable; + +import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; + import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; import com.twitter.common.testing.easymock.EasyMockTest; import com.twitter.common.util.testing.FakeClock; import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.JobKey; +import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.TaskEvent; -import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.async.OfferManager; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.stats.CachedCounters; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.aurora.scheduler.testing.FakeStatsProvider; -import org.easymock.IExpectationSetters; +import org.apache.mesos.Protos; +import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.PENDING_PROCESSOR_RUN_NAME; +import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.TASK_PROCESSOR_RUN_NAME; import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.attemptsStatName; import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotSearchStatName; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; public class PendingTaskProcessorTest extends EasyMockTest { - private static final ScheduledTask TASK_A = makeTask("task_a"); - private static final ScheduledTask TASK_B = makeTask("task_b"); - private static final PreemptionSlot SLOT_A = createPreemptionSlot(TASK_A); - private static final PreemptionSlot SLOT_B = createPreemptionSlot(TASK_B); - private static final TaskGroupKey GROUP_A = - TaskGroupKey.from(ITaskConfig.build(TASK_A.getAssignedTask().getTask())); - private static final TaskGroupKey GROUP_B = - TaskGroupKey.from(ITaskConfig.build(TASK_B.getAssignedTask().getTask())); - private static final String SLAVE_ID = "slave_id"; - private static final IJobKey JOB_KEY = IJobKey.build(TASK_A.getAssignedTask().getTask().getJob()); - + private static final String CACHE_STAT = "cache_size"; + private static final String SLAVE_ID_1 = "slave_id_1"; + private static final String SLAVE_ID_2 = "slave_id_2"; + private static final JobKey JOB_A = new JobKey("role_a", "env", "job_a"); + private static final JobKey JOB_B = new JobKey("role_b", "env", "job_b"); + private static final ScheduledTask TASK_A = makeTask(JOB_A, SLAVE_ID_1, "id1"); + private static final ScheduledTask TASK_B = makeTask(JOB_B, SLAVE_ID_2, "id2"); + private static final PreemptionProposal SLOT_A = createPreemptionProposal(TASK_A, SLAVE_ID_1); private static final Amount<Long, Time> PREEMPTION_DELAY = Amount.of(30L, Time.SECONDS); - - private static final Set<PreemptionSlot> NO_SLOTS = ImmutableSet.of(); + private static final Amount<Long, Time> EXPIRATION = Amount.of(10L, Time.MINUTES); private StorageTestUtil storageUtil; + private OfferManager offerManager; private FakeStatsProvider statsProvider; - private PreemptionSlotFinder preemptionSlotFinder; + private PreemptionVictimFilter preemptionVictimFilter; private PendingTaskProcessor slotFinder; - private BiCache<PreemptionSlot, TaskGroupKey> slotCache; + private BiCache<PreemptionProposal, TaskGroupKey> slotCache; + private ClusterState clusterState; private FakeClock clock; @Before public void setUp() { storageUtil = new StorageTestUtil(this); storageUtil.expectOperations(); - preemptionSlotFinder = createMock(PreemptionSlotFinder.class); - slotCache = createMock(new Clazz<BiCache<PreemptionSlot, TaskGroupKey>>() { }); + offerManager = createMock(OfferManager.class); + preemptionVictimFilter = createMock(PreemptionVictimFilter.class); statsProvider = new FakeStatsProvider(); + clusterState = createMock(ClusterState.class); clock = new FakeClock(); + slotCache = new BiCache<>( + statsProvider, + new BiCache.BiCacheSettings(EXPIRATION, CACHE_STAT), + clock); slotFinder = new PendingTaskProcessor( storageUtil.storage, - preemptionSlotFinder, + offerManager, + preemptionVictimFilter, new PreemptorMetrics(new CachedCounters(statsProvider)), PREEMPTION_DELAY, slotCache, + clusterState, clock); } @Test public void testSearchSlotSuccessful() throws Exception { - expect(slotCache.getByValue(GROUP_A)).andReturn(NO_SLOTS); - expect(slotCache.getByValue(GROUP_B)).andReturn(NO_SLOTS); expectGetPendingTasks(TASK_A, TASK_B); - expectAttributeAggegateFetchTasks(); - expectSlotSearch(TASK_A, Optional.of(SLOT_A)); - expectSlotSearch(TASK_B, Optional.of(SLOT_B)); - slotCache.put(SLOT_A, GROUP_A); - slotCache.put(SLOT_B, GROUP_B); + expectGetClusterState(TASK_A, TASK_B); + HostOffer offer1 = makeOffer(SLAVE_ID_1); + HostOffer offer2 = makeOffer(SLAVE_ID_2); + expectOffers(offer1, offer2); + expectSlotSearch(TASK_A, offer1, TASK_A); + expectSlotSearch(TASK_B, offer2, TASK_B); control.replay(); clock.advance(PREEMPTION_DELAY); slotFinder.run(); - assertEquals(1L, statsProvider.getLongValue(PENDING_PROCESSOR_RUN_NAME)); + assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); assertEquals(2L, statsProvider.getLongValue(attemptsStatName(true))); assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, true))); assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true))); + assertEquals(2L, statsProvider.getLongValue(CACHE_STAT)); } @Test public void testSearchSlotFailed() throws Exception { - expect(slotCache.getByValue(GROUP_A)).andReturn(NO_SLOTS); expectGetPendingTasks(TASK_A); - expectAttributeAggegateFetchTasks(); - expectSlotSearch(TASK_A, Optional.<PreemptionSlot>absent()); + expectGetClusterState(TASK_A); + HostOffer offer1 = makeOffer(SLAVE_ID_1); + expectOffers(offer1); + expectSlotSearch(TASK_A, offer1); control.replay(); clock.advance(PREEMPTION_DELAY); slotFinder.run(); - assertEquals(1L, statsProvider.getLongValue(PENDING_PROCESSOR_RUN_NAME)); + assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true))); assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(false, true))); } - private void expectSlotSearch(ScheduledTask task, Optional<PreemptionSlot> slot) { - expect(preemptionSlotFinder.findPreemptionSlotFor( - IAssignedTask.build(task.getAssignedTask()), - AttributeAggregate.EMPTY, - storageUtil.storeProvider)).andReturn(slot); + @Test + public void testHasCachedSlots() throws Exception { + slotCache.put(SLOT_A, group(TASK_A)); + expectGetPendingTasks(TASK_A); + expectGetClusterState(TASK_A); + HostOffer offer1 = makeOffer(SLAVE_ID_1); + expectOffers(offer1); + + control.replay(); + + clock.advance(PREEMPTION_DELAY); + + slotFinder.run(); + assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true))); + assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true))); + } + + @Test + public void testMultipleTaskGroups() throws Exception { + ScheduledTask task1 = makeTask(JOB_A, "1"); + ScheduledTask task2 = makeTask(JOB_A, "2"); + ScheduledTask task3 = makeTask(JOB_A, "3"); + ScheduledTask task4 = makeTask(JOB_B, "4"); + ScheduledTask task5 = makeTask(JOB_B, "5"); + + expectGetPendingTasks(task1, task4, task2, task5, task3); + expectGetClusterState(TASK_A, TASK_B); + + HostOffer offer1 = makeOffer(SLAVE_ID_1); + HostOffer offer2 = makeOffer(SLAVE_ID_2); + expectOffers(offer1, offer2); + expectSlotSearch(task1, offer1); + expectSlotSearch(task4, offer1, TASK_B); + expectSlotSearch(task5, offer2, TASK_B); + PreemptionProposal proposal1 = createPreemptionProposal(TASK_B, SLAVE_ID_1); + PreemptionProposal proposal2 = createPreemptionProposal(TASK_B, SLAVE_ID_2); + + control.replay(); + + clock.advance(PREEMPTION_DELAY); + + slotFinder.run(); + assertEquals(slotCache.get(proposal1), Optional.of(group(task4))); + assertEquals(slotCache.get(proposal2), Optional.of(group(task5))); + assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); + assertEquals(3L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, true))); + assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(false, true))); + assertEquals(2L, statsProvider.getLongValue(CACHE_STAT)); } - private static PreemptionSlot createPreemptionSlot(ScheduledTask task) { + @Test + public void testNoVictims() throws Exception { + expectGetClusterState(); + control.replay(); + + clock.advance(PREEMPTION_DELAY); + + slotFinder.run(); + assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true))); + assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true))); + } + + private static final Function<ScheduledTask, String> GET_SLAVE_ID = + new Function<ScheduledTask, String>() { + @Override + public String apply(ScheduledTask task) { + return task.getAssignedTask().getSlaveId(); + } + }; + + private Multimap<String, PreemptionVictim> getVictims(ScheduledTask... tasks) { + return Multimaps.transformValues( + Multimaps.index(Arrays.asList(tasks), GET_SLAVE_ID), + new Function<ScheduledTask, PreemptionVictim>() { + @Override + public PreemptionVictim apply(ScheduledTask task) { + return PreemptionVictim.fromTask(IAssignedTask.build(task.getAssignedTask())); + } + } + ); + } + + private HostOffer makeOffer(String slaveId) { + Protos.Offer.Builder builder = Protos.Offer.newBuilder(); + builder.getIdBuilder().setValue("id"); + builder.getFrameworkIdBuilder().setValue("framework-id"); + builder.getSlaveIdBuilder().setValue(slaveId); + builder.setHostname(slaveId); + return new HostOffer( + builder.build(), + IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE))); + } + + private void expectOffers(HostOffer... offers) { + expect(offerManager.getOffers()).andReturn(ImmutableSet.copyOf(offers)); + } + + private void expectGetClusterState(ScheduledTask... returnedTasks) { + expect(clusterState.getSlavesToActiveTasks()).andReturn(getVictims(returnedTasks)); + } + + private void expectSlotSearch(ScheduledTask task, HostOffer offer, ScheduledTask... victims) { + expect(preemptionVictimFilter.filterPreemptionVictims( + eq(ITaskConfig.build(task.getAssignedTask().getTask())), + EasyMock.<Iterable<PreemptionVictim>>anyObject(), + anyObject(AttributeAggregate.class), + eq(Optional.of(offer)), + eq(storageUtil.storeProvider))).andReturn( + victims.length == 0 + ? Optional.<ImmutableSet<PreemptionVictim>>absent() + : Optional.of(ImmutableSet.copyOf(getVictims(victims).values()))); + } + + private static PreemptionProposal createPreemptionProposal(ScheduledTask task, String slaveId) { IAssignedTask assigned = IAssignedTask.build(task.getAssignedTask()); - return new PreemptionSlot(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), SLAVE_ID); + return new PreemptionProposal(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), slaveId); } - private static ScheduledTask makeTask(String taskId) { + private static ScheduledTask makeTask(JobKey key, String taskId) { + return makeTask(key, null, taskId); + } + + private static TaskGroupKey group(ScheduledTask task) { + return TaskGroupKey.from(ITaskConfig.build(task.getAssignedTask().getTask())); + } + + private static ScheduledTask makeTask(JobKey key, @Nullable String slaveId, String taskId) { ScheduledTask task = new ScheduledTask() .setAssignedTask(new AssignedTask() + .setSlaveId(slaveId) .setTaskId(taskId) .setTask(new TaskConfig() .setPriority(1) .setProduction(true) - .setJob(new JobKey("role", "env", "name")))); + .setJob(key))); task.addToTaskEvents(new TaskEvent(0, PENDING)); return task; } - private IExpectationSetters<?> expectAttributeAggegateFetchTasks() { - return storageUtil.expectTaskFetch( - Query.jobScoped(JOB_KEY).byStatus(Tasks.SLAVE_ASSIGNED_STATES)); - } - private void expectGetPendingTasks(ScheduledTask... returnedTasks) { storageUtil.expectTaskFetch( Query.statusScoped(PENDING), http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilterTest.java new file mode 100644 index 0000000..67dfb82 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilterTest.java @@ -0,0 +1,514 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.async.preemptor; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; + +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Data; +import com.twitter.common.testing.easymock.EasyMockTest; + +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.Attribute; +import org.apache.aurora.gen.Constraint; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.JobKey; +import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.gen.TaskEvent; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.configuration.Resources; +import org.apache.aurora.scheduler.filter.SchedulingFilter; +import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; +import org.apache.aurora.scheduler.filter.SchedulingFilterImpl; +import org.apache.aurora.scheduler.mesos.TaskExecutors; +import org.apache.aurora.scheduler.stats.CachedCounters; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.easymock.IExpectationSetters; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.MaintenanceMode.NONE; +import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; +import static org.apache.mesos.Protos.Offer; +import static org.apache.mesos.Protos.Resource; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; + +public class PreemptionVictimFilterTest extends EasyMockTest { + private static final String USER_A = "user_a"; + private static final String USER_B = "user_b"; + private static final String USER_C = "user_c"; + private static final String JOB_A = "job_a"; + private static final String JOB_B = "job_b"; + private static final String JOB_C = "job_c"; + private static final String TASK_ID_A = "task_a"; + private static final String TASK_ID_B = "task_b"; + private static final String TASK_ID_C = "task_c"; + private static final String TASK_ID_D = "task_d"; + private static final String HOST = "host"; + private static final String RACK = "rack"; + private static final String SLAVE_ID = HOST + "_id"; + private static final String RACK_ATTRIBUTE = "rack"; + private static final String HOST_ATTRIBUTE = "host"; + private static final String OFFER = "offer"; + private static final Optional<HostOffer> NO_OFFER = Optional.absent(); + + private StorageTestUtil storageUtil; + private SchedulingFilter schedulingFilter; + private FakeStatsProvider statsProvider; + private PreemptorMetrics preemptorMetrics; + + @Before + public void setUp() { + storageUtil = new StorageTestUtil(this); + storageUtil.expectOperations(); + statsProvider = new FakeStatsProvider(); + preemptorMetrics = new PreemptorMetrics(new CachedCounters(statsProvider)); + } + + private Optional<ImmutableSet<PreemptionVictim>> runFilter( + ScheduledTask pendingTask, + Optional<HostOffer> offer, + ScheduledTask... victims) { + + PreemptionVictimFilter.PreemptionVictimFilterImpl filter = + new PreemptionVictimFilter.PreemptionVictimFilterImpl( + schedulingFilter, + TaskExecutors.NO_OVERHEAD_EXECUTOR, + preemptorMetrics); + + return filter.filterPreemptionVictims( + ITaskConfig.build(pendingTask.getAssignedTask().getTask()), + preemptionVictims(victims), + EMPTY, + offer, + storageUtil.mutableStoreProvider); + } + + @Test + public void testPreempted() throws Exception { + setUpHost(); + + schedulingFilter = createMock(SchedulingFilter.class); + ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A); + assignToHost(lowPriority); + + ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100); + + expectFiltering(); + + control.replay(); + assertVictims(runFilter(highPriority, NO_OFFER, lowPriority), lowPriority); + } + + @Test + public void testLowestPriorityPreempted() throws Exception { + setUpHost(); + + schedulingFilter = createMock(SchedulingFilter.class); + ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10); + assignToHost(lowPriority); + + ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 1); + assignToHost(lowerPriority); + + ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 100); + + expectFiltering(); + + control.replay(); + assertVictims(runFilter(highPriority, NO_OFFER, lowerPriority), lowerPriority); + } + + @Test + public void testOnePreemptableTask() throws Exception { + setUpHost(); + + schedulingFilter = createMock(SchedulingFilter.class); + ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100); + assignToHost(highPriority); + + ScheduledTask lowerPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 99); + assignToHost(lowerPriority); + + ScheduledTask lowestPriority = makeTask(USER_A, JOB_A, TASK_ID_C, 1); + assignToHost(lowestPriority); + + ScheduledTask pendingPriority = makeTask(USER_A, JOB_A, TASK_ID_D, 98); + + expectFiltering(); + + control.replay(); + assertVictims( + runFilter(pendingPriority, NO_OFFER, highPriority, lowerPriority, lowestPriority), + lowestPriority); + } + + @Test + public void testHigherPriorityRunning() throws Exception { + schedulingFilter = createMock(SchedulingFilter.class); + ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_B, 100); + assignToHost(highPriority); + + ScheduledTask task = makeTask(USER_A, JOB_A, TASK_ID_A); + + control.replay(); + assertNoVictims(runFilter(task, NO_OFFER, highPriority)); + } + + @Test + public void testProductionPreemptingNonproduction() throws Exception { + setUpHost(); + + schedulingFilter = createMock(SchedulingFilter.class); + // Use a very low priority for the production task to show that priority is irrelevant. + ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000); + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_B + "_a1", 100); + assignToHost(a1); + + expectFiltering(); + + control.replay(); + assertVictims(runFilter(p1, NO_OFFER, a1), a1); + } + + @Test + public void testProductionPreemptingNonproductionAcrossUsers() throws Exception { + setUpHost(); + + schedulingFilter = createMock(SchedulingFilter.class); + // Use a very low priority for the production task to show that priority is irrelevant. + ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000); + ScheduledTask a1 = makeTask(USER_B, JOB_A, TASK_ID_B + "_a1", 100); + assignToHost(a1); + + expectFiltering(); + + control.replay(); + assertVictims(runFilter(p1, NO_OFFER, a1), a1); + } + + @Test + public void testProductionUsersDoNotPreemptEachOther() throws Exception { + schedulingFilter = createMock(SchedulingFilter.class); + ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", 1000); + ScheduledTask a1 = makeProductionTask(USER_B, JOB_A, TASK_ID_B + "_a1", 0); + assignToHost(a1); + + control.replay(); + assertNoVictims(runFilter(p1, NO_OFFER, a1)); + } + + // Ensures a production task can preempt 2 tasks on the same host. + @Test + public void testProductionPreemptingManyNonProduction() throws Exception { + schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); + a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + + ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1"); + b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + + setUpHost(); + + assignToHost(a1); + assignToHost(b1); + + ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); + p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); + + control.replay(); + assertVictims(runFilter(p1, NO_OFFER, a1, b1), a1, b1); + } + + // Ensures we select the minimal number of tasks to preempt + @Test + public void testMinimalSetPreempted() throws Exception { + schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); + a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096); + + ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1"); + b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + + ScheduledTask b2 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b2"); + b2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + + setUpHost(); + + assignToHost(a1); + assignToHost(b1); + assignToHost(b2); + + ScheduledTask p1 = makeProductionTask(USER_C, JOB_C, TASK_ID_C + "_p1"); + p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); + + control.replay(); + assertVictims(runFilter(p1, NO_OFFER, b1, b2, a1), a1); + } + + // Ensures a production task *never* preempts a production task from another job. + @Test + public void testProductionJobNeverPreemptsProductionJob() throws Exception { + schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1"); + p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); + + setUpHost(); + + assignToHost(p1); + + ScheduledTask p2 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p2"); + p2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + + control.replay(); + assertNoVictims(runFilter(p2, NO_OFFER, p1)); + } + + // Ensures that we can preempt if a task + offer can satisfy a pending task. + @Test + public void testPreemptWithOfferAndTask() throws Exception { + schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + + setUpHost(); + + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); + a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + assignToHost(a1); + + ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); + p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); + + control.replay(); + assertVictims( + runFilter(p1, makeOffer(OFFER, 1, Amount.of(512L, Data.MB), Amount.of(1L, Data.MB), 1), a1), + a1); + } + + // Ensures we can preempt if two tasks and an offer can satisfy a pending task. + @Test + public void testPreemptWithOfferAndMultipleTasks() throws Exception { + schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); + + setUpHost(); + + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); + a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + assignToHost(a1); + + ScheduledTask a2 = makeTask(USER_A, JOB_B, TASK_ID_A + "_a2"); + a2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + assignToHost(a2); + + ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); + p1.getAssignedTask().getTask().setNumCpus(4).setRamMb(2048); + + control.replay(); + Optional<HostOffer> offer = + makeOffer(OFFER, 2, Amount.of(1024L, Data.MB), Amount.of(1L, Data.MB), 1); + assertVictims(runFilter(p1, offer, a1, a2), a1, a2); + } + + @Test + public void testNoPreemptionVictims() { + schedulingFilter = createMock(SchedulingFilter.class); + ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A); + + control.replay(); + + assertNoVictims(runFilter(task, NO_OFFER)); + } + + @Test + public void testMissingAttributes() { + schedulingFilter = createMock(SchedulingFilter.class); + ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A); + assignToHost(task); + + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); + a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + assignToHost(a1); + + expect(storageUtil.attributeStore.getHostAttributes(HOST)) + .andReturn(Optional.<IHostAttributes>absent()); + + control.replay(); + + assertNoVictims(runFilter(task, NO_OFFER, a1)); + assertEquals(1L, statsProvider.getLongValue(MISSING_ATTRIBUTES_NAME)); + } + + @Test + public void testAllVictimsVetoed() { + schedulingFilter = createMock(SchedulingFilter.class); + ScheduledTask task = makeProductionTask(USER_A, JOB_A, TASK_ID_A); + assignToHost(task); + + ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); + a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + assignToHost(a1); + + setUpHost(); + expectFiltering(Optional.of(Veto.constraintMismatch("ban"))); + + control.replay(); + + assertNoVictims(runFilter(task, NO_OFFER, a1)); + } + + private static ImmutableSet<PreemptionVictim> preemptionVictims(ScheduledTask... tasks) { + return FluentIterable.from(ImmutableSet.copyOf(tasks)) + .transform( + new Function<ScheduledTask, PreemptionVictim>() { + @Override + public PreemptionVictim apply(ScheduledTask task) { + return PreemptionVictim.fromTask(IAssignedTask.build(task.getAssignedTask())); + } + }).toSet(); + } + + private static void assertVictims( + Optional<ImmutableSet<PreemptionVictim>> actual, + ScheduledTask... expected) { + + assertEquals(Optional.of(preemptionVictims(expected)), actual); + } + + private static void assertNoVictims(Optional<ImmutableSet<PreemptionVictim>> actual) { + assertEquals(Optional.<ImmutableSet<PreemptionVictim>>absent(), actual); + } + + private Optional<HostOffer> makeOffer( + String offerId, + double cpu, + Amount<Long, Data> ram, + Amount<Long, Data> disk, + int numPorts) { + + List<Resource> resources = new Resources(cpu, ram, disk, numPorts).toResourceList(); + Offer.Builder builder = Offer.newBuilder(); + builder.getIdBuilder().setValue(offerId); + builder.getFrameworkIdBuilder().setValue("framework-id"); + builder.getSlaveIdBuilder().setValue(SLAVE_ID); + builder.setHostname(HOST); + for (Resource r: resources) { + builder.addResources(r); + } + return Optional.of(new HostOffer( + builder.build(), + IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)))); + } + + private IExpectationSetters<Set<SchedulingFilter.Veto>> expectFiltering() { + return expectFiltering(Optional.<Veto>absent()); + } + + private IExpectationSetters<Set<SchedulingFilter.Veto>> expectFiltering( + final Optional<Veto> veto) { + + return expect(schedulingFilter.filter( + EasyMock.<SchedulingFilter.UnusedResource>anyObject(), + EasyMock.<SchedulingFilter.ResourceRequest>anyObject())) + .andAnswer( + new IAnswer<Set<SchedulingFilter.Veto>>() { + @Override + public Set<SchedulingFilter.Veto> answer() { + return veto.asSet(); + } + }); + } + + static ScheduledTask makeTask( + String role, + String job, + String taskId, + int priority, + String env, + boolean production) { + + AssignedTask assignedTask = new AssignedTask() + .setTaskId(taskId) + .setTask(new TaskConfig() + .setJob(new JobKey(role, env, job)) + .setPriority(priority) + .setProduction(production) + .setJobName(job) + .setEnvironment(env) + .setConstraints(new HashSet<Constraint>())); + return new ScheduledTask().setAssignedTask(assignedTask); + } + + static ScheduledTask makeTask(String role, String job, String taskId) { + return makeTask(role, job, taskId, 0, "dev", false); + } + + static void addEvent(ScheduledTask task, ScheduleStatus status) { + task.addToTaskEvents(new TaskEvent(0, status)); + } + + private ScheduledTask makeProductionTask(String role, String job, String taskId) { + return makeTask(role, job, taskId, 0, "prod", true); + } + + private ScheduledTask makeProductionTask(String role, String job, String taskId, int priority) { + return makeTask(role, job, taskId, priority, "prod", true); + } + + private ScheduledTask makeTask(String role, String job, String taskId, int priority) { + return makeTask(role, job, taskId, priority, "dev", false); + } + + private void assignToHost(ScheduledTask task) { + task.setStatus(RUNNING); + addEvent(task, RUNNING); + task.getAssignedTask().setSlaveHost(HOST); + task.getAssignedTask().setSlaveId(SLAVE_ID); + } + + private Attribute host(String host) { + return new Attribute(HOST_ATTRIBUTE, ImmutableSet.of(host)); + } + + private Attribute rack(String rack) { + return new Attribute(RACK_ATTRIBUTE, ImmutableSet.of(rack)); + } + + // Sets up a normal host, no dedicated hosts and no maintenance. + private void setUpHost() { + IHostAttributes hostAttrs = IHostAttributes.build( + new HostAttributes().setHost(HOST).setSlaveId(HOST + "_id") + .setMode(NONE).setAttributes(ImmutableSet.of(rack(RACK), host(RACK)))); + + expect(storageUtil.attributeStore.getHostAttributes(HOST)) + .andReturn(Optional.of(hostAttrs)).anyTimes(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/8fd21a1a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java index 64283fa..32d18a9 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java @@ -20,12 +20,14 @@ import com.google.common.collect.ImmutableSet; import com.twitter.common.testing.easymock.EasyMockTest; import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.JobKey; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.TaskEvent; -import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.async.OfferManager; import org.apache.aurora.scheduler.async.preemptor.Preemptor.PreemptorImpl; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.base.Tasks; @@ -33,9 +35,11 @@ import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.stats.CachedCounters; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.apache.mesos.Protos; import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; @@ -52,39 +56,47 @@ import static org.junit.Assert.assertEquals; public class PreemptorImplTest extends EasyMockTest { private static final String SLAVE_ID = "slave_id"; private static final IScheduledTask TASK = IScheduledTask.build(makeTask()); - private static final PreemptionSlot SLOT = createPreemptionSlot(TASK); + private static final PreemptionProposal PROPOSAL = createPreemptionProposal(TASK); private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(ITaskConfig.build(makeTask().getAssignedTask().getTask())); - private static final Set<PreemptionSlot> NO_SLOTS = ImmutableSet.of(); + private static final Set<PreemptionProposal> NO_SLOTS = ImmutableSet.of(); private static final Optional<String> EMPTY_RESULT = Optional.absent(); + private static final HostOffer OFFER = + new HostOffer(Protos.Offer.getDefaultInstance(), IHostAttributes.build(new HostAttributes())); private StateManager stateManager; private FakeStatsProvider statsProvider; - private PreemptionSlotFinder preemptionSlotFinder; + private PreemptionVictimFilter preemptionVictimFilter; private PreemptorImpl preemptor; - private BiCache<PreemptionSlot, TaskGroupKey> slotCache; + private BiCache<PreemptionProposal, TaskGroupKey> slotCache; private Storage.MutableStoreProvider storeProvider; @Before public void setUp() { storeProvider = createMock(Storage.MutableStoreProvider.class); stateManager = createMock(StateManager.class); - preemptionSlotFinder = createMock(PreemptionSlotFinder.class); - slotCache = createMock(new Clazz<BiCache<PreemptionSlot, TaskGroupKey>>() { }); + preemptionVictimFilter = createMock(PreemptionVictimFilter.class); + slotCache = createMock(new Clazz<BiCache<PreemptionProposal, TaskGroupKey>>() { }); statsProvider = new FakeStatsProvider(); + OfferManager offerManager = createMock(OfferManager.class); + expect(offerManager.getOffer(anyObject(Protos.SlaveID.class))) + .andReturn(Optional.of(OFFER)) + .anyTimes(); + preemptor = new PreemptorImpl( stateManager, - preemptionSlotFinder, + offerManager, + preemptionVictimFilter, new PreemptorMetrics(new CachedCounters(statsProvider)), slotCache); } @Test public void testPreemptTasksSuccessful() throws Exception { - expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(SLOT)); - slotCache.remove(SLOT, GROUP_KEY); - expectSlotValidation(Optional.of(ImmutableSet.of( + expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL)); + slotCache.remove(PROPOSAL, GROUP_KEY); + expectSlotValidation(PROPOSAL, Optional.of(ImmutableSet.of( PreemptionVictim.fromTask(TASK.getAssignedTask())))); expectPreempted(TASK); @@ -98,9 +110,9 @@ public class PreemptorImplTest extends EasyMockTest { @Test public void testPreemptTasksValidationFailed() throws Exception { - expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(SLOT)); - slotCache.remove(SLOT, GROUP_KEY); - expectSlotValidation(Optional.<ImmutableSet<PreemptionVictim>>absent()); + expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL)); + slotCache.remove(PROPOSAL, GROUP_KEY); + expectSlotValidation(PROPOSAL, Optional.<ImmutableSet<PreemptionVictim>>absent()); control.replay(); @@ -124,11 +136,15 @@ public class PreemptorImplTest extends EasyMockTest { return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), EMPTY, storeProvider); } - private void expectSlotValidation(Optional<ImmutableSet<PreemptionVictim>> victims) { - expect(preemptionSlotFinder.validatePreemptionSlotFor( - TASK.getAssignedTask(), + private void expectSlotValidation( + PreemptionProposal slot, + Optional<ImmutableSet<PreemptionVictim>> victims) { + + expect(preemptionVictimFilter.filterPreemptionVictims( + TASK.getAssignedTask().getTask(), + slot.getVictims(), EMPTY, - SLOT, + Optional.of(OFFER), storeProvider)).andReturn(victims); } @@ -142,9 +158,9 @@ public class PreemptorImplTest extends EasyMockTest { .andReturn(true); } - private static PreemptionSlot createPreemptionSlot(IScheduledTask task) { + private static PreemptionProposal createPreemptionProposal(IScheduledTask task) { IAssignedTask assigned = task.getAssignedTask(); - return new PreemptionSlot(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), SLAVE_ID); + return new PreemptionProposal(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), SLAVE_ID); } private static ScheduledTask makeTask() {
