Repository: aurora Updated Branches: refs/heads/master f5025f3c6 -> 74a121772
Modifying resource counters to support revocable resources. Bugs closed: AURORA-1439 Reviewed at https://reviews.apache.org/r/37593/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/74a12177 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/74a12177 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/74a12177 Branch: refs/heads/master Commit: 74a1217721c73213f65a3f604df0639e7b97823a Parents: f5025f3 Author: Maxim Khutornenko <[email protected]> Authored: Thu Aug 20 12:19:02 2015 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Thu Aug 20 12:19:02 2015 -0700 ---------------------------------------------------------------------- .../org/apache/aurora/scheduler/Resources.java | 6 +- .../scheduler/stats/AsyncStatsModule.java | 48 +++++++---- .../aurora/scheduler/stats/ResourceCounter.java | 2 +- .../aurora/scheduler/stats/SlotSizeCounter.java | 88 ++++++++++++-------- .../scheduler/stats/AsyncStatsModuleTest.java | 73 ++++++++++++++++ .../scheduler/stats/SlotSizeCounterTest.java | 60 +++++++++---- 6 files changed, 208 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/74a12177/src/main/java/org/apache/aurora/scheduler/Resources.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/Resources.java b/src/main/java/org/apache/aurora/scheduler/Resources.java index 3b4fbb6..b34a629 100644 --- a/src/main/java/org/apache/aurora/scheduler/Resources.java +++ b/src/main/java/org/apache/aurora/scheduler/Resources.java @@ -17,7 +17,6 @@ import java.util.Collections; import java.util.List; import java.util.Set; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.base.Predicates; @@ -55,14 +54,13 @@ public final class Resources { /** * Revocable resource filter. */ - @VisibleForTesting - static final Predicate<Resource> REVOCABLE = + public static final Predicate<Resource> REVOCABLE = Predicates.or(Predicates.not(CPU), Predicates.and(CPU, Resource::hasRevocable)); /** * Non-revocable resource filter. */ - private static final Predicate<Resource> NON_REVOCABLE = Predicates.not(Resource::hasRevocable); + public static final Predicate<Resource> NON_REVOCABLE = Predicates.not(Resource::hasRevocable); private final Iterable<Resource> mesosResources; http://git-wip-us.apache.org/repos/asf/aurora/blob/74a12177/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java index 74a6546..09a2f00 100644 --- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java +++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java @@ -16,8 +16,7 @@ package org.apache.aurora.scheduler.stats; import javax.inject.Inject; import javax.inject.Singleton; -import com.google.common.base.Function; -import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.AbstractScheduledService; import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; import com.google.inject.AbstractModule; @@ -41,6 +40,10 @@ import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; import static java.util.Objects.requireNonNull; +import static org.apache.aurora.scheduler.ResourceSlot.NONE; +import static org.apache.aurora.scheduler.Resources.NON_REVOCABLE; +import static org.apache.aurora.scheduler.Resources.REVOCABLE; + /** * Module to configure export of cluster-wide resource allocation and consumption statistics. */ @@ -136,19 +139,6 @@ public class AsyncStatsModule extends AbstractModule { } static class OfferAdapter implements MachineResourceProvider { - private static final Function<HostOffer, MachineResource> TO_RESOURCE = - new Function<HostOffer, MachineResource>() { - @Override - public MachineResource apply(HostOffer offer) { - ResourceSlot resources = Resources.from(offer.getOffer()).slot(); - IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate() - .setNumCpus(resources.getNumCpus()) - .setRamMb(resources.getRam().as(Data.MB)) - .setDiskMb(resources.getDisk().as(Data.MB))); - return new MachineResource(quota, Conversions.isDedicated(offer.getOffer())); - } - }; - private final OfferManager offerManager; @Inject @@ -159,7 +149,33 @@ public class AsyncStatsModule extends AbstractModule { @Override public Iterable<MachineResource> get() { Iterable<HostOffer> offers = offerManager.getOffers(); - return FluentIterable.from(offers).transform(TO_RESOURCE); + + ImmutableList.Builder<MachineResource> builder = ImmutableList.builder(); + for (HostOffer offer : offers) { + ResourceSlot revocable = Resources.from(offer.getOffer()).filter(REVOCABLE).slot(); + ResourceSlot nonRevocable = + Resources.from(offer.getOffer()).filter(NON_REVOCABLE).slot(); + boolean isDedicated = Conversions.isDedicated(offer.getOffer()); + + // It's insufficient to compare revocable against NONE here as RAM, DISK and PORTS + // are always rolled in to revocable as non-compressible resources. Only if revocable + // CPU is non-zero should we expose the revocable resources as aggregates. + if (revocable.getNumCpus() > 0.0) { + builder.add(new MachineResource(fromSlot(revocable), isDedicated, true)); + } + + if (!nonRevocable.equals(NONE)) { + builder.add(new MachineResource(fromSlot(nonRevocable), isDedicated, false)); + } + } + return builder.build(); + } + + private static IResourceAggregate fromSlot(ResourceSlot slot) { + return IResourceAggregate.build(new ResourceAggregate() + .setNumCpus(slot.getNumCpus()) + .setRamMb(slot.getRam().as(Data.MB)) + .setDiskMb(slot.getDisk().as(Data.MB))); } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/74a12177/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java index 6dbc5d6..36e2c93 100644 --- a/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java +++ b/src/main/java/org/apache/aurora/scheduler/stats/ResourceCounter.java @@ -132,7 +132,7 @@ public class ResourceCounter { } public enum MetricType { - TOTAL_CONSUMED(Predicates.alwaysTrue()), + TOTAL_CONSUMED(Predicates.<ITaskConfig>alwaysTrue()), DEDICATED_CONSUMED(new Predicate<ITaskConfig>() { @Override public boolean apply(ITaskConfig task) { http://git-wip-us.apache.org/repos/asf/aurora/blob/74a12177/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java index 39c055d..e7be8e2 100644 --- a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java +++ b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java @@ -14,14 +14,16 @@ package org.apache.aurora.scheduler.stats; import java.util.Map; +import java.util.Objects; import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; -import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultimap; import org.apache.aurora.scheduler.ResourceAggregates; import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; @@ -39,6 +41,14 @@ class SlotSizeCounter implements Runnable { "large", ResourceAggregates.LARGE, "xlarge", ResourceAggregates.XLARGE); + // Ensures all counters are always initialized regardless of the Resource availability. + private static final Iterable<String> SLOT_GROUPS = ImmutableList.of( + getPrefix(false, false), + getPrefix(false, true), + getPrefix(true, false), + getPrefix(true, true) + ); + private final Map<String, IResourceAggregate> slotSizes; private final MachineResourceProvider machineResourceProvider; private final CachedCounters cachedCounters; @@ -57,10 +67,12 @@ class SlotSizeCounter implements Runnable { static class MachineResource { private final IResourceAggregate size; private final boolean dedicated; + private final boolean revocable; - public MachineResource(IResourceAggregate size, boolean dedicated) { + public MachineResource(IResourceAggregate size, boolean dedicated, boolean revocable) { this.size = requireNonNull(size); this.dedicated = dedicated; + this.revocable = revocable; } public IResourceAggregate getSize() { @@ -70,6 +82,27 @@ class SlotSizeCounter implements Runnable { public boolean isDedicated() { return dedicated; } + + public boolean isRevocable() { + return revocable; + } + + @Override + public int hashCode() { + return Objects.hash(size, dedicated, revocable); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof MachineResource)) { + return false; + } + + MachineResource other = (MachineResource) obj; + return Objects.equals(size, other.size) + && Objects.equals(dedicated, other.dedicated) + && Objects.equals(revocable, other.revocable); + } } interface MachineResourceProvider { @@ -81,13 +114,15 @@ class SlotSizeCounter implements Runnable { this(SLOT_SIZES, machineResourceProvider, cachedCounters); } + private static String getPrefix(boolean dedicated, boolean revocable) { + String dedicatedSuffix = dedicated ? "dedicated_" : ""; + String revocableSuffix = revocable ? "revocable_" : ""; + return "empty_slots_" + dedicatedSuffix + revocableSuffix; + } + @VisibleForTesting - static String getStatName(String slotName, boolean dedicated) { - if (dedicated) { - return "empty_slots_dedicated_" + slotName; - } else { - return "empty_slots_" + slotName; - } + static String getStatName(String slotName, boolean dedicated, boolean revocable) { + return getPrefix(dedicated, revocable) + slotName; } private int countSlots(Iterable<IResourceAggregate> slots, final IResourceAggregate slotSize) { @@ -105,40 +140,27 @@ class SlotSizeCounter implements Runnable { return sum; } - private static Predicate<MachineResource> isDedicated(final boolean dedicated) { - return new Predicate<MachineResource>() { - @Override - public boolean apply(MachineResource slot) { - return slot.isDedicated() == dedicated; - } - }; - } - - private static final Function<MachineResource, IResourceAggregate> GET_SIZE = - new Function<MachineResource, IResourceAggregate>() { - @Override - public IResourceAggregate apply(MachineResource slot) { - return slot.getSize(); - } - }; - private void updateStats( String name, - boolean dedicated, Iterable<MachineResource> slots, IResourceAggregate slotSize) { - Iterable<IResourceAggregate> sizes = - FluentIterable.from(slots).filter(isDedicated(dedicated)).transform(GET_SIZE); - cachedCounters.get(getStatName(name, dedicated)).set(countSlots(sizes, slotSize)); + ImmutableMultimap.Builder<String, IResourceAggregate> builder = ImmutableMultimap.builder(); + for (MachineResource slot : slots) { + builder.put(getStatName(name, slot.isDedicated(), slot.isRevocable()), slot.getSize()); + } + + ImmutableMultimap<String, IResourceAggregate> sizes = builder.build(); + + for (String slotGroup : SLOT_GROUPS) { + String statName = slotGroup + name; + cachedCounters.get(statName).set(countSlots(sizes.get(statName), slotSize)); + } } @Override public void run() { Iterable<MachineResource> slots = machineResourceProvider.get(); - for (Map.Entry<String, IResourceAggregate> entry : slotSizes.entrySet()) { - updateStats(entry.getKey(), false, slots, entry.getValue()); - updateStats(entry.getKey(), true, slots, entry.getValue()); - } + slotSizes.entrySet().stream().forEach(e -> updateStats(e.getKey(), slots, e.getValue())); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/74a12177/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java b/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java new file mode 100644 index 0000000..7519ce3 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java @@ -0,0 +1,73 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.stats; + +import com.google.common.collect.ImmutableList; +import com.twitter.common.testing.easymock.EasyMockTest; + +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.ResourceAggregate; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.ResourceType; +import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResource; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; +import org.apache.mesos.Protos; +import org.junit.Test; + +import static org.apache.aurora.scheduler.stats.AsyncStatsModule.OfferAdapter; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; + +public class AsyncStatsModuleTest extends EasyMockTest { + @Test + public void testOfferAdapter() { + OfferManager offerManager = createMock(OfferManager.class); + expect(offerManager.getOffers()).andReturn(ImmutableList.of( + new HostOffer(Protos.Offer.newBuilder() + .setId(Protos.OfferID.newBuilder().setValue("offerId")) + .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("frameworkId")) + .setSlaveId(Protos.SlaveID.newBuilder().setValue("slaveId")) + .setHostname("hostName") + .addResources(getCpuResource(true, 2.0)) + .addResources(getCpuResource(false, 4.0)) + .build(), + IHostAttributes.build(new HostAttributes())))); + + control.replay(); + + OfferAdapter adapter = new OfferAdapter(offerManager); + + assertEquals(ImmutableList.of(resource(true, 2.0), resource(false, 4.0)), adapter.get()); + } + + private static MachineResource resource(boolean revocable, double cpu) { + return new MachineResource( + IResourceAggregate.build(new ResourceAggregate(cpu, 0, 0)), false, revocable); + } + + private static Protos.Resource getCpuResource(boolean revocable, double value) { + Protos.Resource.Builder builder = Protos.Resource.newBuilder() + .setName(ResourceType.CPUS.getName()) + .setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(value)); + + if (revocable) { + builder.setRevocable(Protos.Resource.RevocableInfo.newBuilder()); + } + + return builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/74a12177/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java b/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java index b6623d5..576078f 100644 --- a/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java +++ b/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java @@ -48,8 +48,12 @@ public class SlotSizeCounterTest extends EasyMockTest { private AtomicLong smallCounter = new AtomicLong(); private AtomicLong smallDedicatedCounter = new AtomicLong(); + private AtomicLong smallRevocableCounter = new AtomicLong(); + private AtomicLong smallDedicatedRevocableCounter = new AtomicLong(); private AtomicLong largeCounter = new AtomicLong(); private AtomicLong largeDedicatedCounter = new AtomicLong(); + private AtomicLong largeRevocableCounter = new AtomicLong(); + private AtomicLong largeDedicatedRevocableCounter = new AtomicLong(); @Before public void setUp() { @@ -59,14 +63,22 @@ public class SlotSizeCounterTest extends EasyMockTest { } private void expectStatExport() { - expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("small", false))) + expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("small", false, false))) .andReturn(smallCounter); - expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("small", true))) + expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("small", true, false))) .andReturn(smallDedicatedCounter); - expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("large", false))) + expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("small", false, true))) + .andReturn(smallRevocableCounter); + expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("small", true, true))) + .andReturn(smallDedicatedRevocableCounter); + expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("large", false, false))) .andReturn(largeCounter); - expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("large", true))) + expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("large", true, false))) .andReturn(largeDedicatedCounter); + expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("large", false, true))) + .andReturn(largeRevocableCounter); + expect(statsProvider.makeCounter(SlotSizeCounter.getStatName("large", true, true))) + .andReturn(largeDedicatedRevocableCounter); } private void expectGetSlots(MachineResource... returned) { @@ -83,23 +95,31 @@ public class SlotSizeCounterTest extends EasyMockTest { slotCounter.run(); assertEquals(0, smallCounter.get()); assertEquals(0, smallDedicatedCounter.get()); + assertEquals(0, smallRevocableCounter.get()); + assertEquals(0, smallDedicatedRevocableCounter.get()); assertEquals(0, largeCounter.get()); assertEquals(0, largeDedicatedCounter.get()); + assertEquals(0, largeRevocableCounter.get()); + assertEquals(0, largeDedicatedRevocableCounter.get()); } @Test public void testTinyOffers() { expectStatExport(); - expectGetSlots( - new MachineResource(IResourceAggregate.build(new ResourceAggregate(0.1, 1, 1)), false)); + expectGetSlots(new MachineResource( + IResourceAggregate.build(new ResourceAggregate(0.1, 1, 1)), false, false)); control.replay(); slotCounter.run(); assertEquals(0, smallCounter.get()); assertEquals(0, smallDedicatedCounter.get()); + assertEquals(0, smallRevocableCounter.get()); + assertEquals(0, smallDedicatedRevocableCounter.get()); assertEquals(0, largeCounter.get()); assertEquals(0, largeDedicatedCounter.get()); + assertEquals(0, largeRevocableCounter.get()); + assertEquals(0, largeDedicatedRevocableCounter.get()); } @Test @@ -107,36 +127,46 @@ public class SlotSizeCounterTest extends EasyMockTest { expectStatExport(); expectGetSlots( new MachineResource( - IResourceAggregate.build(new ResourceAggregate(1000, 16384, 1)), false)); + IResourceAggregate.build(new ResourceAggregate(1000, 16384, 1)), false, false)); control.replay(); slotCounter.run(); assertEquals(0, smallCounter.get()); assertEquals(0, smallDedicatedCounter.get()); + assertEquals(0, smallRevocableCounter.get()); + assertEquals(0, smallDedicatedRevocableCounter.get()); assertEquals(0, largeCounter.get()); assertEquals(0, largeDedicatedCounter.get()); + assertEquals(0, largeRevocableCounter.get()); + assertEquals(0, largeDedicatedRevocableCounter.get()); } @Test public void testCountSlots() { expectStatExport(); expectGetSlots( - new MachineResource(SMALL, false), - new MachineResource(SMALL, false), - new MachineResource(LARGE, false), - new MachineResource(ResourceAggregates.scale(LARGE, 4), false), - new MachineResource(IResourceAggregate.build(new ResourceAggregate(1, 1, 1)), false), - new MachineResource(SMALL, true), - new MachineResource(SMALL, true), - new MachineResource(ResourceAggregates.scale(SMALL, 2), true)); + new MachineResource(SMALL, false, false), + new MachineResource(SMALL, false, false), + new MachineResource(LARGE, false, false), + new MachineResource(LARGE, false, true), + new MachineResource(LARGE, true, true), + new MachineResource(ResourceAggregates.scale(LARGE, 4), false, false), + new MachineResource(IResourceAggregate.build(new ResourceAggregate(1, 1, 1)), false, false), + new MachineResource(SMALL, true, false), + new MachineResource(SMALL, true, false), + new MachineResource(ResourceAggregates.scale(SMALL, 2), true, false)); control.replay(); slotCounter.run(); assertEquals(22, smallCounter.get()); assertEquals(4, smallDedicatedCounter.get()); + assertEquals(4, smallRevocableCounter.get()); + assertEquals(4, smallDedicatedRevocableCounter.get()); assertEquals(5, largeCounter.get()); assertEquals(0, largeDedicatedCounter.get()); + assertEquals(1, largeRevocableCounter.get()); + assertEquals(1, largeDedicatedRevocableCounter.get()); } }
