Enable custom offer scoring modules for task assignment Major portions of the refactor:
* Refactor `OfferManager` to do filtering of offers (added `getMatching` and `getAllMatching` methods) as opposed to TaskAssigner * Refactor `TaskAssigner`, allow for injection of custom "scoring" class through `OfferRanker` interface And some minor things as well: * Moved `TaskAssignerImpl`, `TaskSchedulerImpl`, and `HostOffers` into their own upper-level classes * Moved `TaskAssigner` to the `scheduling` package and out of the `state` package * Renamed some methods in `OfferManager` to avoid code stutter * Renaming of some classes (e.g. `FirstFitTaskAssigner` -> `TaskAssignerImpl`) * And a slew of others Reviewed at https://reviews.apache.org/r/63973/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/80139da4 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/80139da4 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/80139da4 Branch: refs/heads/master Commit: 80139da4624916e406c7e80c4ea2d286d4d859c3 Parents: 21af250 Author: Jordan Ly <[email protected]> Authored: Tue Nov 28 11:02:14 2017 -0800 Committer: Bill Farner <[email protected]> Committed: Tue Nov 28 11:02:14 2017 -0800 ---------------------------------------------------------------------- RELEASE-NOTES.md | 3 + docs/reference/scheduler-configuration.md | 2 +- .../org/apache/aurora/benchmark/Offers.java | 2 +- .../aurora/benchmark/SchedulingBenchmarks.java | 16 +- .../benchmark/fakes/FakeOfferManager.java | 31 +- .../apache/aurora/scheduler/app/AppModule.java | 4 +- .../aurora/scheduler/config/CliOptions.java | 6 +- .../scheduler/filter/AttributeAggregate.java | 38 +- .../scheduler/filter/SchedulingFilter.java | 5 + .../scheduler/filter/SchedulingFilterImpl.java | 2 +- .../apache/aurora/scheduler/http/Offers.java | 2 +- .../scheduler/mesos/MesosCallbackHandler.java | 12 +- .../aurora/scheduler/offers/HostOffers.java | 253 +++++++++ .../aurora/scheduler/offers/OfferManager.java | 404 ++------------ .../scheduler/offers/OfferManagerImpl.java | 246 +++++++++ .../scheduler/offers/OfferManagerModule.java | 211 ++++++++ .../aurora/scheduler/offers/OfferSettings.java | 7 +- .../aurora/scheduler/offers/OffersModule.java | 211 -------- .../preemptor/PendingTaskProcessor.java | 2 +- .../aurora/scheduler/preemptor/Preemptor.java | 2 +- .../scheduling/FirstFitOfferSelector.java | 29 + .../scheduling/FirstFitOfferSelectorModule.java | 26 + .../scheduler/scheduling/OfferSelector.java | 36 ++ .../scheduler/scheduling/SchedulingModule.java | 4 +- .../scheduler/scheduling/TaskAssigner.java | 46 ++ .../scheduler/scheduling/TaskAssignerImpl.java | 284 ++++++++++ .../scheduling/TaskAssignerImplModule.java | 59 ++ .../scheduler/scheduling/TaskScheduler.java | 191 ------- .../scheduler/scheduling/TaskSchedulerImpl.java | 207 +++++++ .../state/FirstFitTaskAssignerModule.java | 31 -- .../aurora/scheduler/state/StateModule.java | 3 +- .../aurora/scheduler/state/TaskAssigner.java | 338 ------------ .../scheduler/stats/AsyncStatsModule.java | 2 +- .../scheduler/config/CommandLineTest.java | 4 +- .../aurora/scheduler/http/OffersTest.java | 4 +- .../mesos/MesosCallbackHandlerTest.java | 20 +- .../scheduler/offers/OfferManagerImplTest.java | 381 +++++++++---- .../preemptor/PendingTaskProcessorTest.java | 2 +- .../scheduler/preemptor/PreemptorImplTest.java | 2 +- .../preemptor/PreemptorModuleTest.java | 2 +- .../scheduling/FirstFitOfferSelectorTest.java | 66 +++ .../scheduling/TaskAssignerImplTest.java | 374 +++++++++++++ .../scheduling/TaskSchedulerImplTest.java | 2 - .../state/FirstFitTaskAssignerTest.java | 539 ------------------- .../scheduler/stats/AsyncStatsModuleTest.java | 2 +- 45 files changed, 2244 insertions(+), 1869 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/RELEASE-NOTES.md ---------------------------------------------------------------------- diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 2d3c423..54dcc75 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -12,6 +12,9 @@ a production cluster. For that reason, the functionality is behind a new flag `-partition_aware` that is disabled by default. When Mesos support is improved and the new behavior is vetted in production clusters, we'll enable this by default. +- Added the ability to "score" offers for a given scheduling assignment via the `OfferSelector` + interface. The default implementation is first fit, but cluster operators can inject a custom + scoring algorithm through the `-offer_selector_modules` flag. ### Deprecations and removals: http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/docs/reference/scheduler-configuration.md ---------------------------------------------------------------------- diff --git a/docs/reference/scheduler-configuration.md b/docs/reference/scheduler-configuration.md index 6c385b5..f697b6f 100644 --- a/docs/reference/scheduler-configuration.md +++ b/docs/reference/scheduler-configuration.md @@ -222,7 +222,7 @@ Optional flags: Time for a stat to be retained in memory before expiring. -stat_sampling_interval (default (1, secs)) Statistic value sampling interval. --task_assigner_modules (default [class org.apache.aurora.scheduler.state.FirstFitTaskAssignerModule]) +-task_assigner_modules (default [class org.apache.aurora.scheduler.scheduling.TaskAssignerImplModule]) Guice modules for replacing task assignment logic. -thermos_executor_cpu (default 0.25) The number of CPU cores to allocate for each instance of the executor. http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/jmh/java/org/apache/aurora/benchmark/Offers.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/Offers.java b/src/jmh/java/org/apache/aurora/benchmark/Offers.java index 2b46326..2fcc804 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/Offers.java +++ b/src/jmh/java/org/apache/aurora/benchmark/Offers.java @@ -50,7 +50,7 @@ final class Offers { */ static void addOffers(OfferManager offerManager, Iterable<HostOffer> offers) { for (HostOffer offer : offers) { - offerManager.addOffer(offer); + offerManager.add(offer); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java index 1708a50..58e3224 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java @@ -62,15 +62,17 @@ import org.apache.aurora.scheduler.mesos.Driver; import org.apache.aurora.scheduler.mesos.TestExecutorSettings; import org.apache.aurora.scheduler.offers.Deferment; import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.offers.OfferManagerImpl; +import org.apache.aurora.scheduler.offers.OfferManagerModule; import org.apache.aurora.scheduler.offers.OfferOrder; import org.apache.aurora.scheduler.offers.OfferSettings; -import org.apache.aurora.scheduler.offers.OffersModule; import org.apache.aurora.scheduler.preemptor.BiCache; import org.apache.aurora.scheduler.preemptor.ClusterStateImpl; import org.apache.aurora.scheduler.preemptor.PreemptorModule; import org.apache.aurora.scheduler.scheduling.RescheduleCalculator; import org.apache.aurora.scheduler.scheduling.TaskScheduler; -import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl.ReservationDuration; +import org.apache.aurora.scheduler.scheduling.TaskSchedulerImpl; +import org.apache.aurora.scheduler.scheduling.TaskSchedulerImpl.ReservationDuration; import org.apache.aurora.scheduler.state.StateModule; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; @@ -147,8 +149,8 @@ public class SchedulingBenchmarks { bind(ScheduledExecutorService.class).annotatedWith(AsyncModule.AsyncExecutor.class) .toInstance(new NoopExecutor()); bind(Deferment.class).to(Deferment.Noop.class); - bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class); - bind(OfferManager.OfferManagerImpl.class).in(Singleton.class); + bind(OfferManager.class).to(OfferManagerImpl.class); + bind(OfferManagerImpl.class).in(Singleton.class); bind(OfferSettings.class).toInstance( new OfferSettings(NO_DELAY, ImmutableList.of(OfferOrder.RANDOM), @@ -157,8 +159,8 @@ public class SchedulingBenchmarks { new FakeTicker())); bind(BiCache.BiCacheSettings.class).toInstance( new BiCache.BiCacheSettings(DELAY_FOREVER, "")); - bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class); - bind(TaskScheduler.TaskSchedulerImpl.class).in(Singleton.class); + bind(TaskScheduler.class).to(TaskSchedulerImpl.class); + bind(TaskSchedulerImpl.class).in(Singleton.class); expose(TaskScheduler.class); expose(OfferManager.class); } @@ -171,7 +173,7 @@ public class SchedulingBenchmarks { .toInstance(DELAY_FOREVER); bind(TaskIdGenerator.class).to(TaskIdGenerator.TaskIdGeneratorImpl.class); bind(new TypeLiteral<Amount<Long, Time>>() { }) - .annotatedWith(OffersModule.UnavailabilityThreshold.class) + .annotatedWith(OfferManagerModule.UnavailabilityThreshold.class) .toInstance(Amount.of(1L, Time.MINUTES)); bind(UpdateAgentReserver.class).to(UpdateAgentReserver.NullAgentReserver.class); bind(UpdateAgentReserver.NullAgentReserver.class).in(Singleton.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java index 201aa81..05c58ab 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java +++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java @@ -18,22 +18,23 @@ import com.google.common.base.Optional; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.aurora.scheduler.offers.OfferManager; import org.apache.mesos.v1.Protos; public class FakeOfferManager implements OfferManager { @Override - public void addOffer(HostOffer offer) { + public void add(HostOffer offer) { // no-op } @Override - public boolean cancelOffer(Protos.OfferID offerId) { + public boolean cancel(Protos.OfferID offerId) { return false; } @Override - public void banOffer(Protos.OfferID offerId) { + public void ban(Protos.OfferID offerId) { // no-op } @@ -43,27 +44,33 @@ public class FakeOfferManager implements OfferManager { } @Override - public void banOfferForTaskGroup(Protos.OfferID offerId, TaskGroupKey groupKey) { + public void hostAttributesChanged(PubsubEvent.HostAttributesChanged change) { // no-op } @Override - public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) { - return null; + public Optional<HostOffer> get(Protos.AgentID agentId) { + return Optional.absent(); } @Override - public void hostAttributesChanged(PubsubEvent.HostAttributesChanged change) { - // no-op + public Iterable<HostOffer> getAll() { + return null; } @Override - public Iterable<HostOffer> getOffers() { - return null; + public Optional<HostOffer> getMatching(Protos.AgentID slaveId, + ResourceRequest resourceRequest, + boolean revocable) { + + return Optional.absent(); } @Override - public Optional<HostOffer> getOffer(Protos.AgentID agentId) { - return Optional.absent(); + public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey, + ResourceRequest resourceRequest, + boolean revocable) { + + return null; } } http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/app/AppModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java index 3204cca..817a019 100644 --- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java +++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java @@ -47,7 +47,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilterImpl; import org.apache.aurora.scheduler.http.JettyServerModule; import org.apache.aurora.scheduler.mesos.SchedulerDriverModule; import org.apache.aurora.scheduler.metadata.MetadataModule; -import org.apache.aurora.scheduler.offers.OffersModule; +import org.apache.aurora.scheduler.offers.OfferManagerModule; import org.apache.aurora.scheduler.preemptor.PreemptorModule; import org.apache.aurora.scheduler.pruning.PruningModule; import org.apache.aurora.scheduler.quota.QuotaModule; @@ -172,7 +172,7 @@ public class AppModule extends AbstractModule { install(new PubsubEventModule()); install(new AsyncModule(options.async)); - install(new OffersModule(options)); + install(new OfferManagerModule(options)); install(new PruningModule(options.pruning)); install(new ReconciliationModule(options.reconciliation)); install(new SchedulingModule(options.scheduling)); http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java index d4537e3..b7f43e0 100644 --- a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java +++ b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java @@ -36,12 +36,13 @@ import org.apache.aurora.scheduler.http.api.security.IniShiroRealmModule; import org.apache.aurora.scheduler.http.api.security.Kerberos5ShiroRealmModule; import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule; import org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule; -import org.apache.aurora.scheduler.offers.OffersModule; +import org.apache.aurora.scheduler.offers.OfferManagerModule; import org.apache.aurora.scheduler.preemptor.PreemptorModule; import org.apache.aurora.scheduler.pruning.PruningModule; import org.apache.aurora.scheduler.reconciliation.ReconciliationModule; import org.apache.aurora.scheduler.resources.ResourceSettings; import org.apache.aurora.scheduler.scheduling.SchedulingModule; +import org.apache.aurora.scheduler.scheduling.TaskAssignerImplModule; import org.apache.aurora.scheduler.sla.SlaModule; import org.apache.aurora.scheduler.state.StateModule; import org.apache.aurora.scheduler.stats.AsyncStatsModule; @@ -54,7 +55,7 @@ import org.apache.aurora.scheduler.updater.UpdaterModule; public class CliOptions { public final ReconciliationModule.Options reconciliation = new ReconciliationModule.Options(); - public final OffersModule.Options offer = new OffersModule.Options(); + public final OfferManagerModule.Options offer = new OfferManagerModule.Options(); public final ExecutorModule.Options executor = new ExecutorModule.Options(); public final AppModule.Options app = new AppModule.Options(); public final SchedulerMain.Options main = new SchedulerMain.Options(); @@ -84,6 +85,7 @@ public class CliOptions { public final StatsModule.Options stats = new StatsModule.Options(); public final CronModule.Options cron = new CronModule.Options(); public final ResourceSettings resourceSettings = new ResourceSettings(); + public final TaskAssignerImplModule.Options taskAssigner = new TaskAssignerImplModule.Options(); final List<Object> custom; public CliOptions() { http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java index 60f141d..a5acafa 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java @@ -49,19 +49,8 @@ public final class AttributeAggregate { */ private Supplier<Multiset<Pair<String, String>>> aggregate; - private boolean isInitialized = false; - private AttributeAggregate(Supplier<Multiset<Pair<String, String>>> aggregate) { - this.aggregate = Suppliers.memoize( - () -> { - initialize(); - return aggregate.get(); - } - ); - } - - private void initialize() { - isInitialized = true; // inlining this assignment yields a PMD false positive + this.aggregate = Suppliers.memoize(aggregate); } /** @@ -123,21 +112,16 @@ public final class AttributeAggregate { } public void updateAttributeAggregate(IHostAttributes attributes) { - // If the aggregate supplier has not been populated there is no need to update it here. - // All tasks attributes will be picked up by the wrapped task query if executed at a - // later point in time. - if (isInitialized) { - final Supplier<Multiset<Pair<String, String>>> previous = aggregate; - aggregate = Suppliers.memoize( - () -> { - ImmutableMultiset.Builder<Pair<String, String>> builder - = new ImmutableMultiset.Builder<>(); - builder.addAll(previous.get()); - addAttributes(builder, attributes.getAttributes()); - return builder.build(); - } - ); - } + final Supplier<Multiset<Pair<String, String>>> previous = aggregate; + aggregate = Suppliers.memoize( + () -> { + ImmutableMultiset.Builder<Pair<String, String>> builder + = new ImmutableMultiset.Builder<>(); + builder.addAll(previous.get()); + addAttributes(builder, attributes.getAttributes()); + return builder.build(); + } + ); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java index 36608a9..a00c095 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Optional; +import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.storage.entities.IConstraint; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; @@ -258,6 +259,10 @@ public interface SchedulingFilter { this(offer, attributes, Optional.absent()); } + public UnusedResource(HostOffer offer, boolean revocable) { + this(offer.getResourceBag(revocable), offer.getAttributes(), offer.getUnavailabilityStart()); + } + public UnusedResource(ResourceBag offer, IHostAttributes attributes, Optional<Instant> start) { this.offer = offer; this.attributes = attributes; http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java index df51d4c..41a0764 100644 --- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java @@ -32,7 +32,7 @@ import org.apache.aurora.common.util.Clock; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.TaskConstraint; import org.apache.aurora.scheduler.configuration.ConfigurationManager; -import org.apache.aurora.scheduler.offers.OffersModule.UnavailabilityThreshold; +import org.apache.aurora.scheduler.offers.OfferManagerModule.UnavailabilityThreshold; import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.storage.entities.IAttribute; http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/http/Offers.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/Offers.java b/src/main/java/org/apache/aurora/scheduler/http/Offers.java index f22ca6e..bb92cd0 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/Offers.java +++ b/src/main/java/org/apache/aurora/scheduler/http/Offers.java @@ -59,7 +59,7 @@ public class Offers { public Response getOffers() throws JsonProcessingException { return Response.ok( mapper.writeValueAsString( - StreamSupport.stream(offerManager.getOffers().spliterator(), false) + StreamSupport.stream(offerManager.getAll().spliterator(), false) .map(o -> o.getOffer()) .collect(Collectors.toList()))) .build(); http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java index fd5874d..87e702f 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java @@ -40,7 +40,7 @@ import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.aurora.scheduler.offers.OfferManager; -import org.apache.aurora.scheduler.offers.OffersModule; +import org.apache.aurora.scheduler.offers.OfferManagerModule; import org.apache.aurora.scheduler.state.MaintenanceController; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage; @@ -126,7 +126,7 @@ public interface MesosCallbackHandler { Driver driver, Clock clock, MaintenanceController controller, - @OffersModule.UnavailabilityThreshold Amount<Long, Time> unavailabilityThreshold, + @OfferManagerModule.UnavailabilityThreshold Amount<Long, Time> unavailabilityThreshold, @PubsubEventModule.RegisteredEvents EventSink registeredEventSink) { this( @@ -226,7 +226,7 @@ public interface MesosCallbackHandler { storeProvider.getAttributeStore().saveHostAttributes(attributes); log.info("Received offer: {}", offer.getId().getValue()); offersReceived.incrementAndGet(); - offerManager.addOffer(new HostOffer(offer, attributes)); + offerManager.add(new HostOffer(offer, attributes)); } }); }); @@ -244,15 +244,15 @@ public interface MesosCallbackHandler { // In this scenario, we want to ensure that we do not use it/accept it when the executor // finally processes the offer. We will temporarily ban it and add a command for the // executor to unban it so future offers can be processed normally. - boolean offerCancelled = offerManager.cancelOffer(offerId); + boolean offerCancelled = offerManager.cancel(offerId); if (!offerCancelled) { log.info( "Received rescind before adding offer: {}, temporarily banning.", offerId.getValue()); - offerManager.banOffer(offerId); + offerManager.ban(offerId); executor.execute(() -> { log.info("Cancelling and unbanning offer: {}.", offerId.getValue()); - offerManager.cancelOffer(offerId); + offerManager.cancel(offerId); }); } offersRescinded.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java new file mode 100644 index 0000000..8adbcb1 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java @@ -0,0 +1,253 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.offers; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.cache.Cache; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.aurora.common.collections.Pair; +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.filter.SchedulingFilter; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; +import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; +import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.mesos.v1.Protos; + +import static java.util.Objects.requireNonNull; + +/** + * A container for the data structures used by this {@link OfferManagerImpl}, to make it easier to + * reason about the different indices used and their consistency. + */ +class HostOffers { + private final Set<HostOffer> offers; + + private final Map<Protos.OfferID, HostOffer> offersById = Maps.newHashMap(); + private final Map<Protos.AgentID, HostOffer> offersBySlave = Maps.newHashMap(); + private final Map<String, HostOffer> offersByHost = Maps.newHashMap(); + + // Keep track of offer->groupKey mappings that will never be matched to avoid redundant + // scheduling attempts. See VetoGroup for more details on static ban. + private final Cache<Pair<Protos.OfferID, TaskGroupKey>, Boolean> staticallyBannedOffers; + private final SchedulingFilter schedulingFilter; + + // Keep track of globally banned offers that will never be matched to anything. + private final Set<Protos.OfferID> globallyBannedOffers = Sets.newHashSet(); + + // Keep track of the number of offers evaluated for vetoes when getting matching offers + private final AtomicLong vetoEvaluatedOffers; + + HostOffers(StatsProvider statsProvider, + OfferSettings offerSettings, + SchedulingFilter schedulingFilter) { + this.offers = new ConcurrentSkipListSet<>(offerSettings.getOrdering()); + this.staticallyBannedOffers = offerSettings + .getStaticBanCacheBuilder() + .build(); + this.schedulingFilter = requireNonNull(schedulingFilter); + + // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive. + // Could track this separately if it turns out to pose problems. + statsProvider.exportSize(OfferManagerImpl.OUTSTANDING_OFFERS, offers); + statsProvider.makeGauge(OfferManagerImpl.STATICALLY_BANNED_OFFERS, + staticallyBannedOffers::size); + statsProvider.makeGauge(OfferManagerImpl.STATICALLY_BANNED_OFFERS_HIT_RATE, + () -> staticallyBannedOffers.stats().hitRate()); + statsProvider.makeGauge(OfferManagerImpl.GLOBALLY_BANNED_OFFERS, globallyBannedOffers::size); + + vetoEvaluatedOffers = statsProvider.makeCounter(OfferManagerImpl.VETO_EVALUATED_OFFERS); + } + + /** + * Adds an offer while maintaining a guarantee that no two offers may exist with the same + * agent ID. If an offer exists with the same agent ID, the existing offer is removed + * and returned, and {@code offer} is not added. + * + * @param offer Offer to add. + * @return The pre-existing offer with the same agent ID as {@code offer}, if one exists, + * which will also be removed prior to returning. + */ + synchronized Optional<HostOffer> addAndPreventAgentCollision(HostOffer offer) { + HostOffer sameAgent = offersBySlave.get(offer.getOffer().getAgentId()); + if (sameAgent != null) { + remove(sameAgent.getOffer().getId()); + return Optional.of(sameAgent); + } + + addInternal(offer); + return Optional.absent(); + } + + private void addInternal(HostOffer offer) { + offers.add(offer); + offersById.put(offer.getOffer().getId(), offer); + offersBySlave.put(offer.getOffer().getAgentId(), offer); + offersByHost.put(offer.getOffer().getHostname(), offer); + } + + synchronized boolean remove(Protos.OfferID id) { + HostOffer removed = offersById.remove(id); + if (removed != null) { + offers.remove(removed); + offersBySlave.remove(removed.getOffer().getAgentId()); + offersByHost.remove(removed.getOffer().getHostname()); + } + globallyBannedOffers.remove(id); + return removed != null; + } + + synchronized void addGlobalBan(Protos.OfferID offerId) { + globallyBannedOffers.add(offerId); + } + + synchronized void updateHostAttributes(IHostAttributes attributes) { + HostOffer offer = offersByHost.remove(attributes.getHost()); + if (offer != null) { + // Remove and re-add a host's offer to re-sort based on its new hostStatus + remove(offer.getOffer().getId()); + addInternal(new HostOffer(offer.getOffer(), attributes)); + } + } + + synchronized Optional<HostOffer> get(Protos.AgentID slaveId) { + HostOffer offer = offersBySlave.get(slaveId); + if (offer == null || globallyBannedOffers.contains(offer.getOffer().getId())) { + return Optional.absent(); + } + + return Optional.of(offer); + } + + /** + * Returns an iterable giving the state of the offers at the time the method is called. Unlike + * {@code getWeaklyConsistentOffers}, the underlying collection is a copy of the original and + * will not be modified outside of the returned iterable. + * + * @return The offers currently known by the scheduler. + */ + synchronized Iterable<HostOffer> getOffers() { + return FluentIterable.from(offers) + .filter(o -> !globallyBannedOffers.contains(o.getOffer().getId())) + .toSet(); + } + + synchronized Optional<HostOffer> getMatching(Protos.AgentID slaveId, + ResourceRequest resourceRequest, + boolean revocable) { + + Optional<HostOffer> optionalOffer = get(slaveId); + if (optionalOffer.isPresent()) { + HostOffer offer = optionalOffer.get(); + + if (isGloballyBanned(offer) + || isVetoed(offer, resourceRequest, revocable, Optional.absent())) { + + return Optional.absent(); + } + } + + return optionalOffer; + } + + /** + * Returns a weakly-consistent iterable giving the available offers to a given + * {@code groupKey}. This iterable can handle concurrent operations on its underlying + * collection, and may reflect changes that happen after the construction of the iterable. + * This property is mainly used in {@code launchTask}. + * + * @param groupKey The task group to get offers for. + * @return The offers a given task group can use. + */ + synchronized Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey, + ResourceRequest resourceRequest, + boolean revocable) { + + return Iterables.unmodifiableIterable(FluentIterable.from(offers) + .filter(o -> !isGloballyBanned(o)) + .filter(o -> !isStaticallyBanned(o, groupKey)) + .filter(HostOffer::hasCpuAndMem) + .filter(o -> !isVetoed(o, resourceRequest, revocable, Optional.of(groupKey)))); + } + + private synchronized boolean isGloballyBanned(HostOffer offer) { + return globallyBannedOffers.contains(offer.getOffer().getId()); + } + + private synchronized boolean isStaticallyBanned(HostOffer offer, TaskGroupKey groupKey) { + return staticallyBannedOffers.getIfPresent(Pair.of(offer.getOffer().getId(), groupKey)) != null; + } + + /** + * Determine whether or not the {@link HostOffer} is vetoed for the given {@link ResourceRequest}. + * If {@code groupKey} is present, this method will also temporarily ban the offer from ever + * matching the {@link TaskGroupKey}. + */ + private boolean isVetoed(HostOffer offer, + ResourceRequest resourceRequest, + boolean revocable, + Optional<TaskGroupKey> groupKey) { + + vetoEvaluatedOffers.incrementAndGet(); + UnusedResource unusedResource = new UnusedResource(offer, revocable); + Set<Veto> vetoes = schedulingFilter.filter(unusedResource, resourceRequest); + if (!vetoes.isEmpty()) { + if (groupKey.isPresent() && Veto.identifyGroup(vetoes) == SchedulingFilter.VetoGroup.STATIC) { + addStaticGroupBan(offer.getOffer().getId(), groupKey.get()); + } + + return true; + } + + return false; + } + + @VisibleForTesting + synchronized void addStaticGroupBan(Protos.OfferID offerId, TaskGroupKey groupKey) { + if (offersById.containsKey(offerId)) { + staticallyBannedOffers.put(Pair.of(offerId, groupKey), true); + } + } + + @VisibleForTesting + synchronized Set<Pair<Protos.OfferID, TaskGroupKey>> getStaticBans() { + return staticallyBannedOffers.asMap().keySet(); + } + + synchronized void clear() { + offers.clear(); + offersById.clear(); + offersBySlave.clear(); + offersByHost.clear(); + staticallyBannedOffers.invalidateAll(); + globallyBannedOffers.clear(); + } + + @VisibleForTesting + synchronized void cleanUpStaticallyBannedOffers() { + staticallyBannedOffers.cleanUp(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java index 96b0f46..0349215 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java @@ -13,41 +13,16 @@ */ package org.apache.aurora.scheduler.offers; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.atomic.AtomicLong; - -import javax.inject.Inject; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; -import com.google.common.cache.Cache; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.eventbus.Subscribe; -import org.apache.aurora.common.collections.Pair; -import org.apache.aurora.common.inject.TimedInterceptor.Timed; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; -import org.apache.aurora.scheduler.mesos.Driver; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; import org.apache.mesos.v1.Protos; import org.apache.mesos.v1.Protos.AgentID; -import org.apache.mesos.v1.Protos.Offer.Operation; import org.apache.mesos.v1.Protos.OfferID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.util.Objects.requireNonNull; import static org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged; @@ -61,7 +36,7 @@ public interface OfferManager extends EventSubscriber { * * @param offer Newly-available resource offer. */ - void addOffer(HostOffer offer); + void add(HostOffer offer); /** * Invalidates an offer. This indicates that the scheduler should not attempt to match any @@ -70,62 +45,70 @@ public interface OfferManager extends EventSubscriber { * @param offerId Cancelled offer. * @return A boolean on whether or not the offer was successfully cancelled. */ - boolean cancelOffer(OfferID offerId); + boolean cancel(OfferID offerId); /** * Exclude an offer from being matched against all tasks. * * @param offerId Offer ID to ban. */ - void banOffer(OfferID offerId); + void ban(OfferID offerId); /** - * Exclude an offer that results in a static mismatch from further attempts to match against all - * tasks from the same group. + * Notifies the offer queue that a host's attributes have changed. * - * @param offerId Offer ID to exclude for the given {@code groupKey}. - * @param groupKey Task group key to exclude. + * @param change State change notification. */ - void banOfferForTaskGroup(OfferID offerId, TaskGroupKey groupKey); + void hostAttributesChanged(HostAttributesChanged change); /** - * Launches the task matched against the offer. + * Gets the offer for the given slave ID. * - * @param offerId Matched offer ID. - * @param task Matched task info. - * @throws LaunchException If there was an error launching the task. + * @param slaveId Slave ID to get the offer for. + * @return The offer for the slave ID. */ - void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException; + Optional<HostOffer> get(AgentID slaveId); /** - * Notifies the offer queue that a host's attributes have changed. + * Gets all offers that the scheduler is holding, excluding banned offers. * - * @param change State change notification. + * @return A snapshot of the offers that the scheduler is currently holding. */ - void hostAttributesChanged(HostAttributesChanged change); + Iterable<HostOffer> getAll(); /** - * Gets the offers that the scheduler is holding, excluding banned offers. + * Gets the offer for the given slave ID if satisfies the supplied {@link ResourceRequest}. * - * @return A snapshot of the offers that the scheduler is currently holding. + * @param slaveId Slave ID to get the offer for. + * @param resourceRequest The request that the offer should satisfy. + * @param revocable Whether or not the request can use revocable resources. + * @return An option containing the offer for the slave ID if it fits. */ - Iterable<HostOffer> getOffers(); + Optional<HostOffer> getMatching(AgentID slaveId, + ResourceRequest resourceRequest, + boolean revocable); /** - * Gets all offers that are not banned for the given {@code groupKey}. + * Gets all offers that the scheduler is holding that satisfy the supplied + * {@link ResourceRequest}. * - * @param groupKey Task group key to check offers for. - * @return A snapshot of all offers eligible for the given {@code groupKey}. + * @param groupKey The {@link TaskGroupKey} of the task in the {@link ResourceRequest}. + * @param resourceRequest The request that the offer should satisfy. + * @param revocable Whether or not the request can use revocable resources. + * @return An option containing the offer for the slave ID if it fits. */ - Iterable<HostOffer> getOffers(TaskGroupKey groupKey); + Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey, + ResourceRequest resourceRequest, + boolean revocable); /** - * Gets an offer for the given slave ID. + * Launches the task matched against the offer. * - * @param slaveId Slave ID to get offer for. - * @return An offer for the slave ID. + * @param offerId Matched offer ID. + * @param task Matched task info. + * @throws LaunchException If there was an error launching the task. */ - Optional<HostOffer> getOffer(AgentID slaveId); + void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException; /** * Thrown when there was an unexpected failure trying to launch a task. @@ -140,319 +123,4 @@ public interface OfferManager extends EventSubscriber { super(msg, cause); } } - - class OfferManagerImpl implements OfferManager { - @VisibleForTesting - static final Logger LOG = LoggerFactory.getLogger(OfferManagerImpl.class); - @VisibleForTesting - static final String OFFER_ACCEPT_RACES = "offer_accept_races"; - @VisibleForTesting - static final String OUTSTANDING_OFFERS = "outstanding_offers"; - @VisibleForTesting - static final String STATICALLY_BANNED_OFFERS = "statically_banned_offers_size"; - @VisibleForTesting - static final String STATICALLY_BANNED_OFFERS_HIT_RATE = "statically_banned_offers_hit_rate"; - @VisibleForTesting - static final String OFFER_CANCEL_FAILURES = "offer_cancel_failures"; - @VisibleForTesting - static final String GLOBALLY_BANNED_OFFERS = "globally_banned_offers_size"; - - private final HostOffers hostOffers; - private final AtomicLong offerRaces; - private final AtomicLong offerCancelFailures; - - private final Driver driver; - private final OfferSettings offerSettings; - private final Deferment offerDecline; - - @Inject - @VisibleForTesting - public OfferManagerImpl( - Driver driver, - OfferSettings offerSettings, - StatsProvider statsProvider, - Deferment offerDecline) { - - this.driver = requireNonNull(driver); - this.offerSettings = requireNonNull(offerSettings); - this.hostOffers = new HostOffers(statsProvider, offerSettings); - this.offerRaces = statsProvider.makeCounter(OFFER_ACCEPT_RACES); - this.offerCancelFailures = statsProvider.makeCounter(OFFER_CANCEL_FAILURES); - this.offerDecline = requireNonNull(offerDecline); - } - - @Override - public void addOffer(HostOffer offer) { - Optional<HostOffer> sameAgent = hostOffers.addAndPreventAgentCollision(offer); - if (sameAgent.isPresent()) { - // We have an existing offer for the same agent. We choose to return both offers so that - // they may be combined into a single offer. - LOG.info("Returning offers for " + offer.getOffer().getAgentId().getValue() - + " for compaction."); - decline(offer.getOffer().getId()); - decline(sameAgent.get().getOffer().getId()); - } else { - offerDecline.defer(() -> removeAndDecline(offer.getOffer().getId())); - } - } - - private void removeAndDecline(OfferID id) { - if (removeFromHostOffers(id)) { - decline(id); - } - } - - private void decline(OfferID id) { - LOG.debug("Declining offer {}", id); - driver.declineOffer(id, getOfferFilter()); - } - - private Protos.Filters getOfferFilter() { - return Protos.Filters.newBuilder() - .setRefuseSeconds(offerSettings.getFilterDuration().as(Time.SECONDS)) - .build(); - } - - @Override - public boolean cancelOffer(final OfferID offerId) { - boolean success = removeFromHostOffers(offerId); - if (!success) { - // This will happen rarely when we race to process this rescind against accepting the offer - // to launch a task. - // If it happens frequently, we are likely processing rescinds before the offer itself. - LOG.warn("Failed to cancel offer: {}.", offerId.getValue()); - this.offerCancelFailures.incrementAndGet(); - } - return success; - } - - @Override - public void banOffer(OfferID offerId) { - hostOffers.addGlobalBan(offerId); - } - - private boolean removeFromHostOffers(final OfferID offerId) { - requireNonNull(offerId); - - // The small risk of inconsistency is acceptable here - if we have an accept/remove race - // on an offer, the master will mark the task as LOST and it will be retried. - return hostOffers.remove(offerId); - } - - @Override - public Iterable<HostOffer> getOffers() { - return hostOffers.getOffers(); - } - - @Override - public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) { - return hostOffers.getWeaklyConsistentOffers(groupKey); - } - - @Override - public Optional<HostOffer> getOffer(AgentID slaveId) { - return hostOffers.get(slaveId); - } - - /** - * Updates the preference of a host's offers. - * - * @param change Host change notification. - */ - @Subscribe - public void hostAttributesChanged(HostAttributesChanged change) { - hostOffers.updateHostAttributes(change.getAttributes()); - } - - /** - * Notifies the queue that the driver is disconnected, and all the stored offers are now - * invalid. - * <p> - * The queue takes this as a signal to flush its queue. - * - * @param event Disconnected event. - */ - @Subscribe - public void driverDisconnected(DriverDisconnected event) { - LOG.info("Clearing stale offers since the driver is disconnected."); - hostOffers.clear(); - } - - /** - * Used for testing to ensure that the underlying cache's `size` method returns an accurate - * value by not including evicted entries. - */ - @VisibleForTesting - public void cleanupStaticBans() { - hostOffers.staticallyBannedOffers.cleanUp(); - } - - /** - * A container for the data structures used by this class, to make it easier to reason about - * the different indices used and their consistency. - */ - private static class HostOffers { - - private final Set<HostOffer> offers; - private final Map<OfferID, HostOffer> offersById = Maps.newHashMap(); - private final Map<AgentID, HostOffer> offersBySlave = Maps.newHashMap(); - private final Map<String, HostOffer> offersByHost = Maps.newHashMap(); - - // Keep track of offer->groupKey mappings that will never be matched to avoid redundant - // scheduling attempts. See VetoGroup for more details on static ban. - private final Cache<Pair<OfferID, TaskGroupKey>, Boolean> staticallyBannedOffers; - - // Keep track of globally banned offers that will never be matched to anything. - private final Set<OfferID> globallyBannedOffers = Sets.newConcurrentHashSet(); - - HostOffers(StatsProvider statsProvider, OfferSettings offerSettings) { - offers = new ConcurrentSkipListSet<>(offerSettings.getOrdering()); - staticallyBannedOffers = offerSettings - .getStaticBanCacheBuilder() - .build(); - // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive. - // Could track this separately if it turns out to pose problems. - statsProvider.exportSize(OUTSTANDING_OFFERS, offers); - statsProvider.makeGauge(STATICALLY_BANNED_OFFERS, staticallyBannedOffers::size); - statsProvider.makeGauge(STATICALLY_BANNED_OFFERS_HIT_RATE, - () -> staticallyBannedOffers.stats().hitRate()); - statsProvider.makeGauge(GLOBALLY_BANNED_OFFERS, globallyBannedOffers::size); - } - - synchronized Optional<HostOffer> get(AgentID slaveId) { - HostOffer offer = offersBySlave.get(slaveId); - if (offer == null || globallyBannedOffers.contains(offer.getOffer().getId())) { - return Optional.absent(); - } - - return Optional.of(offer); - } - - /** - * Adds an offer while maintaining a guarantee that no two offers may exist with the same - * agent ID. If an offer exists with the same agent ID, the existing offer is removed - * and returned, and {@code offer} is not added. - * - * @param offer Offer to add. - * @return The pre-existing offer with the same agent ID as {@code offer}, if one exists, - * which will also be removed prior to returning. - */ - synchronized Optional<HostOffer> addAndPreventAgentCollision(HostOffer offer) { - HostOffer sameAgent = offersBySlave.get(offer.getOffer().getAgentId()); - if (sameAgent != null) { - remove(sameAgent.getOffer().getId()); - return Optional.of(sameAgent); - } - - addInternal(offer); - return Optional.absent(); - } - - private void addInternal(HostOffer offer) { - offers.add(offer); - offersById.put(offer.getOffer().getId(), offer); - offersBySlave.put(offer.getOffer().getAgentId(), offer); - offersByHost.put(offer.getOffer().getHostname(), offer); - } - - synchronized boolean remove(OfferID id) { - HostOffer removed = offersById.remove(id); - if (removed != null) { - offers.remove(removed); - offersBySlave.remove(removed.getOffer().getAgentId()); - offersByHost.remove(removed.getOffer().getHostname()); - } - globallyBannedOffers.remove(id); - return removed != null; - } - - synchronized void updateHostAttributes(IHostAttributes attributes) { - HostOffer offer = offersByHost.remove(attributes.getHost()); - if (offer != null) { - // Remove and re-add a host's offer to re-sort based on its new hostStatus - remove(offer.getOffer().getId()); - addInternal(new HostOffer(offer.getOffer(), attributes)); - } - } - - /** - * Returns an iterable giving the state of the offers at the time the method is called. Unlike - * {@code getWeaklyConsistentOffers}, the underlying collection is a copy of the original and - * will not be modified outside of the returned iterable. - * - * @return The offers currently known by the scheduler. - */ - synchronized Iterable<HostOffer> getOffers() { - return FluentIterable.from(offers).filter( - e -> !globallyBannedOffers.contains(e.getOffer().getId()) - ).toSet(); - } - - /** - * Returns a weakly-consistent iterable giving the available offers to a given - * {@code groupKey}. This iterable can handle concurrent operations on its underlying - * collection, and may reflect changes that happen after the construction of the iterable. - * This property is mainly used in {@code launchTask}. - * - * @param groupKey The task group to get offers for. - * @return The offers a given task group can use. - */ - synchronized Iterable<HostOffer> getWeaklyConsistentOffers(TaskGroupKey groupKey) { - return Iterables.unmodifiableIterable(FluentIterable.from(offers).filter(e -> - staticallyBannedOffers.getIfPresent(Pair.of(e.getOffer().getId(), groupKey)) == null - && !globallyBannedOffers.contains(e.getOffer().getId()))); - } - - synchronized void addGlobalBan(OfferID offerId) { - globallyBannedOffers.add(offerId); - } - - synchronized void addStaticGroupBan(OfferID offerId, TaskGroupKey groupKey) { - if (offersById.containsKey(offerId)) { - staticallyBannedOffers.put(Pair.of(offerId, groupKey), true); - } - } - - synchronized void clear() { - offers.clear(); - offersById.clear(); - offersBySlave.clear(); - offersByHost.clear(); - staticallyBannedOffers.invalidateAll(); - globallyBannedOffers.clear(); - } - } - - @Override - public void banOfferForTaskGroup(OfferID offerId, TaskGroupKey groupKey) { - hostOffers.addStaticGroupBan(offerId, groupKey); - } - - @Timed("offer_manager_launch_task") - @Override - public void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException { - // Guard against an offer being removed after we grabbed it from the iterator. - // If that happens, the offer will not exist in hostOffers, and we can immediately - // send it back to LOST for quick reschedule. - // Removing while iterating counts on the use of a weakly-consistent iterator being used, - // which is a feature of ConcurrentSkipListSet. - if (hostOffers.remove(offerId)) { - try { - Operation launch = Operation.newBuilder() - .setType(Operation.Type.LAUNCH) - .setLaunch(Operation.Launch.newBuilder().addTaskInfos(task)) - .build(); - driver.acceptOffers(offerId, ImmutableList.of(launch), getOfferFilter()); - } catch (IllegalStateException e) { - // TODO(William Farner): Catch only the checked exception produced by Driver - // once it changes from throwing IllegalStateException when the driver is not yet - // registered. - throw new LaunchException("Failed to launch task.", e); - } - } else { - offerRaces.incrementAndGet(); - throw new LaunchException("Offer no longer exists in offer queue, likely data race."); - } - } - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java new file mode 100644 index 0000000..427b1b4 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java @@ -0,0 +1,246 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.offers; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import javax.inject.Inject; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.eventbus.Subscribe; + +import org.apache.aurora.common.collections.Pair; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.filter.SchedulingFilter; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; +import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.mesos.v1.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +import static org.apache.aurora.common.inject.TimedInterceptor.Timed; + +public class OfferManagerImpl implements OfferManager { + private static final Logger LOG = + LoggerFactory.getLogger(org.apache.aurora.scheduler.offers.OfferManagerImpl.class); + + @VisibleForTesting + static final String OFFER_ACCEPT_RACES = "offer_accept_races"; + @VisibleForTesting + static final String OUTSTANDING_OFFERS = "outstanding_offers"; + @VisibleForTesting + static final String STATICALLY_BANNED_OFFERS = "statically_banned_offers_size"; + @VisibleForTesting + static final String STATICALLY_BANNED_OFFERS_HIT_RATE = "statically_banned_offers_hit_rate"; + @VisibleForTesting + static final String OFFER_CANCEL_FAILURES = "offer_cancel_failures"; + @VisibleForTesting + static final String GLOBALLY_BANNED_OFFERS = "globally_banned_offers_size"; + @VisibleForTesting + static final String VETO_EVALUATED_OFFERS = "veto_evaluated_offers"; + + private final HostOffers hostOffers; + private final AtomicLong offerRaces; + private final AtomicLong offerCancelFailures; + + private final Driver driver; + private final OfferSettings offerSettings; + private final Deferment offerDecline; + + @Inject + @VisibleForTesting + public OfferManagerImpl( + Driver driver, + OfferSettings offerSettings, + StatsProvider statsProvider, + Deferment offerDecline, + SchedulingFilter schedulingFilter) { + + this.driver = requireNonNull(driver); + this.offerSettings = requireNonNull(offerSettings); + this.hostOffers = new HostOffers(statsProvider, offerSettings, schedulingFilter); + this.offerRaces = statsProvider.makeCounter(OFFER_ACCEPT_RACES); + this.offerCancelFailures = statsProvider.makeCounter(OFFER_CANCEL_FAILURES); + this.offerDecline = requireNonNull(offerDecline); + } + + @Override + public void add(HostOffer offer) { + Optional<HostOffer> sameAgent = hostOffers.addAndPreventAgentCollision(offer); + if (sameAgent.isPresent()) { + // We have an existing offer for the same agent. We choose to return both offers so that + // they may be combined into a single offer. + LOG.info("Returning offers for " + offer.getOffer().getAgentId().getValue() + + " for compaction."); + decline(offer.getOffer().getId()); + decline(sameAgent.get().getOffer().getId()); + } else { + offerDecline.defer(() -> removeAndDecline(offer.getOffer().getId())); + } + } + + private void removeAndDecline(Protos.OfferID id) { + if (removeFromHostOffers(id)) { + decline(id); + } + } + + private void decline(Protos.OfferID id) { + LOG.debug("Declining offer {}", id); + driver.declineOffer(id, getOfferFilter()); + } + + private Protos.Filters getOfferFilter() { + return Protos.Filters.newBuilder() + .setRefuseSeconds(offerSettings.getFilterDuration().as(Time.SECONDS)) + .build(); + } + + @Override + public boolean cancel(final Protos.OfferID offerId) { + boolean success = removeFromHostOffers(offerId); + if (!success) { + // This will happen rarely when we race to process this rescind against accepting the offer + // to launch a task. + // If it happens frequently, we are likely processing rescinds before the offer itself. + LOG.warn("Failed to cancel offer: {}.", offerId.getValue()); + this.offerCancelFailures.incrementAndGet(); + } + return success; + } + + private boolean removeFromHostOffers(final Protos.OfferID offerId) { + requireNonNull(offerId); + + // The small risk of inconsistency is acceptable here - if we have an accept/remove race + // on an offer, the master will mark the task as LOST and it will be retried. + return hostOffers.remove(offerId); + } + + @Override + public void ban(Protos.OfferID offerId) { + hostOffers.addGlobalBan(offerId); + } + + /** + * Updates the preference of a host's offers. + * + * @param change Host change notification. + */ + @Subscribe + public void hostAttributesChanged(PubsubEvent.HostAttributesChanged change) { + hostOffers.updateHostAttributes(change.getAttributes()); + } + + @Override + public Optional<HostOffer> get(Protos.AgentID slaveId) { + return hostOffers.get(slaveId); + } + + @Override + public Iterable<HostOffer> getAll() { + return hostOffers.getOffers(); + } + + @Override + public Optional<HostOffer> getMatching(Protos.AgentID slaveId, + ResourceRequest resourceRequest, + boolean revocable) { + + return hostOffers.getMatching(slaveId, resourceRequest, revocable); + } + + @Override + public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey, + ResourceRequest resourceRequest, + boolean revocable) { + + return hostOffers.getAllMatching(groupKey, resourceRequest, revocable); + } + + /** + * Notifies the queue that the driver is disconnected, and all the stored offers are now + * invalid. + * <p> + * The queue takes this as a signal to flush its queue. + * + * @param event Disconnected event. + */ + @Subscribe + public void driverDisconnected(PubsubEvent.DriverDisconnected event) { + LOG.info("Clearing stale offers since the driver is disconnected."); + hostOffers.clear(); + } + + @Timed("offer_manager_launch_task") + @Override + public void launchTask(Protos.OfferID offerId, Protos.TaskInfo task) throws LaunchException { + // Guard against an offer being removed after we grabbed it from the iterator. + // If that happens, the offer will not exist in hostOffers, and we can immediately + // send it back to LOST for quick reschedule. + // Removing while iterating counts on the use of a weakly-consistent iterator being used, + // which is a feature of ConcurrentSkipListSet. + if (hostOffers.remove(offerId)) { + try { + Protos.Offer.Operation launch = Protos.Offer.Operation.newBuilder() + .setType(Protos.Offer.Operation.Type.LAUNCH) + .setLaunch(Protos.Offer.Operation.Launch.newBuilder().addTaskInfos(task)) + .build(); + driver.acceptOffers(offerId, ImmutableList.of(launch), getOfferFilter()); + } catch (IllegalStateException e) { + // TODO(William Farner): Catch only the checked exception produced by Driver + // once it changes from throwing IllegalStateException when the driver is not yet + // registered. + throw new LaunchException("Failed to launch task.", e); + } + } else { + offerRaces.incrementAndGet(); + throw new LaunchException("Offer no longer exists in offer queue, likely data race."); + } + } + + /** + * Get all static bans. + */ + @VisibleForTesting + Set<Pair<Protos.OfferID, TaskGroupKey>> getStaticBans() { + return hostOffers.getStaticBans(); + } + + /** + * Exclude an offer that results in a static mismatch from further attempts to match against all + * tasks from the same group. + */ + @VisibleForTesting + void banForTaskGroup(Protos.OfferID offerId, TaskGroupKey groupKey) { + hostOffers.addStaticGroupBan(offerId, groupKey); + } + + /** + * Used for testing to ensure that the underlying cache's `size` method returns an accurate + * value by not including evicted entries. + */ + @VisibleForTesting + void cleanupStaticBans() { + hostOffers.cleanUpStaticallyBannedOffers(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java new file mode 100644 index 0000000..e2e3628 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java @@ -0,0 +1,211 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.offers; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.List; + +import javax.inject.Qualifier; +import javax.inject.Singleton; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.google.common.base.Supplier; +import com.google.common.base.Ticker; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Ordering; +import com.google.inject.AbstractModule; +import com.google.inject.Module; +import com.google.inject.PrivateModule; +import com.google.inject.Provides; +import com.google.inject.TypeLiteral; + +import org.apache.aurora.common.quantity.Amount; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.util.Random; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.app.MoreModules; +import org.apache.aurora.scheduler.config.CliOptions; +import org.apache.aurora.scheduler.config.types.TimeAmount; +import org.apache.aurora.scheduler.config.validators.NotNegativeAmount; +import org.apache.aurora.scheduler.config.validators.NotNegativeNumber; +import org.apache.aurora.scheduler.events.PubsubEventModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * Binding module for resource offer management. + */ +public class OfferManagerModule extends AbstractModule { + private static final Logger LOG = LoggerFactory.getLogger(OfferManagerModule.class); + + @Parameters(separators = "=") + public static class Options { + @Parameter(names = "-hold_offers_forever", + description = + "Hold resource offers indefinitely, disabling automatic offer decline settings.", + arity = 1) + public boolean holdOffersForever = false; + + @Parameter(names = "-min_offer_hold_time", + validateValueWith = NotNegativeAmount.class, + description = "Minimum amount of time to hold a resource offer before declining.") + public TimeAmount minOfferHoldTime = new TimeAmount(5, Time.MINUTES); + + @Parameter(names = "-offer_hold_jitter_window", + validateValueWith = NotNegativeAmount.class, + description = "Maximum amount of random jitter to add to the offer hold time window.") + public TimeAmount offerHoldJitterWindow = new TimeAmount(1, Time.MINUTES); + + @Parameter(names = "-offer_filter_duration", + description = + "Duration after which we expect Mesos to re-offer unused resources. A short duration " + + "improves scheduling performance in smaller clusters, but might lead to resource " + + "starvation for other frameworks if you run many frameworks in your cluster.") + public TimeAmount offerFilterDuration = new TimeAmount(5, Time.SECONDS); + + @Parameter(names = "-unavailability_threshold", + description = + "Threshold time, when running tasks should be drained from a host, before a host " + + "becomes unavailable. Should be greater than min_offer_hold_time + " + + "offer_hold_jitter_window.") + public TimeAmount unavailabilityThreshold = new TimeAmount(6, Time.MINUTES); + + @Parameter(names = "-offer_order", + description = + "Iteration order for offers, to influence task scheduling. Multiple orderings will be " + + "compounded together. E.g. CPU,MEMORY,RANDOM would sort first by cpus offered," + + " then memory and finally would randomize any equal offers.") + public List<OfferOrder> offerOrder = ImmutableList.of(OfferOrder.RANDOM); + + @Parameter(names = "-offer_order_modules", + description = "Custom Guice module to provide an offer ordering.") + @SuppressWarnings("rawtypes") + public List<Class> offerOrderModules = ImmutableList.of(OfferOrderModule.class); + + @Parameter(names = "-offer_static_ban_cache_max_size", + validateValueWith = NotNegativeNumber.class, + description = + "The number of offers to hold in the static ban cache. If no value is specified, " + + "the cache will grow indefinitely. However, entries will expire within " + + "'min_offer_hold_time' + 'offer_hold_jitter_window' of being written.") + public long offerStaticBanCacheMaxSize = Long.MAX_VALUE; + } + + /** + * Binding annotation for the threshold to veto tasks with unavailability. + */ + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + public @interface UnavailabilityThreshold { } + + public static class OfferOrderModule extends AbstractModule { + private final CliOptions options; + + public OfferOrderModule(CliOptions options) { + this.options = options; + } + + @Override + protected void configure() { + bind(new TypeLiteral<Ordering<HostOffer>>() { }) + .toInstance(OfferOrderBuilder.create(options.offer.offerOrder)); + } + } + + private final CliOptions cliOptions; + + public OfferManagerModule(CliOptions cliOptions) { + this.cliOptions = cliOptions; + } + + @Override + protected void configure() { + Options options = cliOptions.offer; + if (!options.holdOffersForever) { + long offerHoldTime = options.offerHoldJitterWindow.as(Time.SECONDS) + + options.minOfferHoldTime.as(Time.SECONDS); + if (options.unavailabilityThreshold.as(Time.SECONDS) < offerHoldTime) { + LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({})" + + " and offer_hold_jitter_window ({}). This creates risks of races between " + + "launching and draining", + options.unavailabilityThreshold, + options.minOfferHoldTime, + options.offerHoldJitterWindow); + } + } + + for (Module module: MoreModules.instantiateAll(options.offerOrderModules, cliOptions)) { + install(module); + } + + bind(new TypeLiteral<Amount<Long, Time>>() { }) + .annotatedWith(UnavailabilityThreshold.class) + .toInstance(options.unavailabilityThreshold); + + install(new PrivateModule() { + @Override + protected void configure() { + if (options.holdOffersForever) { + bind(Deferment.class).to(Deferment.Noop.class); + } else { + bind(new TypeLiteral<Supplier<Amount<Long, Time>>>() { }).toInstance( + new RandomJitterReturnDelay( + options.minOfferHoldTime.as(Time.MILLISECONDS), + options.offerHoldJitterWindow.as(Time.MILLISECONDS), + Random.Util.newDefaultRandom())); + bind(Deferment.class).to(Deferment.DelayedDeferment.class); + } + + bind(OfferManager.class).to(OfferManagerImpl.class); + bind(OfferManagerImpl.class).in(Singleton.class); + expose(OfferManager.class); + } + }); + PubsubEventModule.bindSubscriber(binder(), OfferManager.class); + } + + @Provides + @Singleton + OfferSettings provideOfferSettings(Ordering<HostOffer> offerOrdering) { + // We have a dual eviction strategy for the static ban cache in OfferManager that is based on + // both maximum size of the cache and the length an offer is valid. We do this in order to + // satisfy requirements in both single- and multi-framework environments. If offers are held for + // a finite duration, then we can expire cache entries after offerMaxHoldTime since that is the + // longest it will be valid for. Additionally, cluster operators will most likely not have to + // worry about cache size in this case as this behavior mimics current behavior. If offers are + // held indefinitely, then we never expire cache entries but the cluster operator can specify a + // maximum size to avoid a memory leak. + long maxOfferHoldTime; + if (cliOptions.offer.holdOffersForever) { + maxOfferHoldTime = Long.MAX_VALUE; + } else { + maxOfferHoldTime = cliOptions.offer.minOfferHoldTime.as(Time.SECONDS) + + cliOptions.offer.offerHoldJitterWindow.as(Time.SECONDS); + } + + return new OfferSettings( + cliOptions.offer.offerFilterDuration, + offerOrdering, + Amount.of(maxOfferHoldTime, Time.SECONDS), + cliOptions.offer.offerStaticBanCacheMaxSize, + Ticker.systemTicker()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java index 57fc1a1..838a319 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java @@ -29,6 +29,7 @@ import static java.util.Objects.requireNonNull; /** * Settings required to create an OfferManager. */ +@VisibleForTesting public class OfferSettings { private final Amount<Long, Time> filterDuration; @@ -67,14 +68,14 @@ public class OfferSettings { /** * Duration after which we want Mesos to re-offer unused or declined resources. */ - public Amount<Long, Time> getFilterDuration() { + Amount<Long, Time> getFilterDuration() { return filterDuration; } /** * The ordering to use when fetching offers from OfferManager. */ - public Ordering<HostOffer> getOrdering() { + Ordering<HostOffer> getOrdering() { return ordering; } @@ -82,7 +83,7 @@ public class OfferSettings { * The builder for the static ban cache. Cache settings (e.g. max size, entry expiration) should * already be added to the builder by this point. */ - public CacheBuilder<Object, Object> getStaticBanCacheBuilder() { + CacheBuilder<Object, Object> getStaticBanCacheBuilder() { return staticBanCacheBuilder; } } http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java deleted file mode 100644 index 4a6ea8d..0000000 --- a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java +++ /dev/null @@ -1,211 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.offers; - -import java.lang.annotation.Retention; -import java.lang.annotation.Target; -import java.util.List; - -import javax.inject.Qualifier; -import javax.inject.Singleton; - -import com.beust.jcommander.Parameter; -import com.beust.jcommander.Parameters; -import com.google.common.base.Supplier; -import com.google.common.base.Ticker; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Ordering; -import com.google.inject.AbstractModule; -import com.google.inject.Module; -import com.google.inject.PrivateModule; -import com.google.inject.Provides; -import com.google.inject.TypeLiteral; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.util.Random; -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.app.MoreModules; -import org.apache.aurora.scheduler.config.CliOptions; -import org.apache.aurora.scheduler.config.types.TimeAmount; -import org.apache.aurora.scheduler.config.validators.NotNegativeAmount; -import org.apache.aurora.scheduler.config.validators.NotNegativeNumber; -import org.apache.aurora.scheduler.events.PubsubEventModule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.lang.annotation.ElementType.FIELD; -import static java.lang.annotation.ElementType.METHOD; -import static java.lang.annotation.ElementType.PARAMETER; -import static java.lang.annotation.RetentionPolicy.RUNTIME; - -/** - * Binding module for resource offer management. - */ -public class OffersModule extends AbstractModule { - private static final Logger LOG = LoggerFactory.getLogger(OffersModule.class); - - @Parameters(separators = "=") - public static class Options { - @Parameter(names = "-hold_offers_forever", - description = - "Hold resource offers indefinitely, disabling automatic offer decline settings.", - arity = 1) - public boolean holdOffersForever = false; - - @Parameter(names = "-min_offer_hold_time", - validateValueWith = NotNegativeAmount.class, - description = "Minimum amount of time to hold a resource offer before declining.") - public TimeAmount minOfferHoldTime = new TimeAmount(5, Time.MINUTES); - - @Parameter(names = "-offer_hold_jitter_window", - validateValueWith = NotNegativeAmount.class, - description = "Maximum amount of random jitter to add to the offer hold time window.") - public TimeAmount offerHoldJitterWindow = new TimeAmount(1, Time.MINUTES); - - @Parameter(names = "-offer_filter_duration", - description = - "Duration after which we expect Mesos to re-offer unused resources. A short duration " - + "improves scheduling performance in smaller clusters, but might lead to resource " - + "starvation for other frameworks if you run many frameworks in your cluster.") - public TimeAmount offerFilterDuration = new TimeAmount(5, Time.SECONDS); - - @Parameter(names = "-unavailability_threshold", - description = - "Threshold time, when running tasks should be drained from a host, before a host " - + "becomes unavailable. Should be greater than min_offer_hold_time + " - + "offer_hold_jitter_window.") - public TimeAmount unavailabilityThreshold = new TimeAmount(6, Time.MINUTES); - - @Parameter(names = "-offer_order", - description = - "Iteration order for offers, to influence task scheduling. Multiple orderings will be " - + "compounded together. E.g. CPU,MEMORY,RANDOM would sort first by cpus offered," - + " then memory and finally would randomize any equal offers.") - public List<OfferOrder> offerOrder = ImmutableList.of(OfferOrder.RANDOM); - - @Parameter(names = "-offer_order_modules", - description = "Custom Guice module to provide an offer ordering.") - @SuppressWarnings("rawtypes") - public List<Class> offerOrderModules = ImmutableList.of(OfferOrderModule.class); - - @Parameter(names = "-offer_static_ban_cache_max_size", - validateValueWith = NotNegativeNumber.class, - description = - "The number of offers to hold in the static ban cache. If no value is specified, " - + "the cache will grow indefinitely. However, entries will expire within " - + "'min_offer_hold_time' + 'offer_hold_jitter_window' of being written.") - public long offerStaticBanCacheMaxSize = Long.MAX_VALUE; - } - - public static class OfferOrderModule extends AbstractModule { - private final CliOptions options; - - public OfferOrderModule(CliOptions options) { - this.options = options; - } - - @Override - protected void configure() { - bind(new TypeLiteral<Ordering<HostOffer>>() { }) - .toInstance(OfferOrderBuilder.create(options.offer.offerOrder)); - } - } - - /** - * Binding annotation for the threshold to veto tasks with unavailability. - */ - @Qualifier - @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) - public @interface UnavailabilityThreshold { } - - private final CliOptions cliOptions; - - public OffersModule(CliOptions cliOptions) { - this.cliOptions = cliOptions; - } - - @Override - protected void configure() { - Options options = cliOptions.offer; - if (!options.holdOffersForever) { - long offerHoldTime = options.offerHoldJitterWindow.as(Time.SECONDS) - + options.minOfferHoldTime.as(Time.SECONDS); - if (options.unavailabilityThreshold.as(Time.SECONDS) < offerHoldTime) { - LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({})" - + " and offer_hold_jitter_window ({}). This creates risks of races between " - + "launching and draining", - options.unavailabilityThreshold, - options.minOfferHoldTime, - options.offerHoldJitterWindow); - } - } - - for (Module module: MoreModules.instantiateAll(options.offerOrderModules, cliOptions)) { - install(module); - } - - bind(new TypeLiteral<Amount<Long, Time>>() { }) - .annotatedWith(UnavailabilityThreshold.class) - .toInstance(options.unavailabilityThreshold); - - install(new PrivateModule() { - @Override - protected void configure() { - if (options.holdOffersForever) { - bind(Deferment.class).to(Deferment.Noop.class); - } else { - bind(new TypeLiteral<Supplier<Amount<Long, Time>>>() { }).toInstance( - new RandomJitterReturnDelay( - options.minOfferHoldTime.as(Time.MILLISECONDS), - options.offerHoldJitterWindow.as(Time.MILLISECONDS), - Random.Util.newDefaultRandom())); - bind(Deferment.class).to(Deferment.DelayedDeferment.class); - } - - bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class); - bind(OfferManager.OfferManagerImpl.class).in(Singleton.class); - expose(OfferManager.class); - } - }); - PubsubEventModule.bindSubscriber(binder(), OfferManager.class); - } - - @Provides - @Singleton - OfferSettings provideOfferSettings(Ordering<HostOffer> offerOrdering) { - // We have a dual eviction strategy for the static ban cache in OfferManager that is based on - // both maximum size of the cache and the length an offer is valid. We do this in order to - // satisfy requirements in both single- and multi-framework environments. If offers are held for - // a finite duration, then we can expire cache entries after offerMaxHoldTime since that is the - // longest it will be valid for. Additionally, cluster operators will most likely not have to - // worry about cache size in this case as this behavior mimics current behavior. If offers are - // held indefinitely, then we never expire cache entries but the cluster operator can specify a - // maximum size to avoid a memory leak. - long maxOfferHoldTime; - if (cliOptions.offer.holdOffersForever) { - maxOfferHoldTime = Long.MAX_VALUE; - } else { - maxOfferHoldTime = cliOptions.offer.minOfferHoldTime.as(Time.SECONDS) - + cliOptions.offer.offerHoldJitterWindow.as(Time.SECONDS); - } - - return new OfferSettings( - cliOptions.offer.offerFilterDuration, - offerOrdering, - Amount.of(maxOfferHoldTime, Time.SECONDS), - cliOptions.offer.offerStaticBanCacheMaxSize, - Ticker.systemTicker()); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java index 766d3b2..497a766 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java @@ -143,7 +143,7 @@ public class PendingTaskProcessor implements Runnable { // Group the offers by slave id so they can be paired with active tasks from the same slave. Map<String, HostOffer> slavesToOffers = - Maps.uniqueIndex(offerManager.getOffers(), OFFER_TO_SLAVE_ID); + Maps.uniqueIndex(offerManager.getAll(), OFFER_TO_SLAVE_ID); Set<String> allSlaves = Sets.newHashSet(Iterables.concat( slavesToOffers.keySet(), http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java index 82a0ff6..ffb8b90 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java @@ -94,7 +94,7 @@ public interface Preemptor { pendingTask.getTask(), slot.getVictims(), jobState, - offerManager.getOffer(slaveId), + offerManager.get(slaveId), store); metrics.recordSlotValidationResult(validatedVictims);
