Replacing IResourceAggregate in resource calculations. Reviewed at https://reviews.apache.org/r/46997/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/3687c6a1 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/3687c6a1 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/3687c6a1 Branch: refs/heads/master Commit: 3687c6a1a9961433eb254f8e25127d028d9003f8 Parents: d702587 Author: Maxim Khutornenko <[email protected]> Authored: Fri May 6 12:13:53 2016 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Fri May 6 12:13:53 2016 -0700 ---------------------------------------------------------------------- .../preemptor/PreemptionVictimFilter.java | 3 +- .../scheduler/quota/QuotaCheckResult.java | 57 ++---- .../aurora/scheduler/quota/QuotaInfo.java | 32 ++-- .../aurora/scheduler/quota/QuotaManager.java | 123 ++++++------- .../scheduler/resources/AcceptedOffer.java | 4 +- .../resources/AuroraResourceConverter.java | 46 +++++ .../scheduler/resources/ResourceAggregates.java | 88 --------- .../aurora/scheduler/resources/ResourceBag.java | 178 +++++++++++++++++++ .../scheduler/resources/ResourceManager.java | 151 +++++++++++++++- .../scheduler/resources/ResourceSlot.java | 22 --- .../scheduler/resources/ResourceType.java | 69 ++++++- .../aurora/scheduler/resources/Resources.java | 30 +--- .../apache/aurora/scheduler/sla/SlaGroup.java | 59 +++--- .../scheduler/stats/AsyncStatsModule.java | 40 ++--- .../aurora/scheduler/stats/SlotSizeCounter.java | 45 +++-- .../scheduler/storage/log/ThriftBackfill.java | 3 +- .../scheduler/thrift/ReadOnlySchedulerImpl.java | 17 +- .../scheduler/quota/QuotaCheckResultTest.java | 55 ++---- .../scheduler/quota/QuotaManagerImplTest.java | 85 +++++---- .../scheduler/resources/ResourceBagTest.java | 66 +++++++ .../resources/ResourceManagerTest.java | 76 +++++++- .../scheduler/resources/ResourceSlotTest.java | 12 -- .../scheduler/resources/ResourceTestUtil.java | 16 +- .../scheduler/resources/ResourcesTest.java | 2 +- .../scheduler/stats/AsyncStatsModuleTest.java | 29 ++- .../scheduler/stats/SlotSizeCounterTest.java | 32 ++-- .../storage/log/SnapshotStoreImplIT.java | 5 +- .../aurora/scheduler/thrift/Fixtures.java | 4 +- .../thrift/ReadOnlySchedulerImplTest.java | 19 +- 29 files changed, 869 insertions(+), 499 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java index 9a37ee7..032ab2d 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java @@ -35,6 +35,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; +import org.apache.aurora.scheduler.resources.ResourceManager; import org.apache.aurora.scheduler.resources.ResourceSlot; import org.apache.aurora.scheduler.resources.Resources; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; @@ -98,7 +99,7 @@ public interface PreemptionVictimFilter { } private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT = - offer -> Resources.from(offer.getOffer()).filter(Resources.NON_REVOCABLE).slot(); + offer -> Resources.from(offer.getOffer()).filter(ResourceManager.NON_REVOCABLE).slot(); private static final Function<HostOffer, String> OFFER_TO_HOST = offer -> offer.getOffer().getHostname(); http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java index 3437c65..99f034f 100644 --- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java +++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaCheckResult.java @@ -16,10 +16,13 @@ package org.apache.aurora.scheduler.quota; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; -import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; +import org.apache.aurora.scheduler.resources.ResourceBag; +import org.apache.aurora.scheduler.resources.ResourceType; import static java.util.Objects.requireNonNull; +import static org.apache.aurora.scheduler.resources.ResourceBag.IS_NEGATIVE; + /** * Calculates and formats detailed quota comparison result. */ @@ -40,21 +43,6 @@ public class QuotaCheckResult { INSUFFICIENT_QUOTA } - enum Resource { - CPU("core(s)"), - RAM("MB"), - DISK("MB"); - - private final String unit; - Resource(String unit) { - this.unit = unit; - } - - String getUnit() { - return unit; - } - } - private final Optional<String> details; private final Result result; @@ -86,34 +74,25 @@ public class QuotaCheckResult { return details; } - static QuotaCheckResult greaterOrEqual(IResourceAggregate a, IResourceAggregate b) { + static QuotaCheckResult greaterOrEqual(ResourceBag a, ResourceBag b) { StringBuilder details = new StringBuilder(); - boolean result = compare(a.getNumCpus(), b.getNumCpus(), Resource.CPU, details) - & compare(a.getRamMb(), b.getRamMb(), Resource.RAM, details) - & compare(a.getDiskMb(), b.getDiskMb(), Resource.DISK, details); + ResourceBag difference = a.subtract(b); + difference.getResourceVectors().entrySet().stream() + .filter(IS_NEGATIVE) + .forEach(entry -> addMessage(entry.getKey(), Math.abs(entry.getValue()), details)); return new QuotaCheckResult( - result ? Result.SUFFICIENT_QUOTA : Result.INSUFFICIENT_QUOTA, + details.length() > 0 ? Result.INSUFFICIENT_QUOTA : Result.SUFFICIENT_QUOTA, Optional.of(details.toString())); } - private static boolean compare( - double a, - double b, - Resource resource, - StringBuilder details) { - - boolean result = a >= b; - if (!result) { - details - .append(details.length() > 0 ? "; " : "") - .append(resource) - .append(" quota exceeded by ") - .append(String.format("%.2f", b - a)) - .append(" ") - .append(resource.getUnit()); - } - - return result; + private static void addMessage(ResourceType resourceType, Double overage, StringBuilder details) { + details + .append(details.length() > 0 ? "; " : "") + .append(resourceType.getAuroraName()) + .append(" quota exceeded by ") + .append(String.format("%.2f", overage)) + .append(" ") + .append(resourceType.getAuroraUnit()); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java index 1df21b8..6990351 100644 --- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java +++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaInfo.java @@ -17,7 +17,7 @@ import java.util.Objects; import com.google.common.base.MoreObjects; -import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; +import org.apache.aurora.scheduler.resources.ResourceBag; import static java.util.Objects.requireNonNull; @@ -25,18 +25,18 @@ import static java.util.Objects.requireNonNull; * Wraps allocated quota and consumption details. */ public class QuotaInfo { - private final IResourceAggregate quota; - private final IResourceAggregate prodSharedConsumption; - private final IResourceAggregate prodDedicatedConsumption; - private final IResourceAggregate nonProdSharedConsumption; - private final IResourceAggregate nonProdDedicatedConsumption; + private final ResourceBag quota; + private final ResourceBag prodSharedConsumption; + private final ResourceBag prodDedicatedConsumption; + private final ResourceBag nonProdSharedConsumption; + private final ResourceBag nonProdDedicatedConsumption; QuotaInfo( - IResourceAggregate quota, - IResourceAggregate prodSharedConsumption, - IResourceAggregate prodDedicatedConsumption, - IResourceAggregate nonProdSharedConsumption, - IResourceAggregate nonProdDedicatedConsumption) { + ResourceBag quota, + ResourceBag prodSharedConsumption, + ResourceBag prodDedicatedConsumption, + ResourceBag nonProdSharedConsumption, + ResourceBag nonProdDedicatedConsumption) { this.quota = requireNonNull(quota); this.prodSharedConsumption = requireNonNull(prodSharedConsumption); @@ -50,7 +50,7 @@ public class QuotaInfo { * * @return Available quota. */ - public IResourceAggregate getQuota() { + public ResourceBag getQuota() { return quota; } @@ -59,7 +59,7 @@ public class QuotaInfo { * * @return Production job consumption. */ - public IResourceAggregate getProdSharedConsumption() { + public ResourceBag getProdSharedConsumption() { return prodSharedConsumption; } @@ -68,7 +68,7 @@ public class QuotaInfo { * * @return Production dedicated job consumption. */ - public IResourceAggregate getProdDedicatedConsumption() { + public ResourceBag getProdDedicatedConsumption() { return prodDedicatedConsumption; } @@ -77,7 +77,7 @@ public class QuotaInfo { * * @return Non production job consumption. */ - public IResourceAggregate getNonProdSharedConsumption() { + public ResourceBag getNonProdSharedConsumption() { return nonProdSharedConsumption; } @@ -86,7 +86,7 @@ public class QuotaInfo { * * @return Non production dedicated job consumption. */ - public IResourceAggregate getNonProdDedicatedConsumption() { + public ResourceBag getNonProdDedicatedConsumption() { return nonProdDedicatedConsumption; } http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java index bf476aa..6d0d120 100644 --- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java +++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java @@ -13,9 +13,10 @@ */ package org.apache.aurora.scheduler.quota; -import java.util.Arrays; +import java.util.EnumSet; import java.util.Map; import java.util.Set; +import java.util.stream.StreamSupport; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; @@ -30,12 +31,13 @@ import com.google.common.collect.Multimap; import com.google.common.collect.RangeSet; import org.apache.aurora.gen.JobUpdateQuery; -import org.apache.aurora.gen.ResourceAggregate; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.configuration.ConfigurationManager; -import org.apache.aurora.scheduler.resources.ResourceAggregates; +import org.apache.aurora.scheduler.resources.ResourceBag; +import org.apache.aurora.scheduler.resources.ResourceManager; +import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.storage.JobUpdateStore; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; @@ -63,7 +65,14 @@ import static com.google.common.base.Predicates.not; import static com.google.common.base.Predicates.or; import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA; -import static org.apache.aurora.scheduler.resources.ResourceAggregates.EMPTY; +import static org.apache.aurora.scheduler.resources.ResourceBag.EMPTY; +import static org.apache.aurora.scheduler.resources.ResourceBag.IS_NEGATIVE; +import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromAggregate; +import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources; +import static org.apache.aurora.scheduler.resources.ResourceManager.getTaskResources; +import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; +import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; +import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; import static org.apache.aurora.scheduler.updater.Updates.getInstanceIds; /** @@ -78,6 +87,8 @@ public interface QuotaManager { Predicate<ITaskConfig> NON_PROD_SHARED = and(not(PROD), not(DEDICATED)); Predicate<ITaskConfig> NON_PROD_DEDICATED = and(not(PROD), DEDICATED); + EnumSet<ResourceType> QUOTA_RESOURCE_TYPES = EnumSet.of(CPUS, RAM_MB, DISK_MB); + /** * Saves a new quota for the provided role or overrides the existing one. * @@ -161,10 +172,9 @@ public interface QuotaManager { } QuotaInfo info = getQuotaInfo(ownerRole, Optional.absent(), storeProvider); - IResourceAggregate prodConsumption = info.getProdSharedConsumption(); - if (quota.getNumCpus() < prodConsumption.getNumCpus() - || quota.getRamMb() < prodConsumption.getRamMb() - || quota.getDiskMb() < prodConsumption.getDiskMb()) { + ResourceBag prodConsumption = info.getProdSharedConsumption(); + ResourceBag overage = bagFromAggregate(quota).subtract(prodConsumption); + if (overage.getResourceVectors().entrySet().stream().anyMatch(IS_NEGATIVE)) { throw new QuotaException(String.format( "Quota: %s is less then current prod reservation: %s", quota.toString(), @@ -191,8 +201,8 @@ public interface QuotaManager { } QuotaInfo quotaInfo = getQuotaInfo(template.getJob().getRole(), storeProvider); - IResourceAggregate requestedTotal = - add(quotaInfo.getProdSharedConsumption(), scale(template, instances)); + ResourceBag requestedTotal = + quotaInfo.getProdSharedConsumption().add(scale(template, instances)); return QuotaCheckResult.greaterOrEqual(quotaInfo.getQuota(), requestedTotal); } @@ -231,13 +241,12 @@ public interface QuotaManager { Optional<IJobConfiguration> oldCron = storeProvider.getCronJobStore().fetchJob(cronConfig.getKey()); - IResourceAggregate oldResource = oldCron.isPresent() ? scale(oldCron.get()) : EMPTY; + ResourceBag oldResource = oldCron.isPresent() ? scale(oldCron.get()) : EMPTY; // Calculate requested total as a sum of current prod consumption and a delta between // new and old cron templates. - IResourceAggregate requestedTotal = add( - quotaInfo.getProdSharedConsumption(), - subtract(scale(cronConfig), oldResource)); + ResourceBag requestedTotal = + quotaInfo.getProdSharedConsumption().add(scale(cronConfig).subtract(oldResource)); return QuotaCheckResult.greaterOrEqual(quotaInfo.getQuota(), requestedTotal); } @@ -280,14 +289,16 @@ public interface QuotaManager { .uniqueIndex(IJobConfiguration::getKey); return new QuotaInfo( - storeProvider.getQuotaStore().fetchQuota(role).or(EMPTY), + storeProvider.getQuotaStore().fetchQuota(role) + .transform(ResourceManager::bagFromAggregate) + .or(EMPTY), getConsumption(tasks, updates, cronTemplates, PROD_SHARED), getConsumption(tasks, updates, cronTemplates, PROD_DEDICATED), getConsumption(tasks, updates, cronTemplates, NON_PROD_SHARED), getConsumption(tasks, updates, cronTemplates, NON_PROD_DEDICATED)); } - private IResourceAggregate getConsumption( + private ResourceBag getConsumption( FluentIterable<IAssignedTask> tasks, Map<IJobKey, IJobUpdateInstructions> updatesByKey, Map<IJobKey, IJobConfiguration> cronTemplatesByKey, @@ -300,21 +311,21 @@ public interface QuotaManager { not(in(cronTemplatesByKey.keySet())), Tasks::getJob); - IResourceAggregate nonCronConsumption = getNonCronConsumption( + ResourceBag nonCronConsumption = getNonCronConsumption( updatesByKey, filteredTasks.filter(excludeCron), filter); - IResourceAggregate cronConsumption = getCronConsumption( + ResourceBag cronConsumption = getCronConsumption( Iterables.filter( cronTemplatesByKey.values(), compose(filter, IJobConfiguration::getTaskConfig)), filteredTasks.transform(IAssignedTask::getTask)); - return add(nonCronConsumption, cronConsumption); + return nonCronConsumption.add(cronConsumption); } - private static IResourceAggregate getNonCronConsumption( + private static ResourceBag getNonCronConsumption( Map<IJobKey, IJobUpdateInstructions> updatesByKey, FluentIterable<IAssignedTask> tasks, final Predicate<ITaskConfig> configFilter) { @@ -330,20 +341,20 @@ public interface QuotaManager { // // 3. Add up the two to yield total consumption. - IResourceAggregate nonUpdateConsumption = fromTasks(tasks + ResourceBag nonUpdateConsumption = fromTasks(tasks .filter(buildNonUpdatingTasksFilter(updatesByKey)) .transform(IAssignedTask::getTask)); final Predicate<IInstanceTaskConfig> instanceFilter = compose(configFilter, IInstanceTaskConfig::getTask); - IResourceAggregate updateConsumption = + ResourceBag updateConsumption = addAll(Iterables.transform(updatesByKey.values(), updateResources(instanceFilter))); - return add(nonUpdateConsumption, updateConsumption); + return nonUpdateConsumption.add(updateConsumption); } - private static IResourceAggregate getCronConsumption( + private static ResourceBag getCronConsumption( Iterable<IJobConfiguration> cronTemplates, FluentIterable<ITaskConfig> tasks) { @@ -358,9 +369,9 @@ public interface QuotaManager { final Multimap<IJobKey, ITaskConfig> taskConfigsByKey = tasks.index(ITaskConfig::getJob); return addAll(Iterables.transform( cronTemplates, - config -> max( - scale(config.getTaskConfig(), config.getInstanceCount()), - fromTasks(taskConfigsByKey.get(config.getKey()))))); + config -> + scale(config.getTaskConfig(), config.getInstanceCount()) + .max(fromTasks(taskConfigsByKey.get(config.getKey()))))); } private static Predicate<IAssignedTask> buildNonUpdatingTasksFilter( @@ -405,16 +416,13 @@ public interface QuotaManager { .setUpdateStatuses(Updates.ACTIVE_JOB_UPDATE_STATES)); } - private static final Function<ITaskConfig, IResourceAggregate> CONFIG_RESOURCES = - config -> IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(config.getNumCpus()) - .setRamMb(config.getRamMb()) - .setDiskMb(config.getDiskMb())); + private static final Function<ITaskConfig, ResourceBag> QUOTA_RESOURCES = + config -> bagFromResources(getTaskResources(config, QUOTA_RESOURCE_TYPES)); - private static final Function<IInstanceTaskConfig, IResourceAggregate> INSTANCE_RESOURCES = + private static final Function<IInstanceTaskConfig, ResourceBag> INSTANCE_RESOURCES = config -> scale(config.getTask(), getUpdateInstanceCount(config.getInstances())); - private static IResourceAggregate instructionsToResources( + private static ResourceBag instructionsToResources( Iterable<IInstanceTaskConfig> instructions) { return addAll(FluentIterable.from(instructions).transform(INSTANCE_RESOURCES)); @@ -433,7 +441,7 @@ public interface QuotaManager { * prod -> non-prod AND {@code prodSharedConsumption=True}: only the initial state * is accounted. */ - private static Function<IJobUpdateInstructions, IResourceAggregate> updateResources( + private static Function<IJobUpdateInstructions, ResourceBag> updateResources( final Predicate<IInstanceTaskConfig> instanceFilter) { return instructions -> { @@ -444,51 +452,26 @@ public interface QuotaManager { instanceFilter); // Calculate result as max(existing, desired) per resource type. - return max( - instructionsToResources(initialState), - instructionsToResources(desiredState)); + return instructionsToResources(initialState).max(instructionsToResources(desiredState)); }; } - private static IResourceAggregate add(IResourceAggregate a, IResourceAggregate b) { - return addAll(Arrays.asList(a, b)); - } - - private static IResourceAggregate addAll(Iterable<IResourceAggregate> aggregates) { - IResourceAggregate total = EMPTY; - for (IResourceAggregate aggregate : aggregates) { - total = IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(total.getNumCpus() + aggregate.getNumCpus()) - .setRamMb(total.getRamMb() + aggregate.getRamMb()) - .setDiskMb(total.getDiskMb() + aggregate.getDiskMb())); - } - return total; - } - - private static IResourceAggregate subtract(IResourceAggregate a, IResourceAggregate b) { - return IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(a.getNumCpus() - b.getNumCpus()) - .setRamMb(a.getRamMb() - b.getRamMb()) - .setDiskMb(a.getDiskMb() - b.getDiskMb())); - } - - private static IResourceAggregate max(IResourceAggregate a, IResourceAggregate b) { - return IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(Math.max(a.getNumCpus(), b.getNumCpus())) - .setRamMb(Math.max(a.getRamMb(), b.getRamMb())) - .setDiskMb(Math.max(a.getDiskMb(), b.getDiskMb()))); + private static ResourceBag addAll(Iterable<ResourceBag> aggregates) { + return StreamSupport.stream(aggregates.spliterator(), false) + .reduce((l, r) -> l.add(r)) + .orElse(EMPTY); } - private static IResourceAggregate scale(ITaskConfig taskConfig, int instanceCount) { - return ResourceAggregates.scale(CONFIG_RESOURCES.apply(taskConfig), instanceCount); + private static ResourceBag scale(ITaskConfig taskConfig, int instanceCount) { + return QUOTA_RESOURCES.apply(taskConfig).scale(instanceCount); } - private static IResourceAggregate scale(IJobConfiguration jobConfiguration) { + private static ResourceBag scale(IJobConfiguration jobConfiguration) { return scale(jobConfiguration.getTaskConfig(), jobConfiguration.getInstanceCount()); } - private static IResourceAggregate fromTasks(Iterable<ITaskConfig> tasks) { - return addAll(Iterables.transform(tasks, CONFIG_RESOURCES)); + private static ResourceBag fromTasks(Iterable<ITaskConfig> tasks) { + return addAll(Iterables.transform(tasks, QUOTA_RESOURCES)); } private static final Function<IJobUpdate, IJobKey> UPDATE_TO_JOB_KEY = http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java b/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java index a735b0b..fce6621 100644 --- a/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java +++ b/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java @@ -97,7 +97,7 @@ public final class AcceptedOffer { List<Resource.Builder> cpuResources = filterToBuilders( reservedFirst, ResourceType.CPUS.getMesosName(), - revocable ? Resources.REVOCABLE : Resources.NON_REVOCABLE); + revocable ? ResourceManager.REVOCABLE : ResourceManager.NON_REVOCABLE); List<Resource.Builder> memResources = filterToBuilderNonRevocable( reservedFirst, ResourceType.RAM_MB.getMesosName()); List<Resource.Builder> diskResources = filterToBuilderNonRevocable( @@ -230,6 +230,6 @@ public final class AcceptedOffer { List<Resource> resources, String name) { - return filterToBuilders(resources, name, Resources.NON_REVOCABLE); + return filterToBuilders(resources, name, ResourceManager.NON_REVOCABLE); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/resources/AuroraResourceConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/AuroraResourceConverter.java b/src/main/java/org/apache/aurora/scheduler/resources/AuroraResourceConverter.java index f9c89a9..59f5fde 100644 --- a/src/main/java/org/apache/aurora/scheduler/resources/AuroraResourceConverter.java +++ b/src/main/java/org/apache/aurora/scheduler/resources/AuroraResourceConverter.java @@ -38,6 +38,22 @@ public interface AuroraResourceConverter<T> { return value.toString(); } + /** + * Gets resource quantity. + * + * @param value Value to quantify. + * @return Resource quantity. + */ + Double quantify(Object value); + + /** + * Converts resource quantity to matching resource value type (if such conversion exists). + * + * @param value Resource quantity. + * @return Value of type T. + */ + T valueOf(Double value); + LongConverter LONG = new LongConverter(); DoubleConverter DOUBLE = new DoubleConverter(); StringConverter STRING = new StringConverter(); @@ -47,6 +63,16 @@ public interface AuroraResourceConverter<T> { public Long parseFrom(String value) { return Longs.tryParse(value); } + + @Override + public Double quantify(Object value) { + return (double) (long) value; + } + + @Override + public Long valueOf(Double value) { + return value.longValue(); + } } class DoubleConverter implements AuroraResourceConverter<Double> { @@ -54,6 +80,16 @@ public interface AuroraResourceConverter<T> { public Double parseFrom(String value) { return Double.parseDouble(value); } + + @Override + public Double quantify(Object value) { + return (Double) value; + } + + @Override + public Double valueOf(Double value) { + return value; + } } class StringConverter implements AuroraResourceConverter<String> { @@ -61,5 +97,15 @@ public interface AuroraResourceConverter<T> { public String parseFrom(String value) { return value; } + + @Override + public Double quantify(Object value) { + return 1.0; + } + + @Override + public String valueOf(Double value) { + throw new UnsupportedOperationException("Unsupported for string resource types"); + } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/resources/ResourceAggregates.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceAggregates.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceAggregates.java deleted file mode 100644 index 1d19b32..0000000 --- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceAggregates.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.resources; - -import com.google.common.collect.Ordering; - -import org.apache.aurora.gen.ResourceAggregate; -import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; - -/** - * Convenience class for normalizing resource measures between tasks and offers. - */ -public final class ResourceAggregates { - - public static final IResourceAggregate EMPTY = - IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(0) - .setRamMb(0) - .setDiskMb(0) - ); - - public static final IResourceAggregate SMALL = - IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(1.0) - .setRamMb(1024) - .setDiskMb(4096) - ); - - public static final IResourceAggregate MEDIUM = - IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(4.0) - .setRamMb(8192) - .setDiskMb(16384) - ); - - public static final IResourceAggregate LARGE = - IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(8.0) - .setRamMb(16384) - .setDiskMb(32768) - ); - - public static final IResourceAggregate XLARGE = - IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(16.0) - .setRamMb(32768) - .setDiskMb(65536) - ); - - private ResourceAggregates() { - // Utility class. - } - - /** - * a * m. - */ - public static IResourceAggregate scale(IResourceAggregate a, int m) { - return IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(a.getNumCpus() * m) - .setRamMb(a.getRamMb() * m) - .setDiskMb(a.getDiskMb() * m)); - } - - /** - * a / b. - * <p> - * This calculates how many times {@code b} "fits into" {@code a}. Behavior is undefined when - * {@code b} contains resources with a value of zero. - */ - public static int divide(IResourceAggregate a, IResourceAggregate b) { - return Ordering.natural().min( - a.getNumCpus() / b.getNumCpus(), - (double) a.getRamMb() / b.getRamMb(), - (double) a.getDiskMb() / b.getDiskMb() - ).intValue(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java new file mode 100644 index 0000000..7916ec0 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceBag.java @@ -0,0 +1,178 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.resources; + +import java.util.Map; +import java.util.Objects; +import java.util.function.BinaryOperator; +import java.util.function.Predicate; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; + +import static java.util.stream.Collectors.toMap; + +import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; +import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; +import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; + +/** + * A bag of unique resource values aggregated by {@link ResourceType}. + */ +public class ResourceBag { + public static final ResourceBag EMPTY = new ResourceBag(ImmutableMap.of( + CPUS, 0.0, + RAM_MB, 0.0, + DISK_MB, 0.0 + )); + + public static final ResourceBag SMALL = new ResourceBag(ImmutableMap.of( + CPUS, 1.0, + RAM_MB, 1024.0, + DISK_MB, 4096.0 + )); + + public static final ResourceBag MEDIUM = new ResourceBag(ImmutableMap.of( + CPUS, 4.0, + RAM_MB, 8192.0, + DISK_MB, 16384.0 + )); + + public static final ResourceBag LARGE = new ResourceBag(ImmutableMap.of( + CPUS, 8.0, + RAM_MB, 16384.0, + DISK_MB, 32768.0 + )); + + public static final ResourceBag XLARGE = new ResourceBag(ImmutableMap.of( + CPUS, 16.0, + RAM_MB, 32768.0, + DISK_MB, 65536.0 + )); + + public static final Predicate<Map.Entry<ResourceType, Double>> IS_NEGATIVE = + entry -> entry.getValue() < 0; + + public static final Predicate<Map.Entry<ResourceType, Double>> IS_POSITIVE = + entry -> entry.getValue() > 0; + + public static final Predicate<Map.Entry<ResourceType, Double>> IS_MESOS_REVOCABLE = + entry -> entry.getKey().isMesosRevocable(); + + private final Map<ResourceType, Double> resourceVectors; + + /** + * Creates an instance of ResourceBag with given resource vectors (type -> value). + * + * @param resourceVectors Map of resource vectors. + */ + ResourceBag(Map<ResourceType, Double> resourceVectors) { + this.resourceVectors = ImmutableMap.copyOf(resourceVectors); + } + + /** + * Gets resource vectors in the bag. + * + * @return Map of resource vectors. + */ + public Map<ResourceType, Double> getResourceVectors() { + return resourceVectors; + } + + /** + * Adds this and other bag contents. + * + * @param other Other bag to add. + * @return Result of addition. + */ + public ResourceBag add(ResourceBag other) { + return binaryOp(other, (l, r) -> l + r); + } + + /** + * Subtracts other bag contents from this. + * + * @param other Other bag to subtract. + * @return Result of subtraction. + */ + public ResourceBag subtract(ResourceBag other) { + return binaryOp(other, (l, r) -> l - r); + } + + /** + * Divides this by other bag contents. + * + * @param other Other bag to divide by. + * @return Result of division. + */ + public ResourceBag divide(ResourceBag other) { + return binaryOp(other, (l, r) -> l / r); + } + + /** + * Applies {@code Math.max()} for each matching resource vector. + * + * @param other Other bag to compare with. + * @return A new bag with max resource vectors. + */ + public ResourceBag max(ResourceBag other) { + return binaryOp(other, (l, r) -> Math.max(l, r)); + } + + /** + * Scales each resource vector by {@code m}. + * + * @param m Scale factor. + * @return Result of scale operation. + */ + public ResourceBag scale(int m) { + return new ResourceBag(resourceVectors.entrySet().stream() + .collect(toMap(Map.Entry::getKey, v -> v.getValue() * m))); + } + + private ResourceBag binaryOp(ResourceBag other, BinaryOperator<Double> operator) { + ImmutableMap.Builder<ResourceType, Double> builder = ImmutableMap.builder(); + for (Map.Entry<ResourceType, Double> entry : resourceVectors.entrySet()) { + // Apply binary operator only on matching keys from the other. If there is no match, keep the + // current value unchanged. + builder.put( + entry.getKey(), + other.getResourceVectors().containsKey(entry.getKey()) + ? operator.apply(entry.getValue(), other.getResourceVectors().get(entry.getKey())) + : entry.getValue()); + } + + return new ResourceBag(builder.build()); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ResourceBag)) { + return false; + } + + ResourceBag other = (ResourceBag) o; + return Objects.equals(resourceVectors, other.resourceVectors); + } + + @Override + public int hashCode() { + return Objects.hash(resourceVectors); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("resourceVectors", resourceVectors).toString(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java index 943e8a4..69087e6 100644 --- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java +++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java @@ -14,16 +14,23 @@ package org.apache.aurora.scheduler.resources; import java.util.EnumSet; +import java.util.Map; import java.util.Set; +import java.util.function.BinaryOperator; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import com.google.common.base.Predicate; import com.google.common.collect.Iterables; +import org.apache.aurora.gen.ResourceAggregate; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IResource; +import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.storage.log.ThriftBackfill; import org.apache.mesos.Protos.Resource; import static org.apache.aurora.scheduler.resources.ResourceType.fromResource; @@ -38,6 +45,30 @@ public final class ResourceManager { } /** + * TODO(maxim): reduce visibility by redirecting callers to #getRevocableOfferResources(). + */ + public static final Predicate<Resource> REVOCABLE = + r -> !fromResource(r).isMesosRevocable() || r.hasRevocable(); + + /** + * TODO(maxim): reduce visibility by redirecting callers to #getNonRevocableOfferResources(). + */ + public static final Predicate<Resource> NON_REVOCABLE = r -> !r.hasRevocable(); + + private static final Function<IResource, ResourceType> RESOURCE_TO_TYPE = r -> fromResource(r); + + private static final Function<Resource, ResourceType> MESOS_RESOURCE_TO_TYPE = + r -> fromResource(r); + + private static final Function<IResource, Double> QUANTIFY_RESOURCE = + r -> fromResource(r).getAuroraResourceConverter().quantify(r.getRawValue()); + + private static final Function<Resource, Double> QUANTIFY_MESOS_RESOURCE = + r -> fromResource(r).getMesosResourceConverter().quantify(r); + + private static final BinaryOperator<Double> REDUCE_VALUES = (l, r) -> l + r; + + /** * Gets offer resources matching specified {@link ResourceType}. * * @param offer Offer to get resources from. @@ -49,6 +80,26 @@ public final class ResourceManager { } /** + * Gets Mesos-revocable offer resources. + * + * @param offer Offer to get resources from. + * @return Mesos-revocable offer resources. + */ + public static Iterable<Resource> getRevocableOfferResources(Offer offer) { + return Iterables.filter(offer.getResourcesList(), REVOCABLE); + } + + /** + * Gets non-Mesos-revocable offer resources. + * + * @param offer Offer to get resources from. + * @return Non-Mesos-revocable offer resources. + */ + public static Iterable<Resource> getNonRevocableOfferResources(Offer offer) { + return Iterables.filter(offer.getResourcesList(), NON_REVOCABLE); + } + + /** * Same as {@link #getTaskResources(ITaskConfig, ResourceType)}. * * @param task Scheduled task to get resources from. @@ -71,6 +122,20 @@ public final class ResourceManager { } /** + * Gets task resources matching any of the specified resource types. + * + * @param task Task config to get resources from. + * @param typesToMatch EnumSet of resource types. + * @return Task resources matching any of the resource types. + */ + public static Iterable<IResource> getTaskResources( + ITaskConfig task, + EnumSet<ResourceType> typesToMatch) { + + return Iterables.filter(task.getResources(), r -> typesToMatch.contains(fromResource(r))); + } + + /** * Gets unique task resource types. * * @param task Task to get resource types from. @@ -78,7 +143,7 @@ public final class ResourceManager { */ public static Set<ResourceType> getTaskResourceTypes(IAssignedTask task) { return EnumSet.copyOf(task.getTask().getResources().stream() - .map(r -> fromResource(r)) + .map(RESOURCE_TO_TYPE) .collect(Collectors.toSet())); } @@ -87,12 +152,88 @@ public final class ResourceManager { * * @param resources Mesos resources. * @param type Type of resource to quantify. - * @return Mesos resource value. + * @return Aggregate Mesos resource value. */ - public static Double quantityOf(Iterable<Resource> resources, ResourceType type) { + public static Double quantityOfMesosResource(Iterable<Resource> resources, ResourceType type) { return StreamSupport.stream(resources.spliterator(), false) .filter(r -> fromResource(r).equals(type)) - .map(r -> fromResource(r).getMesosResourceConverter().quantify(r)) - .reduce((l, r) -> l + r).orElse(0.0); + .map(QUANTIFY_MESOS_RESOURCE) + .reduce(REDUCE_VALUES) + .orElse(0.0); + } + + /** + * Gets the quantity of resources. Caller to ensure all resources are of the same type. + * + * @param resources Resources to sum up. + * @return Aggregate resource value. + */ + public static Double quantityOf(Iterable<IResource> resources) { + return StreamSupport.stream(resources.spliterator(), false) + .map(QUANTIFY_RESOURCE) + .reduce(REDUCE_VALUES) + .orElse(0.0); + } + + /** + * Creates a {@link ResourceBag} from resources. + * + * @param resources Resources to convert. + * @return A {@link ResourceBag} instance. + */ + public static ResourceBag bagFromResources(Iterable<IResource> resources) { + return bagFromResources(resources, RESOURCE_TO_TYPE, QUANTIFY_RESOURCE); + } + + /** + * Creates a {@link ResourceBag} from Mesos resources. + * + * @param resources Mesos resources to convert. + * @return A {@link ResourceBag} instance. + */ + public static ResourceBag bagFromMesosResources(Iterable<Resource> resources) { + return bagFromResources(resources, MESOS_RESOURCE_TO_TYPE, QUANTIFY_MESOS_RESOURCE); + } + + /** + * Creates a {@link ResourceBag} from {@link IResourceAggregate}. + * + * @param aggregate {@link IResourceAggregate} to convert. + * @return A {@link ResourceBag} instance. + */ + public static ResourceBag bagFromAggregate(IResourceAggregate aggregate) { + return new ResourceBag(aggregate.getResources().stream() + .collect(Collectors.toMap(RESOURCE_TO_TYPE, QUANTIFY_RESOURCE))); + } + + /** + * Creates a {@link IResourceAggregate} from {@link ResourceBag}. + * + * @param bag {@link ResourceBag} to convert. + * @return A {@link IResourceAggregate} instance. + */ + public static IResourceAggregate aggregateFromBag(ResourceBag bag) { + return ThriftBackfill.backfillResourceAggregate(new ResourceAggregate() + .setResources(bag.getResourceVectors().entrySet().stream() + .map(e -> IResource.newBuilder( + e.getKey().getValue(), + e.getKey().getAuroraResourceConverter().valueOf(e.getValue()))) + .collect(Collectors.toSet()))); + } + + private static <T> ResourceBag bagFromResources( + Iterable<T> resources, + Function<T, ResourceType> typeMapper, + Function<T, Double> valueMapper) { + + return new ResourceBag(StreamSupport.stream(resources.spliterator(), false) + .collect(Collectors.groupingBy(typeMapper)) + .entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + group -> group.getValue().stream() + .map(valueMapper) + .reduce(REDUCE_VALUES) + .orElse(0.0)))); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java index a8dee95..13922bc 100644 --- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java +++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java @@ -189,28 +189,6 @@ public final class ResourceSlot { } /** - * Generates a ResourceSlot where each resource component is a max out of the two components. - * - * @param a A resource to compare. - * @param b A resource to compare. - * - * @return Returns a ResourceSlot instance where each component is a max of the two components. - */ - @VisibleForTesting - static ResourceSlot maxElements(ResourceSlot a, ResourceSlot b) { - double maxCPU = Math.max(a.getNumCpus(), b.getNumCpus()); - Amount<Long, Data> maxRAM = Amount.of( - Math.max(a.getRam().as(Data.MB), b.getRam().as(Data.MB)), - Data.MB); - Amount<Long, Data> maxDisk = Amount.of( - Math.max(a.getDisk().as(Data.MB), b.getDisk().as(Data.MB)), - Data.MB); - int maxPorts = Math.max(a.getNumPorts(), b.getNumPorts()); - - return new ResourceSlot(maxCPU, maxRAM, maxDisk, maxPorts); - } - - /** * Number of CPUs. * * @return CPUs. http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java index baed3de..276320a 100644 --- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java +++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java @@ -45,7 +45,17 @@ public enum ResourceType implements TEnum { /** * CPU resource. */ - CPUS(_Fields.NUM_CPUS, SCALAR, "cpus", DOUBLE, Optional.empty(), "CPU", 16, false), + CPUS( + _Fields.NUM_CPUS, + SCALAR, + "cpus", + DOUBLE, + Optional.empty(), + "CPU", + "core(s)", + 16, + false, + true), /** * RAM resource. @@ -57,7 +67,9 @@ public enum ResourceType implements TEnum { LONG, Optional.empty(), "RAM", + "MB", Amount.of(24, GB).as(MB), + false, false), /** @@ -70,13 +82,25 @@ public enum ResourceType implements TEnum { LONG, Optional.empty(), "disk", + "MB", Amount.of(450, GB).as(MB), + false, false), /** * Port resource. */ - PORTS(_Fields.NAMED_PORT, RANGES, "ports", STRING, Optional.of(PORT_MAPPER), "ports", 1000, true); + PORTS( + _Fields.NAMED_PORT, + RANGES, + "ports", + STRING, + Optional.of(PORT_MAPPER), + "ports", + "count", + 1000, + true, + false); /** * Correspondent thrift {@link org.apache.aurora.gen.Resource} enum value. @@ -109,6 +133,11 @@ public enum ResourceType implements TEnum { private final String auroraName; /** + * Aurora resource unit. + */ + private final String auroraUnit; + + /** * Scaling range for comparing scheduling vetoes. */ private final int scalingRange; @@ -118,6 +147,11 @@ public enum ResourceType implements TEnum { */ private final boolean isMultipleAllowed; + /** + * Indicates if a resource can be Mesos-revocable. + */ + private final boolean isMesosRevocable; + private static ImmutableMap<Integer, ResourceType> byField = Maps.uniqueIndex(EnumSet.allOf(ResourceType.class), ResourceType::getValue); @@ -133,8 +167,10 @@ public enum ResourceType implements TEnum { * @param auroraResourceConverter See {@link #getAuroraResourceConverter()} for more details. * @param mapper See {@link #getMapper()} for more details. * @param auroraName See {@link #getAuroraName()} for more details. + * @param auroraUnit See {@link #getAuroraUnit()} for more details. * @param scalingRange See {@link #getScalingRange()} for more details. * @param isMultipleAllowed See {@link #isMultipleAllowed()} for more details. + * @param isMesosRevocable See {@link #isMesosRevocable()} for more details. */ ResourceType( _Fields value, @@ -143,17 +179,21 @@ public enum ResourceType implements TEnum { AuroraResourceConverter<?> auroraResourceConverter, Optional<ResourceMapper> mapper, String auroraName, + String auroraUnit, int scalingRange, - boolean isMultipleAllowed) { + boolean isMultipleAllowed, + boolean isMesosRevocable) { this.value = value; this.mesosResourceConverter = requireNonNull(mesosResourceConverter); this.mesosName = requireNonNull(mesosName); this.auroraResourceConverter = requireNonNull(auroraResourceConverter); - this.auroraName = requireNonNull(auroraName); this.mapper = requireNonNull(mapper); + this.auroraName = requireNonNull(auroraName); + this.auroraUnit = requireNonNull(auroraUnit); this.scalingRange = scalingRange; this.isMultipleAllowed = isMultipleAllowed; + this.isMesosRevocable = isMesosRevocable; } /** @@ -215,6 +255,15 @@ public enum ResourceType implements TEnum { } /** + * Gets resource unit for internal Aurora representation. + * + * @return Aurora resource unit. + */ + public String getAuroraUnit() { + return auroraUnit; + } + + /** * Scaling range to use for comparison of scheduling vetoes. * <p> * This has no real bearing besides trying to determine if a veto along one resource vector @@ -237,6 +286,18 @@ public enum ResourceType implements TEnum { } /** + * Returns a flag indicating if a resource can be Mesos-revocable. + * <p> + * @see <a href="https://github.com/apache/mesos/blob/master/include/mesos/mesos.proto/">Mesos + * protobuf for more details</a> + * + * @return True if a resource can be Mesos-revocable, false otherwise. + */ + public boolean isMesosRevocable() { + return isMesosRevocable; + } + + /** * Returns a {@link ResourceType} for the given ID. * * @param value ID value to search by. See {@link #getValue()}. http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/resources/Resources.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/Resources.java b/src/main/java/org/apache/aurora/scheduler/resources/Resources.java index 94cd163..2ced8dd 100644 --- a/src/main/java/org/apache/aurora/scheduler/resources/Resources.java +++ b/src/main/java/org/apache/aurora/scheduler/resources/Resources.java @@ -17,7 +17,6 @@ import java.util.Set; import com.google.common.base.Function; import com.google.common.base.Predicate; -import com.google.common.base.Predicates; import com.google.common.collect.ContiguousSet; import com.google.common.collect.DiscreteDomain; import com.google.common.collect.ImmutableList; @@ -32,7 +31,9 @@ import org.apache.mesos.Protos.Value.Range; import static java.util.Objects.requireNonNull; -import static org.apache.aurora.scheduler.resources.ResourceManager.quantityOf; +import static org.apache.aurora.scheduler.resources.ResourceManager.NON_REVOCABLE; +import static org.apache.aurora.scheduler.resources.ResourceManager.REVOCABLE; +import static org.apache.aurora.scheduler.resources.ResourceManager.quantityOfMesosResource; import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; @@ -42,23 +43,6 @@ import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; * A container for multiple Mesos resource vectors. */ public final class Resources { - - /** - * CPU resource filter. - */ - private static final Predicate<Resource> CPU = e -> e.getName().equals(CPUS.getMesosName()); - - /** - * Revocable resource filter. - */ - public static final Predicate<Resource> REVOCABLE = - Predicates.or(Predicates.not(CPU), Predicates.and(CPU, Resource::hasRevocable)); - - /** - * Non-revocable resource filter. - */ - public static final Predicate<Resource> NON_REVOCABLE = Predicates.not(Resource::hasRevocable); - /** * Convert range to set of integers. */ @@ -109,10 +93,10 @@ public final class Resources { * @return {@code ResourceSlot} instance. */ public ResourceSlot slot() { - return new ResourceSlot(quantityOf(mesosResources, CPUS), - Amount.of(quantityOf(mesosResources, RAM_MB).longValue(), Data.MB), - Amount.of(quantityOf(mesosResources, DISK_MB).longValue(), Data.MB), - quantityOf(mesosResources, PORTS).intValue()); + return new ResourceSlot(quantityOfMesosResource(mesosResources, CPUS), + Amount.of(quantityOfMesosResource(mesosResources, RAM_MB).longValue(), Data.MB), + Amount.of(quantityOfMesosResource(mesosResources, DISK_MB).longValue(), Data.MB), + quantityOfMesosResource(mesosResources, PORTS).intValue()); } /** http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java index 2c044a6..21121bc 100644 --- a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java +++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java @@ -26,14 +26,21 @@ import com.google.common.collect.Range; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.resources.ResourceBag; +import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import static org.apache.aurora.scheduler.resources.ResourceAggregates.EMPTY; -import static org.apache.aurora.scheduler.resources.ResourceAggregates.LARGE; -import static org.apache.aurora.scheduler.resources.ResourceAggregates.MEDIUM; -import static org.apache.aurora.scheduler.resources.ResourceAggregates.SMALL; -import static org.apache.aurora.scheduler.resources.ResourceAggregates.XLARGE; +import static org.apache.aurora.scheduler.resources.ResourceBag.EMPTY; +import static org.apache.aurora.scheduler.resources.ResourceBag.LARGE; +import static org.apache.aurora.scheduler.resources.ResourceBag.MEDIUM; +import static org.apache.aurora.scheduler.resources.ResourceBag.SMALL; +import static org.apache.aurora.scheduler.resources.ResourceBag.XLARGE; +import static org.apache.aurora.scheduler.resources.ResourceManager.getTaskResources; +import static org.apache.aurora.scheduler.resources.ResourceManager.quantityOf; +import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; +import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; +import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; /** * Defines a logical grouping criteria to be applied over a set of tasks. @@ -56,30 +63,30 @@ interface SlaGroup { CLUSTER(new Cluster()), RESOURCE_CPU(new Resource<>( ImmutableMap.of( - "sla_cpu_small_", Range.closed(EMPTY.getNumCpus(), SMALL.getNumCpus()), - "sla_cpu_medium_", Range.openClosed(SMALL.getNumCpus(), MEDIUM.getNumCpus()), - "sla_cpu_large_", Range.openClosed(MEDIUM.getNumCpus(), LARGE.getNumCpus()), - "sla_cpu_xlarge_", Range.openClosed(LARGE.getNumCpus(), XLARGE.getNumCpus()), - "sla_cpu_xxlarge_", Range.greaterThan(XLARGE.getNumCpus())), - task -> task.getAssignedTask().getTask().getNumCpus() + "sla_cpu_small_", Range.closed(fromBag(EMPTY, CPUS), fromBag(SMALL, CPUS)), + "sla_cpu_medium_", Range.openClosed(fromBag(SMALL, CPUS), fromBag(MEDIUM, CPUS)), + "sla_cpu_large_", Range.openClosed(fromBag(MEDIUM, CPUS), fromBag(LARGE, CPUS)), + "sla_cpu_xlarge_", Range.openClosed(fromBag(LARGE, CPUS), fromBag(XLARGE, CPUS)), + "sla_cpu_xxlarge_", Range.greaterThan(fromBag(XLARGE, CPUS))), + task -> quantityOf(getTaskResources(task.getAssignedTask().getTask(), CPUS)) )), RESOURCE_RAM(new Resource<>( ImmutableMap.of( - "sla_ram_small_", Range.closed(EMPTY.getRamMb(), SMALL.getRamMb()), - "sla_ram_medium_", Range.openClosed(SMALL.getRamMb(), MEDIUM.getRamMb()), - "sla_ram_large_", Range.openClosed(MEDIUM.getRamMb(), LARGE.getRamMb()), - "sla_ram_xlarge_", Range.openClosed(LARGE.getRamMb(), XLARGE.getRamMb()), - "sla_ram_xxlarge_", Range.greaterThan(XLARGE.getRamMb())), - task -> task.getAssignedTask().getTask().getRamMb() + "sla_ram_small_", Range.closed(fromBag(EMPTY, RAM_MB), fromBag(SMALL, RAM_MB)), + "sla_ram_medium_", Range.openClosed(fromBag(SMALL, RAM_MB), fromBag(MEDIUM, RAM_MB)), + "sla_ram_large_", Range.openClosed(fromBag(MEDIUM, RAM_MB), fromBag(LARGE, RAM_MB)), + "sla_ram_xlarge_", Range.openClosed(fromBag(LARGE, RAM_MB), fromBag(XLARGE, RAM_MB)), + "sla_ram_xxlarge_", Range.greaterThan(fromBag(XLARGE, RAM_MB))), + task -> quantityOf(getTaskResources(task.getAssignedTask().getTask(), RAM_MB)) )), RESOURCE_DISK(new Resource<>( ImmutableMap.of( - "sla_disk_small_", Range.closed(EMPTY.getDiskMb(), SMALL.getDiskMb()), - "sla_disk_medium_", Range.openClosed(SMALL.getDiskMb(), MEDIUM.getDiskMb()), - "sla_disk_large_", Range.openClosed(MEDIUM.getDiskMb(), LARGE.getDiskMb()), - "sla_disk_xlarge_", Range.openClosed(LARGE.getDiskMb(), XLARGE.getDiskMb()), - "sla_disk_xxlarge_", Range.greaterThan(XLARGE.getDiskMb())), - task -> task.getAssignedTask().getTask().getDiskMb() + "sla_disk_small_", Range.closed(fromBag(EMPTY, DISK_MB), fromBag(SMALL, DISK_MB)), + "sla_disk_medium_", Range.openClosed(fromBag(SMALL, DISK_MB), fromBag(MEDIUM, DISK_MB)), + "sla_disk_large_", Range.openClosed(fromBag(MEDIUM, DISK_MB), fromBag(LARGE, DISK_MB)), + "sla_disk_xlarge_", Range.openClosed(fromBag(LARGE, DISK_MB), fromBag(XLARGE, DISK_MB)), + "sla_disk_xxlarge_", Range.greaterThan(fromBag(XLARGE, DISK_MB))), + task -> quantityOf(getTaskResources(task.getAssignedTask().getTask(), DISK_MB)) )); private SlaGroup group; @@ -90,6 +97,12 @@ interface SlaGroup { SlaGroup getSlaGroup() { return group; } + + // TODO(maxim): Refactor SLA management to build groups dynamically from + // all available ResourceType values. + private static Double fromBag(ResourceBag bag, ResourceType type) { + return bag.getResourceVectors().get(type); + } } /** http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java index 03dfa27..1d1415a 100644 --- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java +++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java @@ -25,24 +25,22 @@ import com.google.inject.PrivateModule; import org.apache.aurora.common.args.Arg; import org.apache.aurora.common.args.CmdLine; import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Data; import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.gen.ResourceAggregate; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.SchedulerServicesModule; import org.apache.aurora.scheduler.base.Conversions; import org.apache.aurora.scheduler.offers.OfferManager; -import org.apache.aurora.scheduler.resources.ResourceSlot; -import org.apache.aurora.scheduler.resources.Resources; +import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResource; import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResourceProvider; -import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; import static java.util.Objects.requireNonNull; -import static org.apache.aurora.scheduler.resources.ResourceSlot.NONE; -import static org.apache.aurora.scheduler.resources.Resources.NON_REVOCABLE; -import static org.apache.aurora.scheduler.resources.Resources.REVOCABLE; +import static org.apache.aurora.scheduler.resources.ResourceBag.IS_MESOS_REVOCABLE; +import static org.apache.aurora.scheduler.resources.ResourceBag.IS_POSITIVE; +import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources; +import static org.apache.aurora.scheduler.resources.ResourceManager.getNonRevocableOfferResources; +import static org.apache.aurora.scheduler.resources.ResourceManager.getRevocableOfferResources; /** * Module to configure export of cluster-wide resource allocation and consumption statistics. @@ -152,30 +150,26 @@ public class AsyncStatsModule extends AbstractModule { ImmutableList.Builder<MachineResource> builder = ImmutableList.builder(); for (HostOffer offer : offers) { - ResourceSlot revocable = Resources.from(offer.getOffer()).filter(REVOCABLE).slot(); - ResourceSlot nonRevocable = - Resources.from(offer.getOffer()).filter(NON_REVOCABLE).slot(); + ResourceBag revocable = bagFromMesosResources(getRevocableOfferResources(offer.getOffer())); + ResourceBag nonRevocable = + bagFromMesosResources(getNonRevocableOfferResources(offer.getOffer())); boolean isDedicated = Conversions.isDedicated(offer.getOffer()); - // It's insufficient to compare revocable against NONE here as RAM, DISK and PORTS + // It's insufficient to compare revocable against EMPTY here as RAM, DISK and PORTS // are always rolled in to revocable as non-compressible resources. Only if revocable // CPU is non-zero should we expose the revocable resources as aggregates. - if (revocable.getNumCpus() > 0.0) { - builder.add(new MachineResource(fromSlot(revocable), isDedicated, true)); + if (revocable.getResourceVectors().entrySet().stream() + .filter(IS_POSITIVE.and(IS_MESOS_REVOCABLE)) + .findFirst() + .isPresent()) { + builder.add(new MachineResource(revocable, isDedicated, true)); } - if (!nonRevocable.equals(NONE)) { - builder.add(new MachineResource(fromSlot(nonRevocable), isDedicated, false)); + if (!nonRevocable.equals(ResourceBag.EMPTY)) { + builder.add(new MachineResource(nonRevocable, isDedicated, false)); } } return builder.build(); } - - private static IResourceAggregate fromSlot(ResourceSlot slot) { - return IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(slot.getNumCpus()) - .setRamMb(slot.getRam().as(Data.MB)) - .setDiskMb(slot.getDisk().as(Data.MB))); - } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java index 1f71b00..a3ca9a9 100644 --- a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java +++ b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java @@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.stats; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import javax.inject.Inject; @@ -24,22 +25,27 @@ import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Ordering; -import org.apache.aurora.scheduler.resources.ResourceAggregates; -import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; +import org.apache.aurora.scheduler.resources.ResourceBag; import static java.util.Objects.requireNonNull; +import static org.apache.aurora.scheduler.resources.ResourceBag.LARGE; +import static org.apache.aurora.scheduler.resources.ResourceBag.MEDIUM; +import static org.apache.aurora.scheduler.resources.ResourceBag.SMALL; +import static org.apache.aurora.scheduler.resources.ResourceBag.XLARGE; + /** * A stat computer that aggregates the number of 'slots' available at different pre-determined * slot sizes, broken down by dedicated and non-dedicated hosts. */ class SlotSizeCounter implements Runnable { - private static final Map<String, IResourceAggregate> SLOT_SIZES = ImmutableMap.of( - "small", ResourceAggregates.SMALL, - "medium", ResourceAggregates.MEDIUM, - "large", ResourceAggregates.LARGE, - "xlarge", ResourceAggregates.XLARGE); + private static final Map<String, ResourceBag> SLOT_SIZES = ImmutableMap.of( + "small", SMALL, + "medium", MEDIUM, + "large", LARGE, + "xlarge", XLARGE); // Ensures all counters are always initialized regardless of the Resource availability. private static final Iterable<String> SLOT_GROUPS = ImmutableList.of( @@ -49,13 +55,13 @@ class SlotSizeCounter implements Runnable { getPrefix(true, true) ); - private final Map<String, IResourceAggregate> slotSizes; + private final Map<String, ResourceBag> slotSizes; private final MachineResourceProvider machineResourceProvider; private final CachedCounters cachedCounters; @VisibleForTesting SlotSizeCounter( - final Map<String, IResourceAggregate> slotSizes, + final Map<String, ResourceBag> slotSizes, MachineResourceProvider machineResourceProvider, CachedCounters cachedCounters) { @@ -65,17 +71,17 @@ class SlotSizeCounter implements Runnable { } static class MachineResource { - private final IResourceAggregate size; + private final ResourceBag size; private final boolean dedicated; private final boolean revocable; - MachineResource(IResourceAggregate size, boolean dedicated, boolean revocable) { + MachineResource(ResourceBag size, boolean dedicated, boolean revocable) { this.size = requireNonNull(size); this.dedicated = dedicated; this.revocable = revocable; } - public IResourceAggregate getSize() { + public ResourceBag getSize() { return size; } @@ -125,9 +131,12 @@ class SlotSizeCounter implements Runnable { return getPrefix(dedicated, revocable) + slotName; } - private int countSlots(Iterable<IResourceAggregate> slots, final IResourceAggregate slotSize) { - Function<IResourceAggregate, Integer> counter = - machineSlack -> ResourceAggregates.divide(machineSlack, slotSize); + private int countSlots(Iterable<ResourceBag> slots, final ResourceBag slotSize) { + Function<ResourceBag, Integer> counter = machineSlack -> Ordering.natural().min( + machineSlack.divide(slotSize).getResourceVectors().entrySet().stream() + .map(entry -> entry.getValue()) + .collect(Collectors.toSet())) + .intValue(); int sum = 0; for (int slotCount : FluentIterable.from(slots).transform(counter)) { @@ -139,14 +148,14 @@ class SlotSizeCounter implements Runnable { private void updateStats( String name, Iterable<MachineResource> slots, - IResourceAggregate slotSize) { + ResourceBag slotSize) { - ImmutableMultimap.Builder<String, IResourceAggregate> builder = ImmutableMultimap.builder(); + ImmutableMultimap.Builder<String, ResourceBag> builder = ImmutableMultimap.builder(); for (MachineResource slot : slots) { builder.put(getStatName(name, slot.isDedicated(), slot.isRevocable()), slot.getSize()); } - ImmutableMultimap<String, IResourceAggregate> sizes = builder.build(); + ImmutableMultimap<String, ResourceBag> sizes = builder.build(); for (String slotGroup : SLOT_GROUPS) { String statName = slotGroup + name; http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java index d1c62a8..0a307fe 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java @@ -25,6 +25,7 @@ import org.apache.aurora.gen.Resource; import org.apache.aurora.gen.ResourceAggregate; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.quota.QuotaManager; import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobUpdate; @@ -119,7 +120,7 @@ public final class ThriftBackfill { aggregate.addToResources(Resource.ramMb(aggregate.getRamMb())); aggregate.addToResources(Resource.diskMb(aggregate.getDiskMb())); } else { - EnumSet<ResourceType> quotaResources = EnumSet.of(CPUS, RAM_MB, DISK_MB); + EnumSet<ResourceType> quotaResources = QuotaManager.QUOTA_RESOURCE_TYPES; if (aggregate.getResources().size() > quotaResources.size()) { throw new IllegalArgumentException("Too many resource values in quota."); } http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java index bab34d8..0d4f044 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/ReadOnlySchedulerImpl.java @@ -103,6 +103,7 @@ import static java.util.Objects.requireNonNull; import static org.apache.aurora.gen.ResponseCode.INVALID_REQUEST; import static org.apache.aurora.scheduler.base.Numbers.convertRanges; import static org.apache.aurora.scheduler.base.Numbers.toRanges; +import static org.apache.aurora.scheduler.resources.ResourceManager.aggregateFromBag; import static org.apache.aurora.scheduler.thrift.Responses.error; import static org.apache.aurora.scheduler.thrift.Responses.invalidRequest; import static org.apache.aurora.scheduler.thrift.Responses.ok; @@ -281,12 +282,16 @@ class ReadOnlySchedulerImpl implements ReadOnlyScheduler.Iface { MorePreconditions.checkNotBlank(ownerRole); return storage.read(storeProvider -> { QuotaInfo quotaInfo = quotaManager.getQuotaInfo(ownerRole, storeProvider); - GetQuotaResult result = new GetQuotaResult(quotaInfo.getQuota().newBuilder()) - .setProdSharedConsumption(quotaInfo.getProdSharedConsumption().newBuilder()) - .setProdDedicatedConsumption(quotaInfo.getProdDedicatedConsumption().newBuilder()) - .setNonProdSharedConsumption(quotaInfo.getNonProdSharedConsumption().newBuilder()) - .setNonProdDedicatedConsumption( - quotaInfo.getNonProdDedicatedConsumption().newBuilder()); + GetQuotaResult result = new GetQuotaResult() + .setQuota(aggregateFromBag(quotaInfo.getQuota()).newBuilder()) + .setProdSharedConsumption(aggregateFromBag( + quotaInfo.getProdSharedConsumption()).newBuilder()) + .setProdDedicatedConsumption(aggregateFromBag( + quotaInfo.getProdDedicatedConsumption()).newBuilder()) + .setNonProdSharedConsumption(aggregateFromBag( + quotaInfo.getNonProdSharedConsumption()).newBuilder()) + .setNonProdDedicatedConsumption(aggregateFromBag( + quotaInfo.getNonProdDedicatedConsumption()).newBuilder()); return ok(Result.getQuotaResult(result)); }); http://git-wip-us.apache.org/repos/asf/aurora/blob/3687c6a1/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java index d989900..b6aee57 100644 --- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java +++ b/src/test/java/org/apache/aurora/scheduler/quota/QuotaCheckResultTest.java @@ -13,12 +13,15 @@ */ package org.apache.aurora.scheduler.quota; -import org.apache.aurora.gen.ResourceAggregate; -import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; import org.junit.Test; import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA; import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA; +import static org.apache.aurora.scheduler.quota.QuotaCheckResult.greaterOrEqual; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.bag; +import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; +import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; +import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -26,61 +29,31 @@ public class QuotaCheckResultTest { @Test public void testGreaterOrEqualPass() { - IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(1.0) - .setRamMb(256L) - .setDiskMb(512L)); - IResourceAggregate request = IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(1.0) - .setRamMb(256L) - .setDiskMb(512L)); - assertEquals(SUFFICIENT_QUOTA, QuotaCheckResult.greaterOrEqual(quota, request).getResult()); + assertEquals( + SUFFICIENT_QUOTA, + greaterOrEqual(bag(1.0, 256, 512), bag(1.0, 256, 512)).getResult()); } @Test public void testGreaterOrEqualFailsCpu() { - IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(1.0) - .setRamMb(256L) - .setDiskMb(512L)); - IResourceAggregate request = IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(2.0) - .setRamMb(256L) - .setDiskMb(512L)); - QuotaCheckResult result = QuotaCheckResult.greaterOrEqual(quota, request); + QuotaCheckResult result = greaterOrEqual(bag(1.0, 256, 512), bag(2.0, 256, 512)); assertEquals(INSUFFICIENT_QUOTA, result.getResult()); - assertTrue(result.getDetails().get().contains("CPU")); + assertTrue(result.getDetails().get().contains(CPUS.getAuroraName())); } @Test public void testGreaterOrEqualFailsRam() { - IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(1.0) - .setRamMb(256L) - .setDiskMb(512L)); - IResourceAggregate request = IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(1.0) - .setRamMb(512L) - .setDiskMb(512L)); - QuotaCheckResult result = QuotaCheckResult.greaterOrEqual(quota, request); + QuotaCheckResult result = greaterOrEqual(bag(1.0, 256, 512), bag(1.0, 512, 512)); assertEquals(INSUFFICIENT_QUOTA, result.getResult()); assertTrue(result.getDetails().get().length() > 0); - assertTrue(result.getDetails().get().contains("RAM")); + assertTrue(result.getDetails().get().contains(RAM_MB.getAuroraName())); } @Test public void testGreaterOrEqualFailsDisk() { - IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(1.0) - .setRamMb(256L) - .setDiskMb(512L)); - IResourceAggregate request = IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(1.0) - .setRamMb(256L) - .setDiskMb(1024L)); - QuotaCheckResult result = QuotaCheckResult.greaterOrEqual(quota, request); + QuotaCheckResult result = greaterOrEqual(bag(1.0, 256, 512), bag(1.0, 256, 1024)); assertEquals(INSUFFICIENT_QUOTA, result.getResult()); assertTrue(result.getDetails().get().length() > 0); - assertTrue(result.getDetails().get().contains("DISK")); + assertTrue(result.getDetails().get().contains(DISK_MB.getAuroraName())); } }
