Repository: aurora Updated Branches: refs/heads/master 3676ec25b -> ab1c9b2ec
Resources: finalizing Resources.java refactoring. Bugs closed: AURORA-1415 Reviewed at https://reviews.apache.org/r/37366/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ab1c9b2e Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ab1c9b2e Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ab1c9b2e Branch: refs/heads/master Commit: ab1c9b2ec2630919b5b3d1bbc61efa7831e568d0 Parents: 3676ec2 Author: Maxim Khutornenko <[email protected]> Authored: Fri Aug 14 13:11:47 2015 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Fri Aug 14 13:11:47 2015 -0700 ---------------------------------------------------------------------- .../org/apache/aurora/scheduler/Resources.java | 177 ++++++++----------- .../aurora/scheduler/state/TaskAssigner.java | 2 +- .../aurora/scheduler/ResourceSlotTest.java | 27 +++ .../apache/aurora/scheduler/ResourcesTest.java | 83 +++++++-- 4 files changed, 173 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/ab1c9b2e/src/main/java/org/apache/aurora/scheduler/Resources.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/Resources.java b/src/main/java/org/apache/aurora/scheduler/Resources.java index 7b1b54e..40df262 100644 --- a/src/main/java/org/apache/aurora/scheduler/Resources.java +++ b/src/main/java/org/apache/aurora/scheduler/Resources.java @@ -15,10 +15,10 @@ package org.apache.aurora.scheduler; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.Set; import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.collect.ContiguousSet; import com.google.common.collect.DiscreteDomain; import com.google.common.collect.ImmutableList; @@ -41,89 +41,89 @@ import static org.apache.aurora.scheduler.ResourceType.PORTS; import static org.apache.aurora.scheduler.ResourceType.RAM_MB; /** - * A container for multiple resource vectors. - * TODO(wfarner): Collapse this in with ResourceAggregates AURORA-105. + * A container for multiple Mesos resource vectors. */ public final class Resources { - private static final Function<Range, Set<Integer>> RANGE_TO_MEMBERS = - new Function<Range, Set<Integer>>() { - @Override - public Set<Integer> apply(Range range) { - return ContiguousSet.create( - com.google.common.collect.Range.closed((int) range.getBegin(), (int) range.getEnd()), - DiscreteDomain.integers()); - } - }; - private final double numCpus; - private final Amount<Long, Data> disk; - private final Amount<Long, Data> ram; - private final int numPorts; + /** + * CPU resource filter. + */ + public static final Predicate<Resource> CPU = e -> e.getName().equals(CPUS.getName()); - private Resources(double numCpus, Amount<Long, Data> ram, Amount<Long, Data> disk, int numPorts) { - this.numCpus = numCpus; - this.ram = requireNonNull(ram); - this.disk = requireNonNull(disk); - this.numPorts = numPorts; - } + private final Iterable<Resource> mesosResources; - @Override - public boolean equals(Object o) { - if (!(o instanceof Resources)) { - return false; - } + private Resources(Iterable<Resource> mesosResources) { + this.mesosResources = ImmutableList.copyOf(mesosResources); + } - Resources other = (Resources) o; - return Objects.equals(numCpus, other.numCpus) - && Objects.equals(ram, other.ram) - && Objects.equals(disk, other.disk) - && Objects.equals(numPorts, other.numPorts); + /** + * Extracts the resources available in a slave offer. + * + * @param offer Offer to get resources from. + * @return The resources available in the offer. + */ + public static Resources from(Offer offer) { + return new Resources(requireNonNull(offer.getResourcesList())); } - @Override - public int hashCode() { - return Objects.hash(numCpus, ram, disk, numPorts); + /** + * Filters resources by the provided {@code predicate}. + * + * @param predicate Predicate filter. + * @return A new {@code Resources} object containing only filtered Mesos resources. + */ + public Resources filter(Predicate<Resource> predicate) { + return new Resources(Iterables.filter(mesosResources, predicate)); } - @Override - public String toString() { - return com.google.common.base.Objects.toStringHelper(this) - .add("numCpus", numCpus) - .add("ram", ram) - .add("disk", disk) - .add("numPorts", numPorts) - .toString(); + /** + * Gets generalized aggregated resource view. + * + * @return {@code ResourceSlot} instance. + */ + public ResourceSlot slot() { + return new ResourceSlot(getScalarValue(CPUS.getName()), + Amount.of((long) getScalarValue(RAM_MB.getName()), Data.MB), + Amount.of((long) getScalarValue(DISK_MB.getName()), Data.MB), + getNumAvailablePorts()); } /** - * Extracts the resources available in a slave offer. + * Attempts to grab {@code numPorts} from this resource instance. * - * @param offer Offer to get resources from. - * @return The resources available in the offer. + * @param numPorts The number of ports to grab. + * @return The set of ports grabbed. + * @throws InsufficientResourcesException if not enough ports were available. */ - public static Resources from(Offer offer) { - requireNonNull(offer); - return new Resources( - getScalarValue(offer, CPUS.getName()), - Amount.of((long) getScalarValue(offer, RAM_MB.getName()), Data.MB), - Amount.of((long) getScalarValue(offer, DISK_MB.getName()), Data.MB), - getNumAvailablePorts(offer.getResourcesList())); + public Set<Integer> getPorts(int numPorts) + throws InsufficientResourcesException { + + if (numPorts == 0) { + return ImmutableSet.of(); + } + + List<Integer> availablePorts = Lists.newArrayList(Sets.newHashSet(Iterables.concat( + Iterables.transform(getPortRanges(), RANGE_TO_MEMBERS)))); + + if (availablePorts.size() < numPorts) { + throw new InsufficientResourcesException( + String.format("Could not get %d ports from %s", numPorts, availablePorts)); + } + + Collections.shuffle(availablePorts); + return ImmutableSet.copyOf(availablePorts.subList(0, numPorts)); } - private static int getNumAvailablePorts(List<Resource> resource) { + private int getNumAvailablePorts() { int offeredPorts = 0; - for (Range range : getPortRanges(resource)) { + for (Range range : getPortRanges()) { offeredPorts += 1 + range.getEnd() - range.getBegin(); } return offeredPorts; } - private static double getScalarValue(Offer offer, String key) { - return getScalarValue(offer.getResourcesList(), key); - } - - private static double getScalarValue(List<Resource> resources, String key) { - Resource resource = getResource(resources, key); + private double getScalarValue(String key) { + Resource resource = getResource(key); if (resource == null) { return 0; } @@ -131,12 +131,12 @@ public final class Resources { return resource.getScalar().getValue(); } - private static Resource getResource(List<Resource> resource, String key) { - return Iterables.find(resource, e -> e.getName().equals(key), null); + private Resource getResource(String key) { + return Iterables.find(mesosResources, e -> e.getName().equals(key), null); } - private static Iterable<Range> getPortRanges(List<Resource> resources) { - Resource resource = getResource(resources, PORTS.getName()); + private Iterable<Range> getPortRanges() { + Resource resource = getResource(PORTS.getName()); if (resource == null) { return ImmutableList.of(); } @@ -145,15 +145,6 @@ public final class Resources { } /** - * Gets generalized aggregated resource view. - * - * @return {@code ResourceSlot} instance. - */ - public ResourceSlot slot() { - return new ResourceSlot(numCpus, ram, disk, numPorts); - } - - /** * Thrown when there are insufficient resources to satisfy a request. */ static class InsufficientResourcesException extends RuntimeException { @@ -162,33 +153,13 @@ public final class Resources { } } - /** - * Attempts to grab {@code numPorts} from the given resource {@code offer}. - * - * @param offer The offer to grab ports from. - * @param numPorts The number of ports to grab. - * @return The set of ports grabbed. - * @throws InsufficientResourcesException if not enough ports were available. - */ - public static Set<Integer> getPorts(Offer offer, int numPorts) - throws InsufficientResourcesException { - - requireNonNull(offer); - - if (numPorts == 0) { - return ImmutableSet.of(); - } - - List<Integer> availablePorts = Lists.newArrayList(Sets.newHashSet( - Iterables.concat( - Iterables.transform(getPortRanges(offer.getResourcesList()), RANGE_TO_MEMBERS)))); - - if (availablePorts.size() < numPorts) { - throw new InsufficientResourcesException( - String.format("Could not get %d ports from %s", numPorts, offer)); - } - - Collections.shuffle(availablePorts); - return ImmutableSet.copyOf(availablePorts.subList(0, numPorts)); - } + private static final Function<Range, Set<Integer>> RANGE_TO_MEMBERS = + new Function<Range, Set<Integer>>() { + @Override + public Set<Integer> apply(Range range) { + return ContiguousSet.create( + com.google.common.collect.Range.closed((int) range.getBegin(), (int) range.getEnd()), + DiscreteDomain.integers()); + } + }; } http://git-wip-us.apache.org/repos/asf/aurora/blob/ab1c9b2e/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java index a7a4381..ca4b5b0 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java +++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java @@ -107,7 +107,7 @@ public interface TaskAssigner { String taskId) { String host = offer.getHostname(); - Set<Integer> selectedPorts = Resources.getPorts(offer, requestedPorts.size()); + Set<Integer> selectedPorts = Resources.from(offer).getPorts(requestedPorts.size()); Preconditions.checkState(selectedPorts.size() == requestedPorts.size()); final Iterator<String> names = requestedPorts.iterator(); http://git-wip-us.apache.org/repos/asf/aurora/blob/ab1c9b2e/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java b/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java index d537315..50e7fc9 100644 --- a/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java +++ b/src/test/java/org/apache/aurora/scheduler/ResourceSlotTest.java @@ -16,6 +16,7 @@ package org.apache.aurora.scheduler; import java.util.Set; import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -24,12 +25,16 @@ import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Data; import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.mesos.ExecutorSettings; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.mesos.Protos; import org.junit.Test; +import static org.apache.aurora.scheduler.ResourceSlot.MIN_THERMOS_RESOURCES; import static org.apache.aurora.scheduler.ResourceSlot.makeMesosRangeResource; import static org.apache.aurora.scheduler.ResourceSlot.makeMesosResource; +import static org.apache.aurora.scheduler.ResourceSlot.maxElements; +import static org.apache.aurora.scheduler.ResourceSlot.sum; import static org.apache.aurora.scheduler.ResourceType.CPUS; import static org.apache.aurora.scheduler.ResourceType.DISK_MB; import static org.apache.aurora.scheduler.ResourceType.PORTS; @@ -81,6 +86,21 @@ public class ResourceSlotTest { } @Test + public void testSum() { + assertEquals(THREE, sum(ImmutableList.of(ONE, ONE, ONE))); + } + + @Test + public void testWithOverhead() { + assertEquals(maxElements(TWO, MIN_THERMOS_RESOURCES), ONE.withOverhead( + ExecutorSettings.newBuilder() + .setExecutorOverhead(ONE) + .setExecutorPath("ignored") + .setThermosObserverRoot("ignored") + .build())); + } + + @Test public void testToResourceList() { ResourceSlot resources = ResourceSlot.from(TASK); Set<Integer> ports = ImmutableSet.of(80, 443); @@ -134,6 +154,13 @@ public class ResourceSlotTest { assertNotEquals(resources, null); } + @Test + public void testOrder() { + assertEquals( + ImmutableList.of(ONE, TWO, THREE, THREE), + ResourceSlot.ORDER.sortedCopy(ImmutableList.of(THREE, ONE, TWO, THREE))); + } + private void expectRanges(Set<Pair<Long, Long>> expected, Set<Integer> values) { Protos.Resource resource = makeMesosRangeResource(PORTS, values); assertEquals(Protos.Value.Type.RANGES, resource.getType()); http://git-wip-us.apache.org/repos/asf/aurora/blob/ab1c9b2e/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java b/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java index 313cf68..a5878a4 100644 --- a/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java +++ b/src/test/java/org/apache/aurora/scheduler/ResourcesTest.java @@ -15,18 +15,26 @@ package org.apache.aurora.scheduler; import java.util.Set; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.twitter.common.collections.Pair; +import com.twitter.common.quantity.Amount; import org.apache.aurora.scheduler.Resources.InsufficientResourcesException; import org.apache.mesos.Protos; import org.apache.mesos.Protos.Resource; import org.apache.mesos.Protos.Value.Range; import org.apache.mesos.Protos.Value.Ranges; -import org.apache.mesos.Protos.Value.Type; import org.junit.Test; +import static com.twitter.common.quantity.Data.MB; + +import static org.apache.aurora.scheduler.ResourceType.CPUS; +import static org.apache.aurora.scheduler.ResourceType.DISK_MB; import static org.apache.aurora.scheduler.ResourceType.PORTS; +import static org.apache.aurora.scheduler.ResourceType.RAM_MB; +import static org.apache.mesos.Protos.Value.Type.RANGES; +import static org.apache.mesos.Protos.Value.Type.SCALAR; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -34,21 +42,21 @@ public class ResourcesTest { @Test public void testPortRangeExact() { Resource portsResource = createPortRange(Pair.of(1, 5)); - Set<Integer> ports = Resources.getPorts(createOffer(portsResource), 5); + Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(5); assertEquals(5, ports.size()); } @Test public void testOnePortAvailable() { Resource portsResource = createPortRange(Pair.of(3, 3)); - Set<Integer> ports = Resources.getPorts(createOffer(portsResource), 1); + Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(1); assertEquals(1, ports.size()); } @Test public void testPortRangeAbundance() { Resource portsResource = createPortRange(Pair.of(1, 10)); - Set<Integer> ports = Resources.getPorts(createOffer(portsResource), 5); + Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(5); assertEquals(5, ports.size()); } @@ -56,14 +64,14 @@ public class ResourcesTest { public void testPortRangeExhaust() { Resource portsResource = createPortRanges(Pair.of(1, 2), Pair.of(10, 15)); - Set<Integer> ports = Resources.getPorts(createOffer(portsResource), 7); + Set<Integer> ports = Resources.from(createOffer(portsResource)).getPorts(7); assertEquals(7, ports.size()); - ports = Resources.getPorts(createOffer(portsResource), 8); + ports = Resources.from(createOffer(portsResource)).getPorts(8); assertEquals(8, ports.size()); try { - Resources.getPorts(createOffer(portsResource), 9); + Resources.from(createOffer(portsResource)).getPorts(9); fail("Ports should not have been sufficient"); } catch (InsufficientResourcesException e) { // Expected. @@ -73,13 +81,44 @@ public class ResourcesTest { @Test public void testGetNoPorts() { Resource portsResource = createPortRange(Pair.of(1, 5)); - assertEquals(ImmutableSet.of(), Resources.getPorts(createOffer(portsResource), 0)); + assertEquals(ImmutableSet.of(), Resources.from(createOffer(portsResource)).getPorts(0)); } @Test(expected = Resources.InsufficientResourcesException.class) public void testPortRangeScarcity() { Resource portsResource = createPortRange(Pair.of(1, 2)); - Resources.getPorts(createOffer(portsResource), 5); + Resources.from(createOffer(portsResource)).getPorts(5); + } + + @Test + public void testGetSlot() { + ImmutableList<Resource> resources = ImmutableList.<Resource>builder() + .add(createCpuResource(8.0)) + .add(createMemResource(1024, RAM_MB)) + .add(createMemResource(2048, DISK_MB)) + .add(createPortRange(Pair.of(1, 10))) + .build(); + + ResourceSlot expected = new ResourceSlot(8.0, Amount.of(1024L, MB), Amount.of(2048L, MB), 10); + assertEquals(expected, Resources.from(createOffer(resources)).slot()); + } + + @Test + public void testMissingResourcesHandledGracefully() { + ImmutableList<Resource> resources = ImmutableList.<Resource>builder().build(); + assertEquals(ResourceSlot.NONE, Resources.from(createOffer(resources)).slot()); + } + + @Test + public void testFilter() { + ImmutableList<Resource> resources = ImmutableList.<Resource>builder() + .add(createCpuResource(8.0)) + .add(createMemResource(1024, RAM_MB)) + .build(); + + assertEquals( + Resources.from(createOffer(createCpuResource(8.0))).slot(), + Resources.from(createOffer(resources)).filter(Resources.CPU).slot()); } private Resource createPortRange(Pair<Integer, Integer> range) { @@ -99,17 +138,37 @@ public class ResourcesTest { return Resource.newBuilder() .setName(PORTS.getName()) - .setType(Type.RANGES) + .setType(RANGES) .setRanges(ranges) .build(); } - private Protos.Offer createOffer(Resource resources) { + private static Resource createCpuResource(double cpus) { + return Resource.newBuilder() + .setName(CPUS.getName()) + .setType(SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpus)) + .build(); + } + + private static Resource createMemResource(long mem, ResourceType resourceType) { + return Resource.newBuilder() + .setName(resourceType.getName()) + .setType(SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(mem)) + .build(); + } + + private static Protos.Offer createOffer(Resource resource) { + return createOffer(ImmutableList.of(resource)); + } + + private static Protos.Offer createOffer(Iterable<Resource> resources) { return Protos.Offer.newBuilder() .setId(Protos.OfferID.newBuilder().setValue("offer-id")) .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("framework-id")) .setSlaveId(Protos.SlaveID.newBuilder().setValue("slave-id")) .setHostname("hostname") - .addResources(resources).build(); + .addAllResources(resources).build(); } }
