Repository: aurora Updated Branches: refs/heads/master 9aab87f18 -> 3b29a4b79
Simplify AttributeAggregate. Reviewed at https://reviews.apache.org/r/33106/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/3b29a4b7 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/3b29a4b7 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/3b29a4b7 Branch: refs/heads/master Commit: 3b29a4b797e137f16b4fda76cd42073a0e5b3ad5 Parents: 9aab87f Author: Bill Farner <[email protected]> Authored: Tue Apr 14 09:53:22 2015 -0700 Committer: Bill Farner <[email protected]> Committed: Tue Apr 14 09:53:22 2015 -0700 ---------------------------------------------------------------------- .../aurora/benchmark/SchedulingBenchmarks.java | 18 +-- .../scheduler/filter/AttributeAggregate.java | 117 ++++++++++--------- .../scheduler/async/TaskSchedulerImplTest.java | 16 +-- .../scheduler/async/TaskSchedulerTest.java | 31 +++-- .../preemptor/PendingTaskProcessorTest.java | 8 +- .../async/preemptor/PreemptorImplTest.java | 20 +--- .../async/preemptor/PreemptorModuleTest.java | 10 +- .../preemptor/PreemptorSlotFinderTest.java | 13 +-- .../events/NotifyingSchedulingFilterTest.java | 18 +-- .../filter/AttributeAggregateTest.java | 92 +++++++-------- .../filter/SchedulingFilterImplTest.java | 108 +++++++---------- .../scheduler/state/TaskAssignerImplTest.java | 16 +-- 12 files changed, 187 insertions(+), 280 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java index ce87344..0113505 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java @@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit; import javax.inject.Singleton; import com.google.common.base.Optional; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.eventbus.EventBus; @@ -50,7 +48,6 @@ import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.Reserva import org.apache.aurora.scheduler.async.preemptor.ClusterStateImpl; import org.apache.aurora.scheduler.async.preemptor.Preemptor; import org.apache.aurora.scheduler.async.preemptor.PreemptorModule; -import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.filter.AttributeAggregate; @@ -309,7 +306,6 @@ public class SchedulingBenchmarks { * Tests preemptor searching for a preemption slot in a completely filled up cluster. */ public static class PreemptorSlotSearchBenchmark extends AbstractBase { - @Override protected BenchmarkSettings getSettings() { return new BenchmarkSettings.Builder() @@ -327,20 +323,8 @@ public class SchedulingBenchmarks { @Override public Boolean apply(final Storage.MutableStoreProvider storeProvider) { IAssignedTask assignedTask = getSettings().getTask().getAssignedTask(); - final Query.Builder query = Query.jobScoped(assignedTask.getTask().getJob()) - .byStatus(org.apache.aurora.scheduler.base.Tasks.SLAVE_ASSIGNED_STATES); - - Supplier<ImmutableSet<IScheduledTask>> taskSupplier = Suppliers.memoize( - new Supplier<ImmutableSet<IScheduledTask>>() { - @Override - public ImmutableSet<IScheduledTask> get() { - return storeProvider.getTaskStore().fetchTasks(query); - } - }); - AttributeAggregate aggregate = - new AttributeAggregate(taskSupplier, storeProvider.getAttributeStore()); - + AttributeAggregate.getJobActiveState(storeProvider, assignedTask.getTask().getJob()); Optional<String> result = preemptor.attemptPreemptionFor(assignedTask, aggregate, storeProvider); http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java index ed82ae9..bd74f89 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java @@ -13,17 +13,13 @@ */ package org.apache.aurora.scheduler.filter; -import java.util.Map; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.AtomicLongMap; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMultiset; +import com.google.common.collect.Multiset; import com.twitter.common.collections.Pair; import org.apache.aurora.scheduler.base.Query; @@ -41,18 +37,20 @@ import static java.util.Objects.requireNonNull; * once the job state may change (e.g. after exiting a write transaction). This is intended to * capture job state once and avoid redundant queries. * <p> - * Note that while the state injected into this class is used lazily (to allow for queries to happen - * only on-demand), calling {@link #equals(Object)} and {@link #hashCode()} rely on the aggregation - * result, thus invoking the {@link Supplier} and {@link AttributeStore}. + * TODO(wfarner): Consider preserving this as only a helper class to compute the Multiset + * representing the aggregate, since this class is now a thin wrapper over a Multiset. */ -public class AttributeAggregate { +public final class AttributeAggregate { /** - * A lazily-computed mapping from attribute name and value to the count of tasks with that - * name/value combination. See doc for {@link #getNumTasksWithAttribute(String, String)} for - * further details. + * A mapping from attribute name and value to the count of tasks with that name/value combination. + * See doc for {@link #getNumTasksWithAttribute(String, String)} for further details. */ - private final Supplier<Map<Pair<String, String>, Long>> aggregate; + private final Supplier<Multiset<Pair<String, String>>> aggregate; + + private AttributeAggregate(Supplier<Multiset<Pair<String, String>>> aggregate) { + this.aggregate = Suppliers.memoize(aggregate); + } /** * Initializes an {@link AttributeAggregate} instance from data store. @@ -65,58 +63,72 @@ public class AttributeAggregate { final StoreProvider storeProvider, final IJobKey jobKey) { - Supplier<ImmutableSet<IScheduledTask>> taskSupplier = Suppliers.memoize( - new Supplier<ImmutableSet<IScheduledTask>>() { + return create( + new Supplier<Iterable<IScheduledTask>>() { @Override - public ImmutableSet<IScheduledTask> get() { - return storeProvider.getTaskStore().fetchTasks( - Query.jobScoped(jobKey).byStatus(Tasks.SLAVE_ASSIGNED_STATES)); + public Iterable<IScheduledTask> get() { + return storeProvider.getTaskStore() + .fetchTasks(Query.jobScoped(jobKey).byStatus(Tasks.SLAVE_ASSIGNED_STATES)); } - }); - return new AttributeAggregate(taskSupplier, storeProvider.getAttributeStore()); + }, + storeProvider.getAttributeStore()); } - /** - * Creates a new attribute aggregate, which will be computed from the provided external state. - * - * @param activeTaskSupplier Supplier of active tasks within the aggregated scope. - * @param attributeStore Source of host attributes to associate with tasks. - */ - public AttributeAggregate( - final Supplier<ImmutableSet<IScheduledTask>> activeTaskSupplier, + @VisibleForTesting + static AttributeAggregate create( + final Supplier<Iterable<IScheduledTask>> taskSupplier, final AttributeStore attributeStore) { - requireNonNull(activeTaskSupplier); - requireNonNull(attributeStore); - - final Function<IScheduledTask, Iterable<IAttribute>> getHostAttributes = - new Function<IScheduledTask, Iterable<IAttribute>>() { + final Function<String, Iterable<IAttribute>> getHostAttributes = + new Function<String, Iterable<IAttribute>>() { @Override - public Iterable<IAttribute> apply(IScheduledTask task) { + public Iterable<IAttribute> apply(String host) { // Note: this assumes we have access to attributes for hosts where all active tasks // reside. - String host = requireNonNull(task.getAssignedTask().getSlaveHost()); + requireNonNull(host); return attributeStore.getHostAttributes(host).get().getAttributes(); } }; - aggregate = Suppliers.memoize(new Supplier<Map<Pair<String, String>, Long>>() { - @Override - public Map<Pair<String, String>, Long> get() { - AtomicLongMap<Pair<String, String>> counts = AtomicLongMap.create(); - Iterable<IAttribute> allAttributes = - Iterables.concat(Iterables.transform(activeTaskSupplier.get(), getHostAttributes)); - for (IAttribute attribute : allAttributes) { - for (String value : attribute.getValues()) { - counts.incrementAndGet(Pair.of(attribute.getName(), value)); + return create(Suppliers.compose( + new Function<Iterable<IScheduledTask>, Iterable<IAttribute>>() { + @Override + public Iterable<IAttribute> apply(Iterable<IScheduledTask> tasks) { + return FluentIterable.from(tasks) + .transform(Tasks.SCHEDULED_TO_SLAVE_HOST) + .transformAndConcat(getHostAttributes); + } + }, + taskSupplier)); + } + + @VisibleForTesting + static AttributeAggregate create(Supplier<Iterable<IAttribute>> attributes) { + Supplier<Multiset<Pair<String, String>>> aggregator = Suppliers.compose( + new Function<Iterable<IAttribute>, Multiset<Pair<String, String>>>() { + @Override + public Multiset<Pair<String, String>> apply(Iterable<IAttribute> attributes) { + ImmutableMultiset.Builder<Pair<String, String>> builder = ImmutableMultiset.builder(); + for (IAttribute attribute : attributes) { + for (String value : attribute.getValues()) { + builder.add(Pair.of(attribute.getName(), value)); + } + } + + return builder.build(); } - } + }, + attributes + ); - return ImmutableMap.copyOf(counts.asMap()); - } - }); + return new AttributeAggregate(aggregator); } + @VisibleForTesting + public static final AttributeAggregate EMPTY = + new AttributeAggregate(Suppliers.<Multiset<Pair<String, String>>>ofInstance( + ImmutableMultiset.<Pair<String, String>>of())); + /** * Gets the total number of tasks with a given attribute name and value combination. * <p> @@ -135,12 +147,11 @@ public class AttributeAggregate { * @return Number of tasks in the job whose hosts have the provided attribute name and value. */ public long getNumTasksWithAttribute(String name, String value) { - return Optional.fromNullable(aggregate.get().get(Pair.of(name, value))) - .or(0L); + return aggregate.get().count(Pair.of(name, value)); } @VisibleForTesting - Map<Pair<String, String>, Long> getAggregates() { + Multiset<Pair<String, String>> getAggregates() { return aggregate.get(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java index c5643d9..b61abf9 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java @@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.async; import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableSet; import com.google.inject.AbstractModule; import com.google.inject.Guice; @@ -43,14 +42,12 @@ import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.events.PubsubEventModule; -import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.state.PubsubTestUtil; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.state.TaskAssigner; import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; import org.apache.aurora.scheduler.state.TaskAssigner.Assignment.Result; -import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.MutateWork; @@ -66,6 +63,7 @@ import org.junit.Test; import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.gen.ScheduleStatus.THROTTLED; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; @@ -96,7 +94,6 @@ public class TaskSchedulerImplTest extends EasyMockTest { private Amount<Long, Time> reservationDuration; private Amount<Long, Time> halfReservationDuration; private EventSink eventSink; - private AttributeAggregate emptyJob; @Before public void setUp() throws Exception { @@ -113,9 +110,6 @@ public class TaskSchedulerImplTest extends EasyMockTest { Injector injector = getInjector(storageUtil.storage); scheduler = injector.getInstance(TaskScheduler.class); eventSink = PubsubTestUtil.startPubsub(injector); - emptyJob = new AttributeAggregate( - Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()), - createMock(AttributeStore.class)); } private Injector getInjector(final Storage storageImpl) { @@ -146,7 +140,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { expect(assigner.maybeAssign( storageUtil.mutableStoreProvider, OFFER, - new ResourceRequest(task.getAssignedTask().getTask(), Tasks.id(task), emptyJob))) + new ResourceRequest(task.getAssignedTask().getTask(), Tasks.id(task), EMPTY))) .andReturn(Assignment.success(TaskInfo.getDefaultInstance())); } @@ -207,7 +201,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { expect(assigner.maybeAssign( storageUtil.mutableStoreProvider, OFFER, - new ResourceRequest(TASK_B.getAssignedTask().getTask(), Tasks.id(TASK_B), emptyJob))) + new ResourceRequest(TASK_B.getAssignedTask().getTask(), Tasks.id(TASK_B), EMPTY))) .andReturn(Assignment.success(TaskInfo.getDefaultInstance())); control.replay(); @@ -317,7 +311,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { expect(assigner.maybeAssign( EasyMock.<MutableStoreProvider>anyObject(), eq(OFFER), - eq(new ResourceRequest(taskA.getAssignedTask().getTask(), Tasks.id(taskA), emptyJob)))) + eq(new ResourceRequest(taskA.getAssignedTask().getTask(), Tasks.id(taskA), EMPTY)))) .andReturn(Assignment.success(TaskInfo.getDefaultInstance())); control.replay(); @@ -346,7 +340,7 @@ public class TaskSchedulerImplTest extends EasyMockTest { private void expectPreemptorCall(IScheduledTask task, Optional<String> result) { expect(preemptor.attemptPreemptionFor( task.getAssignedTask(), - emptyJob, + EMPTY, storageUtil.mutableStoreProvider)).andReturn(result); } http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java index 88c0163..9c47a76 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java @@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.RateLimiter; @@ -59,7 +58,6 @@ import org.apache.aurora.scheduler.state.MaintenanceController; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.state.TaskAssigner; import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; -import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.MutateWork; @@ -89,6 +87,7 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED; import static org.apache.aurora.gen.ScheduleStatus.LOST; import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; import static org.apache.mesos.Protos.Offer; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.eq; @@ -125,7 +124,6 @@ public class TaskSchedulerTest extends EasyMockTest { private StatsProvider statsProvider; private RescheduleCalculator rescheduleCalculator; private Preemptor preemptor; - private AttributeAggregate emptyJob; private Amount<Long, Time> reservationDuration = Amount.of(1L, Time.MINUTES); @Before @@ -144,9 +142,6 @@ public class TaskSchedulerTest extends EasyMockTest { statsProvider = createMock(StatsProvider.class); rescheduleCalculator = createMock(RescheduleCalculator.class); preemptor = createMock(Preemptor.class); - emptyJob = new AttributeAggregate( - Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()), - createMock(AttributeStore.class)); } private void replayAndCreateScheduler() { @@ -315,11 +310,11 @@ public class TaskSchedulerTest extends EasyMockTest { TaskInfo mesosTask = makeTaskInfo(task); Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure()); + expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure()); expectPreemptorCall(task); Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); - expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.success(mesosTask)); + expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask)); driver.launchTask(OFFER_A.getOffer().getId(), mesosTask); Capture<Runnable> timeoutCapture3 = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); @@ -350,7 +345,7 @@ public class TaskSchedulerTest extends EasyMockTest { Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); expectAnyMaintenanceCalls(); expectOfferDeclineIn(10); - expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.success(mesosTask)); + expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask)); driver.launchTask(OFFER_A.getOffer().getId(), mesosTask); expectLastCall().andThrow(new IllegalStateException("Driver not ready.")); expect(stateManager.changeState( @@ -380,10 +375,10 @@ public class TaskSchedulerTest extends EasyMockTest { Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); expectAnyMaintenanceCalls(); expectOfferDeclineIn(10); - expectMaybeAssign(OFFER_A, task, emptyJob).andThrow(new StorageException("Injected failure.")); + expectMaybeAssign(OFFER_A, task, EMPTY).andThrow(new StorageException("Injected failure.")); Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); - expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.success(mesosTask)); + expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask)); driver.launchTask(OFFER_A.getOffer().getId(), mesosTask); expectLastCall(); @@ -402,7 +397,7 @@ public class TaskSchedulerTest extends EasyMockTest { Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10); expectAnyMaintenanceCalls(); - expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure()); + expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure()); Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10); expectPreemptorCall(task); driver.declineOffer(OFFER_A.getOffer().getId()); @@ -464,13 +459,13 @@ public class TaskSchedulerTest extends EasyMockTest { IScheduledTask taskA = makeTask("A", PENDING); TaskInfo mesosTaskA = makeTaskInfo(taskA); - expectMaybeAssign(OFFER_A, taskA, emptyJob).andReturn(Assignment.success(mesosTaskA)); + expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA)); driver.launchTask(OFFER_A.getOffer().getId(), mesosTaskA); Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); IScheduledTask taskB = makeTask("B", PENDING); TaskInfo mesosTaskB = makeTaskInfo(taskB); - expectMaybeAssign(OFFER_B, taskB, emptyJob).andReturn(Assignment.success(mesosTaskB)); + expectMaybeAssign(OFFER_B, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB)); driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskB); Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); @@ -496,7 +491,7 @@ public class TaskSchedulerTest extends EasyMockTest { IScheduledTask taskA = makeTask("A", PENDING); TaskInfo mesosTaskA = makeTaskInfo(taskA); - expectMaybeAssign(OFFER_B, taskA, emptyJob).andReturn(Assignment.success(mesosTaskA)); + expectMaybeAssign(OFFER_B, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA)); driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskA); Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); @@ -505,7 +500,7 @@ public class TaskSchedulerTest extends EasyMockTest { HostOffer updatedOfferC = new HostOffer( OFFER_C.getOffer(), IHostAttributes.build(OFFER_C.getAttributes().newBuilder().setMode(NONE))); - expectMaybeAssign(updatedOfferC, taskB, emptyJob).andReturn(Assignment.success(mesosTaskB)); + expectMaybeAssign(updatedOfferC, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB)); driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB); Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); @@ -602,7 +597,7 @@ public class TaskSchedulerTest extends EasyMockTest { final IScheduledTask task = makeTask("a", PENDING); Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS); - expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure()); + expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure()); expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 20); expectPreemptorCall(task); @@ -655,7 +650,7 @@ public class TaskSchedulerTest extends EasyMockTest { private void expectPreemptorCall(IScheduledTask task) { expect(preemptor.attemptPreemptionFor( eq(task.getAssignedTask()), - eq(emptyJob), + eq(EMPTY), EasyMock.<MutableStoreProvider>anyObject())).andReturn(Optional.<String>absent()); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java index bcd1b4e..75fc16d 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java @@ -16,7 +16,6 @@ package org.apache.aurora.scheduler.async.preemptor; import java.util.Arrays; import com.google.common.base.Optional; -import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableSet; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; @@ -33,7 +32,6 @@ import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.stats.CachedCounters; -import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -68,7 +66,6 @@ public class PendingTaskProcessorTest extends EasyMockTest { private FakeStatsProvider statsProvider; private PreemptionSlotFinder preemptionSlotFinder; private PendingTaskProcessor slotFinder; - private AttributeAggregate attrAggregate; private PreemptionSlotCache slotCache; private FakeClock clock; @@ -80,9 +77,6 @@ public class PendingTaskProcessorTest extends EasyMockTest { slotCache = createMock(PreemptionSlotCache.class); statsProvider = new FakeStatsProvider(); clock = new FakeClock(); - attrAggregate = new AttributeAggregate( - Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()), - createMock(AttributeStore.class)); slotFinder = new PendingTaskProcessor( storageUtil.storage, @@ -135,7 +129,7 @@ public class PendingTaskProcessorTest extends EasyMockTest { private void expectSlotSearch(ScheduledTask task, Optional<PreemptionSlot> slot) { expect(preemptionSlotFinder.findPreemptionSlotFor( IAssignedTask.build(task.getAssignedTask()), - attrAggregate, + AttributeAggregate.EMPTY, storageUtil.storeProvider)).andReturn(slot); } http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java index 281f4e0..97d6087 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java @@ -14,9 +14,7 @@ package org.apache.aurora.scheduler.async.preemptor; import com.google.common.base.Optional; -import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableSet; - import com.twitter.common.testing.easymock.EasyMockTest; import org.apache.aurora.gen.AssignedTask; @@ -28,10 +26,8 @@ import org.apache.aurora.gen.TaskEvent; import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; import org.apache.aurora.scheduler.async.preemptor.Preemptor.PreemptorImpl; import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.stats.CachedCounters; -import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -43,6 +39,7 @@ import org.junit.Test; import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotValidationStatName; import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.successStatName; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; @@ -61,7 +58,6 @@ public class PreemptorImplTest extends EasyMockTest { private FakeStatsProvider statsProvider; private PreemptionSlotFinder preemptionSlotFinder; private PreemptorImpl preemptor; - private AttributeAggregate attrAggregate; private PreemptionSlotCache slotCache; private Storage.MutableStoreProvider storeProvider; @@ -72,10 +68,6 @@ public class PreemptorImplTest extends EasyMockTest { preemptionSlotFinder = createMock(PreemptionSlotFinder.class); slotCache = createMock(PreemptionSlotCache.class); statsProvider = new FakeStatsProvider(); - attrAggregate = new AttributeAggregate( - Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()), - createMock(AttributeStore.class)); - preemptor = new PreemptorImpl( stateManager, preemptionSlotFinder, @@ -124,15 +116,15 @@ public class PreemptorImplTest extends EasyMockTest { } private Optional<String> callPreemptor() { - return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), attrAggregate, storeProvider); + return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), EMPTY, storeProvider); } private void expectSlotValidation(Optional<ImmutableSet<PreemptionVictim>> victims) { expect(preemptionSlotFinder.validatePreemptionSlotFor( - eq(TASK.getAssignedTask()), - eq(attrAggregate), - eq(SLOT), - anyObject(Storage.MutableStoreProvider.class))).andReturn(victims); + TASK.getAssignedTask(), + EMPTY, + SLOT, + storeProvider)).andReturn(victims); } private void expectPreempted(IScheduledTask preempted) throws Exception { http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java index 7e2d1c5..9d3820a 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java @@ -14,8 +14,6 @@ package org.apache.aurora.scheduler.async.preemptor; import com.google.common.base.Optional; -import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableSet; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; @@ -33,10 +31,8 @@ import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.state.TaskAssigner; -import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.junit.Before; import org.junit.Test; @@ -80,10 +76,6 @@ public class PreemptorModuleTest extends EasyMockTest { Amount.of(0L, Time.SECONDS), Amount.of(0L, Time.SECONDS))); - Supplier<ImmutableSet<IScheduledTask>> taskSupplier = - createMock(new EasyMockTest.Clazz<Supplier<ImmutableSet<IScheduledTask>>>() { }); - AttributeStore attributeStore = createMock(AttributeStore.class); - control.replay(); injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute(); @@ -93,7 +85,7 @@ public class PreemptorModuleTest extends EasyMockTest { Optional.<String>absent(), injector.getInstance(Preemptor.class).attemptPreemptionFor( IAssignedTask.build(new AssignedTask()), - new AttributeAggregate(taskSupplier, attributeStore), + AttributeAggregate.EMPTY, storageUtil.mutableStoreProvider)); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java index b80e558..eed2de9 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java @@ -20,7 +20,6 @@ import java.util.Set; import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.base.Suppliers; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; @@ -48,16 +47,13 @@ import org.apache.aurora.scheduler.async.OfferManager; import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot; import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlotFinderImpl; import org.apache.aurora.scheduler.configuration.Resources; -import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import org.apache.aurora.scheduler.filter.SchedulingFilterImpl; import org.apache.aurora.scheduler.mesos.TaskExecutors; import org.apache.aurora.scheduler.stats.CachedCounters; -import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.apache.mesos.Protos; @@ -70,6 +66,7 @@ import org.junit.Test; import static org.apache.aurora.gen.MaintenanceMode.NONE; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; import static org.apache.mesos.Protos.Offer; import static org.apache.mesos.Protos.Resource; import static org.easymock.EasyMock.expect; @@ -98,7 +95,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest { private FakeStatsProvider statsProvider; private ClusterState clusterState; private OfferManager offerManager; - private AttributeAggregate emptyJob; private PreemptorMetrics preemptorMetrics; @Before @@ -109,9 +105,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest { storageUtil.expectOperations(); statsProvider = new FakeStatsProvider(); preemptorMetrics = new PreemptorMetrics(new CachedCounters(statsProvider)); - emptyJob = new AttributeAggregate( - Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()), - createMock(AttributeStore.class)); } private Optional<PreemptionSlot> runSlotFinder(ScheduledTask pendingTask) { @@ -124,7 +117,7 @@ public class PreemptorSlotFinderTest extends EasyMockTest { return slotFinder.findPreemptionSlotFor( IAssignedTask.build(pendingTask.getAssignedTask()), - emptyJob, + EMPTY, storageUtil.mutableStoreProvider); } @@ -417,7 +410,7 @@ public class PreemptorSlotFinderTest extends EasyMockTest { Optional<ImmutableSet<PreemptionVictim>> victims = slotFinder.validatePreemptionSlotFor( IAssignedTask.build(p1.getAssignedTask()), - emptyJob, + EMPTY, slot.get(), storageUtil.mutableStoreProvider); http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java index 61cea32..2b71043 100644 --- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java +++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java @@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.events; import java.util.Set; -import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableSet; import com.twitter.common.testing.easymock.EasyMockTest; @@ -31,9 +30,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import org.apache.aurora.scheduler.mesos.TaskExecutors; -import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.junit.Before; import org.junit.Test; @@ -51,7 +48,8 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest { private static final UnusedResource RESOURCE = new UnusedResource( ResourceSlot.from(TASK, TaskExecutors.NO_OVERHEAD_EXECUTOR), IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE))); - private ResourceRequest request; + private static final ResourceRequest REQUEST = + new ResourceRequest(TASK, "taskId", AttributeAggregate.EMPTY); private static final Veto VETO_1 = Veto.insufficientResources("ram", 1); private static final Veto VETO_2 = Veto.insufficientResources("ram", 2); @@ -65,30 +63,26 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest { delegate = createMock(SchedulingFilter.class); eventSink = createMock(EventSink.class); filter = new NotifyingSchedulingFilter(delegate, eventSink); - AttributeAggregate emptyJob = new AttributeAggregate( - Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()), - createMock(AttributeStore.class)); - request = new ResourceRequest(TASK, "taskId", emptyJob); } @Test public void testEvents() { Set<Veto> vetoes = ImmutableSet.of(VETO_1, VETO_2); - expect(delegate.filter(RESOURCE, request)).andReturn(vetoes); + expect(delegate.filter(RESOURCE, REQUEST)).andReturn(vetoes); eventSink.post(new Vetoed(GROUP_KEY, vetoes)); control.replay(); - assertEquals(vetoes, filter.filter(RESOURCE, request)); + assertEquals(vetoes, filter.filter(RESOURCE, REQUEST)); } @Test public void testNoVetoes() { Set<Veto> vetoes = ImmutableSet.of(); - expect(delegate.filter(RESOURCE, request)).andReturn(vetoes); + expect(delegate.filter(RESOURCE, REQUEST)).andReturn(vetoes); control.replay(); - assertEquals(vetoes, filter.filter(RESOURCE, request)); + assertEquals(vetoes, filter.filter(RESOURCE, REQUEST)); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java b/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java index 4b56576..6b36062 100644 --- a/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java +++ b/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java @@ -13,12 +13,11 @@ */ package org.apache.aurora.scheduler.filter; -import java.util.Map; - import com.google.common.base.Optional; -import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableMap; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableMultiset; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multiset; import com.twitter.common.collections.Pair; import com.twitter.common.testing.easymock.EasyMockTest; @@ -37,65 +36,51 @@ import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; public class AttributeAggregateTest extends EasyMockTest { - - private Supplier<ImmutableSet<IScheduledTask>> activeTaskSupplier; private AttributeStore attributeStore; - private AttributeAggregate aggregate; @Before public void setUp() throws Exception { - activeTaskSupplier = createMock(new Clazz<Supplier<ImmutableSet<IScheduledTask>>>() { }); attributeStore = createMock(AttributeStore.class); - aggregate = new AttributeAggregate(activeTaskSupplier, attributeStore); } @Test public void testNoTasks() { - expectGetTasks(); - control.replay(); - assertAggregates(ImmutableMap.<Pair<String, String>, Long>of()); - assertAggregate("none", "alsoNone", 0); + AttributeAggregate aggregate = aggregate(); + assertEquals(ImmutableMultiset.<Pair<String, String>>of(), aggregate.getAggregates()); + assertAggregate(aggregate, "none", "alsoNone", 0); } @Test(expected = IllegalStateException.class) public void testAttributesMissing() { - expectGetTasks(task("1", "a")); expect(attributeStore.getHostAttributes("a")).andReturn(Optional.<IHostAttributes>absent()); control.replay(); - aggregate.getAggregates(); + aggregate(task("1", "a")).getAggregates(); } @Test(expected = NullPointerException.class) public void testTaskWithNoHost() { - expectGetTasks(task("1", null)); - control.replay(); - aggregate.getAggregates(); + aggregate(task("1", null)).getAggregates(); } @Test public void testNoAttributes() { - expectGetTasks(task("1", "hostA")); expectGetAttributes("hostA"); control.replay(); - assertAggregates(ImmutableMap.<Pair<String, String>, Long>of()); + assertEquals( + ImmutableMultiset.<Pair<String, String>>of(), + aggregate(task("1", "hostA")).getAggregates()); } @Test public void testAggregate() { - expectGetTasks( - task("1", "a1"), - task("2", "b1"), - task("3", "b1"), - task("4", "b2"), - task("5", "c1")); expectGetAttributes( "a1", attribute("host", "a1"), @@ -121,29 +106,37 @@ public class AttributeAggregateTest extends EasyMockTest { control.replay(); - Map<Pair<String, String>, Long> expected = ImmutableMap.<Pair<String, String>, Long>builder() - .put(Pair.of("rack", "a"), 1L) - .put(Pair.of("rack", "b"), 3L) - .put(Pair.of("rack", "c"), 1L) - .put(Pair.of("host", "a1"), 1L) - .put(Pair.of("host", "b1"), 2L) - .put(Pair.of("host", "b2"), 1L) - .put(Pair.of("host", "c1"), 1L) - .put(Pair.of("pdu", "p1"), 4L) - .put(Pair.of("pdu", "p2"), 4L) - .put(Pair.of("ssd", "true"), 1L) + Multiset<Pair<String, String>> expected = ImmutableMultiset.<Pair<String, String>>builder() + .add(Pair.of("rack", "a")) + .addCopies(Pair.of("rack", "b"), 3) + .add(Pair.of("rack", "c")) + .add(Pair.of("host", "a1")) + .addCopies(Pair.of("host", "b1"), 2) + .add(Pair.of("host", "b2")) + .add(Pair.of("host", "c1")) + .addCopies(Pair.of("pdu", "p1"), 4) + .addCopies(Pair.of("pdu", "p2"), 4) + .add(Pair.of("ssd", "true")) .build(); - assertAggregates(expected); - for (Map.Entry<Pair<String, String>, Long> entry : expected.entrySet()) { - assertAggregate(entry.getKey().getFirst(), entry.getKey().getSecond(), entry.getValue()); + AttributeAggregate aggregate = aggregate( + task("1", "a1"), + task("2", "b1"), + task("3", "b1"), + task("4", "b2"), + task("5", "c1")); + assertEquals(expected, aggregate.getAggregates()); + for (Multiset.Entry<Pair<String, String>> entry : expected.entrySet()) { + Pair<String, String> element = entry.getElement(); + assertAggregate(aggregate, element.getFirst(), element.getSecond(), entry.getCount()); } - assertAggregate("host", "c2", 0L); - assertAggregate("hostc", "2", 0L); + assertAggregate(aggregate, "host", "c2", 0L); + assertAggregate(aggregate, "hostc", "2", 0L); } - private void expectGetTasks(IScheduledTask... activeTasks) { - expect(activeTaskSupplier.get()) - .andReturn(ImmutableSet.<IScheduledTask>builder().add(activeTasks).build()); + private AttributeAggregate aggregate(IScheduledTask... activeTasks) { + return AttributeAggregate.create( + Suppliers.<Iterable<IScheduledTask>>ofInstance(ImmutableSet.copyOf(activeTasks)), + attributeStore); } private IExpectationSetters<?> expectGetAttributes(String host, Attribute... attributes) { @@ -153,11 +146,12 @@ public class AttributeAggregateTest extends EasyMockTest { .setAttributes(ImmutableSet.<Attribute>builder().add(attributes).build())))); } - private void assertAggregates(Map<Pair<String, String>, Long> expected) { - assertEquals(expected, aggregate.getAggregates()); - } + private void assertAggregate( + AttributeAggregate aggregate, + String name, + String value, + long expected) { - private void assertAggregate(String name, String value, long expected) { assertEquals(expected, aggregate.getNumTasksWithAttribute(name, value)); } http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java index d06b89c..26bad99 100644 --- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java @@ -15,16 +15,14 @@ package org.apache.aurora.scheduler.filter; import java.util.Arrays; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import com.google.common.base.Optional; import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.twitter.common.collections.Pair; import com.twitter.common.testing.easymock.EasyMockTest; -import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.gen.Attribute; import org.apache.aurora.gen.Constraint; import org.apache.aurora.gen.ExecutorConfig; @@ -32,7 +30,6 @@ import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.Identity; import org.apache.aurora.gen.LimitConstraint; import org.apache.aurora.gen.MaintenanceMode; -import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.gen.TaskConstraint; import org.apache.aurora.gen.ValueConstraint; @@ -45,20 +42,18 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup; import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType; import org.apache.aurora.scheduler.mesos.Offers; import org.apache.aurora.scheduler.mesos.TaskExecutors; -import org.apache.aurora.scheduler.storage.AttributeStore; +import org.apache.aurora.scheduler.storage.entities.IAttribute; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import org.easymock.IExpectationSetters; import org.junit.Before; import org.junit.Test; import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.CPU; import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.DISK; import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.PORTS; import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.RAM; -import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; public class SchedulingFilterImplTest extends EasyMockTest { @@ -92,20 +87,11 @@ public class SchedulingFilterImplTest extends EasyMockTest { private static final ResourceSlot DEFAULT_OFFER = ResourceSlot.from( Offers.createOffer(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK, Pair.of(80, 80))); - private AttributeAggregate emptyJob; - - private final AtomicLong taskIdCounter = new AtomicLong(); - private SchedulingFilter defaultFilter; - private AttributeStore.Mutable attributeStore; @Before public void setUp() { defaultFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR); - attributeStore = createMock(AttributeStore.Mutable.class); - emptyJob = new AttributeAggregate( - Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()), - attributeStore); } @Test @@ -145,22 +131,22 @@ public class SchedulingFilterImplTest extends EasyMockTest { none, defaultFilter.filter( new UnusedResource(twoPorts, hostA), - new ResourceRequest(noPortTask, TASK_ID, emptyJob))); + new ResourceRequest(noPortTask, TASK_ID, EMPTY))); assertEquals( none, defaultFilter.filter( new UnusedResource(twoPorts, hostA), - new ResourceRequest(onePortTask, TASK_ID, emptyJob))); + new ResourceRequest(onePortTask, TASK_ID, EMPTY))); assertEquals( none, defaultFilter.filter( new UnusedResource(twoPorts, hostA), - new ResourceRequest(twoPortTask, TASK_ID, emptyJob))); + new ResourceRequest(twoPortTask, TASK_ID, EMPTY))); assertEquals( ImmutableSet.of(PORTS.veto(1)), defaultFilter.filter( new UnusedResource(twoPorts, hostA), - new ResourceRequest(threePortTask, TASK_ID, emptyJob))); + new ResourceRequest(threePortTask, TASK_ID, EMPTY))); } @Test @@ -296,38 +282,41 @@ public class SchedulingFilterImplTest extends EasyMockTest { assertNoVetoes(hostLimitTask(2), hostAttributes(HOST_A, host(HOST_A))); } - private Attribute host(String host) { + private IAttribute host(String host) { return valueAttribute(HOST_ATTRIBUTE, host); } - private Attribute rack(String rack) { + private IAttribute rack(String rack) { return valueAttribute(RACK_ATTRIBUTE, rack); } - private Attribute dedicated(String value, String... values) { + private IAttribute dedicated(String value, String... values) { return valueAttribute(DEDICATED_ATTRIBUTE, value, values); } @Test public void testLimitWithinJob() throws Exception { - expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce(); - expectGetHostAttributes(HOST_B, host(HOST_B), rack(RACK_A)).atLeastOnce(); - expectGetHostAttributes(HOST_C, host(HOST_C), rack(RACK_B)).atLeastOnce(); - - AttributeAggregate stateA = new AttributeAggregate(Suppliers.ofInstance(ImmutableSet.of( - makeScheduledTask(OWNER_A, JOB_A, HOST_A), - makeScheduledTask(OWNER_A, JOB_A, HOST_B), - makeScheduledTask(OWNER_A, JOB_A, HOST_B), - makeScheduledTask(OWNER_A, JOB_A, HOST_C))), - attributeStore); - AttributeAggregate stateB = new AttributeAggregate(Suppliers.ofInstance(ImmutableSet.of( - makeScheduledTask(OWNER_B, JOB_A, HOST_A), - makeScheduledTask(OWNER_B, JOB_A, HOST_A), - makeScheduledTask(OWNER_B, JOB_A, HOST_B))), - attributeStore); - control.replay(); + AttributeAggregate stateA = AttributeAggregate.create( + Suppliers.<Iterable<IAttribute>>ofInstance(ImmutableList.of( + host(HOST_A), + rack(RACK_A), + host(HOST_B), + rack(RACK_A), + host(HOST_B), + rack(RACK_A), + host(HOST_C), + rack(RACK_B)))); + AttributeAggregate stateB = AttributeAggregate.create( + Suppliers.<Iterable<IAttribute>>ofInstance(ImmutableList.of( + host(HOST_A), + rack(RACK_A), + host(HOST_A), + rack(RACK_A), + host(HOST_B), + rack(RACK_A)))); + IHostAttributes hostA = hostAttributes(HOST_A, host(HOST_A), rack(RACK_A)); IHostAttributes hostB = hostAttributes(HOST_B, host(HOST_B), rack(RACK_A)); IHostAttributes hostC = hostAttributes(HOST_C, host(HOST_C), rack(RACK_B)); @@ -421,7 +410,7 @@ public class SchedulingFilterImplTest extends EasyMockTest { ImmutableSet.<Veto>of(), defaultFilter.filter( new UnusedResource(DEFAULT_OFFER, hostA), - new ResourceRequest(task, TASK_ID, emptyJob))); + new ResourceRequest(task, TASK_ID, EMPTY))); Constraint jvmNegated = jvmConstraint.deepCopy(); jvmNegated.getConstraint().getValue().setNegated(true); @@ -512,7 +501,7 @@ public class SchedulingFilterImplTest extends EasyMockTest { return checkConstraint( owner, jobName, - emptyJob, + EMPTY, hostAttributes, constraintName, expected, @@ -551,7 +540,7 @@ public class SchedulingFilterImplTest extends EasyMockTest { } private void assertNoVetoes(ITaskConfig task, IHostAttributes hostAttributes) { - assertVetoes(task, hostAttributes, emptyJob); + assertVetoes(task, hostAttributes, EMPTY); } private void assertNoVetoes( @@ -563,7 +552,7 @@ public class SchedulingFilterImplTest extends EasyMockTest { } private void assertVetoes(ITaskConfig task, IHostAttributes hostAttributes, Veto... vetoes) { - assertVetoes(task, hostAttributes, emptyJob, vetoes); + assertVetoes(task, hostAttributes, EMPTY, vetoes); } private void assertVetoes( @@ -582,25 +571,25 @@ public class SchedulingFilterImplTest extends EasyMockTest { private static IHostAttributes hostAttributes( String host, MaintenanceMode mode, - Attribute... attributes) { + IAttribute... attributes) { return IHostAttributes.build( new HostAttributes() .setHost(host) .setMode(mode) - .setAttributes(ImmutableSet.<Attribute>builder().add(attributes).build())); + .setAttributes(IAttribute.toBuildersSet(ImmutableSet.copyOf(attributes)))); } private static IHostAttributes hostAttributes( String host, - Attribute... attributes) { + IAttribute... attributes) { return hostAttributes(host, MaintenanceMode.NONE, attributes); } - private Attribute valueAttribute(String name, String string, String... strings) { - return new Attribute(name, - ImmutableSet.<String>builder().add(string).addAll(Arrays.asList(strings)).build()); + private IAttribute valueAttribute(String name, String string, String... strings) { + return IAttribute.build(new Attribute(name, + ImmutableSet.<String>builder().add(string).addAll(Arrays.asList(strings)).build())); } private static Constraint makeConstraint(String name, String... values) { @@ -608,25 +597,6 @@ public class SchedulingFilterImplTest extends EasyMockTest { TaskConstraint.value(new ValueConstraint(false, ImmutableSet.copyOf(values)))); } - private IExpectationSetters<Optional<IHostAttributes>> expectGetHostAttributes( - String host, - Attribute... attributes) { - - IHostAttributes hostAttributes = IHostAttributes.build(new HostAttributes() - .setHost(host) - .setAttributes(ImmutableSet.<Attribute>builder().add(attributes).build())); - return expect(attributeStore.getHostAttributes(host)).andReturn(Optional.of(hostAttributes)); - } - - private IScheduledTask makeScheduledTask(Identity owner, String jobName, String host) { - return IScheduledTask.build(new ScheduledTask().setAssignedTask( - new AssignedTask() - .setSlaveHost(host) - .setTaskId("Task-" + taskIdCounter.incrementAndGet()) - .setTask(hostLimitTask(owner, jobName, 1 /* Max per host not used here. */) - .newBuilder()))); - } - private Constraint limitConstraint(String name, int value) { return new Constraint(name, TaskConstraint.limit(new LimitConstraint(value))); } http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java index aca0234..ff95c36 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java @@ -13,7 +13,6 @@ */ package org.apache.aurora.scheduler.state; -import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.twitter.common.testing.easymock.EasyMockTest; @@ -26,7 +25,6 @@ import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.ResourceSlot; import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; @@ -34,7 +32,6 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import org.apache.aurora.scheduler.mesos.MesosTaskFactory; import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl; -import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.mesos.Protos.FrameworkID; @@ -49,6 +46,7 @@ import org.apache.mesos.Protos.Value.Type; import org.junit.Before; import org.junit.Test; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY; import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import static org.apache.mesos.Protos.Offer; import static org.easymock.EasyMock.expect; @@ -89,7 +87,6 @@ public class TaskAssignerImplTest extends EasyMockTest { private SchedulingFilter filter; private MesosTaskFactory taskFactory; private TaskAssigner assigner; - private AttributeAggregate emptyJob; @Before public void setUp() throws Exception { @@ -98,16 +95,13 @@ public class TaskAssignerImplTest extends EasyMockTest { filter = createMock(SchedulingFilter.class); taskFactory = createMock(MesosTaskFactory.class); assigner = new TaskAssignerImpl(stateManager, filter, taskFactory); - emptyJob = new AttributeAggregate( - Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()), - createMock(AttributeStore.class)); } @Test public void testAssignNoVetoes() { expect(filter.filter( new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()), - new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob))) + new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), EMPTY))) .andReturn(ImmutableSet.<Veto>of()); expect(stateManager.assignTask( storeProvider, @@ -126,14 +120,14 @@ public class TaskAssignerImplTest extends EasyMockTest { assigner.maybeAssign( storeProvider, OFFER, - new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob))); + new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), EMPTY))); } @Test public void testAssignVetoes() { expect(filter.filter( new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()), - new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob))) + new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), EMPTY))) .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied"))); control.replay(); @@ -143,6 +137,6 @@ public class TaskAssignerImplTest extends EasyMockTest { assigner.maybeAssign( storeProvider, OFFER, - new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob))); + new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), EMPTY))); } }
