Repository: aurora Updated Branches: refs/heads/master 77c465aef -> 485da81ce
Migrating preemptor and scheduling filter to ResourceBag. Reviewed at https://reviews.apache.org/r/47050/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/485da81c Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/485da81c Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/485da81c Branch: refs/heads/master Commit: 485da81ce996ccdde998c3439d5a418d66f64802 Parents: 77c465a Author: Maxim Khutornenko <[email protected]> Authored: Tue May 10 14:26:33 2016 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Tue May 10 14:26:33 2016 -0700 ---------------------------------------------------------------------- .../executor/ExecutorSettings.java | 22 +----- .../scheduler/filter/SchedulingFilter.java | 8 +- .../scheduler/filter/SchedulingFilterImpl.java | 19 ++--- .../scheduler/mesos/MesosTaskFactory.java | 2 +- .../scheduler/preemptor/PreemptionVictim.java | 7 +- .../preemptor/PreemptionVictimFilter.java | 68 +++++++++++++---- .../scheduler/quota/QuotaCheckResult.java | 5 +- .../aurora/scheduler/quota/QuotaManager.java | 2 +- .../aurora/scheduler/resources/ResourceBag.java | 50 ++++++++++++ .../scheduler/resources/ResourceManager.java | 29 ++++++- .../scheduler/resources/ResourceSlot.java | 35 --------- .../apache/aurora/scheduler/sla/SlaGroup.java | 2 +- .../aurora/scheduler/state/TaskAssigner.java | 5 +- .../scheduler/stats/AsyncStatsModule.java | 5 +- .../aurora/scheduler/stats/SlotSizeCounter.java | 2 +- .../events/NotifyingSchedulingFilterTest.java | 4 +- .../filter/SchedulingFilterImplTest.java | 30 ++++---- .../mesos/MesosTaskFactoryImplTest.java | 38 ++++------ .../apache/aurora/scheduler/mesos/Offers.java | 66 ---------------- .../preemptor/PreemptionVictimFilterTest.java | 80 +++++++++++++++----- .../scheduler/resources/ResourceBagTest.java | 15 ++++ .../resources/ResourceManagerTest.java | 20 +++++ .../scheduler/resources/ResourceSlotTest.java | 21 ----- .../scheduler/resources/ResourceTestUtil.java | 9 +++ .../scheduler/state/TaskAssignerImplTest.java | 12 +-- 25 files changed, 306 insertions(+), 250 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java index 501e643..e919d3f 100644 --- a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java +++ b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java @@ -15,10 +15,8 @@ package org.apache.aurora.scheduler.configuration.executor; import java.util.Objects; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Data; -import org.apache.aurora.scheduler.resources.ResourceSlot; -import org.apache.aurora.scheduler.resources.ResourceType; +import org.apache.aurora.scheduler.resources.ResourceBag; +import org.apache.aurora.scheduler.resources.ResourceManager; import static java.util.Objects.requireNonNull; @@ -44,20 +42,8 @@ public class ExecutorSettings { return populateDiscoveryInfo; } - private double getExecutorResourceValue(ResourceType resource) { - return config.getExecutor().getResourcesList().stream() - .filter(r -> r.getName().equals(resource.getMesosName())) - .findFirst() - .map(r -> r.getScalar().getValue()) - .orElse(0D); - } - - public ResourceSlot getExecutorOverhead() { - return new ResourceSlot( - getExecutorResourceValue(ResourceType.CPUS), - Amount.of((long) getExecutorResourceValue(ResourceType.RAM_MB), Data.MB), - Amount.of((long) getExecutorResourceValue(ResourceType.DISK_MB), Data.MB), - 0); + public ResourceBag getExecutorOverhead() { + return ResourceManager.bagFromMesosResources(config.getExecutor().getResourcesList()); } @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java index 1ee2cfa..c324f59 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java @@ -18,7 +18,7 @@ import java.util.Set; import com.google.common.base.MoreObjects; -import org.apache.aurora.scheduler.resources.ResourceSlot; +import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.storage.entities.IConstraint; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; @@ -246,15 +246,15 @@ public interface SchedulingFilter { * An available resource in the cluster. */ class UnusedResource { - private final ResourceSlot offer; + private final ResourceBag offer; private final IHostAttributes attributes; - public UnusedResource(ResourceSlot offer, IHostAttributes attributes) { + public UnusedResource(ResourceBag offer, IHostAttributes attributes) { this.offer = offer; this.attributes = attributes; } - public ResourceSlot getResourceSlot() { + public ResourceBag getResourceSlot() { return offer; } http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java index 6b5b12b..1daf296 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java @@ -25,12 +25,12 @@ import com.google.common.collect.Ordering; import com.google.inject.Inject; import org.apache.aurora.common.inject.TimedInterceptor.Timed; -import org.apache.aurora.common.quantity.Data; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.TaskConstraint; import org.apache.aurora.scheduler.configuration.ConfigurationManager; import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; -import org.apache.aurora.scheduler.resources.ResourceSlot; +import org.apache.aurora.scheduler.resources.ResourceBag; +import org.apache.aurora.scheduler.resources.ResourceManager; import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.storage.entities.IAttribute; import org.apache.aurora.scheduler.storage.entities.IConstraint; @@ -41,10 +41,6 @@ import static java.util.Objects.requireNonNull; import static org.apache.aurora.gen.MaintenanceMode.DRAINED; import static org.apache.aurora.gen.MaintenanceMode.DRAINING; import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE; -import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; -import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; -import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; -import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; /** * Implementation of the scheduling filter that ensures resource requirements of tasks are @@ -74,12 +70,10 @@ public class SchedulingFilterImpl implements SchedulingFilter { } } - private static Set<Veto> getResourceVetoes(ResourceSlot available, ResourceSlot required) { + private static Set<Veto> getResourceVetoes(ResourceBag available, ResourceBag required) { ImmutableSet.Builder<Veto> vetoes = ImmutableSet.builder(); - maybeAddVeto(vetoes, CPUS, available.getNumCpus(), required.getNumCpus()); - maybeAddVeto(vetoes, RAM_MB, available.getRam().as(Data.MB), required.getRam().as(Data.MB)); - maybeAddVeto(vetoes, DISK_MB, available.getDisk().as(Data.MB), required.getDisk().as(Data.MB)); - maybeAddVeto(vetoes, PORTS, available.getNumPorts(), required.getNumPorts()); + required.streamResourceVectors().forEach( + e -> maybeAddVeto(vetoes, e.getKey(), available.valueOf(e.getKey()), e.getValue())); return vetoes.build(); } @@ -167,6 +161,7 @@ public class SchedulingFilterImpl implements SchedulingFilter { // 4. Resource check (lowest score). return getResourceVetoes( resource.getResourceSlot(), - ResourceSlot.from(request.getTask()).add(executorSettings.getExecutorOverhead())); + ResourceManager.bagFromResources(request.getTask().getResources()) + .add(executorSettings.getExecutorOverhead())); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java index eb516b3..ef1b5bc 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java @@ -149,7 +149,7 @@ public interface MesosTaskFactory { acceptedOffer = AcceptedOffer.create( offer, ResourceSlot.from(config), - executorSettings.getExecutorOverhead(), + executorSettings.getExecutorOverhead().toSlot(), ImmutableSet.copyOf(task.getAssignedPorts().values()), tierManager.getTier(task.getTask())); } catch (Resources.InsufficientResourcesException e) { http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java index 98be997..69b6866 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java @@ -17,7 +17,8 @@ import java.util.Objects; import com.google.common.base.MoreObjects; -import org.apache.aurora.scheduler.resources.ResourceSlot; +import org.apache.aurora.scheduler.resources.ResourceBag; +import org.apache.aurora.scheduler.resources.ResourceManager; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; @@ -53,8 +54,8 @@ public final class PreemptionVictim { return task.getTask().getPriority(); } - public ResourceSlot getResourceSlot() { - return ResourceSlot.from(task.getTask()); + public ResourceBag getResourceBag() { + return ResourceManager.bagFromResources(task.getTask().getResources()); } public String getTaskId() { http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java index 032ab2d..53b6aee 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java @@ -18,10 +18,12 @@ import java.util.Set; import javax.inject.Inject; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; @@ -35,16 +37,17 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; -import org.apache.aurora.scheduler.resources.ResourceManager; -import org.apache.aurora.scheduler.resources.ResourceSlot; -import org.apache.aurora.scheduler.resources.Resources; +import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import static java.util.Objects.requireNonNull; -import static org.apache.aurora.scheduler.resources.ResourceSlot.sum; +import static org.apache.aurora.scheduler.resources.ResourceBag.EMPTY; +import static org.apache.aurora.scheduler.resources.ResourceBag.IS_MESOS_REVOCABLE; +import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources; +import static org.apache.aurora.scheduler.resources.ResourceManager.getNonRevocableOfferResources; /** * Filters active tasks (victims) and available offer (slack) resources that can accommodate a @@ -98,33 +101,63 @@ public interface PreemptionVictimFilter { this.tierManager = requireNonNull(tierManager); } - private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT = - offer -> Resources.from(offer.getOffer()).filter(ResourceManager.NON_REVOCABLE).slot(); - private static final Function<HostOffer, String> OFFER_TO_HOST = offer -> offer.getOffer().getHostname(); private static final Function<PreemptionVictim, String> VICTIM_TO_HOST = PreemptionVictim::getSlaveHost; - private final Function<PreemptionVictim, ResourceSlot> victimToResources = - new Function<PreemptionVictim, ResourceSlot>() { + private final Function<PreemptionVictim, ResourceBag> victimToResources = + new Function<PreemptionVictim, ResourceBag>() { @Override - public ResourceSlot apply(PreemptionVictim victim) { - ResourceSlot slot = victim.getResourceSlot(); + public ResourceBag apply(PreemptionVictim victim) { + ResourceBag bag = victim.getResourceBag(); if (tierManager.getTier(victim.getConfig()).isRevocable()) { // Revocable task CPU cannot be used for preemption purposes as it's a compressible // resource. We can still use RAM, DISK and PORTS as they are not compressible. - slot = new ResourceSlot(0.0, slot.getRam(), slot.getDisk(), slot.getNumPorts()); + bag = bag.filter(IS_MESOS_REVOCABLE.negate()); } - return slot.add(executorSettings.getExecutorOverhead()); + return bag.add(executorSettings.getExecutorOverhead()); } }; + private static final java.util.function.Predicate<Integer> IS_ZERO = e -> e == 0; + + /** + * A Resources object is greater than another iff _all_ of its resource components are greater. + * A Resources object compares as equal if some but not all components are greater + * than or equal to the other. + */ + @VisibleForTesting + static final Ordering<ResourceBag> ORDER = new Ordering<ResourceBag>() { + @Override + public int compare(ResourceBag left, ResourceBag right) { + ImmutableList.Builder<Integer> builder = ImmutableList.builder(); + left.streamResourceVectors().forEach( + entry -> builder.add(entry.getValue().compareTo(right.valueOf(entry.getKey())))); + + List<Integer> results = builder.build(); + + if (results.stream().allMatch(IS_ZERO)) { + return 0; + } + + if (results.stream().filter(IS_ZERO.negate()).allMatch(e -> e > 0)) { + return 1; + } + + if (results.stream().filter(IS_ZERO.negate()).allMatch(e -> e < 0)) { + return -1; + } + + return 0; + } + }; + // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector // ordering private final Ordering<PreemptionVictim> resourceOrder = - ResourceSlot.ORDER.onResultOf(victimToResources).reverse(); + ORDER.onResultOf(victimToResources).reverse(); @Override public Optional<ImmutableSet<PreemptionVictim>> filterPreemptionVictims( @@ -140,7 +173,10 @@ public interface PreemptionVictimFilter { .addAll(Iterables.transform(possibleVictims, VICTIM_TO_HOST)) .addAll(Iterables.transform(offer.asSet(), OFFER_TO_HOST)).build(); - ResourceSlot slackResources = sum(Iterables.transform(offer.asSet(), OFFER_TO_RESOURCE_SLOT)); + ResourceBag slackResources = offer.asSet().stream() + .map(o -> bagFromMesosResources(getNonRevocableOfferResources(o.getOffer()))) + .reduce((l, r) -> l.add(r)) + .orElse(EMPTY); FluentIterable<PreemptionVictim> preemptableTasks = FluentIterable.from(possibleVictims) .filter(preemptionFilter(pendingTask)); @@ -160,7 +196,7 @@ public interface PreemptionVictimFilter { return Optional.absent(); } - ResourceSlot totalResource = slackResources; + ResourceBag totalResource = slackResources; for (PreemptionVictim victim : sortedVictims) { toPreemptTasks.add(victim); totalResource = totalResource.add(victimToResources.apply(victim)); http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java index 99f034f..d3ca2df 100644 --- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java +++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java @@ -77,9 +77,8 @@ public class QuotaCheckResult { static QuotaCheckResult greaterOrEqual(ResourceBag a, ResourceBag b) { StringBuilder details = new StringBuilder(); ResourceBag difference = a.subtract(b); - difference.getResourceVectors().entrySet().stream() - .filter(IS_NEGATIVE) - .forEach(entry -> addMessage(entry.getKey(), Math.abs(entry.getValue()), details)); + difference.filter(IS_NEGATIVE).streamResourceVectors().forEach( + entry -> addMessage(entry.getKey(), Math.abs(entry.getValue()), details)); return new QuotaCheckResult( details.length() > 0 ? Result.INSUFFICIENT_QUOTA : Result.SUFFICIENT_QUOTA, http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java index 6d0d120..612525c 100644 --- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java +++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java @@ -174,7 +174,7 @@ public interface QuotaManager { QuotaInfo info = getQuotaInfo(ownerRole, Optional.absent(), storeProvider); ResourceBag prodConsumption = info.getProdSharedConsumption(); ResourceBag overage = bagFromAggregate(quota).subtract(prodConsumption); - if (overage.getResourceVectors().entrySet().stream().anyMatch(IS_NEGATIVE)) { + if (!overage.filter(IS_NEGATIVE).getResourceVectors().isEmpty()) { throw new QuotaException(String.format( "Quota: %s is less then current prod reservation: %s", quota.toString(), http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java index 7916ec0..390654b 100644 --- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java +++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java @@ -17,14 +17,19 @@ import java.util.Map; import java.util.Objects; import java.util.function.BinaryOperator; import java.util.function.Predicate; +import java.util.stream.Stream; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableMap; +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Data; + import static java.util.stream.Collectors.toMap; import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; +import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; /** @@ -91,6 +96,25 @@ public class ResourceBag { } /** + * Convenience function to return a stream of resource vectors. + * + * @return A stream of resource vectors. + */ + public Stream<Map.Entry<ResourceType, Double>> streamResourceVectors() { + return resourceVectors.entrySet().stream(); + } + + /** + * Gets the value of resource specified by {@code type} or 0.0. + * + * @param type Resource type to get value for. + * @return Resource value or 0.0 if no mapping for {@code type} is found. + */ + public Double valueOf(ResourceType type) { + return resourceVectors.getOrDefault(type, 0.0); + } + + /** * Adds this and other bag contents. * * @param other Other bag to add. @@ -141,6 +165,32 @@ public class ResourceBag { .collect(toMap(Map.Entry::getKey, v -> v.getValue() * m))); } + /** + * Filters bag resources by {@code predicate}. + * + * @param predicate Predicate to filter by. + * @return A new bag with resources filtered by {@code predicate}. + */ + public ResourceBag filter(Predicate<Map.Entry<ResourceType, Double>> predicate) { + return new ResourceBag(resourceVectors.entrySet().stream() + .filter(predicate) + .collect(toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + + /** + * Temporary bridge between bag and slot to facilitate migration. + * + * TODO(maxim): remove together with ResourceSlot. + * @return ResourceSlot. + */ + public ResourceSlot toSlot() { + return new ResourceSlot( + valueOf(CPUS), + Amount.of(valueOf(RAM_MB).longValue(), Data.MB), + Amount.of(valueOf(DISK_MB).longValue(), Data.MB), + valueOf(PORTS).intValue()); + } + private ResourceBag binaryOp(ResourceBag other, BinaryOperator<Double> operator) { ImmutableMap.Builder<ResourceType, Double> builder = ImmutableMap.builder(); for (Map.Entry<ResourceType, Double> entry : resourceVectors.entrySet()) { http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java index 69087e6..3b38469 100644 --- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java +++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java @@ -25,6 +25,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import org.apache.aurora.gen.ResourceAggregate; +import org.apache.aurora.scheduler.TierInfo; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IResource; import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; @@ -100,6 +101,19 @@ public final class ResourceManager { } /** + * Gets offer resources filtered by the provided {@code tierInfo} instance. + * + * @param offer Offer to get resources from. + * @param tierInfo Tier info. + * @return Offer resources filtered by {@code tierInfo}. + */ + public static Iterable<Resource> getOfferResources(Offer offer, TierInfo tierInfo) { + return tierInfo.isRevocable() + ? getRevocableOfferResources(offer) + : getNonRevocableOfferResources(offer); + } + + /** * Same as {@link #getTaskResources(ITaskConfig, ResourceType)}. * * @param task Scheduled task to get resources from. @@ -163,6 +177,19 @@ public final class ResourceManager { } /** + * Gets the quantity of resource specified by {@code type}. + * + * @param resources Resources. + * @param type Type of resource to quantify. + * @return Aggregate resource value. + */ + public static Double quantityOf(Iterable<IResource> resources, ResourceType type) { + return quantityOf(StreamSupport.stream(resources.spliterator(), false) + .filter(r -> fromResource(r).equals(type)) + .collect(Collectors.toList())); + } + + /** * Gets the quantity of resources. Caller to ensure all resources are of the same type. * * @param resources Resources to sum up. @@ -214,7 +241,7 @@ public final class ResourceManager { */ public static IResourceAggregate aggregateFromBag(ResourceBag bag) { return ThriftBackfill.backfillResourceAggregate(new ResourceAggregate() - .setResources(bag.getResourceVectors().entrySet().stream() + .setResources(bag.streamResourceVectors() .map(e -> IResource.newBuilder( e.getKey().getValue(), e.getKey().getAuroraResourceConverter().valueOf(e.getValue()))) http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java index 13922bc..dea7943 100644 --- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java +++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java @@ -18,13 +18,11 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Consumer; -import java.util.function.Predicate; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.collect.Ordering; import com.google.common.collect.Range; import org.apache.aurora.common.quantity.Amount; @@ -285,37 +283,4 @@ public final class ResourceSlot { Amount.of(getDisk().as(BYTES) - other.getDisk().as(BYTES), BYTES), getNumPorts() - other.getNumPorts()); } - - /** - * A Resources object is greater than another iff _all_ of its resource components are greater - * or equal. A Resources object compares as equal if some but not all components are greater than - * or equal to the other. - */ - public static final Ordering<ResourceSlot> ORDER = new Ordering<ResourceSlot>() { - @Override - public int compare(ResourceSlot left, ResourceSlot right) { - int diskC = left.getDisk().compareTo(right.getDisk()); - int ramC = left.getRam().compareTo(right.getRam()); - int portC = Integer.compare(left.getNumPorts(), right.getNumPorts()); - int cpuC = Double.compare(left.getNumCpus(), right.getNumCpus()); - - List<Integer> vector = ImmutableList.of(diskC, ramC, portC, cpuC); - - if (vector.stream().allMatch(IS_ZERO)) { - return 0; - } - - if (vector.stream().filter(IS_ZERO.negate()).allMatch(e -> e > 0)) { - return 1; - } - - if (vector.stream().filter(IS_ZERO.negate()).allMatch(e -> e < 0)) { - return -1; - } - - return 0; - } - }; - - private static final Predicate<Integer> IS_ZERO = e -> e == 0; } http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java index 21121bc..6fbd4e9 100644 --- a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java +++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java @@ -101,7 +101,7 @@ interface SlaGroup { // TODO(maxim): Refactor SLA management to build groups dynamically from // all available ResourceType values. private static Double fromBag(ResourceBag bag, ResourceType type) { - return bag.getResourceVectors().get(type); + return bag.valueOf(type); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java index b6e43d7..7c8079f 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java +++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java @@ -37,7 +37,6 @@ import org.apache.aurora.scheduler.mesos.MesosTaskFactory; import org.apache.aurora.scheduler.offers.OfferManager; import org.apache.aurora.scheduler.resources.ResourceManager; import org.apache.aurora.scheduler.resources.ResourceType; -import org.apache.aurora.scheduler.resources.Resources; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.mesos.Protos.TaskInfo; import org.slf4j.Logger; @@ -47,6 +46,8 @@ import static java.util.Objects.requireNonNull; import static org.apache.aurora.gen.ScheduleStatus.LOST; import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources; +import static org.apache.aurora.scheduler.resources.ResourceManager.getOfferResources; import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import static org.apache.mesos.Protos.Offer; @@ -152,7 +153,7 @@ public interface TaskAssigner { TierInfo tierInfo = tierManager.getTier(groupKey.getTask()); Set<Veto> vetoes = filter.filter( new UnusedResource( - Resources.from(offer.getOffer()).filter(tierInfo).slot(), + bagFromMesosResources(getOfferResources(offer.getOffer(), tierInfo)), offer.getAttributes()), resourceRequest); http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java index 1d1415a..40451e9 100644 --- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java +++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java @@ -158,10 +158,7 @@ public class AsyncStatsModule extends AbstractModule { // It's insufficient to compare revocable against EMPTY here as RAM, DISK and PORTS // are always rolled in to revocable as non-compressible resources. Only if revocable // CPU is non-zero should we expose the revocable resources as aggregates. - if (revocable.getResourceVectors().entrySet().stream() - .filter(IS_POSITIVE.and(IS_MESOS_REVOCABLE)) - .findFirst() - .isPresent()) { + if (!revocable.filter(IS_POSITIVE.and(IS_MESOS_REVOCABLE)).getResourceVectors().isEmpty()) { builder.add(new MachineResource(revocable, isDedicated, true)); } http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java index a3ca9a9..d0cc72f 100644 --- a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java +++ b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java @@ -133,7 +133,7 @@ class SlotSizeCounter implements Runnable { private int countSlots(Iterable<ResourceBag> slots, final ResourceBag slotSize) { Function<ResourceBag, Integer> counter = machineSlack -> Ordering.natural().min( - machineSlack.divide(slotSize).getResourceVectors().entrySet().stream() + machineSlack.divide(slotSize).streamResourceVectors() .map(entry -> entry.getValue()) .collect(Collectors.toSet())) .intValue(); http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java index 1474fa9..29d4d64 100644 --- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java +++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java @@ -28,7 +28,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; -import org.apache.aurora.scheduler.resources.ResourceSlot; +import org.apache.aurora.scheduler.resources.ResourceManager; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.junit.Before; @@ -45,7 +45,7 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest { .setDiskMb(1024)); private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK); private static final UnusedResource RESOURCE = new UnusedResource( - ResourceSlot.from(TASK), + ResourceManager.bagFromResources(TASK.getResources()), IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE))); private static final ResourceRequest REQUEST = new ResourceRequest(TASK, AttributeAggregate.EMPTY); http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java index 94a885f..80126e2 100644 --- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; -import org.apache.aurora.common.collections.Pair; import org.apache.aurora.common.testing.easymock.EasyMockTest; import org.apache.aurora.gen.Attribute; import org.apache.aurora.gen.Constraint; @@ -38,11 +37,9 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup; import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType; -import org.apache.aurora.scheduler.mesos.Offers; import org.apache.aurora.scheduler.mesos.TaskExecutors; -import org.apache.aurora.scheduler.resources.ResourceSlot; +import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.resources.ResourceType; -import org.apache.aurora.scheduler.resources.Resources; import org.apache.aurora.scheduler.storage.entities.IAttribute; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IJobKey; @@ -50,8 +47,14 @@ import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.junit.Before; import org.junit.Test; +import static org.apache.aurora.gen.Resource.diskMb; +import static org.apache.aurora.gen.Resource.numCpus; +import static org.apache.aurora.gen.Resource.ramMb; import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE; import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; +import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.resetPorts; import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; @@ -76,8 +79,11 @@ public class SchedulingFilterImplTest extends EasyMockTest { private static final int DEFAULT_CPUS = 4; private static final long DEFAULT_RAM = 1000; private static final long DEFAULT_DISK = 2000; - private static final ResourceSlot DEFAULT_OFFER = Resources.from( - Offers.createOffer(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK, Pair.of(80, 80))).slot(); + private static final ResourceBag DEFAULT_OFFER = bagFromMesosResources(ImmutableSet.of( + mesosScalar(CPUS, DEFAULT_CPUS), + mesosScalar(RAM_MB, DEFAULT_RAM), + mesosScalar(DISK_MB, DEFAULT_DISK), + mesosRange(PORTS, 80, 81))); private SchedulingFilter defaultFilter; @@ -101,9 +107,6 @@ public class SchedulingFilterImplTest extends EasyMockTest { public void testSufficientPorts() { control.replay(); - ResourceSlot twoPorts = Resources.from( - Offers.createOffer(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK, Pair.of(80, 81))).slot(); - ITaskConfig noPortTask = resetPorts( makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK), ImmutableSet.of()); @@ -122,22 +125,22 @@ public class SchedulingFilterImplTest extends EasyMockTest { assertEquals( none, defaultFilter.filter( - new UnusedResource(twoPorts, hostA), + new UnusedResource(DEFAULT_OFFER, hostA), new ResourceRequest(noPortTask, EMPTY))); assertEquals( none, defaultFilter.filter( - new UnusedResource(twoPorts, hostA), + new UnusedResource(DEFAULT_OFFER, hostA), new ResourceRequest(onePortTask, EMPTY))); assertEquals( none, defaultFilter.filter( - new UnusedResource(twoPorts, hostA), + new UnusedResource(DEFAULT_OFFER, hostA), new ResourceRequest(twoPortTask, EMPTY))); assertEquals( ImmutableSet.of(veto(PORTS, 1)), defaultFilter.filter( - new UnusedResource(twoPorts, hostA), + new UnusedResource(DEFAULT_OFFER, hostA), new ResourceRequest(threePortTask, EMPTY))); } @@ -617,6 +620,7 @@ public class SchedulingFilterImplTest extends EasyMockTest { .setNumCpus(cpus) .setRamMb(ramMb) .setDiskMb(diskMb) + .setResources(ImmutableSet.of(numCpus(cpus), ramMb(ramMb), diskMb(diskMb))) .setExecutorConfig(new ExecutorConfig("aurora", "config"))); } http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java index fea95f1..cf4d350 100644 --- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java @@ -37,9 +37,8 @@ import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig; import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl; -import org.apache.aurora.scheduler.resources.ResourceSlot; +import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.resources.ResourceType; -import org.apache.aurora.scheduler.resources.Resources; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IServerInfo; @@ -69,6 +68,8 @@ import static org.apache.aurora.scheduler.mesos.TaskExecutors.NO_OVERHEAD_EXECUT import static org.apache.aurora.scheduler.mesos.TaskExecutors.SOME_OVERHEAD_EXECUTOR; import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_CONFIG; import static org.apache.aurora.scheduler.mesos.TestExecutorSettings.THERMOS_EXECUTOR; +import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources; +import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources; import static org.apache.aurora.scheduler.resources.ResourceSlot.makeMesosRangeResource; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.resetPorts; import static org.easymock.EasyMock.expect; @@ -116,15 +117,17 @@ public class MesosTaskFactoryImplTest extends EasyMockTest { .setSlaveId(SLAVE) .setHostname("slave-hostname") .addAllResources( - ResourceSlot.from(TASK_CONFIG).add(THERMOS_EXECUTOR.getExecutorOverhead()) + bagFromResources(TASK_CONFIG.getResources()).add(THERMOS_EXECUTOR.getExecutorOverhead()) + .toSlot() .toResourceList(DEV_TIER)) .addResources(makeMesosRangeResource(ResourceType.PORTS, ImmutableSet.of(80))) .build(); private static final Offer OFFER_SOME_OVERHEAD_EXECUTOR = OFFER_THERMOS_EXECUTOR.toBuilder() .clearResources() - .addAllResources( - ResourceSlot.from(TASK_CONFIG).add(SOME_OVERHEAD_EXECUTOR.getExecutorOverhead()) - .toResourceList(DEV_TIER)) + .addAllResources(bagFromResources(TASK_CONFIG.getResources()) + .add(SOME_OVERHEAD_EXECUTOR.getExecutorOverhead()) + .toSlot() + .toResourceList(DEV_TIER)) .addResources(makeMesosRangeResource(ResourceType.PORTS, ImmutableSet.of(80))) .build(); @@ -239,9 +242,13 @@ public class MesosTaskFactoryImplTest extends EasyMockTest { } private void checkTaskResources(ITaskConfig task, TaskInfo taskInfo) { + ResourceBag taskResources = bagFromMesosResources(taskInfo.getResourcesList()); + ResourceBag executorResources = + bagFromMesosResources(taskInfo.getExecutor().getResourcesList()); + assertEquals( - ResourceSlot.from(task).add(config.getExecutorOverhead()), - getTotalTaskResources(taskInfo)); + bagFromResources(task.getResources()).add(config.getExecutorOverhead()), + taskResources.add(executorResources)); } private void checkDiscoveryInfoUnset(TaskInfo taskInfo) { @@ -452,19 +459,4 @@ public class MesosTaskFactoryImplTest extends EasyMockTest { .build(), task.getExecutor().getContainer()); } - - private static ResourceSlot getTotalTaskResources(TaskInfo task) { - Resources taskResources = fromResourceList(task.getResourcesList()); - Resources executorResources = fromResourceList(task.getExecutor().getResourcesList()); - return taskResources.slot().add(executorResources.slot()); - } - - private static Resources fromResourceList(Iterable<Resource> resources) { - return Resources.from(Protos.Offer.newBuilder() - .setId(Protos.OfferID.newBuilder().setValue("ignored")) - .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("ignored")) - .setSlaveId(SlaveID.newBuilder().setValue("ignored")) - .setHostname("ignored") - .addAllResources(resources).build()); - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/test/java/org/apache/aurora/scheduler/mesos/Offers.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/Offers.java b/src/test/java/org/apache/aurora/scheduler/mesos/Offers.java deleted file mode 100644 index b266554..0000000 --- a/src/test/java/org/apache/aurora/scheduler/mesos/Offers.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.mesos; - -import org.apache.aurora.common.collections.Pair; -import org.apache.mesos.Protos; -import org.apache.mesos.Protos.Offer; -import org.apache.mesos.Protos.OfferID; -import org.apache.mesos.Protos.Resource; -import org.apache.mesos.Protos.SlaveID; -import org.apache.mesos.Protos.Value.Range; -import org.apache.mesos.Protos.Value.Ranges; -import org.apache.mesos.Protos.Value.Scalar; -import org.apache.mesos.Protos.Value.Type; - -import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; -import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; -import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; -import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; - -public final class Offers { - private Offers() { - // Utility class. - } - - public static Offer createOffer( - double cpu, - double ramMb, - double diskMb, - Pair<Integer, Integer> portRange) { - - Ranges portRanges = Ranges.newBuilder() - .addRange(Range - .newBuilder().setBegin(portRange.getFirst()).setEnd(portRange.getSecond()).build()) - .build(); - - return Offer.newBuilder() - .addResources(Resource.newBuilder().setType(Type.SCALAR).setName(CPUS.getMesosName()) - .setScalar(Scalar.newBuilder().setValue(cpu))) - .addResources(Resource.newBuilder().setType(Type.SCALAR).setName(RAM_MB.getMesosName()) - .setScalar(Scalar.newBuilder().setValue(ramMb))) - .addResources(Resource.newBuilder().setType(Type.SCALAR).setName(DISK_MB.getMesosName()) - .setScalar(Scalar.newBuilder().setValue(diskMb))) - .addResources(Resource.newBuilder().setType(Type.RANGES).setName(PORTS.getMesosName()) - .setRanges(portRanges)) - .addAttributes(Protos.Attribute.newBuilder().setType(Type.TEXT) - .setName("host") - .setText(Protos.Value.Text.newBuilder().setValue("slavehost"))) - .setSlaveId(SlaveID.newBuilder().setValue("SlaveId").build()) - .setHostname("slavehost") - .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id").build()) - .setId(OfferID.newBuilder().setValue("OfferId").build()) - .build(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java index 4efa696..99a980a 100644 --- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java @@ -40,7 +40,10 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import org.apache.aurora.scheduler.filter.SchedulingFilterImpl; import org.apache.aurora.scheduler.mesos.TaskExecutors; +import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.resources.ResourceSlot; +import org.apache.aurora.scheduler.resources.ResourceTestUtil; +import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.stats.CachedCounters; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; @@ -59,8 +62,11 @@ import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER; import static org.apache.aurora.scheduler.base.TaskTestUtil.PREFERRED_TIER; import static org.apache.aurora.scheduler.base.TaskTestUtil.REVOCABLE_TIER; import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; +import static org.apache.aurora.scheduler.preemptor.PreemptionVictimFilter.PreemptionVictimFilterImpl.ORDER; import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.bag; import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; +import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; import static org.apache.mesos.Protos.Offer; import static org.apache.mesos.Protos.Resource; import static org.easymock.EasyMock.expect; @@ -255,11 +261,13 @@ public class PreemptionVictimFilterTest extends EasyMockTest { public void testProductionPreemptingManyNonProduction() throws Exception { schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); - a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + setResource(a1, CPUS, 1.0); + setResource(a1, RAM_MB, 512.0); expectGetTier(a1, DEV_TIER).atLeastOnce(); ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1"); - b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + setResource(b1, CPUS, 1.0); + setResource(b1, RAM_MB, 512.0); expectGetTier(b1, DEV_TIER).atLeastOnce(); setUpHost(); @@ -268,7 +276,8 @@ public class PreemptionVictimFilterTest extends EasyMockTest { assignToHost(b1); ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); - p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); + setResource(p1, CPUS, 2.0); + setResource(p1, RAM_MB, 1024.0); expectGetTier(p1, PREFERRED_TIER).times(2); control.replay(); @@ -280,15 +289,19 @@ public class PreemptionVictimFilterTest extends EasyMockTest { public void testMinimalSetPreempted() throws Exception { schedulingFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); - a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096); + setResource(a1, CPUS, 4.0); + setResource(a1, RAM_MB, 4096.0); expectGetTier(a1, DEV_TIER).atLeastOnce(); ScheduledTask b1 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b1"); b1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + setResource(b1, CPUS, 1.0); + setResource(b1, RAM_MB, 512.0); expectGetTier(b1, DEV_TIER).anyTimes(); ScheduledTask b2 = makeTask(USER_B, JOB_B, TASK_ID_B + "_b2"); - b2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + setResource(b2, CPUS, 1.0); + setResource(b2, RAM_MB, 512.0); expectGetTier(b2, DEV_TIER).anyTimes(); setUpHost(); @@ -298,7 +311,8 @@ public class PreemptionVictimFilterTest extends EasyMockTest { assignToHost(b2); ScheduledTask p1 = makeProductionTask(USER_C, JOB_C, TASK_ID_C + "_p1"); - p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); + setResource(p1, CPUS, 2.0); + setResource(p1, RAM_MB, 1024.0); expectGetTier(p1, PREFERRED_TIER).times(3); control.replay(); @@ -358,12 +372,14 @@ public class PreemptionVictimFilterTest extends EasyMockTest { setUpHost(); ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); - a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + setResource(a1, CPUS, 1.0); + setResource(a1, RAM_MB, 512.0); assignToHost(a1); expectGetTier(a1, DEV_TIER).times(2); ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); - p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); + setResource(p1, CPUS, 2.0); + setResource(p1, RAM_MB, 1024.0); expectGetTier(p1, PREFERRED_TIER); control.replay(); @@ -381,12 +397,14 @@ public class PreemptionVictimFilterTest extends EasyMockTest { setUpHost(); ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); - a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + setResource(a1, CPUS, 1.0); + setResource(a1, RAM_MB, 512.0); assignToHost(a1); expectGetTier(a1, REVOCABLE_TIER).times(2); ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); - p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); + setResource(p1, CPUS, 2.0); + setResource(p1, RAM_MB, 1024.0); expectGetTier(p1, PREFERRED_TIER); control.replay(); @@ -404,12 +422,14 @@ public class PreemptionVictimFilterTest extends EasyMockTest { setUpHost(); ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); - a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + setResource(a1, CPUS, 1.0); + setResource(a1, RAM_MB, 512.0); assignToHost(a1); expectGetTier(a1, REVOCABLE_TIER).times(2); ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); - p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024); + setResource(p1, CPUS, 2.0); + setResource(p1, RAM_MB, 1024.0); expectGetTier(p1, PREFERRED_TIER); control.replay(); @@ -429,17 +449,20 @@ public class PreemptionVictimFilterTest extends EasyMockTest { setUpHost(); ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); - a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + setResource(a1, CPUS, 1.0); + setResource(a1, RAM_MB, 512.0); assignToHost(a1); expectGetTier(a1, DEV_TIER).atLeastOnce(); ScheduledTask a2 = makeTask(USER_A, JOB_B, TASK_ID_A + "_a2"); - a2.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + setResource(a2, CPUS, 1.0); + setResource(a2, RAM_MB, 512.0); assignToHost(a2); expectGetTier(a2, DEV_TIER).atLeastOnce(); ScheduledTask p1 = makeProductionTask(USER_B, JOB_B, TASK_ID_B + "_p1"); - p1.getAssignedTask().getTask().setNumCpus(4).setRamMb(2048); + setResource(p1, CPUS, 4.0); + setResource(p1, RAM_MB, 2048.0); expectGetTier(p1, PREFERRED_TIER).times(2); control.replay(); @@ -466,7 +489,8 @@ public class PreemptionVictimFilterTest extends EasyMockTest { expectGetTier(task, PREFERRED_TIER); ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); - a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + setResource(a1, CPUS, 1.0); + setResource(a1, RAM_MB, 512.0); assignToHost(a1); expectGetTier(a1, DEV_TIER); @@ -486,7 +510,8 @@ public class PreemptionVictimFilterTest extends EasyMockTest { expectGetTier(task, PREFERRED_TIER); ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1"); - a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512); + setResource(a1, CPUS, 1.0); + setResource(a1, RAM_MB, 512.0); assignToHost(a1); expectGetTier(a1, DEV_TIER).times(2); @@ -498,6 +523,18 @@ public class PreemptionVictimFilterTest extends EasyMockTest { assertNoVictims(runFilter(task, NO_OFFER, a1)); } + @Test + public void testOrder() { + control.replay(); + + ResourceBag one = bag(1, 1, 1); + ResourceBag two = bag(2, 2, 2); + ResourceBag three = bag(3, 3, 3); + assertEquals( + ImmutableList.of(one, two, three, three), + ORDER.sortedCopy(ImmutableList.of(three, one, two, three))); + } + private static ImmutableSet<PreemptionVictim> preemptionVictims(ScheduledTask... tasks) { return FluentIterable.from(ImmutableSet.copyOf(tasks)) .transform( @@ -569,7 +606,14 @@ public class PreemptionVictimFilterTest extends EasyMockTest { .andReturn(tier); } - static ScheduledTask makeTask( + private static void setResource(ScheduledTask task, ResourceType type, Double value) { + task.getAssignedTask().setTask(ResourceTestUtil.resetResource( + ITaskConfig.build(task.getAssignedTask().getTask()), + type, + value).newBuilder()); + } + + private static ScheduledTask makeTask( String role, String job, String taskId, http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/test/java/org/apache/aurora/scheduler/resources/ResourceBagTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourceBagTest.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourceBagTest.java index 48724d5..b8e6474 100644 --- a/src/test/java/org/apache/aurora/scheduler/resources/ResourceBagTest.java +++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourceBagTest.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import org.junit.Test; +import static org.apache.aurora.scheduler.resources.ResourceBag.IS_NEGATIVE; import static org.apache.aurora.scheduler.resources.ResourceBag.LARGE; import static org.apache.aurora.scheduler.resources.ResourceBag.MEDIUM; import static org.apache.aurora.scheduler.resources.ResourceBag.SMALL; @@ -24,6 +25,7 @@ import static org.apache.aurora.scheduler.resources.ResourceBag.XLARGE; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.bag; import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; +import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; import static org.junit.Assert.assertEquals; @@ -63,4 +65,17 @@ public class ResourceBagTest { LARGE.add(new ResourceBag(ImmutableMap.of(CPUS, 1.0))), new ResourceBag(ImmutableMap.of(CPUS, 9.0, RAM_MB, 16384.0, DISK_MB, 32768.0))); } + + @Test + public void testValueOf() { + assertEquals(1.0, SMALL.valueOf(CPUS), 0.0); + assertEquals(0.0, SMALL.valueOf(PORTS), 0.0); + } + + @Test + public void testFilter() { + assertEquals( + new ResourceBag(ImmutableMap.of(CPUS, -1.0)), + bag(-1.0, 128, 1024).filter(IS_NEGATIVE)); + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java index 333db30..a5dda25 100644 --- a/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourceManagerTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.scheduler.TierInfo; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IResource; import org.apache.mesos.Protos; @@ -86,6 +87,12 @@ public class ResourceManagerTest { assertEquals( ImmutableSet.of(resource2, resource3), ImmutableSet.copyOf(ResourceManager.getRevocableOfferResources(offer))); + assertEquals( + ImmutableSet.of(resource1, resource3), + ImmutableSet.copyOf(ResourceManager.getOfferResources(offer, new TierInfo(false, false)))); + assertEquals( + ImmutableSet.of(resource2, resource3), + ImmutableSet.copyOf(ResourceManager.getOfferResources(offer, new TierInfo(false, true)))); } @Test @@ -138,6 +145,19 @@ public class ResourceManagerTest { } @Test + public void testResourceQuantityByType() { + assertEquals( + 8.0, + ResourceManager.quantityOf( + ImmutableSet.of( + IResource.build(numCpus(3.0)), + IResource.build(numCpus(5.0)), + IResource.build(ramMb(128))), + CPUS), + 0.0); + } + + @Test public void testBagFromResources() { assertEquals( bag(2.0, 32, 64), http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/test/java/org/apache/aurora/scheduler/resources/ResourceSlotTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourceSlotTest.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourceSlotTest.java index 842572c..1f1cb18 100644 --- a/src/test/java/org/apache/aurora/scheduler/resources/ResourceSlotTest.java +++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourceSlotTest.java @@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.resources; import java.util.Set; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -31,7 +30,6 @@ import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER; import static org.apache.aurora.scheduler.base.TaskTestUtil.REVOCABLE_TIER; import static org.apache.aurora.scheduler.resources.ResourceSlot.makeMesosRangeResource; import static org.apache.aurora.scheduler.resources.ResourceSlot.makeMesosResource; -import static org.apache.aurora.scheduler.resources.ResourceSlot.sum; import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; @@ -64,18 +62,6 @@ public class ResourceSlotTest { } @Test - public void testAdd() { - assertEquals(TWO, ONE.add(ONE)); - assertEquals(THREE, ONE.add(TWO)); - assertEquals(THREE, TWO.add(ONE)); - } - - @Test - public void testSum() { - assertEquals(THREE, sum(ImmutableList.of(ONE, ONE, ONE))); - } - - @Test public void testToResourceListNoRevoca() { ResourceSlot resources = ResourceSlot.from(TASK); assertEquals( @@ -138,13 +124,6 @@ public class ResourceSlotTest { assertNotEquals(resources, null); } - @Test - public void testOrder() { - assertEquals( - ImmutableList.of(ONE, TWO, THREE, THREE), - ResourceSlot.ORDER.sortedCopy(ImmutableList.of(THREE, ONE, TWO, THREE))); - } - private void expectRanges(Set<Pair<Long, Long>> expected, Set<Integer> values) { Protos.Resource resource = makeMesosRangeResource(PORTS, values); assertEquals(Protos.Value.Type.RANGES, resource.getType()); http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java b/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java index e0cca4b..ba597b8 100644 --- a/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java +++ b/src/test/java/org/apache/aurora/scheduler/resources/ResourceTestUtil.java @@ -69,6 +69,15 @@ public final class ResourceTestUtil { return ITaskConfig.build(builder); } + public static ITaskConfig resetResource(ITaskConfig config, ResourceType type, Double value) { + TaskConfig builder = config.newBuilder(); + builder.getResources().removeIf(e -> fromResource(IResource.build(e)).equals(type)); + builder.addToResources(IResource.newBuilder( + type.getValue(), + type.getAuroraResourceConverter().valueOf(value))); + return ITaskConfig.build(builder); + } + public static Protos.Resource mesosScalar(ResourceType type, double value) { return mesosScalar(type, Optional.absent(), false, value); } http://git-wip-us.apache.org/repos/asf/aurora/blob/485da81c/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java index 81baa78..50d942e 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java @@ -34,7 +34,6 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import org.apache.aurora.scheduler.mesos.MesosTaskFactory; import org.apache.aurora.scheduler.offers.OfferManager; -import org.apache.aurora.scheduler.resources.Resources; import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; @@ -58,6 +57,7 @@ import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER; import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB; import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; +import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer; import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; @@ -87,8 +87,9 @@ public class TaskAssignerImplTest extends EasyMockTest { .setSlaveId(MESOS_OFFER.getSlaveId()) .build(); private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of(); - private static final UnusedResource UNUSED = - new UnusedResource(Resources.from(MESOS_OFFER).slot(), OFFER.getAttributes()); + private static final UnusedResource UNUSED = new UnusedResource( + bagFromMesosResources(MESOS_OFFER.getResourcesList()), + OFFER.getAttributes()); private static final ResourceRequest RESOURCE_REQUEST = new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY); @@ -267,13 +268,14 @@ public class TaskAssignerImplTest extends EasyMockTest { expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER).times(2); expect(filter.filter( new UnusedResource( - Resources.from(mismatched.getOffer()).slot(), + bagFromMesosResources(mismatched.getOffer().getResourcesList()), mismatched.getAttributes()), new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY))) .andReturn(ImmutableSet.of(Veto.constraintMismatch("constraint mismatch"))); offerManager.banOffer(mismatched.getOffer().getId(), GROUP_KEY); expect(filter.filter( - new UnusedResource(Resources.from(OFFER.getOffer()).slot(), OFFER.getAttributes()), + new UnusedResource( + bagFromMesosResources(MESOS_OFFER.getResourcesList()), OFFER.getAttributes()), new ResourceRequest(TASK.getAssignedTask().getTask(), EMPTY))) .andReturn(ImmutableSet.of());
