Moving resource-related classes into a new package Reviewed at https://reviews.apache.org/r/46051/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/46ce98d8 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/46ce98d8 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/46ce98d8 Branch: refs/heads/master Commit: 46ce98d8c64d866cc0a94445606f0e00cfbeb649 Parents: bafdd71 Author: Maxim Khutornenko <[email protected]> Authored: Mon Apr 11 14:45:39 2016 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Mon Apr 11 14:45:39 2016 -0700 ---------------------------------------------------------------------- .../org/apache/aurora/benchmark/Offers.java | 2 +- .../apache/aurora/scheduler/AcceptedOffer.java | 234 ------------- .../aurora/scheduler/ResourceAggregates.java | 68 ---- .../apache/aurora/scheduler/ResourceSlot.java | 341 ------------------ .../apache/aurora/scheduler/ResourceType.java | 54 --- .../org/apache/aurora/scheduler/Resources.java | 185 ---------- .../configuration/executor/ExecutorModule.java | 6 +- .../executor/ExecutorSettings.java | 4 +- .../scheduler/filter/SchedulingFilter.java | 2 +- .../scheduler/filter/SchedulingFilterImpl.java | 2 +- .../scheduler/mesos/MesosTaskFactory.java | 6 +- .../scheduler/mesos/TestExecutorSettings.java | 4 +- .../scheduler/preemptor/PreemptionVictim.java | 2 +- .../preemptor/PreemptionVictimFilter.java | 6 +- .../aurora/scheduler/quota/QuotaManager.java | 4 +- .../scheduler/resources/AcceptedOffer.java | 235 +++++++++++++ .../scheduler/resources/ResourceAggregates.java | 68 ++++ .../scheduler/resources/ResourceSlot.java | 342 +++++++++++++++++++ .../scheduler/resources/ResourceType.java | 54 +++ .../aurora/scheduler/resources/Resources.java | 186 ++++++++++ .../apache/aurora/scheduler/sla/SlaGroup.java | 10 +- .../aurora/scheduler/state/TaskAssigner.java | 2 +- .../scheduler/stats/AsyncStatsModule.java | 10 +- .../aurora/scheduler/stats/SlotSizeCounter.java | 2 +- .../aurora/scheduler/AcceptedOfferTest.java | 304 ----------------- .../aurora/scheduler/ResourceSlotTest.java | 170 --------- .../apache/aurora/scheduler/ResourcesTest.java | 176 ---------- .../aurora/scheduler/app/SchedulerIT.java | 2 +- .../local/simulator/ClusterSimulatorModule.java | 8 +- .../events/NotifyingSchedulingFilterTest.java | 2 +- .../filter/SchedulingFilterImplTest.java | 4 +- .../mesos/MesosTaskFactoryImplTest.java | 8 +- .../apache/aurora/scheduler/mesos/Offers.java | 8 +- .../aurora/scheduler/mesos/TaskExecutors.java | 2 +- .../preemptor/PreemptionVictimFilterTest.java | 4 +- .../scheduler/quota/QuotaManagerImplTest.java | 2 +- .../scheduler/resources/AcceptedOfferTest.java | 305 +++++++++++++++++ .../scheduler/resources/ResourceSlotTest.java | 170 +++++++++ .../scheduler/resources/ResourcesTest.java | 176 ++++++++++ .../scheduler/state/TaskAssignerImplTest.java | 2 +- .../scheduler/stats/AsyncStatsModuleTest.java | 2 +- .../scheduler/stats/SlotSizeCounterTest.java | 2 +- .../storage/log/SnapshotStoreImplIT.java | 2 +- .../thrift/ReadOnlySchedulerImplTest.java | 8 +- 44 files changed, 1595 insertions(+), 1591 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/jmh/java/org/apache/aurora/benchmark/Offers.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/Offers.java b/src/jmh/java/org/apache/aurora/benchmark/Offers.java index 4c232f6..c22b791 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/Offers.java +++ b/src/jmh/java/org/apache/aurora/benchmark/Offers.java @@ -20,8 +20,8 @@ import com.google.common.collect.ImmutableSet; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Data; import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.ResourceSlot; import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.resources.ResourceSlot; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.mesos.Protos; http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/AcceptedOffer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/AcceptedOffer.java b/src/main/java/org/apache/aurora/scheduler/AcceptedOffer.java deleted file mode 100644 index 9c2dc0b..0000000 --- a/src/main/java/org/apache/aurora/scheduler/AcceptedOffer.java +++ /dev/null @@ -1,234 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler; - -import java.util.List; -import java.util.Set; - -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; - -import org.apache.aurora.common.quantity.Data; -import org.apache.aurora.scheduler.base.Numbers; -import org.apache.mesos.Protos; -import org.apache.mesos.Protos.Offer; -import org.apache.mesos.Protos.Resource; - -import static java.util.Objects.requireNonNull; - -/** - * Allocate resources from an accepted Mesos Offer to TaskInfo and ExecutorInfo. - */ -public final class AcceptedOffer { - - public static final String DEFAULT_ROLE_NAME = "*"; - - /** - * Reserved resource filter. - */ - public static final Predicate<Resource> RESERVED = - e -> e.hasRole() && !e.getRole().equals(DEFAULT_ROLE_NAME); - - /** - * Non reserved resource filter. - */ - public static final Predicate<Resource> NOT_RESERVED = Predicates.not(RESERVED); - - /** - * Helper function to check a resource value is small enough to be considered zero. - */ - public static boolean nearZero(double value) { - return Math.abs(value) < EPSILON; - } - - /** - * Get proper value for {@link org.apache.mesos.Protos.TaskInfo}'s resources. - * @return A list of Resource used for TaskInfo. - */ - public List<Resource> getTaskResources() { - return taskResources; - } - - /** - * Get proper value for {@link org.apache.mesos.Protos.ExecutorInfo}'s resources. - * @return A list of Resource used for ExecutorInfo. - */ - public List<Resource> getExecutorResources() { - return executorResources; - } - - /** - * Use this epsilon value to avoid comparison with zero. - */ - private static final double EPSILON = 1e-6; - - private final List<Resource> taskResources; - private final List<Resource> executorResources; - - public static AcceptedOffer create( - Offer offer, - ResourceSlot taskSlot, - ResourceSlot executorSlot, - Set<Integer> selectedPorts, - TierInfo tierInfo) throws Resources.InsufficientResourcesException { - - List<Resource> reservedFirst = ImmutableList.<Resource>builder() - .addAll(Iterables.filter(offer.getResourcesList(), RESERVED)) - .addAll(Iterables.filter(offer.getResourcesList(), NOT_RESERVED)) - .build(); - - boolean revocable = tierInfo.isRevocable(); - List<Resource.Builder> cpuResources = filterToBuilders( - reservedFirst, - ResourceType.CPUS.getName(), - revocable ? Resources.REVOCABLE : Resources.NON_REVOCABLE); - List<Resource.Builder> memResources = filterToBuilderNonRevocable( - reservedFirst, ResourceType.RAM_MB.getName()); - List<Resource.Builder> diskResources = filterToBuilderNonRevocable( - reservedFirst, ResourceType.DISK_MB.getName()); - List<Resource.Builder> portsResources = filterToBuilderNonRevocable( - reservedFirst, ResourceType.PORTS.getName()); - - List<Resource> taskResources = ImmutableList.<Resource>builder() - .addAll(allocateScalarType(cpuResources, taskSlot.getNumCpus(), revocable)) - .addAll(allocateScalarType(memResources, taskSlot.getRam().as(Data.MB), false)) - .addAll(allocateScalarType(diskResources, taskSlot.getDisk().as(Data.MB), false)) - .addAll(allocateRangeType(portsResources, selectedPorts)) - .build(); - - List<Resource> executorResources = ImmutableList.<Resource>builder() - .addAll(allocateScalarType(cpuResources, executorSlot.getNumCpus(), revocable)) - .addAll(allocateScalarType(memResources, executorSlot.getRam().as(Data.MB), false)) - .addAll(allocateScalarType(diskResources, executorSlot.getDisk().as(Data.MB), false)) - .build(); - - return new AcceptedOffer(taskResources, executorResources); - } - - private AcceptedOffer( - List<Resource> taskResources, - List<Resource> executorResources) { - - this.taskResources = requireNonNull(taskResources); - this.executorResources = requireNonNull(executorResources); - } - - private static List<Resource> allocateRangeType( - List<Resource.Builder> from, - Set<Integer> valueSet) throws Resources.InsufficientResourcesException { - - Set<Integer> leftOver = Sets.newHashSet(valueSet); - ImmutableList.Builder<Resource> result = ImmutableList.<Resource>builder(); - for (Resource.Builder r : from) { - Set<Integer> fromResource = Sets.newHashSet(Iterables.concat( - Iterables.transform(r.getRanges().getRangeList(), Resources.RANGE_TO_MEMBERS))); - Set<Integer> available = Sets.newHashSet(Sets.intersection(leftOver, fromResource)); - if (available.isEmpty()) { - continue; - } - Resource newResource = makeMesosRangeResource(r.build(), available); - result.add(newResource); - leftOver.removeAll(available); - if (leftOver.isEmpty()) { - break; - } - } - if (!leftOver.isEmpty()) { - // NOTE: this will not happen as long as Veto logic from TaskAssigner.maybeAssign is - // consistent. - // Maybe we should consider implementing resource veto with this class to ensure that. - throw new Resources.InsufficientResourcesException( - "Insufficient resource for range type when allocating from offer"); - } - return result.build(); - } - - /** - * Creates a mesos resource of integer ranges from given prototype. - * - * @param prototype Resource prototype. - * @param values Values to translate into ranges. - * @return A new mesos ranges resource. - */ - static Resource makeMesosRangeResource( - Resource prototype, - Set<Integer> values) { - - return Protos.Resource.newBuilder(prototype) - .setRanges(Protos.Value.Ranges.newBuilder() - .addAllRange( - Iterables.transform(Numbers.toRanges(values), ResourceSlot.RANGE_TRANSFORM))) - .build(); - } - - private static List<Resource> allocateScalarType( - List<Resource.Builder> from, - double amount, - boolean revocable) throws Resources.InsufficientResourcesException { - - double remaining = amount; - ImmutableList.Builder<Resource> result = ImmutableList.builder(); - for (Resource.Builder r : from) { - if (nearZero(remaining)) { - break; - } - final double available = r.getScalar().getValue(); - if (nearZero(available)) { - // Skip resource slot that is already used up. - continue; - } - final double used = Math.min(remaining, available); - remaining -= used; - Resource.Builder newResource = - Resource.newBuilder(r.build()) - .setScalar(Protos.Value.Scalar.newBuilder().setValue(used).build()); - if (revocable) { - newResource.setRevocable(Resource.RevocableInfo.newBuilder()); - } - result.add(newResource.build()); - r.getScalarBuilder().setValue(available - used); - } - if (!nearZero(remaining)) { - // NOTE: this will not happen as long as Veto logic from TaskAssigner.maybeAssign is - // consistent. - // Maybe we should consider implementing resource veto with this class to ensure that. - throw new Resources.InsufficientResourcesException( - "Insufficient resource when allocating from offer"); - } - return result.build(); - } - - private static List<Resource.Builder> filterToBuilders( - List<Resource> resources, - String name, - Predicate<Resource> additionalFilter) { - - return FluentIterable.from(resources) - .filter(e -> e.getName().equals(name)) - .filter(additionalFilter) - .transform(Resource::toBuilder) - .toList(); - } - - private static List<Resource.Builder> filterToBuilderNonRevocable( - List<Resource> resources, - String name) { - - return filterToBuilders(resources, name, Resources.NON_REVOCABLE); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/ResourceAggregates.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/ResourceAggregates.java b/src/main/java/org/apache/aurora/scheduler/ResourceAggregates.java deleted file mode 100644 index a6335c5..0000000 --- a/src/main/java/org/apache/aurora/scheduler/ResourceAggregates.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler; - -import com.google.common.collect.Ordering; - -import org.apache.aurora.gen.ResourceAggregate; -import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; - -/** - * Convenience class for normalizing resource measures between tasks and offers. - */ -public final class ResourceAggregates { - - public static final IResourceAggregate EMPTY = - IResourceAggregate.build(new ResourceAggregate(0, 0, 0)); - - public static final IResourceAggregate SMALL = - IResourceAggregate.build(new ResourceAggregate(1.0, 1024, 4096)); - - public static final IResourceAggregate MEDIUM = - IResourceAggregate.build(new ResourceAggregate(4.0, 8192, 16384)); - - public static final IResourceAggregate LARGE = - IResourceAggregate.build(new ResourceAggregate(8.0, 16384, 32768)); - - public static final IResourceAggregate XLARGE = - IResourceAggregate.build(new ResourceAggregate(16.0, 32768, 65536)); - - private ResourceAggregates() { - // Utility class. - } - - /** - * a * m. - */ - public static IResourceAggregate scale(IResourceAggregate a, int m) { - return IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(a.getNumCpus() * m) - .setRamMb(a.getRamMb() * m) - .setDiskMb(a.getDiskMb() * m)); - } - - /** - * a / b. - * <p> - * This calculates how many times {@code b} "fits into" {@code a}. Behavior is undefined when - * {@code b} contains resources with a value of zero. - */ - public static int divide(IResourceAggregate a, IResourceAggregate b) { - return Ordering.natural().min( - a.getNumCpus() / b.getNumCpus(), - (double) a.getRamMb() / b.getRamMb(), - (double) a.getDiskMb() / b.getDiskMb() - ).intValue(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java deleted file mode 100644 index 86f2667..0000000 --- a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java +++ /dev/null @@ -1,341 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler; - -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.function.Consumer; -import java.util.function.Predicate; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Ordering; -import com.google.common.collect.Range; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Data; -import org.apache.aurora.scheduler.base.Numbers; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import org.apache.mesos.Protos; -import org.apache.mesos.Protos.ExecutorInfo; -import org.apache.mesos.Protos.Resource; -import org.apache.mesos.Protos.Resource.Builder; -import org.apache.mesos.Protos.TaskInfo; - -import static java.util.Objects.requireNonNull; - -import static org.apache.aurora.common.quantity.Data.BYTES; -import static org.apache.aurora.scheduler.ResourceType.CPUS; -import static org.apache.aurora.scheduler.ResourceType.DISK_MB; -import static org.apache.aurora.scheduler.ResourceType.RAM_MB; - -/** - * Represents a single task/host aggregate resource vector unaware of any Mesos resource traits. - */ -public final class ResourceSlot { - - private final double numCpus; - private final Amount<Long, Data> disk; - private final Amount<Long, Data> ram; - private final int numPorts; - - /** - * Empty ResourceSlot value. - */ - public static final ResourceSlot NONE = - new ResourceSlot(0, Amount.of(0L, Data.BITS), Amount.of(0L, Data.BITS), 0); - - /** - * Convert {@link com.google.common.collect.Range} to {@link org.apache.mesos.Protos.Value.Range}. - */ - public static final Function<Range<Integer>, Protos.Value.Range> RANGE_TRANSFORM = - input -> Protos.Value.Range.newBuilder() - .setBegin(input.lowerEndpoint()) - .setEnd(input.upperEndpoint()) - .build(); - - public ResourceSlot( - double numCpus, - Amount<Long, Data> ram, - Amount<Long, Data> disk, - int numPorts) { - - this.numCpus = numCpus; - this.ram = requireNonNull(ram); - this.disk = requireNonNull(disk); - this.numPorts = numPorts; - } - - /** - * Extracts the resources required from a task. - * - * @param task Task to get resources from. - * @return The resources required by the task. - */ - public static ResourceSlot from(ITaskConfig task) { - requireNonNull(task); - return new ResourceSlot( - task.getNumCpus(), - Amount.of(task.getRamMb(), Data.MB), - Amount.of(task.getDiskMb(), Data.MB), - task.getRequestedPorts().size()); - } - - /** - * Ensures that the revocable setting on the executor and task CPU resources match. - * - * @param task Task to check for resource type alignment. - * @return A possibly-modified task, with aligned CPU resource types. - */ - public static TaskInfo matchResourceTypes(TaskInfo task) { - TaskInfo.Builder taskBuilder = task.toBuilder(); - - Optional<Resource> revocableTaskCpu = taskBuilder.getResourcesList().stream() - .filter(r -> r.getName().equals(CPUS.getName())) - .filter(Resource::hasRevocable) - .findFirst(); - ExecutorInfo.Builder executorBuilder = taskBuilder.getExecutorBuilder(); - - Consumer<Builder> matchRevocable = builder -> { - if (revocableTaskCpu.isPresent()) { - builder.setRevocable(revocableTaskCpu.get().getRevocable()); - } else { - builder.clearRevocable(); - } - }; - - executorBuilder.getResourcesBuilderList().stream() - .filter(r -> r.getName().equals(CPUS.getName())) - .forEach(matchRevocable); - - return taskBuilder.build(); - } - - /** - * Convenience method for adapting to Mesos resources without applying a port range. - * - * @param tierInfo Task tier info. - * @return Mesos resources. - */ - public List<Protos.Resource> toResourceList(TierInfo tierInfo) { - return ImmutableList.<Protos.Resource>builder() - .add(makeMesosResource(CPUS, numCpus, tierInfo.isRevocable())) - .add(makeMesosResource(DISK_MB, disk.as(Data.MB), false)) - .add(makeMesosResource(RAM_MB, ram.as(Data.MB), false)) - .build(); - } - - /** - * Creates a mesos resource of integer ranges. - * - * @param resourceType Resource type. - * @param values Values to translate into ranges. - * @return A new mesos ranges resource. - */ - @VisibleForTesting - public static Protos.Resource makeMesosRangeResource( - ResourceType resourceType, - Set<Integer> values) { - - return Protos.Resource.newBuilder() - .setName(resourceType.getName()) - .setType(Protos.Value.Type.RANGES) - .setRanges(Protos.Value.Ranges.newBuilder() - .addAllRange(Iterables.transform(Numbers.toRanges(values), RANGE_TRANSFORM))) - .build(); - } - - /** - * Creates a scalar mesos resource. - * - * @param resourceType Resource type. - * @param value Value for the resource. - * @param revocable Flag indicating if this resource is revocable. - * @return A mesos resource. - */ - @VisibleForTesting - static Protos.Resource makeMesosResource( - ResourceType resourceType, - double value, - boolean revocable) { - - Protos.Resource.Builder builder = Protos.Resource.newBuilder() - .setName(resourceType.getName()) - .setType(Protos.Value.Type.SCALAR) - .setScalar(Protos.Value.Scalar.newBuilder().setValue(value)); - - if (revocable) { - builder.setRevocable(Protos.Resource.RevocableInfo.newBuilder()); - } - - return builder.build(); - } - - /** - * Generates a ResourceSlot where each resource component is a max out of the two components. - * - * @param a A resource to compare. - * @param b A resource to compare. - * - * @return Returns a ResourceSlot instance where each component is a max of the two components. - */ - @VisibleForTesting - static ResourceSlot maxElements(ResourceSlot a, ResourceSlot b) { - double maxCPU = Math.max(a.getNumCpus(), b.getNumCpus()); - Amount<Long, Data> maxRAM = Amount.of( - Math.max(a.getRam().as(Data.MB), b.getRam().as(Data.MB)), - Data.MB); - Amount<Long, Data> maxDisk = Amount.of( - Math.max(a.getDisk().as(Data.MB), b.getDisk().as(Data.MB)), - Data.MB); - int maxPorts = Math.max(a.getNumPorts(), b.getNumPorts()); - - return new ResourceSlot(maxCPU, maxRAM, maxDisk, maxPorts); - } - - /** - * Number of CPUs. - * - * @return CPUs. - */ - public double getNumCpus() { - return numCpus; - } - - /** - * Disk amount. - * - * @return Disk. - */ - public Amount<Long, Data> getDisk() { - return disk; - } - - /** - * RAM amount. - * - * @return RAM. - */ - public Amount<Long, Data> getRam() { - return ram; - } - - /** - * Number of ports. - * - * @return Port count. - */ - public int getNumPorts() { - return numPorts; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof ResourceSlot)) { - return false; - } - - ResourceSlot other = (ResourceSlot) o; - return Objects.equals(numCpus, other.numCpus) - && Objects.equals(ram, other.ram) - && Objects.equals(disk, other.disk) - && Objects.equals(numPorts, other.numPorts); - } - - @Override - public int hashCode() { - return Objects.hash(numCpus, ram, disk, numPorts); - } - - /** - * Sums up all resources in {@code slots}. - * - * @param slots Resource slots to sum up. - * @return Sum of all resource slots. - */ - public static ResourceSlot sum(Iterable<ResourceSlot> slots) { - ResourceSlot sum = NONE; - - for (ResourceSlot r : slots) { - sum = sum.add(r); - } - - return sum; - } - - /** - * Adds {@code other}. - * - * @param other Resource slot to add. - * @return Result. - */ - public ResourceSlot add(ResourceSlot other) { - return new ResourceSlot( - getNumCpus() + other.getNumCpus(), - Amount.of(getRam().as(BYTES) + other.getRam().as(BYTES), BYTES), - Amount.of(getDisk().as(BYTES) + other.getDisk().as(BYTES), BYTES), - getNumPorts() + other.getNumPorts()); - } - - /** - * Subtracts {@code other}. - * - * @param other Resource slot to subtract. - * @return Result. - */ - public ResourceSlot subtract(ResourceSlot other) { - return new ResourceSlot( - getNumCpus() - other.getNumCpus(), - Amount.of(getRam().as(BYTES) - other.getRam().as(BYTES), BYTES), - Amount.of(getDisk().as(BYTES) - other.getDisk().as(BYTES), BYTES), - getNumPorts() - other.getNumPorts()); - } - - /** - * A Resources object is greater than another iff _all_ of its resource components are greater - * or equal. A Resources object compares as equal if some but not all components are greater than - * or equal to the other. - */ - public static final Ordering<ResourceSlot> ORDER = new Ordering<ResourceSlot>() { - @Override - public int compare(ResourceSlot left, ResourceSlot right) { - int diskC = left.getDisk().compareTo(right.getDisk()); - int ramC = left.getRam().compareTo(right.getRam()); - int portC = Integer.compare(left.getNumPorts(), right.getNumPorts()); - int cpuC = Double.compare(left.getNumCpus(), right.getNumCpus()); - - List<Integer> vector = ImmutableList.of(diskC, ramC, portC, cpuC); - - if (vector.stream().allMatch(IS_ZERO)) { - return 0; - } - - if (vector.stream().filter(IS_ZERO.negate()).allMatch(e -> e > 0)) { - return 1; - } - - if (vector.stream().filter(IS_ZERO.negate()).allMatch(e -> e < 0)) { - return -1; - } - - return 0; - } - }; - - private static final Predicate<Integer> IS_ZERO = e -> e == 0; -} http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/ResourceType.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/ResourceType.java b/src/main/java/org/apache/aurora/scheduler/ResourceType.java deleted file mode 100644 index b4efc8d..0000000 --- a/src/main/java/org/apache/aurora/scheduler/ResourceType.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler; - -import com.google.common.annotations.VisibleForTesting; - -import static java.util.Objects.requireNonNull; - -/** - * Defines Mesos resource types. - */ -@VisibleForTesting -public enum ResourceType { - /** - * CPU resource. - */ - CPUS("cpus"), - - /** - * RAM resource. - */ - RAM_MB("mem"), - - /** - * DISK resource. - */ - DISK_MB("disk"), - - /** - * Port resource. - */ - PORTS("ports"); - - private final String resourceName; - - ResourceType(String resourceName) { - this.resourceName = requireNonNull(resourceName); - } - - public String getName() { - return resourceName; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/Resources.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/Resources.java b/src/main/java/org/apache/aurora/scheduler/Resources.java deleted file mode 100644 index 4baf9dd..0000000 --- a/src/main/java/org/apache/aurora/scheduler/Resources.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler; - -import java.util.Collections; -import java.util.List; -import java.util.Set; - -import com.google.common.base.Function; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.ContiguousSet; -import com.google.common.collect.DiscreteDomain; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Data; -import org.apache.mesos.Protos.Offer; -import org.apache.mesos.Protos.Resource; -import org.apache.mesos.Protos.Value.Range; - -import static java.util.Objects.requireNonNull; - -import static org.apache.aurora.scheduler.ResourceType.CPUS; -import static org.apache.aurora.scheduler.ResourceType.DISK_MB; -import static org.apache.aurora.scheduler.ResourceType.PORTS; -import static org.apache.aurora.scheduler.ResourceType.RAM_MB; - -/** - * A container for multiple Mesos resource vectors. - */ -public final class Resources { - - /** - * CPU resource filter. - */ - private static final Predicate<Resource> CPU = e -> e.getName().equals(CPUS.getName()); - - /** - * Revocable resource filter. - */ - public static final Predicate<Resource> REVOCABLE = - Predicates.or(Predicates.not(CPU), Predicates.and(CPU, Resource::hasRevocable)); - - /** - * Non-revocable resource filter. - */ - public static final Predicate<Resource> NON_REVOCABLE = Predicates.not(Resource::hasRevocable); - - /** - * Convert range to set of integers. - */ - public static final Function<Range, Set<Integer>> RANGE_TO_MEMBERS = - range -> ContiguousSet.create( - com.google.common.collect.Range.closed((int) range.getBegin(), (int) range.getEnd()), - DiscreteDomain.integers()); - - private final Iterable<Resource> mesosResources; - - private Resources(Iterable<Resource> mesosResources) { - this.mesosResources = ImmutableList.copyOf(mesosResources); - } - - /** - * Extracts the resources available in a slave offer. - * - * @param offer Offer to get resources from. - * @return The resources available in the offer. - */ - public static Resources from(Offer offer) { - return new Resources(requireNonNull(offer.getResourcesList())); - } - - /** - * Filters resources by the provided {@code predicate}. - * - * @param predicate Predicate filter. - * @return A new {@code Resources} object containing only filtered Mesos resources. - */ - public Resources filter(Predicate<Resource> predicate) { - return new Resources(Iterables.filter(mesosResources, predicate)); - } - - /** - * Filters resources using the provided {@code tierInfo} instance. - * - * @param tierInfo Tier info. - * @return A new {@code Resources} object containing only filtered Mesos resources. - */ - public Resources filter(TierInfo tierInfo) { - return filter(tierInfo.isRevocable() ? REVOCABLE : NON_REVOCABLE); - } - - /** - * Gets generalized aggregated resource view. - * - * @return {@code ResourceSlot} instance. - */ - public ResourceSlot slot() { - return new ResourceSlot(getScalarValue(CPUS.getName()), - Amount.of((long) getScalarValue(RAM_MB.getName()), Data.MB), - Amount.of((long) getScalarValue(DISK_MB.getName()), Data.MB), - getNumAvailablePorts()); - } - - /** - * Attempts to grab {@code numPorts} from this resource instance. - * - * @param numPorts The number of ports to grab. - * @return The set of ports grabbed. - * @throws InsufficientResourcesException if not enough ports were available. - */ - public Set<Integer> getPorts(int numPorts) - throws InsufficientResourcesException { - - if (numPorts == 0) { - return ImmutableSet.of(); - } - - List<Integer> availablePorts = Lists.newArrayList(Sets.newHashSet(Iterables.concat( - Iterables.transform(getPortRanges(), RANGE_TO_MEMBERS)))); - - if (availablePorts.size() < numPorts) { - throw new InsufficientResourcesException( - String.format("Could not get %d ports from %s", numPorts, availablePorts)); - } - - Collections.shuffle(availablePorts); - return ImmutableSet.copyOf(availablePorts.subList(0, numPorts)); - } - - private int getNumAvailablePorts() { - int offeredPorts = 0; - for (Range range : getPortRanges()) { - offeredPorts += 1 + range.getEnd() - range.getBegin(); - } - return offeredPorts; - } - - private double getScalarValue(String key) { - Iterable<Resource> resources = getResources(key); - double value = 0; - for (Resource r : resources) { - value += r.getScalar().getValue(); - } - return value; - } - - private Iterable<Resource> getResources(String key) { - return Iterables.filter(mesosResources, e -> e.getName().equals(key)); - } - - private Iterable<Range> getPortRanges() { - ImmutableList.Builder<Range> ranges = ImmutableList.builder(); - for (Resource r : getResources(PORTS.getName())) { - ranges.addAll(r.getRanges().getRangeList().iterator()); - } - - return ranges.build(); - } - - /** - * Thrown when there are insufficient resources to satisfy a request. - */ - public static class InsufficientResourcesException extends RuntimeException { - InsufficientResourcesException(String message) { - super(message); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorModule.java b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorModule.java index 1fe27a5..dd9e12b 100644 --- a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorModule.java +++ b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorModule.java @@ -35,7 +35,7 @@ import org.apache.aurora.common.base.MorePreconditions; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Data; import org.apache.aurora.gen.Volume; -import org.apache.aurora.scheduler.ResourceType; +import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.mesos.Protos; import org.apache.mesos.Protos.CommandInfo; import org.apache.mesos.Protos.CommandInfo.URI; @@ -44,8 +44,8 @@ import org.apache.mesos.Protos.Resource; import org.apache.mesos.Protos.Value.Scalar; import org.apache.mesos.Protos.Value.Type; -import static org.apache.aurora.scheduler.ResourceType.CPUS; -import static org.apache.aurora.scheduler.ResourceType.RAM_MB; +import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; +import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; /** * Binding module for {@link ExecutorSettings}. http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java index e4279b1..78e7be9 100644 --- a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java +++ b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java @@ -17,8 +17,8 @@ import java.util.Objects; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Data; -import org.apache.aurora.scheduler.ResourceSlot; -import org.apache.aurora.scheduler.ResourceType; +import org.apache.aurora.scheduler.resources.ResourceSlot; +import org.apache.aurora.scheduler.resources.ResourceType; import static java.util.Objects.requireNonNull; http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java index 1e8eb0c..625e6d5 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java @@ -18,7 +18,7 @@ import java.util.Set; import com.google.common.base.MoreObjects; -import org.apache.aurora.scheduler.ResourceSlot; +import org.apache.aurora.scheduler.resources.ResourceSlot; import org.apache.aurora.scheduler.storage.entities.IConstraint; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java index e9ee049..f8c57f9 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java @@ -29,9 +29,9 @@ import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Data; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.TaskConstraint; -import org.apache.aurora.scheduler.ResourceSlot; import org.apache.aurora.scheduler.configuration.ConfigurationManager; import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; +import org.apache.aurora.scheduler.resources.ResourceSlot; import org.apache.aurora.scheduler.storage.entities.IAttribute; import org.apache.aurora.scheduler.storage.entities.IConstraint; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java index fb7c7b2..b325106 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java @@ -27,14 +27,14 @@ import com.google.protobuf.ByteString; import org.apache.aurora.GuavaUtils; import org.apache.aurora.Protobufs; import org.apache.aurora.codec.ThriftBinaryCodec; -import org.apache.aurora.scheduler.AcceptedOffer; -import org.apache.aurora.scheduler.ResourceSlot; -import org.apache.aurora.scheduler.Resources; import org.apache.aurora.scheduler.TierManager; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.SchedulerException; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; +import org.apache.aurora.scheduler.resources.AcceptedOffer; +import org.apache.aurora.scheduler.resources.ResourceSlot; +import org.apache.aurora.scheduler.resources.Resources; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IDockerContainer; import org.apache.aurora.scheduler.storage.entities.IJobKey; http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/mesos/TestExecutorSettings.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/TestExecutorSettings.java b/src/main/java/org/apache/aurora/scheduler/mesos/TestExecutorSettings.java index 8cef410..1d252bb 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/TestExecutorSettings.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/TestExecutorSettings.java @@ -15,12 +15,12 @@ package org.apache.aurora.scheduler.mesos; import com.google.common.collect.ImmutableList; -import org.apache.aurora.scheduler.ResourceSlot; -import org.apache.aurora.scheduler.ResourceType; import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig; import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; import org.apache.aurora.scheduler.configuration.executor.Executors; +import org.apache.aurora.scheduler.resources.ResourceSlot; +import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.mesos.Protos.CommandInfo; import org.apache.mesos.Protos.CommandInfo.URI; import org.apache.mesos.Protos.ExecutorInfo; http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java index 7d92843..98be997 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java @@ -17,7 +17,7 @@ import java.util.Objects; import com.google.common.base.MoreObjects; -import org.apache.aurora.scheduler.ResourceSlot; +import org.apache.aurora.scheduler.resources.ResourceSlot; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java index d3b7963..9a37ee7 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java @@ -28,8 +28,6 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.ResourceSlot; -import org.apache.aurora.scheduler.Resources; import org.apache.aurora.scheduler.TierManager; import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; import org.apache.aurora.scheduler.filter.AttributeAggregate; @@ -37,13 +35,15 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; +import org.apache.aurora.scheduler.resources.ResourceSlot; +import org.apache.aurora.scheduler.resources.Resources; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import static java.util.Objects.requireNonNull; -import static org.apache.aurora.scheduler.ResourceSlot.sum; +import static org.apache.aurora.scheduler.resources.ResourceSlot.sum; /** * Filters active tasks (victims) and available offer (slack) resources that can accommodate a http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java index c18836a..bf476aa 100644 --- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java +++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java @@ -31,11 +31,11 @@ import com.google.common.collect.RangeSet; import org.apache.aurora.gen.JobUpdateQuery; import org.apache.aurora.gen.ResourceAggregate; -import org.apache.aurora.scheduler.ResourceAggregates; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.configuration.ConfigurationManager; +import org.apache.aurora.scheduler.resources.ResourceAggregates; import org.apache.aurora.scheduler.storage.JobUpdateStore; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; @@ -62,8 +62,8 @@ import static com.google.common.base.Predicates.in; import static com.google.common.base.Predicates.not; import static com.google.common.base.Predicates.or; -import static org.apache.aurora.scheduler.ResourceAggregates.EMPTY; import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.SUFFICIENT_QUOTA; +import static org.apache.aurora.scheduler.resources.ResourceAggregates.EMPTY; import static org.apache.aurora.scheduler.updater.Updates.getInstanceIds; /** http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java b/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java new file mode 100644 index 0000000..6a5237f --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/resources/AcceptedOffer.java @@ -0,0 +1,235 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.resources; + +import java.util.List; +import java.util.Set; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +import org.apache.aurora.common.quantity.Data; +import org.apache.aurora.scheduler.TierInfo; +import org.apache.aurora.scheduler.base.Numbers; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.Offer; +import org.apache.mesos.Protos.Resource; + +import static java.util.Objects.requireNonNull; + +/** + * Allocate resources from an accepted Mesos Offer to TaskInfo and ExecutorInfo. + */ +public final class AcceptedOffer { + + public static final String DEFAULT_ROLE_NAME = "*"; + + /** + * Reserved resource filter. + */ + public static final Predicate<Resource> RESERVED = + e -> e.hasRole() && !e.getRole().equals(DEFAULT_ROLE_NAME); + + /** + * Non reserved resource filter. + */ + public static final Predicate<Resource> NOT_RESERVED = Predicates.not(RESERVED); + + /** + * Helper function to check a resource value is small enough to be considered zero. + */ + public static boolean nearZero(double value) { + return Math.abs(value) < EPSILON; + } + + /** + * Get proper value for {@link org.apache.mesos.Protos.TaskInfo}'s resources. + * @return A list of Resource used for TaskInfo. + */ + public List<Resource> getTaskResources() { + return taskResources; + } + + /** + * Get proper value for {@link org.apache.mesos.Protos.ExecutorInfo}'s resources. + * @return A list of Resource used for ExecutorInfo. + */ + public List<Resource> getExecutorResources() { + return executorResources; + } + + /** + * Use this epsilon value to avoid comparison with zero. + */ + private static final double EPSILON = 1e-6; + + private final List<Resource> taskResources; + private final List<Resource> executorResources; + + public static AcceptedOffer create( + Offer offer, + ResourceSlot taskSlot, + ResourceSlot executorSlot, + Set<Integer> selectedPorts, + TierInfo tierInfo) throws Resources.InsufficientResourcesException { + + List<Resource> reservedFirst = ImmutableList.<Resource>builder() + .addAll(Iterables.filter(offer.getResourcesList(), RESERVED)) + .addAll(Iterables.filter(offer.getResourcesList(), NOT_RESERVED)) + .build(); + + boolean revocable = tierInfo.isRevocable(); + List<Resource.Builder> cpuResources = filterToBuilders( + reservedFirst, + ResourceType.CPUS.getName(), + revocable ? Resources.REVOCABLE : Resources.NON_REVOCABLE); + List<Resource.Builder> memResources = filterToBuilderNonRevocable( + reservedFirst, ResourceType.RAM_MB.getName()); + List<Resource.Builder> diskResources = filterToBuilderNonRevocable( + reservedFirst, ResourceType.DISK_MB.getName()); + List<Resource.Builder> portsResources = filterToBuilderNonRevocable( + reservedFirst, ResourceType.PORTS.getName()); + + List<Resource> taskResources = ImmutableList.<Resource>builder() + .addAll(allocateScalarType(cpuResources, taskSlot.getNumCpus(), revocable)) + .addAll(allocateScalarType(memResources, taskSlot.getRam().as(Data.MB), false)) + .addAll(allocateScalarType(diskResources, taskSlot.getDisk().as(Data.MB), false)) + .addAll(allocateRangeType(portsResources, selectedPorts)) + .build(); + + List<Resource> executorResources = ImmutableList.<Resource>builder() + .addAll(allocateScalarType(cpuResources, executorSlot.getNumCpus(), revocable)) + .addAll(allocateScalarType(memResources, executorSlot.getRam().as(Data.MB), false)) + .addAll(allocateScalarType(diskResources, executorSlot.getDisk().as(Data.MB), false)) + .build(); + + return new AcceptedOffer(taskResources, executorResources); + } + + private AcceptedOffer( + List<Resource> taskResources, + List<Resource> executorResources) { + + this.taskResources = requireNonNull(taskResources); + this.executorResources = requireNonNull(executorResources); + } + + private static List<Resource> allocateRangeType( + List<Resource.Builder> from, + Set<Integer> valueSet) throws Resources.InsufficientResourcesException { + + Set<Integer> leftOver = Sets.newHashSet(valueSet); + ImmutableList.Builder<Resource> result = ImmutableList.<Resource>builder(); + for (Resource.Builder r : from) { + Set<Integer> fromResource = Sets.newHashSet(Iterables.concat( + Iterables.transform(r.getRanges().getRangeList(), Resources.RANGE_TO_MEMBERS))); + Set<Integer> available = Sets.newHashSet(Sets.intersection(leftOver, fromResource)); + if (available.isEmpty()) { + continue; + } + Resource newResource = makeMesosRangeResource(r.build(), available); + result.add(newResource); + leftOver.removeAll(available); + if (leftOver.isEmpty()) { + break; + } + } + if (!leftOver.isEmpty()) { + // NOTE: this will not happen as long as Veto logic from TaskAssigner.maybeAssign is + // consistent. + // Maybe we should consider implementing resource veto with this class to ensure that. + throw new Resources.InsufficientResourcesException( + "Insufficient resource for range type when allocating from offer"); + } + return result.build(); + } + + /** + * Creates a mesos resource of integer ranges from given prototype. + * + * @param prototype Resource prototype. + * @param values Values to translate into ranges. + * @return A new mesos ranges resource. + */ + static Resource makeMesosRangeResource( + Resource prototype, + Set<Integer> values) { + + return Protos.Resource.newBuilder(prototype) + .setRanges(Protos.Value.Ranges.newBuilder() + .addAllRange( + Iterables.transform(Numbers.toRanges(values), ResourceSlot.RANGE_TRANSFORM))) + .build(); + } + + private static List<Resource> allocateScalarType( + List<Resource.Builder> from, + double amount, + boolean revocable) throws Resources.InsufficientResourcesException { + + double remaining = amount; + ImmutableList.Builder<Resource> result = ImmutableList.builder(); + for (Resource.Builder r : from) { + if (nearZero(remaining)) { + break; + } + final double available = r.getScalar().getValue(); + if (nearZero(available)) { + // Skip resource slot that is already used up. + continue; + } + final double used = Math.min(remaining, available); + remaining -= used; + Resource.Builder newResource = + Resource.newBuilder(r.build()) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(used).build()); + if (revocable) { + newResource.setRevocable(Resource.RevocableInfo.newBuilder()); + } + result.add(newResource.build()); + r.getScalarBuilder().setValue(available - used); + } + if (!nearZero(remaining)) { + // NOTE: this will not happen as long as Veto logic from TaskAssigner.maybeAssign is + // consistent. + // Maybe we should consider implementing resource veto with this class to ensure that. + throw new Resources.InsufficientResourcesException( + "Insufficient resource when allocating from offer"); + } + return result.build(); + } + + private static List<Resource.Builder> filterToBuilders( + List<Resource> resources, + String name, + Predicate<Resource> additionalFilter) { + + return FluentIterable.from(resources) + .filter(e -> e.getName().equals(name)) + .filter(additionalFilter) + .transform(Resource::toBuilder) + .toList(); + } + + private static List<Resource.Builder> filterToBuilderNonRevocable( + List<Resource> resources, + String name) { + + return filterToBuilders(resources, name, Resources.NON_REVOCABLE); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/resources/ResourceAggregates.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceAggregates.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceAggregates.java new file mode 100644 index 0000000..302eb87 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceAggregates.java @@ -0,0 +1,68 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.resources; + +import com.google.common.collect.Ordering; + +import org.apache.aurora.gen.ResourceAggregate; +import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; + +/** + * Convenience class for normalizing resource measures between tasks and offers. + */ +public final class ResourceAggregates { + + public static final IResourceAggregate EMPTY = + IResourceAggregate.build(new ResourceAggregate(0, 0, 0)); + + public static final IResourceAggregate SMALL = + IResourceAggregate.build(new ResourceAggregate(1.0, 1024, 4096)); + + public static final IResourceAggregate MEDIUM = + IResourceAggregate.build(new ResourceAggregate(4.0, 8192, 16384)); + + public static final IResourceAggregate LARGE = + IResourceAggregate.build(new ResourceAggregate(8.0, 16384, 32768)); + + public static final IResourceAggregate XLARGE = + IResourceAggregate.build(new ResourceAggregate(16.0, 32768, 65536)); + + private ResourceAggregates() { + // Utility class. + } + + /** + * a * m. + */ + public static IResourceAggregate scale(IResourceAggregate a, int m) { + return IResourceAggregate.build(new ResourceAggregate() + .setNumCpus(a.getNumCpus() * m) + .setRamMb(a.getRamMb() * m) + .setDiskMb(a.getDiskMb() * m)); + } + + /** + * a / b. + * <p> + * This calculates how many times {@code b} "fits into" {@code a}. Behavior is undefined when + * {@code b} contains resources with a value of zero. + */ + public static int divide(IResourceAggregate a, IResourceAggregate b) { + return Ordering.natural().min( + a.getNumCpus() / b.getNumCpus(), + (double) a.getRamMb() / b.getRamMb(), + (double) a.getDiskMb() / b.getDiskMb() + ).intValue(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java new file mode 100644 index 0000000..43696d2 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceSlot.java @@ -0,0 +1,342 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.resources; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Predicate; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; +import com.google.common.collect.Range; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Data; +import org.apache.aurora.scheduler.TierInfo; +import org.apache.aurora.scheduler.base.Numbers; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.ExecutorInfo; +import org.apache.mesos.Protos.Resource; +import org.apache.mesos.Protos.Resource.Builder; +import org.apache.mesos.Protos.TaskInfo; + +import static java.util.Objects.requireNonNull; + +import static org.apache.aurora.common.quantity.Data.BYTES; +import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; +import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; +import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; + +/** + * Represents a single task/host aggregate resource vector unaware of any Mesos resource traits. + */ +public final class ResourceSlot { + + private final double numCpus; + private final Amount<Long, Data> disk; + private final Amount<Long, Data> ram; + private final int numPorts; + + /** + * Empty ResourceSlot value. + */ + public static final ResourceSlot NONE = + new ResourceSlot(0, Amount.of(0L, Data.BITS), Amount.of(0L, Data.BITS), 0); + + /** + * Convert {@link com.google.common.collect.Range} to {@link org.apache.mesos.Protos.Value.Range}. + */ + public static final Function<Range<Integer>, Protos.Value.Range> RANGE_TRANSFORM = + input -> Protos.Value.Range.newBuilder() + .setBegin(input.lowerEndpoint()) + .setEnd(input.upperEndpoint()) + .build(); + + public ResourceSlot( + double numCpus, + Amount<Long, Data> ram, + Amount<Long, Data> disk, + int numPorts) { + + this.numCpus = numCpus; + this.ram = requireNonNull(ram); + this.disk = requireNonNull(disk); + this.numPorts = numPorts; + } + + /** + * Extracts the resources required from a task. + * + * @param task Task to get resources from. + * @return The resources required by the task. + */ + public static ResourceSlot from(ITaskConfig task) { + requireNonNull(task); + return new ResourceSlot( + task.getNumCpus(), + Amount.of(task.getRamMb(), Data.MB), + Amount.of(task.getDiskMb(), Data.MB), + task.getRequestedPorts().size()); + } + + /** + * Ensures that the revocable setting on the executor and task CPU resources match. + * + * @param task Task to check for resource type alignment. + * @return A possibly-modified task, with aligned CPU resource types. + */ + public static TaskInfo matchResourceTypes(TaskInfo task) { + TaskInfo.Builder taskBuilder = task.toBuilder(); + + Optional<Resource> revocableTaskCpu = taskBuilder.getResourcesList().stream() + .filter(r -> r.getName().equals(CPUS.getName())) + .filter(Resource::hasRevocable) + .findFirst(); + ExecutorInfo.Builder executorBuilder = taskBuilder.getExecutorBuilder(); + + Consumer<Builder> matchRevocable = builder -> { + if (revocableTaskCpu.isPresent()) { + builder.setRevocable(revocableTaskCpu.get().getRevocable()); + } else { + builder.clearRevocable(); + } + }; + + executorBuilder.getResourcesBuilderList().stream() + .filter(r -> r.getName().equals(CPUS.getName())) + .forEach(matchRevocable); + + return taskBuilder.build(); + } + + /** + * Convenience method for adapting to Mesos resources without applying a port range. + * + * @param tierInfo Task tier info. + * @return Mesos resources. + */ + public List<Protos.Resource> toResourceList(TierInfo tierInfo) { + return ImmutableList.<Protos.Resource>builder() + .add(makeMesosResource(CPUS, numCpus, tierInfo.isRevocable())) + .add(makeMesosResource(DISK_MB, disk.as(Data.MB), false)) + .add(makeMesosResource(RAM_MB, ram.as(Data.MB), false)) + .build(); + } + + /** + * Creates a mesos resource of integer ranges. + * + * @param resourceType Resource type. + * @param values Values to translate into ranges. + * @return A new mesos ranges resource. + */ + @VisibleForTesting + public static Protos.Resource makeMesosRangeResource( + ResourceType resourceType, + Set<Integer> values) { + + return Protos.Resource.newBuilder() + .setName(resourceType.getName()) + .setType(Protos.Value.Type.RANGES) + .setRanges(Protos.Value.Ranges.newBuilder() + .addAllRange(Iterables.transform(Numbers.toRanges(values), RANGE_TRANSFORM))) + .build(); + } + + /** + * Creates a scalar mesos resource. + * + * @param resourceType Resource type. + * @param value Value for the resource. + * @param revocable Flag indicating if this resource is revocable. + * @return A mesos resource. + */ + @VisibleForTesting + static Protos.Resource makeMesosResource( + ResourceType resourceType, + double value, + boolean revocable) { + + Protos.Resource.Builder builder = Protos.Resource.newBuilder() + .setName(resourceType.getName()) + .setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(value)); + + if (revocable) { + builder.setRevocable(Protos.Resource.RevocableInfo.newBuilder()); + } + + return builder.build(); + } + + /** + * Generates a ResourceSlot where each resource component is a max out of the two components. + * + * @param a A resource to compare. + * @param b A resource to compare. + * + * @return Returns a ResourceSlot instance where each component is a max of the two components. + */ + @VisibleForTesting + static ResourceSlot maxElements(ResourceSlot a, ResourceSlot b) { + double maxCPU = Math.max(a.getNumCpus(), b.getNumCpus()); + Amount<Long, Data> maxRAM = Amount.of( + Math.max(a.getRam().as(Data.MB), b.getRam().as(Data.MB)), + Data.MB); + Amount<Long, Data> maxDisk = Amount.of( + Math.max(a.getDisk().as(Data.MB), b.getDisk().as(Data.MB)), + Data.MB); + int maxPorts = Math.max(a.getNumPorts(), b.getNumPorts()); + + return new ResourceSlot(maxCPU, maxRAM, maxDisk, maxPorts); + } + + /** + * Number of CPUs. + * + * @return CPUs. + */ + public double getNumCpus() { + return numCpus; + } + + /** + * Disk amount. + * + * @return Disk. + */ + public Amount<Long, Data> getDisk() { + return disk; + } + + /** + * RAM amount. + * + * @return RAM. + */ + public Amount<Long, Data> getRam() { + return ram; + } + + /** + * Number of ports. + * + * @return Port count. + */ + public int getNumPorts() { + return numPorts; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ResourceSlot)) { + return false; + } + + ResourceSlot other = (ResourceSlot) o; + return Objects.equals(numCpus, other.numCpus) + && Objects.equals(ram, other.ram) + && Objects.equals(disk, other.disk) + && Objects.equals(numPorts, other.numPorts); + } + + @Override + public int hashCode() { + return Objects.hash(numCpus, ram, disk, numPorts); + } + + /** + * Sums up all resources in {@code slots}. + * + * @param slots Resource slots to sum up. + * @return Sum of all resource slots. + */ + public static ResourceSlot sum(Iterable<ResourceSlot> slots) { + ResourceSlot sum = NONE; + + for (ResourceSlot r : slots) { + sum = sum.add(r); + } + + return sum; + } + + /** + * Adds {@code other}. + * + * @param other Resource slot to add. + * @return Result. + */ + public ResourceSlot add(ResourceSlot other) { + return new ResourceSlot( + getNumCpus() + other.getNumCpus(), + Amount.of(getRam().as(BYTES) + other.getRam().as(BYTES), BYTES), + Amount.of(getDisk().as(BYTES) + other.getDisk().as(BYTES), BYTES), + getNumPorts() + other.getNumPorts()); + } + + /** + * Subtracts {@code other}. + * + * @param other Resource slot to subtract. + * @return Result. + */ + public ResourceSlot subtract(ResourceSlot other) { + return new ResourceSlot( + getNumCpus() - other.getNumCpus(), + Amount.of(getRam().as(BYTES) - other.getRam().as(BYTES), BYTES), + Amount.of(getDisk().as(BYTES) - other.getDisk().as(BYTES), BYTES), + getNumPorts() - other.getNumPorts()); + } + + /** + * A Resources object is greater than another iff _all_ of its resource components are greater + * or equal. A Resources object compares as equal if some but not all components are greater than + * or equal to the other. + */ + public static final Ordering<ResourceSlot> ORDER = new Ordering<ResourceSlot>() { + @Override + public int compare(ResourceSlot left, ResourceSlot right) { + int diskC = left.getDisk().compareTo(right.getDisk()); + int ramC = left.getRam().compareTo(right.getRam()); + int portC = Integer.compare(left.getNumPorts(), right.getNumPorts()); + int cpuC = Double.compare(left.getNumCpus(), right.getNumCpus()); + + List<Integer> vector = ImmutableList.of(diskC, ramC, portC, cpuC); + + if (vector.stream().allMatch(IS_ZERO)) { + return 0; + } + + if (vector.stream().filter(IS_ZERO.negate()).allMatch(e -> e > 0)) { + return 1; + } + + if (vector.stream().filter(IS_ZERO.negate()).allMatch(e -> e < 0)) { + return -1; + } + + return 0; + } + }; + + private static final Predicate<Integer> IS_ZERO = e -> e == 0; +} http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java new file mode 100644 index 0000000..5900ccb --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceType.java @@ -0,0 +1,54 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.resources; + +import com.google.common.annotations.VisibleForTesting; + +import static java.util.Objects.requireNonNull; + +/** + * Defines Mesos resource types. + */ +@VisibleForTesting +public enum ResourceType { + /** + * CPU resource. + */ + CPUS("cpus"), + + /** + * RAM resource. + */ + RAM_MB("mem"), + + /** + * DISK resource. + */ + DISK_MB("disk"), + + /** + * Port resource. + */ + PORTS("ports"); + + private final String resourceName; + + ResourceType(String resourceName) { + this.resourceName = requireNonNull(resourceName); + } + + public String getName() { + return resourceName; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/resources/Resources.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/Resources.java b/src/main/java/org/apache/aurora/scheduler/resources/Resources.java new file mode 100644 index 0000000..46b31f0 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/resources/Resources.java @@ -0,0 +1,186 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.resources; + +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.ContiguousSet; +import com.google.common.collect.DiscreteDomain; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Data; +import org.apache.aurora.scheduler.TierInfo; +import org.apache.mesos.Protos.Offer; +import org.apache.mesos.Protos.Resource; +import org.apache.mesos.Protos.Value.Range; + +import static java.util.Objects.requireNonNull; + +import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; +import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; +import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; +import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; + +/** + * A container for multiple Mesos resource vectors. + */ +public final class Resources { + + /** + * CPU resource filter. + */ + private static final Predicate<Resource> CPU = e -> e.getName().equals(CPUS.getName()); + + /** + * Revocable resource filter. + */ + public static final Predicate<Resource> REVOCABLE = + Predicates.or(Predicates.not(CPU), Predicates.and(CPU, Resource::hasRevocable)); + + /** + * Non-revocable resource filter. + */ + public static final Predicate<Resource> NON_REVOCABLE = Predicates.not(Resource::hasRevocable); + + /** + * Convert range to set of integers. + */ + public static final Function<Range, Set<Integer>> RANGE_TO_MEMBERS = + range -> ContiguousSet.create( + com.google.common.collect.Range.closed((int) range.getBegin(), (int) range.getEnd()), + DiscreteDomain.integers()); + + private final Iterable<Resource> mesosResources; + + private Resources(Iterable<Resource> mesosResources) { + this.mesosResources = ImmutableList.copyOf(mesosResources); + } + + /** + * Extracts the resources available in a slave offer. + * + * @param offer Offer to get resources from. + * @return The resources available in the offer. + */ + public static Resources from(Offer offer) { + return new Resources(requireNonNull(offer.getResourcesList())); + } + + /** + * Filters resources by the provided {@code predicate}. + * + * @param predicate Predicate filter. + * @return A new {@code Resources} object containing only filtered Mesos resources. + */ + public Resources filter(Predicate<Resource> predicate) { + return new Resources(Iterables.filter(mesosResources, predicate)); + } + + /** + * Filters resources using the provided {@code tierInfo} instance. + * + * @param tierInfo Tier info. + * @return A new {@code Resources} object containing only filtered Mesos resources. + */ + public Resources filter(TierInfo tierInfo) { + return filter(tierInfo.isRevocable() ? REVOCABLE : NON_REVOCABLE); + } + + /** + * Gets generalized aggregated resource view. + * + * @return {@code ResourceSlot} instance. + */ + public ResourceSlot slot() { + return new ResourceSlot(getScalarValue(CPUS.getName()), + Amount.of((long) getScalarValue(RAM_MB.getName()), Data.MB), + Amount.of((long) getScalarValue(DISK_MB.getName()), Data.MB), + getNumAvailablePorts()); + } + + /** + * Attempts to grab {@code numPorts} from this resource instance. + * + * @param numPorts The number of ports to grab. + * @return The set of ports grabbed. + * @throws InsufficientResourcesException if not enough ports were available. + */ + public Set<Integer> getPorts(int numPorts) + throws InsufficientResourcesException { + + if (numPorts == 0) { + return ImmutableSet.of(); + } + + List<Integer> availablePorts = Lists.newArrayList(Sets.newHashSet(Iterables.concat( + Iterables.transform(getPortRanges(), RANGE_TO_MEMBERS)))); + + if (availablePorts.size() < numPorts) { + throw new InsufficientResourcesException( + String.format("Could not get %d ports from %s", numPorts, availablePorts)); + } + + Collections.shuffle(availablePorts); + return ImmutableSet.copyOf(availablePorts.subList(0, numPorts)); + } + + private int getNumAvailablePorts() { + int offeredPorts = 0; + for (Range range : getPortRanges()) { + offeredPorts += 1 + range.getEnd() - range.getBegin(); + } + return offeredPorts; + } + + private double getScalarValue(String key) { + Iterable<Resource> resources = getResources(key); + double value = 0; + for (Resource r : resources) { + value += r.getScalar().getValue(); + } + return value; + } + + private Iterable<Resource> getResources(String key) { + return Iterables.filter(mesosResources, e -> e.getName().equals(key)); + } + + private Iterable<Range> getPortRanges() { + ImmutableList.Builder<Range> ranges = ImmutableList.builder(); + for (Resource r : getResources(PORTS.getName())) { + ranges.addAll(r.getRanges().getRangeList().iterator()); + } + + return ranges.build(); + } + + /** + * Thrown when there are insufficient resources to satisfy a request. + */ + public static class InsufficientResourcesException extends RuntimeException { + InsufficientResourcesException(String message) { + super(message); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java index bf7c084..2c044a6 100644 --- a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java +++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java @@ -29,11 +29,11 @@ import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import static org.apache.aurora.scheduler.ResourceAggregates.EMPTY; -import static org.apache.aurora.scheduler.ResourceAggregates.LARGE; -import static org.apache.aurora.scheduler.ResourceAggregates.MEDIUM; -import static org.apache.aurora.scheduler.ResourceAggregates.SMALL; -import static org.apache.aurora.scheduler.ResourceAggregates.XLARGE; +import static org.apache.aurora.scheduler.resources.ResourceAggregates.EMPTY; +import static org.apache.aurora.scheduler.resources.ResourceAggregates.LARGE; +import static org.apache.aurora.scheduler.resources.ResourceAggregates.MEDIUM; +import static org.apache.aurora.scheduler.resources.ResourceAggregates.SMALL; +import static org.apache.aurora.scheduler.resources.ResourceAggregates.XLARGE; /** * Defines a logical grouping criteria to be applied over a set of tasks. http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java index 0c467a6..7d43d4a 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java +++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java @@ -28,7 +28,6 @@ import com.google.common.collect.FluentIterable; import org.apache.aurora.common.inject.TimedInterceptor.Timed; import org.apache.aurora.common.stats.Stats; import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.Resources; import org.apache.aurora.scheduler.TierInfo; import org.apache.aurora.scheduler.TierManager; import org.apache.aurora.scheduler.base.TaskGroupKey; @@ -39,6 +38,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup; import org.apache.aurora.scheduler.mesos.MesosTaskFactory; import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.resources.Resources; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.mesos.Protos.TaskInfo; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java index 08eb6d6..03dfa27 100644 --- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java +++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java @@ -29,20 +29,20 @@ import org.apache.aurora.common.quantity.Data; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.gen.ResourceAggregate; import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.ResourceSlot; -import org.apache.aurora.scheduler.Resources; import org.apache.aurora.scheduler.SchedulerServicesModule; import org.apache.aurora.scheduler.base.Conversions; import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.resources.ResourceSlot; +import org.apache.aurora.scheduler.resources.Resources; import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResource; import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResourceProvider; import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; import static java.util.Objects.requireNonNull; -import static org.apache.aurora.scheduler.ResourceSlot.NONE; -import static org.apache.aurora.scheduler.Resources.NON_REVOCABLE; -import static org.apache.aurora.scheduler.Resources.REVOCABLE; +import static org.apache.aurora.scheduler.resources.ResourceSlot.NONE; +import static org.apache.aurora.scheduler.resources.Resources.NON_REVOCABLE; +import static org.apache.aurora.scheduler.resources.Resources.REVOCABLE; /** * Module to configure export of cluster-wide resource allocation and consumption statistics. http://git-wip-us.apache.org/repos/asf/aurora/blob/46ce98d8/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java index c9e57ec..1f71b00 100644 --- a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java +++ b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java @@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; -import org.apache.aurora.scheduler.ResourceAggregates; +import org.apache.aurora.scheduler.resources.ResourceAggregates; import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; import static java.util.Objects.requireNonNull;
