Refactor scheduling code to split matching and assigning phases This patch sets the stage for performing the bulk of scheduling work in a separate call path, without holding the write lock.
Also included is a mechanical refactor pushing the `revocable` flag into `ResourceRequest` (which was ~always needed as a sibling parameter). Reviewed at https://reviews.apache.org/r/64954/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/4e6242fe Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/4e6242fe Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/4e6242fe Branch: refs/heads/master Commit: 4e6242fed68650e7be906ec3d17ae750f49f8cb8 Parents: 5b34231 Author: Bill Farner <[email protected]> Authored: Tue Jan 9 14:50:51 2018 -0800 Committer: Bill Farner <[email protected]> Committed: Tue Jan 9 14:50:51 2018 -0800 ---------------------------------------------------------------------- .../common/testing/easymock/EasyMockTest.java | 14 +- .../benchmark/fakes/FakeOfferManager.java | 12 +- .../apache/aurora/scheduler/TierManager.java | 4 +- .../aurora/scheduler/base/TaskTestUtil.java | 14 +- .../executor/ExecutorSettings.java | 21 +- .../scheduler/filter/SchedulingFilter.java | 42 +++- .../scheduler/mesos/MesosTaskFactory.java | 7 +- .../aurora/scheduler/offers/HostOffers.java | 29 +-- .../aurora/scheduler/offers/OfferManager.java | 10 +- .../scheduler/offers/OfferManagerImpl.java | 12 +- .../scheduler/preemptor/PreemptionVictim.java | 5 +- .../preemptor/PreemptionVictimFilter.java | 20 +- .../scheduler/resources/ResourceManager.java | 6 + .../scheduler/scheduling/TaskAssigner.java | 2 +- .../scheduler/scheduling/TaskAssignerImpl.java | 237 +++++++++---------- .../scheduler/scheduling/TaskScheduler.java | 2 +- .../scheduler/scheduling/TaskSchedulerImpl.java | 100 ++++---- .../aurora/scheduler/storage/TaskStore.java | 3 +- .../storage/durability/WriteRecorder.java | 3 +- .../scheduler/updater/UpdateAgentReserver.java | 33 +-- .../aurora/scheduler/TierManagerTest.java | 7 + .../events/NotifyingSchedulingFilterTest.java | 6 +- .../filter/SchedulingFilterImplTest.java | 36 ++- .../mesos/MesosTaskFactoryImplTest.java | 12 +- .../scheduler/offers/OfferManagerImplTest.java | 51 ++-- .../preemptor/PreemptionVictimFilterTest.java | 101 ++------ .../scheduling/FirstFitOfferSelectorTest.java | 9 +- .../scheduling/TaskAssignerImplTest.java | 170 ++++++------- .../scheduling/TaskSchedulerImplTest.java | 56 ++--- .../updater/NullAgentReserverTest.java | 3 +- .../updater/UpdateAgentReserverImplTest.java | 45 +--- 31 files changed, 476 insertions(+), 596 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java b/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java index c5500b3..15fc677 100644 --- a/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java +++ b/commons/src/main/java/org/apache/aurora/common/testing/easymock/EasyMockTest.java @@ -26,6 +26,9 @@ import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.IMocksControl; import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; import static org.easymock.EasyMock.createControl; @@ -38,6 +41,16 @@ import static org.easymock.EasyMock.createControl; public abstract class EasyMockTest extends TearDownTestCase { protected IMocksControl control; + @Rule + public final TestWatcher verifyControl = new TestWatcher() { + @Override + protected void succeeded(Description description) { + // Only attempt to verify the control when the test case otherwise succeeded. This prevents + // spurious mock-related error messages that distract from the real error. + control.verify(); + } + }; + /** * Creates an EasyMock {@link #control} for tests to use that will be automatically * {@link IMocksControl#verify() verified} on tear down. @@ -45,7 +58,6 @@ public abstract class EasyMockTest extends TearDownTestCase { @Before public final void setupEasyMock() { control = createControl(); - addTearDown(() -> control.verify()); } /** http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java index f0dacd4..0a105c7 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java +++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java @@ -15,6 +15,8 @@ package org.apache.aurora.benchmark.fakes; import java.util.Optional; +import com.google.common.collect.ImmutableList; + import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; @@ -59,18 +61,14 @@ public class FakeOfferManager implements OfferManager { } @Override - public Optional<HostOffer> getMatching(Protos.AgentID slaveId, - ResourceRequest resourceRequest, - boolean revocable) { - + public Optional<HostOffer> getMatching(Protos.AgentID slaveId, ResourceRequest resourceRequest) { return Optional.empty(); } @Override public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey, - ResourceRequest resourceRequest, - boolean revocable) { + ResourceRequest resourceRequest) { - return null; + return ImmutableList.of(); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/TierManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/TierManager.java b/src/main/java/org/apache/aurora/scheduler/TierManager.java index c6ad2b1..a37fea4 100644 --- a/src/main/java/org/apache/aurora/scheduler/TierManager.java +++ b/src/main/java/org/apache/aurora/scheduler/TierManager.java @@ -14,6 +14,7 @@ package org.apache.aurora.scheduler; import java.util.Map; +import java.util.Optional; import javax.inject.Inject; @@ -102,7 +103,8 @@ public interface TierManager { !taskConfig.isSetTier() || tierConfig.tiers.containsKey(taskConfig.getTier()), "Invalid tier '%s' in TaskConfig.", taskConfig.getTier()); - return tierConfig.tiers.get(taskConfig.getTier()); + return tierConfig.tiers.get( + Optional.ofNullable(taskConfig.getTier()).orElse(tierConfig.defaultTier)); } @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java index 22d5a64..2b61c27 100644 --- a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java +++ b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java @@ -47,6 +47,8 @@ import org.apache.aurora.scheduler.configuration.ConfigurationManager; import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings; import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig; import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; +import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.storage.durability.ThriftBackfill; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -70,12 +72,14 @@ public final class TaskTestUtil { new TierInfo(true /* preemptible */, false /* revocable */); public static final TierInfo PREFERRED_TIER = new TierInfo(false /* preemptible */, false /* revocable */); + public static final String REVOCABLE_TIER_NAME = "tier-revocable"; public static final String PROD_TIER_NAME = "tier-prod"; public static final String DEV_TIER_NAME = "tier-dev"; public static final TierConfig TIER_CONFIG = new TierConfig(DEV_TIER_NAME, ImmutableMap.of( PROD_TIER_NAME, PREFERRED_TIER, - DEV_TIER_NAME, DEV_TIER + DEV_TIER_NAME, DEV_TIER, + REVOCABLE_TIER_NAME, REVOCABLE_TIER )); public static final TierManager TIER_MANAGER = new TierManager.TierManagerImpl(TIER_CONFIG); public static final ThriftBackfill THRIFT_BACKFILL = new ThriftBackfill(TIER_MANAGER); @@ -237,4 +241,12 @@ public final class TaskTestUtil { new org.apache.aurora.gen.TierConfig("revocable", REVOCABLE_TIER.toMap()) ); } + + public static ResourceRequest toResourceRequest(ITaskConfig task) { + return ResourceRequest.fromTask( + task, + EXECUTOR_SETTINGS, + AttributeAggregate.empty(), + TIER_MANAGER); + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java index 5c987fd..dac84e2 100644 --- a/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java +++ b/src/main/java/org/apache/aurora/scheduler/configuration/executor/ExecutorSettings.java @@ -19,6 +19,9 @@ import java.util.Optional; import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.resources.ResourceManager; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static java.util.Objects.requireNonNull; @@ -26,6 +29,9 @@ import static java.util.Objects.requireNonNull; * Configuration for the executor to run, and resource overhead required for it. */ public class ExecutorSettings { + + private static final Logger LOG = LoggerFactory.getLogger(ExecutorSettings.class); + private final Map<String, ExecutorConfig> config; private final boolean populateDiscoveryInfo; @@ -45,12 +51,19 @@ public class ExecutorSettings { return populateDiscoveryInfo; } - public Optional<ResourceBag> getExecutorOverhead(String name) { + public ResourceBag getExecutorOverhead(ITaskConfig task) { + if (!task.isSetExecutorConfig()) { + // Docker-based tasks don't need executors + return ResourceBag.EMPTY; + } + + String name = task.getExecutorConfig().getName(); if (config.containsKey(name)) { - return Optional.of( - ResourceManager.bagFromMesosResources(config.get(name).getExecutor().getResourcesList())); + return ResourceManager.bagFromMesosResources( + config.get(name).getExecutor().getResourcesList()); } else { - return Optional.empty(); + LOG.warn("No executor configuration found for " + name); + return ResourceBag.EMPTY; } } http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java index bd41590..fd97259 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java @@ -21,12 +21,17 @@ import java.util.Set; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; +import org.apache.aurora.scheduler.TierManager; +import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; import org.apache.aurora.scheduler.offers.HostOffer; import org.apache.aurora.scheduler.resources.ResourceBag; +import org.apache.aurora.scheduler.resources.ResourceManager; import org.apache.aurora.scheduler.storage.entities.IConstraint; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import static java.util.Objects.requireNonNull; + import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.CONSTRAINT_MISMATCH; import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.INSUFFICIENT_RESOURCES; import static org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType.LIMIT_NOT_SATISFIED; @@ -306,11 +311,31 @@ public interface SchedulingFilter { private final ITaskConfig task; private final ResourceBag request; private final AttributeAggregate jobState; + private final boolean revocable; + + private ResourceRequest( + ITaskConfig task, + ResourceBag request, + AttributeAggregate jobState, + boolean revocable) { - public ResourceRequest(ITaskConfig task, ResourceBag request, AttributeAggregate jobState) { - this.task = task; - this.request = request; - this.jobState = jobState; + this.task = requireNonNull(task); + this.request = requireNonNull(request); + this.jobState = requireNonNull(jobState); + this.revocable = revocable; + } + + public static ResourceRequest fromTask( + ITaskConfig task, + ExecutorSettings executorSettings, + AttributeAggregate jobState, + TierManager tierManager) { + + return new ResourceRequest( + task, + ResourceManager.bagFromTask(task, executorSettings), + jobState, + tierManager.getTier(task).isRevocable()); } public Iterable<IConstraint> getConstraints() { @@ -329,6 +354,10 @@ public interface SchedulingFilter { return jobState; } + public boolean isRevocable() { + return revocable; + } + @Override public boolean equals(Object o) { if (!(o instanceof ResourceRequest)) { @@ -338,12 +367,13 @@ public interface SchedulingFilter { ResourceRequest other = (ResourceRequest) o; return Objects.equals(task, other.task) && Objects.equals(request, other.request) - && Objects.equals(jobState, other.jobState); + && Objects.equals(jobState, other.jobState) + && revocable == other.revocable; } @Override public int hashCode() { - return Objects.hash(task, request, jobState); + return Objects.hash(task, request, jobState, revocable); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java index cb288bb..bcb2bbf 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosTaskFactory.java @@ -152,12 +152,7 @@ public interface MesosTaskFactory { ITaskConfig config = task.getTask(); - // Docker-based tasks don't need executors - ResourceBag executorOverhead = ResourceBag.EMPTY; - if (config.isSetExecutorConfig()) { - executorOverhead = - executorSettings.getExecutorOverhead(getExecutorName(task)).orElse(ResourceBag.EMPTY); - } + ResourceBag executorOverhead = executorSettings.getExecutorOverhead(config); AcceptedOffer acceptedOffer; // TODO(wfarner): Re-evaluate if/why we need to continue handling unset assignedPorts field. http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java index a01c0a8..2ea7a01 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java @@ -154,22 +154,13 @@ class HostOffers { .toSet(); } - synchronized Optional<HostOffer> getMatching(Protos.AgentID slaveId, - ResourceRequest resourceRequest, - boolean revocable) { + synchronized Optional<HostOffer> getMatching( + Protos.AgentID slaveId, + ResourceRequest resourceRequest) { - Optional<HostOffer> optionalOffer = get(slaveId); - if (optionalOffer.isPresent()) { - HostOffer offer = optionalOffer.get(); - - if (isGloballyBanned(offer) - || isVetoed(offer, resourceRequest, revocable, Optional.empty())) { - - return Optional.empty(); - } - } - - return optionalOffer; + return get(slaveId) + .filter(offer -> !isGloballyBanned(offer)) + .filter(offer -> !isVetoed(offer, resourceRequest, Optional.empty())); } /** @@ -182,14 +173,13 @@ class HostOffers { * @return The offers a given task group can use. */ synchronized Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey, - ResourceRequest resourceRequest, - boolean revocable) { + ResourceRequest resourceRequest) { return Iterables.unmodifiableIterable(FluentIterable.from(offers) .filter(o -> !isGloballyBanned(o)) .filter(o -> !isStaticallyBanned(o, groupKey)) .filter(HostOffer::hasCpuAndMem) - .filter(o -> !isVetoed(o, resourceRequest, revocable, Optional.of(groupKey)))); + .filter(o -> !isVetoed(o, resourceRequest, Optional.of(groupKey)))); } private synchronized boolean isGloballyBanned(HostOffer offer) { @@ -207,11 +197,10 @@ class HostOffers { */ private boolean isVetoed(HostOffer offer, ResourceRequest resourceRequest, - boolean revocable, Optional<TaskGroupKey> groupKey) { vetoEvaluatedOffers.incrementAndGet(); - UnusedResource unusedResource = new UnusedResource(offer, revocable); + UnusedResource unusedResource = new UnusedResource(offer, resourceRequest.isRevocable()); Set<Veto> vetoes = schedulingFilter.filter(unusedResource, resourceRequest); if (!vetoes.isEmpty()) { if (groupKey.isPresent() && Veto.identifyGroup(vetoes) == SchedulingFilter.VetoGroup.STATIC) { http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java index e90de3e..8f9e33d 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java @@ -81,12 +81,9 @@ public interface OfferManager extends EventSubscriber { * * @param slaveId Slave ID to get the offer for. * @param resourceRequest The request that the offer should satisfy. - * @param revocable Whether or not the request can use revocable resources. * @return An option containing the offer for the slave ID if it fits. */ - Optional<HostOffer> getMatching(AgentID slaveId, - ResourceRequest resourceRequest, - boolean revocable); + Optional<HostOffer> getMatching(AgentID slaveId, ResourceRequest resourceRequest); /** * Gets all offers that the scheduler is holding that satisfy the supplied @@ -94,12 +91,9 @@ public interface OfferManager extends EventSubscriber { * * @param groupKey The {@link TaskGroupKey} of the task in the {@link ResourceRequest}. * @param resourceRequest The request that the offer should satisfy. - * @param revocable Whether or not the request can use revocable resources. * @return An option containing the offer for the slave ID if it fits. */ - Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey, - ResourceRequest resourceRequest, - boolean revocable); + Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey, ResourceRequest resourceRequest); /** * Launches the task matched against the offer. http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java index 8e806b7..084b48c 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java @@ -162,19 +162,15 @@ public class OfferManagerImpl implements OfferManager { } @Override - public Optional<HostOffer> getMatching(Protos.AgentID slaveId, - ResourceRequest resourceRequest, - boolean revocable) { - - return hostOffers.getMatching(slaveId, resourceRequest, revocable); + public Optional<HostOffer> getMatching(Protos.AgentID slaveId, ResourceRequest resourceRequest) { + return hostOffers.getMatching(slaveId, resourceRequest); } @Override public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey, - ResourceRequest resourceRequest, - boolean revocable) { + ResourceRequest resourceRequest) { - return hostOffers.getAllMatching(groupKey, resourceRequest, revocable); + return hostOffers.getAllMatching(groupKey, resourceRequest); } /** http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java index 69b6866..780689e 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java @@ -17,6 +17,7 @@ import java.util.Objects; import com.google.common.base.MoreObjects; +import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.resources.ResourceManager; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; @@ -54,8 +55,8 @@ public final class PreemptionVictim { return task.getTask().getPriority(); } - public ResourceBag getResourceBag() { - return ResourceManager.bagFromResources(task.getTask().getResources()); + public ResourceBag getResourceBag(ExecutorSettings executorSettings) { + return ResourceManager.bagFromTask(task.getTask(), executorSettings); } public String getTaskId() { http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java index cf6d348..569cfe6 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java @@ -38,7 +38,6 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import org.apache.aurora.scheduler.offers.HostOffer; import org.apache.aurora.scheduler.resources.ResourceBag; -import org.apache.aurora.scheduler.resources.ResourceManager; import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; @@ -113,13 +112,7 @@ public interface PreemptionVictimFilter { new Function<PreemptionVictim, ResourceBag>() { @Override public ResourceBag apply(PreemptionVictim victim) { - ResourceBag bag = victim.getResourceBag(); - - if (victim.getConfig().isSetExecutorConfig()) { - // Be pessimistic about revocable resource available if config is not available - bag.add(executorSettings.getExecutorOverhead( - victim.getConfig().getExecutorConfig().getName()).orElse(EMPTY)); - } + ResourceBag bag = victim.getResourceBag(executorSettings); if (tierManager.getTier(victim.getConfig()).isRevocable()) { // Revocable task CPU cannot be used for preemption purposes as it's a compressible @@ -223,10 +216,8 @@ public interface PreemptionVictimFilter { return Optional.empty(); } - ResourceBag overhead = pendingTask.isSetExecutorConfig() - ? executorSettings.getExecutorOverhead( - pendingTask.getExecutorConfig().getName()).orElse(EMPTY) - : EMPTY; + ResourceRequest requiredResources = + ResourceRequest.fromTask(pendingTask, executorSettings, jobState, tierManager); ResourceBag totalResource = slackResources; for (PreemptionVictim victim : sortedVictims) { @@ -240,10 +231,7 @@ public interface PreemptionVictimFilter { Set<Veto> vetoes = schedulingFilter.filter( new UnusedResource(totalResource, attributes.get(), unavailability), - new ResourceRequest( - pendingTask, - ResourceManager.bagFromResources(pendingTask.getResources()).add(overhead), - jobState)); + requiredResources); if (vetoes.isEmpty()) { return Optional.of(ImmutableSet.copyOf(toPreemptTasks)); http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java index d093753..2bf9808 100644 --- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java +++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java @@ -26,6 +26,7 @@ import com.google.common.base.Predicates; import com.google.common.collect.Iterables; import org.apache.aurora.gen.ResourceAggregate; +import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; import org.apache.aurora.scheduler.storage.durability.ThriftBackfill; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IResource; @@ -245,6 +246,11 @@ public final class ResourceManager { return bagFromResources(resources, RESOURCE_TO_TYPE, QUANTIFY_RESOURCE); } + public static ResourceBag bagFromTask(ITaskConfig task, ExecutorSettings executorSettings) { + return bagFromResources(task.getResources(), RESOURCE_TO_TYPE, QUANTIFY_RESOURCE) + .add(executorSettings.getExecutorOverhead(task)); + } + /** * Creates a {@link ResourceBag} from Mesos resources. * http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java index 87619b5..d2597a1 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java @@ -41,6 +41,6 @@ public interface TaskAssigner { MutableStoreProvider storeProvider, ResourceRequest resourceRequest, TaskGroupKey groupKey, - Iterable<IAssignedTask> tasks, + Set<IAssignedTask> tasks, Map<String, TaskGroupKey> preemptionReservations); } http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java index 54bd177..ec416cc 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java @@ -13,6 +13,7 @@ */ package org.apache.aurora.scheduler.scheduling; +import java.util.Collection; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -21,22 +22,22 @@ import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; import org.apache.aurora.common.stats.StatsProvider; -import org.apache.aurora.scheduler.TierManager; import org.apache.aurora.scheduler.base.InstanceKeys; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.mesos.MesosTaskFactory; import org.apache.aurora.scheduler.offers.HostOffer; import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.offers.OfferManager.LaunchException; import org.apache.aurora.scheduler.resources.ResourceManager; import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IInstanceKey; import org.apache.aurora.scheduler.updater.UpdateAgentReserver; @@ -64,7 +65,6 @@ public class TaskAssignerImpl implements TaskAssigner { private final StateManager stateManager; private final MesosTaskFactory taskFactory; private final OfferManager offerManager; - private final TierManager tierManager; private final UpdateAgentReserver updateAgentReserver; private final OfferSelector offerSelector; @@ -73,7 +73,6 @@ public class TaskAssignerImpl implements TaskAssigner { StateManager stateManager, MesosTaskFactory taskFactory, OfferManager offerManager, - TierManager tierManager, UpdateAgentReserver updateAgentReserver, StatsProvider statsProvider, OfferSelector offerSelector) { @@ -81,7 +80,6 @@ public class TaskAssignerImpl implements TaskAssigner { this.stateManager = requireNonNull(stateManager); this.taskFactory = requireNonNull(taskFactory); this.offerManager = requireNonNull(offerManager); - this.tierManager = requireNonNull(tierManager); this.launchFailures = statsProvider.makeCounter(ASSIGNER_LAUNCH_FAILURES); this.updateAgentReserver = requireNonNull(updateAgentReserver); this.offerSelector = requireNonNull(offerSelector); @@ -99,7 +97,7 @@ public class TaskAssignerImpl implements TaskAssigner { } private Protos.TaskInfo assign( - Storage.MutableStoreProvider storeProvider, + MutableStoreProvider storeProvider, Protos.Offer offer, String taskId, boolean revocable) { @@ -118,20 +116,18 @@ public class TaskAssignerImpl implements TaskAssigner { } private void launchUsingOffer( - Storage.MutableStoreProvider storeProvider, - boolean revocable, + MutableStoreProvider stores, ResourceRequest resourceRequest, IAssignedTask task, - HostOffer offer, - ImmutableSet.Builder<String> assignmentResult) throws OfferManager.LaunchException { + HostOffer offer) throws LaunchException { String taskId = task.getTaskId(); - Protos.TaskInfo taskInfo = assign(storeProvider, offer.getOffer(), taskId, revocable); + Protos.TaskInfo taskInfo = + assign(stores, offer.getOffer(), taskId, resourceRequest.isRevocable()); resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes()); try { offerManager.launchTask(offer.getOffer().getId(), taskInfo); - assignmentResult.add(taskId); - } catch (OfferManager.LaunchException e) { + } catch (LaunchException e) { LOG.warn("Failed to launch task.", e); launchFailures.incrementAndGet(); @@ -139,147 +135,144 @@ public class TaskAssignerImpl implements TaskAssigner { // It is in the LOST state and a new task will move to PENDING to replace it. // Should the state change fail due to storage issues, that's okay. The task will // time out in the ASSIGNED state and be moved to LOST. - stateManager.changeState( - storeProvider, - taskId, - Optional.of(PENDING), - LOST, - LAUNCH_FAILED_MSG); + stateManager.changeState(stores, taskId, Optional.of(PENDING), LOST, LAUNCH_FAILED_MSG); throw e; } } - private Iterable<IAssignedTask> maybeAssignReserved( - Iterable<IAssignedTask> tasks, - Storage.MutableStoreProvider storeProvider, - boolean revocable, - ResourceRequest resourceRequest, - TaskGroupKey groupKey, - ImmutableSet.Builder<String> assignmentResult) { + private static final class ReservationStatus { + final boolean taskReserving; + final Optional<HostOffer> offer; - if (!updateAgentReserver.hasReservations(groupKey)) { - return tasks; + private ReservationStatus(boolean taskReserving, Optional<HostOffer> offer) { + this.taskReserving = taskReserving; + this.offer = requireNonNull(offer); } - // Data structure to record which tasks should be excluded from the regular (non-reserved) - // scheduling loop. This is important because we release reservations once they are used, - // so we need to record them separately to avoid them being double-scheduled. - ImmutableSet.Builder<IInstanceKey> excludeBuilder = ImmutableSet.builder(); - - for (IAssignedTask task : tasks) { - IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), task.getInstanceId()); - Optional<String> maybeAgentId = updateAgentReserver.getAgent(key); - if (maybeAgentId.isPresent()) { - excludeBuilder.add(key); - Optional<HostOffer> offer = offerManager.getMatching( - Protos.AgentID.newBuilder().setValue(maybeAgentId.get()).build(), - resourceRequest, - revocable); - if (offer.isPresent()) { - try { - // The offer can still be veto'd because of changed constraints, or because the - // Scheduler hasn't been updated by Mesos yet... - launchUsingOffer(storeProvider, - revocable, - resourceRequest, - task, - offer.get(), - assignmentResult); - LOG.info("Used update reservation for {} on {}", key, maybeAgentId.get()); - updateAgentReserver.release(maybeAgentId.get(), key); - } catch (OfferManager.LaunchException e) { - updateAgentReserver.release(maybeAgentId.get(), key); - } - } else { - LOG.info( - "Tried to reuse offer on {} for {}, but was not ready yet.", - maybeAgentId.get(), - key); - } - } + static final ReservationStatus NOT_RESERVING = new ReservationStatus(false, Optional.empty()); + static final ReservationStatus NOT_READY = new ReservationStatus(true, Optional.empty()); + + static ReservationStatus ready(HostOffer offer) { + return new ReservationStatus(true, Optional.of(offer)); } - // Return only the tasks that didn't have reservations. Offers on agents that were reserved - // might not have been seen by Aurora yet, so we need to wait until the reservation expires - // before giving up and falling back to the first-fit algorithm. - Set<IInstanceKey> toBeExcluded = excludeBuilder.build(); - return Iterables.filter(tasks, t -> !toBeExcluded.contains( - InstanceKeys.from(t.getTask().getJob(), t.getInstanceId()))); + boolean isTaskReserving() { + return taskReserving; + } + + Optional<HostOffer> getOffer() { + return offer; + } + } + + private ReservationStatus getReservation(IAssignedTask task, ResourceRequest resourceRequest) { + + IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), task.getInstanceId()); + Optional<String> agentId = updateAgentReserver.getAgent(key); + if (!agentId.isPresent()) { + return ReservationStatus.NOT_RESERVING; + } + Optional<HostOffer> offer = offerManager.getMatching( + Protos.AgentID.newBuilder().setValue(agentId.get()).build(), + resourceRequest); + if (offer.isPresent()) { + LOG.info("Used update reservation for {} on {}", key, agentId.get()); + updateAgentReserver.release(agentId.get(), key); + return ReservationStatus.ready(offer.get()); + } else { + LOG.info( + "Tried to reuse offer on {} for {}, but was not ready yet.", + agentId.get(), + key); + return ReservationStatus.NOT_READY; + } } /** * Determine whether or not the offer is reserved for a different task via preemption or * update affinity. */ - @SuppressWarnings("PMD.UselessParentheses") // TODO(jly): PMD bug, remove when upgrade from 5.5.3 - private boolean isAgentReserved(HostOffer offer, - TaskGroupKey groupKey, - Map<String, TaskGroupKey> preemptionReservations) { + private boolean isAgentReserved( + HostOffer offer, + TaskGroupKey groupKey, + Map<String, TaskGroupKey> preemptionReservations) { String agentId = offer.getOffer().getAgentId().getValue(); - Optional<TaskGroupKey> reservedGroup = Optional.ofNullable( - preemptionReservations.get(agentId)); + boolean reservedForPreemption = Optional.ofNullable(preemptionReservations.get(agentId)) + .map(group -> !group.equals(groupKey)) + .orElse(false); - return (reservedGroup.isPresent() && !reservedGroup.get().equals(groupKey)) - || !updateAgentReserver.getReservations(agentId).isEmpty(); + return reservedForPreemption || updateAgentReserver.isReserved(agentId); } - @Timed("assigner_maybe_assign") - @Override - public Set<String> maybeAssign( - Storage.MutableStoreProvider storeProvider, - ResourceRequest resourceRequest, - TaskGroupKey groupKey, - Iterable<IAssignedTask> tasks, - Map<String, TaskGroupKey> preemptionReservations) { + private static class SchedulingMatch { + final IAssignedTask task; + final HostOffer offer; - if (Iterables.isEmpty(tasks)) { - return ImmutableSet.of(); + SchedulingMatch(IAssignedTask task, HostOffer offer) { + this.task = requireNonNull(task); + this.offer = requireNonNull(offer); } + } - boolean revocable = tierManager.getTier(groupKey.getTask()).isRevocable(); - ImmutableSet.Builder<String> assignmentResult = ImmutableSet.builder(); + private Collection<SchedulingMatch> findMatches( + ResourceRequest resourceRequest, + TaskGroupKey groupKey, + Set<IAssignedTask> tasks, + Map<String, TaskGroupKey> preemptionReservations) { - // Assign tasks reserved for a specific agent (e.g. for update affinity) - Iterable<IAssignedTask> nonReservedTasks = maybeAssignReserved( - tasks, - storeProvider, - revocable, - resourceRequest, - groupKey, - assignmentResult); + // Avoid matching multiple tasks against any offer. + Map<String, SchedulingMatch> matchesByOffer = Maps.newHashMap(); - // Assign the rest of the non-reserved tasks - for (IAssignedTask task : nonReservedTasks) { - try { + tasks.forEach(task -> { + ReservationStatus reservation = getReservation(task, resourceRequest); + Optional<HostOffer> chosenOffer; + if (reservation.isTaskReserving()) { + // Use the reserved offer, which may not currently exist. + chosenOffer = reservation.getOffer(); + } else { // Get all offers that will satisfy the given ResourceRequest and that are not reserved // for updates or preemption - FluentIterable<HostOffer> matchingOffers = FluentIterable - .from(offerManager.getAllMatching(groupKey, resourceRequest, revocable)) - .filter(o -> !isAgentReserved(o, groupKey, preemptionReservations)); + Iterable<HostOffer> matchingOffers = Iterables.filter( + offerManager.getAllMatching(groupKey, resourceRequest), + o -> !matchesByOffer.containsKey(o.getOffer().getId().getValue()) + && !isAgentReserved(o, groupKey, preemptionReservations)); // Determine which is the optimal offer to select for the given request - Optional<HostOffer> optionalOffer = offerSelector.select(matchingOffers, resourceRequest); - - // If no offer is chosen, continue to the next task - if (!optionalOffer.isPresent()) { - continue; - } - - // Attempt to launch the task using the chosen offer - HostOffer offer = optionalOffer.get(); - launchUsingOffer(storeProvider, - revocable, - resourceRequest, - task, - offer, - assignmentResult); - } catch (OfferManager.LaunchException e) { + chosenOffer = offerSelector.select(matchingOffers, resourceRequest); + } + + chosenOffer.ifPresent(hostOffer -> { + matchesByOffer.put( + hostOffer.getOffer().getId().getValue(), + new SchedulingMatch(task, hostOffer)); + }); + }); + + return matchesByOffer.values(); + } + + @Timed("assigner_maybe_assign") + @Override + public Set<String> maybeAssign( + MutableStoreProvider storeProvider, + ResourceRequest resourceRequest, + TaskGroupKey groupKey, + Set<IAssignedTask> tasks, + Map<String, TaskGroupKey> reservations) { + + ImmutableSet.Builder<String> assigned = ImmutableSet.builder(); + + for (SchedulingMatch match : findMatches(resourceRequest, groupKey, tasks, reservations)) { + try { + launchUsingOffer(storeProvider, resourceRequest, match.task, match.offer); + assigned.add(match.task.getTaskId()); + } catch (LaunchException e) { // Any launch exception causes the scheduling round to terminate for this TaskGroup. break; } } - return assignmentResult.build(); + return assigned.build(); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java index 3c38f95..d2f3257 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java @@ -31,5 +31,5 @@ public interface TaskScheduler extends EventSubscriber { * @return Successfully scheduled task IDs. The caller should call schedule again if a given * task ID was not present in the result. */ - Set<String> schedule(MutableStoreProvider storeProvider, Iterable<String> taskIds); + Set<String> schedule(MutableStoreProvider storeProvider, Set<String> taskIds); } http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java index cff4ab1..edab03d 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.stream.Collectors; import javax.inject.Inject; @@ -26,7 +27,6 @@ import javax.inject.Qualifier; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; @@ -34,16 +34,17 @@ import com.google.common.eventbus.Subscribe; import org.apache.aurora.common.inject.TimedInterceptor.Timed; import org.apache.aurora.common.stats.Stats; +import org.apache.aurora.scheduler.TierManager; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.filter.AttributeAggregate; -import org.apache.aurora.scheduler.filter.SchedulingFilter; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.preemptor.BiCache; import org.apache.aurora.scheduler.preemptor.Preemptor; -import org.apache.aurora.scheduler.resources.ResourceBag; -import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; +import org.apache.aurora.scheduler.storage.Storage.StoreProvider; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; @@ -55,10 +56,8 @@ import static java.lang.annotation.ElementType.METHOD; import static java.lang.annotation.ElementType.PARAMETER; import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.toMap; import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources; /** * An asynchronous task scheduler. Scheduling of tasks is performed on a delay, where each task @@ -81,6 +80,7 @@ public class TaskSchedulerImpl implements TaskScheduler { private final TaskAssigner assigner; private final Preemptor preemptor; private final ExecutorSettings executorSettings; + private final TierManager tierManager; private final BiCache<String, TaskGroupKey> reservations; private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired"); @@ -92,16 +92,18 @@ public class TaskSchedulerImpl implements TaskScheduler { TaskAssigner assigner, Preemptor preemptor, ExecutorSettings executorSettings, + TierManager tierManager, BiCache<String, TaskGroupKey> reservations) { this.assigner = requireNonNull(assigner); this.preemptor = requireNonNull(preemptor); this.executorSettings = requireNonNull(executorSettings); + this.tierManager = requireNonNull(tierManager); this.reservations = requireNonNull(reservations); } @Timed("task_schedule_attempt") - public Set<String> schedule(Storage.MutableStoreProvider store, Iterable<String> taskIds) { + public Set<String> schedule(MutableStoreProvider store, Set<String> taskIds) { try { return scheduleTasks(store, taskIds); } catch (RuntimeException e) { @@ -115,77 +117,67 @@ public class TaskSchedulerImpl implements TaskScheduler { } } - private Set<String> scheduleTasks(Storage.MutableStoreProvider store, Iterable<String> tasks) { - ImmutableSet<String> taskIds = ImmutableSet.copyOf(tasks); - String taskIdValues = Joiner.on(",").join(taskIds); - LOG.debug("Attempting to schedule tasks {}", taskIdValues); - ImmutableSet<IAssignedTask> assignedTasks = - ImmutableSet.copyOf(Iterables.transform( - store.getTaskStore().fetchTasks(Query.taskScoped(taskIds).byStatus(PENDING)), - IScheduledTask::getAssignedTask)); - - if (Iterables.isEmpty(assignedTasks)) { - LOG.warn("Failed to look up all tasks in a scheduling round: {}", taskIdValues); - return taskIds; + private Map<String, IAssignedTask> fetchTasks(StoreProvider store, Set<String> ids) { + Map<String, IAssignedTask> tasks = store.getTaskStore() + .fetchTasks(Query.taskScoped(ids).byStatus(PENDING)) + .stream() + .map(IScheduledTask::getAssignedTask) + .collect(Collectors.toMap( + IAssignedTask::getTaskId, + Function.identity() + )); + + if (ids.size() != tasks.size()) { + LOG.warn("Failed to look up tasks " + + Joiner.on(", ").join(Sets.difference(ids, tasks.keySet()))); } + return tasks; + } - Preconditions.checkState( - assignedTasks.stream() - .collect(Collectors.groupingBy(t -> t.getTask())) - .entrySet() - .size() == 1, - "Found multiple task groups for %s", - taskIdValues); - - Map<String, IAssignedTask> assignableTaskMap = - assignedTasks.stream().collect(toMap(t -> t.getTaskId(), t -> t)); + private Set<String> scheduleTasks(MutableStoreProvider store, Set<String> ids) { + LOG.debug("Attempting to schedule tasks {}", ids); + Map<String, IAssignedTask> tasksById = fetchTasks(store, ids); - if (taskIds.size() != assignedTasks.size()) { - LOG.warn("Failed to look up tasks " - + Joiner.on(", ").join(Sets.difference(taskIds, assignableTaskMap.keySet()))); + if (tasksById.isEmpty()) { + // None of the tasks were found in storage. This could be caused by a task group that was + // killed by the user, for example. + return ids; } - // This is safe after all checks above. - ITaskConfig task = assignedTasks.stream().findFirst().get().getTask(); + // Prepare scheduling context for the tasks + ITaskConfig task = Iterables.getOnlyElement(tasksById.values().stream() + .map(IAssignedTask::getTask) + .collect(Collectors.toSet())); AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob()); - // Valid Docker tasks can have a container but no executor config - ResourceBag overhead = ResourceBag.EMPTY; - if (task.isSetExecutorConfig()) { - overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName()) - .orElseThrow( - () -> new IllegalArgumentException("Cannot find executor configuration")); - } - + // Attempt to schedule using available resources. Set<String> launched = assigner.maybeAssign( store, - new SchedulingFilter.ResourceRequest( - task, - bagFromResources(task.getResources()).add(overhead), aggregate), + ResourceRequest.fromTask(task, executorSettings, aggregate, tierManager), TaskGroupKey.from(task), - assignedTasks, + ImmutableSet.copyOf(tasksById.values()), reservations.asMap()); - attemptsFired.addAndGet(assignableTaskMap.size()); - Set<String> failedToLaunch = Sets.difference(assignableTaskMap.keySet(), launched); + attemptsFired.addAndGet(tasksById.size()); - failedToLaunch.forEach(taskId -> { - // Task could not be scheduled. + // Fall back to preemption for tasks not scheduled above. + Set<String> unassigned = Sets.difference(tasksById.keySet(), launched); + unassigned.forEach(taskId -> { // TODO(maxim): Now that preemption slots are searched asynchronously, consider // retrying a launch attempt within the current scheduling round IFF a reservation is // available. - maybePreemptFor(assignableTaskMap.get(taskId), aggregate, store); + maybePreemptFor(tasksById.get(taskId), aggregate, store); }); - attemptsNoMatch.addAndGet(failedToLaunch.size()); + attemptsNoMatch.addAndGet(unassigned.size()); // Return all successfully launched tasks as well as those weren't tried (not in PENDING). - return Sets.union(launched, Sets.difference(taskIds, assignableTaskMap.keySet())); + return Sets.union(launched, Sets.difference(ids, tasksById.keySet())); } private void maybePreemptFor( IAssignedTask task, AttributeAggregate jobState, - Storage.MutableStoreProvider storeProvider) { + MutableStoreProvider storeProvider) { if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) { return; http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java index 1219215..ebb345d 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java @@ -13,6 +13,7 @@ */ package org.apache.aurora.scheduler.storage; +import java.util.Collection; import java.util.Optional; import java.util.Set; @@ -47,7 +48,7 @@ public interface TaskStore { * @param query Builder of the query to identify tasks with. * @return A read-only view of matching tasks. */ - Iterable<IScheduledTask> fetchTasks(Query.Builder query); + Collection<IScheduledTask> fetchTasks(Query.Builder query); /** * Fetches all job keys represented in the task store. http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java index dea8e69..8d70cae 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java @@ -13,6 +13,7 @@ */ package org.apache.aurora.scheduler.storage.durability; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -327,7 +328,7 @@ public class WriteRecorder implements } @Override - public Iterable<IScheduledTask> fetchTasks(Query.Builder query) { + public Collection<IScheduledTask> fetchTasks(Query.Builder query) { return this.taskStore.fetchTasks(query); } http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java index b6909a6..59eca7b 100644 --- a/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java +++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdateAgentReserver.java @@ -14,12 +14,9 @@ package org.apache.aurora.scheduler.updater; import java.util.Optional; -import java.util.Set; -import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; -import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.preemptor.BiCache; import org.apache.aurora.scheduler.storage.entities.IInstanceKey; import org.slf4j.Logger; @@ -58,21 +55,13 @@ public interface UpdateAgentReserver { Optional<String> getAgent(IInstanceKey key); /** - * Get all reservations for a given agent id. Useful for skipping over the agent between the + * Checks whether an agent is currently reserved. Useful for skipping over the agent between the * reserve/release window. * * @param agentId The agent id to look up reservations for. * @return A set of keys reserved for that agent. */ - Set<IInstanceKey> getReservations(String agentId); - - /** - * Check if the agent reserver has any reservations for the provided key. - * - * @param groupKey The key to check. - * @return True if there are reservations against any instances in that key. - */ - boolean hasReservations(TaskGroupKey groupKey); + boolean isReserved(String agentId); /** * Implementation of the update reserver backed by a BiCache (the same mechanism we use for @@ -99,16 +88,9 @@ public interface UpdateAgentReserver { cache.remove(key, agentId); } - public Set<IInstanceKey> getReservations(String agentId) { - return cache.getByValue(agentId); - } - @Override - public boolean hasReservations(TaskGroupKey groupKey) { - return cache.asMap().entrySet().stream() - .filter(entry -> entry.getKey().getJobKey().equals(groupKey.getTask().getJob())) - .findFirst() - .isPresent(); + public boolean isReserved(String agentId) { + return !cache.getByValue(agentId).isEmpty(); } @Override @@ -137,12 +119,7 @@ public interface UpdateAgentReserver { } @Override - public Set<IInstanceKey> getReservations(String agentId) { - return ImmutableSet.of(); - } - - @Override - public boolean hasReservations(TaskGroupKey groupKey) { + public boolean isReserved(String agentId) { return false; } } http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java b/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java index 82e40d5..f116e3a 100644 --- a/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/TierManagerTest.java @@ -48,6 +48,13 @@ public class TierManagerTest { } @Test + public void testDefaultTier() { + assertEquals( + DEV_TIER, + TIER_MANAGER.getTier(ITaskConfig.build(new TaskConfig()))); + } + + @Test public void testGetTierRevocableAndProduction() { assertEquals( REVOCABLE_TIER, http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java index 64d7a44..7136711 100644 --- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java +++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java @@ -23,13 +23,12 @@ import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.TaskVars; import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import org.apache.aurora.scheduler.metadata.NearestFit; -import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.resources.ResourceManager; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; @@ -53,8 +52,7 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest { private static final UnusedResource RESOURCE = new UnusedResource( ResourceManager.bagFromResources(TASK.getResources()), IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE))); - private static final ResourceRequest REQUEST = - new ResourceRequest(TASK, ResourceBag.EMPTY, AttributeAggregate.empty()); + private static final ResourceRequest REQUEST = TaskTestUtil.toResourceRequest(TASK); private static final Veto VETO_1 = Veto.insufficientResources("ram", 1); private static final Veto VETO_2 = Veto.insufficientResources("ram", 2); http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java index 0de90d7..21d4e47 100644 --- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java @@ -38,14 +38,13 @@ import org.apache.aurora.gen.TaskConstraint; import org.apache.aurora.gen.ValueConstraint; import org.apache.aurora.gen.apiConstants; import org.apache.aurora.scheduler.base.JobKeys; +import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup; import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType; -import org.apache.aurora.scheduler.mesos.TaskExecutors; import org.apache.aurora.scheduler.resources.ResourceBag; -import org.apache.aurora.scheduler.resources.ResourceManager; import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.storage.entities.IAttribute; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; @@ -57,8 +56,10 @@ import org.junit.Test; import static org.apache.aurora.gen.Resource.diskMb; import static org.apache.aurora.gen.Resource.numCpus; import static org.apache.aurora.gen.Resource.ramMb; +import static org.apache.aurora.scheduler.base.TaskTestUtil.TIER_MANAGER; import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE; import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty; +import static org.apache.aurora.scheduler.mesos.TaskExecutors.NO_OVERHEAD_EXECUTOR; import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar; @@ -135,22 +136,22 @@ public class SchedulingFilterImplTest extends EasyMockTest { none, defaultFilter.filter( new UnusedResource(DEFAULT_OFFER, hostA), - new ResourceRequest(noPortTask, bag(noPortTask), empty()))); + TaskTestUtil.toResourceRequest(noPortTask))); assertEquals( none, defaultFilter.filter( new UnusedResource(DEFAULT_OFFER, hostA), - new ResourceRequest(onePortTask, bag(onePortTask), empty()))); + TaskTestUtil.toResourceRequest(onePortTask))); assertEquals( none, defaultFilter.filter( new UnusedResource(DEFAULT_OFFER, hostA), - new ResourceRequest(twoPortTask, bag(twoPortTask), empty()))); + TaskTestUtil.toResourceRequest(twoPortTask))); assertEquals( ImmutableSet.of(veto(PORTS, 1)), defaultFilter.filter( new UnusedResource(DEFAULT_OFFER, hostA), - new ResourceRequest(threePortTask, bag(threePortTask), empty()))); + TaskTestUtil.toResourceRequest(threePortTask))); } @Test @@ -238,13 +239,12 @@ public class SchedulingFilterImplTest extends EasyMockTest { DEFAULT_OFFER, hostAttributes(HOST_A), Optional.of(start)); - ResourceRequest request = new ResourceRequest(task, bag(task), empty()); control.replay(); assertEquals( ImmutableSet.of(Veto.maintenance("draining")), - defaultFilter.filter(unusedResource, request)); + defaultFilter.filter(unusedResource, TaskTestUtil.toResourceRequest(task))); } @Test @@ -262,14 +262,12 @@ public class SchedulingFilterImplTest extends EasyMockTest { DEFAULT_OFFER, hostAttributes(HOST_A), Optional.of(start)); - ResourceRequest request = new ResourceRequest(task, bag(task), empty()); control.replay(); assertEquals( ImmutableSet.of(), - defaultFilter.filter(unusedResource, request)); - + defaultFilter.filter(unusedResource, TaskTestUtil.toResourceRequest(task))); } @Test @@ -350,7 +348,7 @@ public class SchedulingFilterImplTest extends EasyMockTest { } @Test - public void testLimitWithinJob() throws Exception { + public void testLimitWithinJob() { control.replay(); AttributeAggregate stateA = AttributeAggregate.create( @@ -465,7 +463,7 @@ public class SchedulingFilterImplTest extends EasyMockTest { ImmutableSet.of(), defaultFilter.filter( new UnusedResource(DEFAULT_OFFER, hostA), - new ResourceRequest(task, bag(task), empty()))); + TaskTestUtil.toResourceRequest(task))); Constraint jvmNegated = jvmConstraint.deepCopy(); jvmNegated.getConstraint().getValue().setNegated(true); @@ -577,7 +575,7 @@ public class SchedulingFilterImplTest extends EasyMockTest { expected, defaultFilter.filter( new UnusedResource(DEFAULT_OFFER, hostAttributes), - new ResourceRequest(task, bag(task), aggregate)) + TaskTestUtil.toResourceRequest(task)) .isEmpty()); Constraint negated = constraint.deepCopy(); @@ -587,7 +585,7 @@ public class SchedulingFilterImplTest extends EasyMockTest { !expected, defaultFilter.filter( new UnusedResource(DEFAULT_OFFER, hostAttributes), - new ResourceRequest(negatedTask, bag(negatedTask), aggregate)) + ResourceRequest.fromTask(negatedTask, NO_OVERHEAD_EXECUTOR, aggregate, TIER_MANAGER)) .isEmpty()); return task; } @@ -618,7 +616,7 @@ public class SchedulingFilterImplTest extends EasyMockTest { ImmutableSet.copyOf(vetoes), defaultFilter.filter( new UnusedResource(DEFAULT_OFFER, hostAttributes), - new ResourceRequest(task, bag(task), jobState))); + ResourceRequest.fromTask(task, NO_OVERHEAD_EXECUTOR, jobState, TIER_MANAGER))); } private static IHostAttributes hostAttributes( @@ -686,10 +684,4 @@ public class SchedulingFilterImplTest extends EasyMockTest { private ITaskConfig makeTask() { return makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK); } - - private ResourceBag bag(ITaskConfig task) { - return ResourceManager.bagFromResources(task.getResources()) - .add(TaskExecutors.NO_OVERHEAD_EXECUTOR.getExecutorOverhead( - task.getExecutorConfig().getName()).get()); - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java index c27a662..686087e 100644 --- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosTaskFactoryImplTest.java @@ -38,6 +38,7 @@ import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig; import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings; import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl; import org.apache.aurora.scheduler.resources.ResourceBag; +import org.apache.aurora.scheduler.resources.ResourceManager; import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IJobKey; @@ -124,15 +125,13 @@ public class MesosTaskFactoryImplTest extends EasyMockTest { .setAgentId(SLAVE) .setHostname("slave-hostname") .addAllResources(mesosScalarFromBag(bagFromResources( - TASK_CONFIG.getResources()).add(THERMOS_EXECUTOR.getExecutorOverhead( - TASK_CONFIG.getExecutorConfig().getName()).get()))) + TASK_CONFIG.getResources()).add(THERMOS_EXECUTOR.getExecutorOverhead(TASK_CONFIG)))) .addResources(mesosRange(PORTS, 80)) .build(); private static final Offer OFFER_SOME_OVERHEAD_EXECUTOR = OFFER_THERMOS_EXECUTOR.toBuilder() .clearResources() .addAllResources(mesosScalarFromBag(bagFromResources( - TASK_CONFIG.getResources()).add(SOME_OVERHEAD_EXECUTOR.getExecutorOverhead( - TASK_CONFIG.getExecutorConfig().getName()).get()))) + TASK_CONFIG.getResources()).add(SOME_OVERHEAD_EXECUTOR.getExecutorOverhead(TASK_CONFIG)))) .addResources(mesosRange(PORTS, 80)) .build(); @@ -282,10 +281,7 @@ public class MesosTaskFactoryImplTest extends EasyMockTest { ResourceBag executorResources = bagFromMesosResources(taskInfo.getExecutor().getResourcesList()); - assertEquals( - bagFromResources(task.getResources()).add( - config.getExecutorOverhead(task.getExecutorConfig().getName()).get()), - taskResources.add(executorResources)); + assertEquals(ResourceManager.bagFromTask(task, config), taskResources.add(executorResources)); } private void checkDiscoveryInfoUnset(TaskInfo taskInfo) { http://git-wip-us.apache.org/repos/asf/aurora/blob/4e6242fe/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java index 1e9532b..28224a5 100644 --- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java @@ -29,6 +29,7 @@ import org.apache.aurora.common.util.testing.FakeTicker; import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged; @@ -37,7 +38,6 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; import org.apache.aurora.scheduler.mesos.Driver; import org.apache.aurora.scheduler.offers.Deferment.Noop; -import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -57,7 +57,6 @@ import static org.apache.aurora.gen.MaintenanceMode.DRAINING; import static org.apache.aurora.gen.MaintenanceMode.NONE; import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB; import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; -import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty; import static org.apache.aurora.scheduler.offers.OfferManagerImpl.GLOBALLY_BANNED_OFFERS; import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_ACCEPT_RACES; import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_CANCEL_FAILURES; @@ -100,10 +99,8 @@ public class OfferManagerImplTest extends EasyMockTest { private static final int PORT = 1000; private static final Protos.Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT)); private static final IScheduledTask TASK = makeTask("id", JOB); - private static final ResourceRequest EMPTY_REQUEST = new ResourceRequest( - TASK.getAssignedTask().getTask(), - ResourceBag.EMPTY, - empty()); + private static final ResourceRequest EMPTY_REQUEST = + TaskTestUtil.toResourceRequest(TASK.getAssignedTask().getTask()); private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask()); private static final TaskInfo TASK_INFO = TaskInfo.newBuilder() .setName("taskName") @@ -250,14 +247,14 @@ public class OfferManagerImplTest extends EasyMockTest { assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); offerManager.add(OFFER_A); assertEquals(OFFER_A, - Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll())); // Add static ban. offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY); assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll())); - assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); } @Test @@ -269,14 +266,14 @@ public class OfferManagerImplTest extends EasyMockTest { offerManager.add(OFFER_A); offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY); assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll())); - assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); // Make sure the static ban expires after maximum amount of time an offer is held. FAKE_TICKER.advance(RETURN_DELAY); offerManager.cleanupStaticBans(); assertEquals(OFFER_A, - Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); } @@ -289,7 +286,7 @@ public class OfferManagerImplTest extends EasyMockTest { offerManager.add(OFFER_A); offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY); assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll())); - assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); // Make sure the static ban is cleared when driver is disconnected. @@ -297,7 +294,7 @@ public class OfferManagerImplTest extends EasyMockTest { assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); offerManager.add(OFFER_A); assertEquals(OFFER_A, - Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); } @Test @@ -361,14 +358,14 @@ public class OfferManagerImplTest extends EasyMockTest { offerManager.add(OFFER_A); assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS)); assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS)); - assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); offerManager.cancel(OFFER_A_ID); offerManager.add(OFFER_A); assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS)); assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS)); assertEquals(OFFER_A, - Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); } private static HostOffer setUnavailability(HostOffer offer, long startMs) { @@ -423,7 +420,7 @@ public class OfferManagerImplTest extends EasyMockTest { cpuManager.add(small); assertEquals(ImmutableList.of(small, medium, large), - ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); assertEquals(ImmutableList.of(small, medium, large), ImmutableList.copyOf(cpuManager.getAll())); } @@ -460,7 +457,7 @@ public class OfferManagerImplTest extends EasyMockTest { cpuManager.add(small); assertEquals(ImmutableList.of(small, medium, large), - ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, true))); + ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); assertEquals(ImmutableList.of(small, medium, large), ImmutableList.copyOf(cpuManager.getAll())); } @@ -488,7 +485,7 @@ public class OfferManagerImplTest extends EasyMockTest { cpuManager.add(small); assertEquals(ImmutableList.of(small, medium, large), - ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); assertEquals(ImmutableList.of(small, medium, large), ImmutableList.copyOf(cpuManager.getAll())); } @@ -516,7 +513,7 @@ public class OfferManagerImplTest extends EasyMockTest { cpuManager.add(small); assertEquals(ImmutableList.of(small, medium, large), - ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); assertEquals(ImmutableList.of(small, medium, large), ImmutableList.copyOf(cpuManager.getAll())); } @@ -555,7 +552,7 @@ public class OfferManagerImplTest extends EasyMockTest { cpuManager.add(small); assertEquals(ImmutableList.of(small, medium, large), - ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); assertEquals(ImmutableList.of(small, medium, large), ImmutableList.copyOf(cpuManager.getAll())); } @@ -628,7 +625,7 @@ public class OfferManagerImplTest extends EasyMockTest { control.replay(); offerManager.add(OFFER_A); assertEquals(Optional.of(OFFER_A), - offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false)); + offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST)); } @Test @@ -640,7 +637,7 @@ public class OfferManagerImplTest extends EasyMockTest { assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS)); offerManager.ban(OFFER_A_ID); assertEquals(Optional.empty(), - offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false)); + offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST)); assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS)); } @@ -655,7 +652,7 @@ public class OfferManagerImplTest extends EasyMockTest { offerManager.add(OFFER_A); assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); assertEquals(Optional.empty(), - offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false)); + offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST)); assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); } @@ -669,7 +666,7 @@ public class OfferManagerImplTest extends EasyMockTest { offerManager.add(OFFER_C); assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); assertEquals(ImmutableSet.of(OFFER_A, OFFER_B, OFFER_C), - ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); } @@ -685,7 +682,7 @@ public class OfferManagerImplTest extends EasyMockTest { assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS)); offerManager.ban(OFFER_B.getOffer().getId()); assertEquals(ImmutableSet.of(OFFER_A, OFFER_C), - ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS)); } @@ -702,7 +699,7 @@ public class OfferManagerImplTest extends EasyMockTest { assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); offerManager.banForTaskGroup(OFFER_B.getOffer().getId(), GROUP_KEY); assertEquals(ImmutableSet.of(OFFER_A, OFFER_C), - ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); assertEquals(ImmutableSet.of(Pair.of(OFFER_B.getOffer().getId(), GROUP_KEY)), @@ -727,7 +724,7 @@ public class OfferManagerImplTest extends EasyMockTest { assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); assertEquals(ImmutableSet.of(OFFER_A), - ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); assertEquals(1, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); assertEquals(ImmutableSet.of(empty, OFFER_A), ImmutableSet.copyOf(offerManager.getAll())); @@ -750,7 +747,7 @@ public class OfferManagerImplTest extends EasyMockTest { assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); assertEquals(ImmutableSet.of(OFFER_B), - ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST))); assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); assertEquals(ImmutableSet.of(Pair.of(OFFER_A.getOffer().getId(), GROUP_KEY)),
