Break apart async package and AsyncModule into purpose-specific equivalents.
Testing Done: Confirmed end-to-end tests pass, and ./gradlew run works. Reviewed at https://reviews.apache.org/r/36666/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/0070a5fd Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/0070a5fd Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/0070a5fd Branch: refs/heads/master Commit: 0070a5fd18c6f219a7fe66f327209b8dc21ab67e Parents: 6e2bf57 Author: Bill Farner <[email protected]> Authored: Wed Jul 22 12:39:37 2015 -0700 Committer: Bill Farner <[email protected]> Committed: Wed Jul 22 12:39:37 2015 -0700 ---------------------------------------------------------------------- .../org/apache/aurora/benchmark/Offers.java | 6 +- .../aurora/benchmark/SchedulingBenchmarks.java | 16 +- .../aurora/benchmark/StatusUpdateBenchmark.java | 6 +- .../benchmark/fakes/FakeOfferManager.java | 2 +- .../fakes/FakeRescheduleCalculator.java | 2 +- .../apache/aurora/scheduler/ResourceSlot.java | 2 +- .../apache/aurora/scheduler/app/AppModule.java | 10 +- .../aurora/scheduler/async/AsyncModule.java | 298 +------- .../scheduler/async/JobUpdateHistoryPruner.java | 105 --- .../aurora/scheduler/async/KillRetry.java | 102 --- .../aurora/scheduler/async/OfferManager.java | 406 ----------- .../async/RandomJitterReturnDelay.java | 49 -- .../scheduler/async/RescheduleCalculator.java | 174 ----- .../aurora/scheduler/async/TaskGroup.java | 77 --- .../aurora/scheduler/async/TaskGroups.java | 239 ------- .../scheduler/async/TaskHistoryPruner.java | 174 ----- .../aurora/scheduler/async/TaskReconciler.java | 155 ----- .../aurora/scheduler/async/TaskScheduler.java | 247 ------- .../aurora/scheduler/async/TaskThrottler.java | 96 --- .../aurora/scheduler/async/TaskTimeout.java | 157 ----- .../scheduler/async/preemptor/BiCache.java | 139 ---- .../scheduler/async/preemptor/ClusterState.java | 34 - .../async/preemptor/ClusterStateImpl.java | 50 -- .../async/preemptor/PendingTaskProcessor.java | 258 ------- .../async/preemptor/PreemptionProposal.java | 66 -- .../async/preemptor/PreemptionVictim.java | 115 ---- .../async/preemptor/PreemptionVictimFilter.java | 214 ------ .../scheduler/async/preemptor/Preemptor.java | 121 ---- .../async/preemptor/PreemptorMetrics.java | 131 ---- .../async/preemptor/PreemptorModule.java | 167 ----- .../apache/aurora/scheduler/http/Offers.java | 2 +- .../aurora/scheduler/http/PendingTasks.java | 2 +- .../scheduler/mesos/MesosSchedulerImpl.java | 2 +- .../aurora/scheduler/offers/OfferManager.java | 408 +++++++++++ .../aurora/scheduler/offers/OffersModule.java | 63 ++ .../offers/RandomJitterReturnDelay.java | 49 ++ .../aurora/scheduler/preemptor/BiCache.java | 139 ++++ .../scheduler/preemptor/ClusterState.java | 34 + .../scheduler/preemptor/ClusterStateImpl.java | 50 ++ .../preemptor/PendingTaskProcessor.java | 258 +++++++ .../scheduler/preemptor/PreemptionProposal.java | 66 ++ .../scheduler/preemptor/PreemptionVictim.java | 115 ++++ .../preemptor/PreemptionVictimFilter.java | 214 ++++++ .../aurora/scheduler/preemptor/Preemptor.java | 121 ++++ .../scheduler/preemptor/PreemptorMetrics.java | 131 ++++ .../scheduler/preemptor/PreemptorModule.java | 167 +++++ .../pruning/JobUpdateHistoryPruner.java | 105 +++ .../aurora/scheduler/pruning/PruningModule.java | 106 +++ .../scheduler/pruning/TaskHistoryPruner.java | 175 +++++ .../scheduler/reconciliation/KillRetry.java | 103 +++ .../reconciliation/ReconciliationModule.java | 118 ++++ .../reconciliation/TaskReconciler.java | 156 +++++ .../scheduler/reconciliation/TaskTimeout.java | 158 +++++ .../scheduling/RescheduleCalculator.java | 174 +++++ .../scheduler/scheduling/SchedulingModule.java | 134 ++++ .../aurora/scheduler/scheduling/TaskGroup.java | 77 +++ .../aurora/scheduler/scheduling/TaskGroups.java | 239 +++++++ .../scheduler/scheduling/TaskScheduler.java | 248 +++++++ .../scheduler/scheduling/TaskThrottler.java | 97 +++ .../scheduler/state/StateManagerImpl.java | 2 +- .../scheduler/stats/AsyncStatsModule.java | 2 +- .../aurora/scheduler/async/AsyncModuleTest.java | 14 - .../async/JobUpdateHistoryPrunerTest.java | 69 -- .../aurora/scheduler/async/KillRetryTest.java | 157 ----- .../scheduler/async/OfferManagerImplTest.java | 234 ------- .../apache/aurora/scheduler/async/Offers.java | 43 -- .../async/RandomJitterReturnDelayTest.java | 77 --- .../async/RescheduleCalculatorImplTest.java | 188 ------ .../aurora/scheduler/async/TaskGroupsTest.java | 140 ---- .../scheduler/async/TaskHistoryPrunerTest.java | 398 ----------- .../scheduler/async/TaskReconcilerTest.java | 140 ---- .../scheduler/async/TaskSchedulerImplTest.java | 340 ---------- .../scheduler/async/TaskSchedulerTest.java | 669 ------------------ .../scheduler/async/TaskThrottlerTest.java | 146 ---- .../aurora/scheduler/async/TaskTimeoutTest.java | 244 ------- .../scheduler/async/preemptor/BiCacheTest.java | 107 --- .../async/preemptor/ClusterStateImplTest.java | 133 ---- .../preemptor/PendingTaskProcessorTest.java | 285 -------- .../preemptor/PreemptionVictimFilterTest.java | 512 -------------- .../async/preemptor/PreemptionVictimTest.java | 49 -- .../async/preemptor/PreemptorImplTest.java | 177 ----- .../async/preemptor/PreemptorModuleTest.java | 91 --- .../scheduler/http/JettyServerModuleTest.java | 8 +- .../scheduler/mesos/MesosSchedulerImplTest.java | 2 +- .../scheduler/offers/OfferManagerImplTest.java | 234 +++++++ .../apache/aurora/scheduler/offers/Offers.java | 43 ++ .../offers/RandomJitterReturnDelayTest.java | 77 +++ .../aurora/scheduler/preemptor/BiCacheTest.java | 107 +++ .../preemptor/ClusterStateImplTest.java | 133 ++++ .../preemptor/PendingTaskProcessorTest.java | 285 ++++++++ .../preemptor/PreemptionVictimFilterTest.java | 512 ++++++++++++++ .../preemptor/PreemptionVictimTest.java | 49 ++ .../scheduler/preemptor/PreemptorImplTest.java | 177 +++++ .../preemptor/PreemptorModuleTest.java | 91 +++ .../pruning/JobUpdateHistoryPrunerTest.java | 69 ++ .../pruning/TaskHistoryPrunerTest.java | 398 +++++++++++ .../scheduler/reconciliation/KillRetryTest.java | 159 +++++ .../reconciliation/TaskReconcilerTest.java | 140 ++++ .../reconciliation/TaskTimeoutTest.java | 244 +++++++ .../RescheduleCalculatorImplTest.java | 188 ++++++ .../scheduler/scheduling/TaskGroupsTest.java | 140 ++++ .../scheduling/TaskSchedulerImplTest.java | 342 ++++++++++ .../scheduler/scheduling/TaskSchedulerTest.java | 671 +++++++++++++++++++ .../scheduler/scheduling/TaskThrottlerTest.java | 146 ++++ .../scheduler/state/StateManagerImplTest.java | 2 +- .../aurora/scheduler/updater/JobUpdaterIT.java | 4 +- 106 files changed, 7953 insertions(+), 7814 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/jmh/java/org/apache/aurora/benchmark/Offers.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/Offers.java b/src/jmh/java/org/apache/aurora/benchmark/Offers.java index b8e6cb5..e40db74 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/Offers.java +++ b/src/jmh/java/org/apache/aurora/benchmark/Offers.java @@ -20,8 +20,8 @@ import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Data; import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.async.OfferManager; import org.apache.aurora.scheduler.configuration.Resources; +import org.apache.aurora.scheduler.offers.OfferManager; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.mesos.Protos; @@ -34,9 +34,9 @@ final class Offers { } /** - * Saves offers into the {@link org.apache.aurora.scheduler.async.OfferManager}. + * Saves offers into the {@link OfferManager}. * - * @param offerManager {@link org.apache.aurora.scheduler.async.OfferManager} to save into. + * @param offerManager {@link OfferManager} to save into. * @param offers Offers to save. */ static void addOffers(OfferManager offerManager, Iterable<HostOffer> offers) { http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java index d9e5199..5716f23 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java @@ -38,20 +38,20 @@ import org.apache.aurora.benchmark.fakes.FakeRescheduleCalculator; import org.apache.aurora.benchmark.fakes.FakeStatsProvider; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.TaskIdGenerator; -import org.apache.aurora.scheduler.async.OfferManager; -import org.apache.aurora.scheduler.async.RescheduleCalculator; -import org.apache.aurora.scheduler.async.TaskScheduler; -import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.ReservationDuration; -import org.apache.aurora.scheduler.async.preemptor.BiCache; -import org.apache.aurora.scheduler.async.preemptor.ClusterStateImpl; -import org.apache.aurora.scheduler.async.preemptor.PendingTaskProcessor; -import org.apache.aurora.scheduler.async.preemptor.PreemptorModule; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilterImpl; import org.apache.aurora.scheduler.mesos.Driver; import org.apache.aurora.scheduler.mesos.ExecutorSettings; +import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.preemptor.BiCache; +import org.apache.aurora.scheduler.preemptor.ClusterStateImpl; +import org.apache.aurora.scheduler.preemptor.PendingTaskProcessor; +import org.apache.aurora.scheduler.preemptor.PreemptorModule; +import org.apache.aurora.scheduler.scheduling.RescheduleCalculator; +import org.apache.aurora.scheduler.scheduling.TaskScheduler; +import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl.ReservationDuration; import org.apache.aurora.scheduler.state.StateModule; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.db.DbUtil; http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java index e08d16e..3931d02 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java +++ b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java @@ -49,9 +49,6 @@ import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.scheduler.TaskIdGenerator; import org.apache.aurora.scheduler.TaskStatusHandler; import org.apache.aurora.scheduler.TaskStatusHandlerImpl; -import org.apache.aurora.scheduler.async.OfferManager; -import org.apache.aurora.scheduler.async.RescheduleCalculator; -import org.apache.aurora.scheduler.async.preemptor.ClusterStateImpl; import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; @@ -62,6 +59,9 @@ import org.apache.aurora.scheduler.mesos.DriverFactory; import org.apache.aurora.scheduler.mesos.DriverSettings; import org.apache.aurora.scheduler.mesos.ExecutorSettings; import org.apache.aurora.scheduler.mesos.MesosSchedulerImpl; +import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.preemptor.ClusterStateImpl; +import org.apache.aurora.scheduler.scheduling.RescheduleCalculator; import org.apache.aurora.scheduler.state.StateModule; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.db.DbUtil; http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java index 45849b5..f413301 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java +++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java @@ -17,9 +17,9 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.async.OfferManager; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.offers.OfferManager; import org.apache.aurora.scheduler.state.TaskAssigner; import org.apache.mesos.Protos; http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeRescheduleCalculator.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeRescheduleCalculator.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeRescheduleCalculator.java index 6d71012..4af2339 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeRescheduleCalculator.java +++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeRescheduleCalculator.java @@ -13,7 +13,7 @@ */ package org.apache.aurora.benchmark.fakes; -import org.apache.aurora.scheduler.async.RescheduleCalculator; +import org.apache.aurora.scheduler.scheduling.RescheduleCalculator; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; public class FakeRescheduleCalculator implements RescheduleCalculator { http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java index 1a158b4..ecadb3e 100644 --- a/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java +++ b/src/main/java/org/apache/aurora/scheduler/ResourceSlot.java @@ -24,9 +24,9 @@ import com.google.common.collect.Ordering; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Data; -import org.apache.aurora.scheduler.async.preemptor.PreemptionVictim; import org.apache.aurora.scheduler.configuration.Resources; import org.apache.aurora.scheduler.mesos.ExecutorSettings; +import org.apache.aurora.scheduler.preemptor.PreemptionVictim; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import org.apache.mesos.Protos; http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/app/AppModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java index d2c1720..4cc1127 100644 --- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java +++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java @@ -47,13 +47,17 @@ import org.apache.aurora.gen.ServerInfo; import org.apache.aurora.scheduler.SchedulerModule; import org.apache.aurora.scheduler.SchedulerServicesModule; import org.apache.aurora.scheduler.async.AsyncModule; -import org.apache.aurora.scheduler.async.preemptor.PreemptorModule; import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.aurora.scheduler.filter.SchedulingFilterImpl; import org.apache.aurora.scheduler.http.JettyServerModule; import org.apache.aurora.scheduler.mesos.SchedulerDriverModule; import org.apache.aurora.scheduler.metadata.MetadataModule; +import org.apache.aurora.scheduler.offers.OffersModule; +import org.apache.aurora.scheduler.preemptor.PreemptorModule; +import org.apache.aurora.scheduler.pruning.PruningModule; import org.apache.aurora.scheduler.quota.QuotaModule; +import org.apache.aurora.scheduler.reconciliation.ReconciliationModule; +import org.apache.aurora.scheduler.scheduling.SchedulingModule; import org.apache.aurora.scheduler.sla.SlaModule; import org.apache.aurora.scheduler.state.StateModule; import org.apache.aurora.scheduler.stats.AsyncStatsModule; @@ -115,6 +119,10 @@ public class AppModule extends AbstractModule { LifecycleModule.bindStartupAction(binder(), RegisterShutdownStackPrinter.class); install(new AsyncModule()); + install(new OffersModule()); + install(new PruningModule()); + install(new ReconciliationModule()); + install(new SchedulingModule()); install(new AsyncStatsModule()); install(new MetadataModule()); install(new QuotaModule()); http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java index 8c2d751..c345c92 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java +++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java @@ -15,45 +15,24 @@ package org.apache.aurora.scheduler.async; import java.lang.annotation.Retention; import java.lang.annotation.Target; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.logging.Logger; import javax.inject.Inject; import javax.inject.Qualifier; -import javax.inject.Singleton; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import com.google.common.util.concurrent.AbstractIdleService; -import com.google.common.util.concurrent.RateLimiter; import com.google.inject.AbstractModule; -import com.google.inject.PrivateModule; -import com.google.inject.TypeLiteral; import com.twitter.common.args.Arg; import com.twitter.common.args.CmdLine; -import com.twitter.common.args.constraints.NotNegative; -import com.twitter.common.args.constraints.Positive; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; import com.twitter.common.stats.StatsProvider; -import com.twitter.common.util.BackoffStrategy; -import com.twitter.common.util.Random; -import com.twitter.common.util.TruncatedBinaryBackoff; import org.apache.aurora.scheduler.SchedulerServicesModule; -import org.apache.aurora.scheduler.async.OfferManager.OfferManagerImpl; -import org.apache.aurora.scheduler.async.OfferManager.OfferReturnDelay; -import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl; -import org.apache.aurora.scheduler.async.TaskGroups.TaskGroupsSettings; -import org.apache.aurora.scheduler.async.TaskHistoryPruner.HistoryPrunnerSettings; -import org.apache.aurora.scheduler.async.TaskReconciler.TaskReconcilerSettings; -import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl; -import org.apache.aurora.scheduler.async.preemptor.BiCache; -import org.apache.aurora.scheduler.async.preemptor.BiCache.BiCacheSettings; import org.apache.aurora.scheduler.base.AsyncUtil; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.events.PubsubEventModule; import static java.lang.annotation.ElementType.FIELD; import static java.lang.annotation.ElementType.METHOD; @@ -65,141 +44,15 @@ import static java.util.Objects.requireNonNull; * Binding module for async task management. */ public class AsyncModule extends AbstractModule { - private static final Logger LOG = Logger.getLogger(AsyncModule.class.getName()); @CmdLine(name = "async_worker_threads", help = "The number of worker threads to process async task operations with.") private static final Arg<Integer> ASYNC_WORKER_THREADS = Arg.create(1); - @CmdLine(name = "transient_task_state_timeout", - help = "The amount of time after which to treat a task stuck in a transient state as LOST.") - private static final Arg<Amount<Long, Time>> TRANSIENT_TASK_STATE_TIMEOUT = - Arg.create(Amount.of(5L, Time.MINUTES)); - - @Positive - @CmdLine(name = "first_schedule_delay", - help = "Initial amount of time to wait before first attempting to schedule a PENDING task.") - private static final Arg<Amount<Long, Time>> FIRST_SCHEDULE_DELAY = - Arg.create(Amount.of(1L, Time.MILLISECONDS)); - - @Positive - @CmdLine(name = "initial_schedule_penalty", - help = "Initial amount of time to wait before attempting to schedule a task that has failed" - + " to schedule.") - private static final Arg<Amount<Long, Time>> INITIAL_SCHEDULE_PENALTY = - Arg.create(Amount.of(1L, Time.SECONDS)); - - @CmdLine(name = "max_schedule_penalty", - help = "Maximum delay between attempts to schedule a PENDING tasks.") - private static final Arg<Amount<Long, Time>> MAX_SCHEDULE_PENALTY = - Arg.create(Amount.of(1L, Time.MINUTES)); - - @CmdLine(name = "min_offer_hold_time", - help = "Minimum amount of time to hold a resource offer before declining.") - @NotNegative - private static final Arg<Amount<Integer, Time>> MIN_OFFER_HOLD_TIME = - Arg.create(Amount.of(5, Time.MINUTES)); - - @CmdLine(name = "offer_hold_jitter_window", - help = "Maximum amount of random jitter to add to the offer hold time window.") - @NotNegative - private static final Arg<Amount<Integer, Time>> OFFER_HOLD_JITTER_WINDOW = - Arg.create(Amount.of(1, Time.MINUTES)); - - @CmdLine(name = "history_prune_threshold", - help = "Time after which the scheduler will prune terminated task history.") - private static final Arg<Amount<Long, Time>> HISTORY_PRUNE_THRESHOLD = - Arg.create(Amount.of(2L, Time.DAYS)); - - @CmdLine(name = "history_max_per_job_threshold", - help = "Maximum number of terminated tasks to retain in a job history.") - private static final Arg<Integer> HISTORY_MAX_PER_JOB_THRESHOLD = Arg.create(100); - - @CmdLine(name = "history_min_retention_threshold", - help = "Minimum guaranteed time for task history retention before any pruning is attempted.") - private static final Arg<Amount<Long, Time>> HISTORY_MIN_RETENTION_THRESHOLD = - Arg.create(Amount.of(1L, Time.HOURS)); - - @CmdLine(name = "max_schedule_attempts_per_sec", - help = "Maximum number of scheduling attempts to make per second.") - private static final Arg<Double> MAX_SCHEDULE_ATTEMPTS_PER_SEC = Arg.create(40D); - - @CmdLine(name = "flapping_task_threshold", - help = "A task that repeatedly runs for less than this time is considered to be flapping.") - private static final Arg<Amount<Long, Time>> FLAPPING_THRESHOLD = - Arg.create(Amount.of(5L, Time.MINUTES)); - - @CmdLine(name = "initial_flapping_task_delay", - help = "Initial amount of time to wait before attempting to schedule a flapping task.") - private static final Arg<Amount<Long, Time>> INITIAL_FLAPPING_DELAY = - Arg.create(Amount.of(30L, Time.SECONDS)); - - @CmdLine(name = "max_flapping_task_delay", - help = "Maximum delay between attempts to schedule a flapping task.") - private static final Arg<Amount<Long, Time>> MAX_FLAPPING_DELAY = - Arg.create(Amount.of(5L, Time.MINUTES)); - - @CmdLine(name = "max_reschedule_task_delay_on_startup", - help = "Upper bound of random delay for pending task rescheduling on scheduler startup.") - private static final Arg<Amount<Integer, Time>> MAX_RESCHEDULING_DELAY = - Arg.create(Amount.of(30, Time.SECONDS)); - - @CmdLine(name = "job_update_history_per_job_threshold", - help = "Maximum number of completed job updates to retain in a job update history.") - private static final Arg<Integer> JOB_UPDATE_HISTORY_PER_JOB_THRESHOLD = Arg.create(10); - - @CmdLine(name = "job_update_history_pruning_interval", - help = "Job update history pruning interval.") - private static final Arg<Amount<Long, Time>> JOB_UPDATE_HISTORY_PRUNING_INTERVAL = - Arg.create(Amount.of(15L, Time.MINUTES)); - - @CmdLine(name = "job_update_history_pruning_threshold", - help = "Time after which the scheduler will prune completed job update history.") - private static final Arg<Amount<Long, Time>> JOB_UPDATE_HISTORY_PRUNING_THRESHOLD = - Arg.create(Amount.of(30L, Time.DAYS)); - - @CmdLine(name = "initial_task_kill_retry_interval", - help = "When killing a task, retry after this delay if mesos has not responded," - + " backing off up to transient_task_state_timeout") - private static final Arg<Amount<Long, Time>> INITIAL_TASK_KILL_RETRY_INTERVAL = - Arg.create(Amount.of(5L, Time.SECONDS)); - - @CmdLine(name = "offer_reservation_duration", help = "Time to reserve a slave's offers while " - + "trying to satisfy a task preempting another.") - private static final Arg<Amount<Long, Time>> RESERVATION_DURATION = - Arg.create(Amount.of(3L, Time.MINUTES)); - - // Reconciliation may create a big surge of status updates in a large cluster. Setting the default - // initial delay to 1 minute to ease up storage contention during scheduler start up. - @CmdLine(name = "reconciliation_initial_delay", - help = "Initial amount of time to delay task reconciliation after scheduler start up.") - private static final Arg<Amount<Long, Time>> RECONCILIATION_INITIAL_DELAY = - Arg.create(Amount.of(1L, Time.MINUTES)); - - @Positive - @CmdLine(name = "reconciliation_explicit_interval", - help = "Interval on which scheduler will ask Mesos for status updates of all non-terminal " - + "tasks known to scheduler.") - private static final Arg<Amount<Long, Time>> RECONCILIATION_EXPLICIT_INTERVAL = - Arg.create(Amount.of(60L, Time.MINUTES)); - - @Positive - @CmdLine(name = "reconciliation_implicit_interval", - help = "Interval on which scheduler will ask Mesos for status updates of all non-terminal " - + "tasks known to Mesos.") - private static final Arg<Amount<Long, Time>> RECONCILIATION_IMPLICIT_INTERVAL = - Arg.create(Amount.of(60L, Time.MINUTES)); - - @CmdLine(name = "reconciliation_schedule_spread", - help = "Difference between explicit and implicit reconciliation intervals intended to " - + "create a non-overlapping task reconciliation schedule.") - private static final Arg<Amount<Long, Time>> RECONCILIATION_SCHEDULE_SPREAD = - Arg.create(Amount.of(30L, Time.MINUTES)); - @Qualifier @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) - private @interface AsyncExecutor { } + public @interface AsyncExecutor { } @VisibleForTesting static final String TIMEOUT_QUEUE_GAUGE = "timeout_queue_size"; @@ -213,152 +66,9 @@ public class AsyncModule extends AbstractModule { final ScheduledThreadPoolExecutor executor = AsyncUtil.loggingScheduledExecutor(ASYNC_WORKER_THREADS.get(), "AsyncProcessor-%d", LOG); bind(ScheduledThreadPoolExecutor.class).annotatedWith(AsyncExecutor.class).toInstance(executor); + bind(ScheduledExecutorService.class).annotatedWith(AsyncExecutor.class).toInstance(executor); + bind(ExecutorService.class).annotatedWith(AsyncExecutor.class).toInstance(executor); SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterGauges.class); - - // AsyncModule itself is not a subclass of PrivateModule because TaskEventModule internally uses - // a MultiBinder, which cannot span multiple injectors. - install(new PrivateModule() { - @Override - protected void configure() { - bind(new TypeLiteral<Amount<Long, Time>>() { }) - .toInstance(TRANSIENT_TASK_STATE_TIMEOUT.get()); - bind(ScheduledExecutorService.class).toInstance(executor); - - bind(TaskTimeout.class).in(Singleton.class); - expose(TaskTimeout.class); - } - }); - PubsubEventModule.bindSubscriber(binder(), TaskTimeout.class); - SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskTimeout.class); - - install(new PrivateModule() { - @Override - protected void configure() { - bind(TaskGroupsSettings.class).toInstance(new TaskGroupsSettings( - FIRST_SCHEDULE_DELAY.get(), - new TruncatedBinaryBackoff( - INITIAL_SCHEDULE_PENALTY.get(), - MAX_SCHEDULE_PENALTY.get()), - RateLimiter.create(MAX_SCHEDULE_ATTEMPTS_PER_SEC.get()))); - - bind(RescheduleCalculatorImpl.RescheduleCalculatorSettings.class) - .toInstance(new RescheduleCalculatorImpl.RescheduleCalculatorSettings( - new TruncatedBinaryBackoff(INITIAL_FLAPPING_DELAY.get(), MAX_FLAPPING_DELAY.get()), - FLAPPING_THRESHOLD.get(), - MAX_RESCHEDULING_DELAY.get())); - - bind(RescheduleCalculator.class).to(RescheduleCalculatorImpl.class).in(Singleton.class); - expose(RescheduleCalculator.class); - bind(TaskGroups.class).in(Singleton.class); - expose(TaskGroups.class); - } - }); - PubsubEventModule.bindSubscriber(binder(), TaskGroups.class); - - install(new PrivateModule() { - @Override - protected void configure() { - bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).in(Singleton.class); - bind(BiCacheSettings.class).toInstance( - new BiCacheSettings(RESERVATION_DURATION.get(), "reservation_cache_size")); - bind(TaskScheduler.class).to(TaskSchedulerImpl.class); - bind(TaskSchedulerImpl.class).in(Singleton.class); - expose(TaskScheduler.class); - } - }); - PubsubEventModule.bindSubscriber(binder(), TaskScheduler.class); - - install(new PrivateModule() { - @Override - protected void configure() { - bind(OfferReturnDelay.class).toInstance( - new RandomJitterReturnDelay( - MIN_OFFER_HOLD_TIME.get().as(Time.MILLISECONDS), - OFFER_HOLD_JITTER_WINDOW.get().as(Time.MILLISECONDS), - new Random.SystemRandom(new java.util.Random()))); - bind(ScheduledExecutorService.class).toInstance(executor); - bind(OfferManager.class).to(OfferManagerImpl.class); - bind(OfferManagerImpl.class).in(Singleton.class); - expose(OfferManager.class); - } - }); - PubsubEventModule.bindSubscriber(binder(), OfferManager.class); - - install(new PrivateModule() { - @Override - protected void configure() { - // TODO(ksweeney): Create a configuration validator module so this can be injected. - // TODO(William Farner): Revert this once large task counts is cheap ala hierarchichal store - bind(HistoryPrunnerSettings.class).toInstance(new HistoryPrunnerSettings( - HISTORY_PRUNE_THRESHOLD.get(), - HISTORY_MIN_RETENTION_THRESHOLD.get(), - HISTORY_MAX_PER_JOB_THRESHOLD.get() - )); - bind(ScheduledExecutorService.class).toInstance(executor); - - bind(TaskHistoryPruner.class).in(Singleton.class); - expose(TaskHistoryPruner.class); - } - }); - PubsubEventModule.bindSubscriber(binder(), TaskHistoryPruner.class); - - install(new PrivateModule() { - @Override - protected void configure() { - bind(ScheduledExecutorService.class).toInstance(executor); - bind(TaskThrottler.class).in(Singleton.class); - expose(TaskThrottler.class); - } - }); - PubsubEventModule.bindSubscriber(binder(), TaskThrottler.class); - - install(new PrivateModule() { - @Override - protected void configure() { - bind(TaskReconcilerSettings.class).toInstance(new TaskReconcilerSettings( - RECONCILIATION_INITIAL_DELAY.get(), - RECONCILIATION_EXPLICIT_INTERVAL.get(), - RECONCILIATION_IMPLICIT_INTERVAL.get(), - RECONCILIATION_SCHEDULE_SPREAD.get())); - bind(ScheduledExecutorService.class).toInstance(executor); - bind(TaskReconciler.class).in(Singleton.class); - expose(TaskReconciler.class); - } - }); - SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskReconciler.class); - - install(new PrivateModule() { - @Override - protected void configure() { - bind(JobUpdateHistoryPruner.HistoryPrunerSettings.class).toInstance( - new JobUpdateHistoryPruner.HistoryPrunerSettings( - JOB_UPDATE_HISTORY_PRUNING_INTERVAL.get(), - JOB_UPDATE_HISTORY_PRUNING_THRESHOLD.get(), - JOB_UPDATE_HISTORY_PER_JOB_THRESHOLD.get())); - - bind(ScheduledExecutorService.class).toInstance( - AsyncUtil.singleThreadLoggingScheduledExecutor("JobUpdatePruner-%d", LOG)); - - bind(JobUpdateHistoryPruner.class).in(Singleton.class); - expose(JobUpdateHistoryPruner.class); - } - }); - SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()) - .to(JobUpdateHistoryPruner.class); - - install(new PrivateModule() { - @Override - protected void configure() { - bind(ScheduledExecutorService.class).toInstance(executor); - bind(BackoffStrategy.class).toInstance( - new TruncatedBinaryBackoff( - INITIAL_TASK_KILL_RETRY_INTERVAL.get(), - TRANSIENT_TASK_STATE_TIMEOUT.get())); - bind(KillRetry.class).in(Singleton.class); - expose(KillRetry.class); - } - }); - PubsubEventModule.bindSubscriber(binder(), KillRetry.class); } static class RegisterGauges extends AbstractIdleService { http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java deleted file mode 100644 index b416343..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; - -import javax.inject.Inject; - -import com.google.common.base.Joiner; -import com.google.common.util.concurrent.AbstractIdleService; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.util.Clock; - -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.Storage.MutateWork; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; - -import static java.util.Objects.requireNonNull; - -/** - * Prunes per-job update history on a periodic basis. - */ -class JobUpdateHistoryPruner extends AbstractIdleService { - private static final Logger LOG = Logger.getLogger(JobUpdateHistoryPruner.class.getName()); - - private final Clock clock; - private final ScheduledExecutorService executor; - private final Storage storage; - private final HistoryPrunerSettings settings; - - static class HistoryPrunerSettings { - private final Amount<Long, Time> pruneInterval; - private final Amount<Long, Time> maxHistorySize; - private final int maxUpdatesPerJob; - - HistoryPrunerSettings( - Amount<Long, Time> pruneInterval, - Amount<Long, Time> maxHistorySize, - int maxUpdatesPerJob) { - - this.pruneInterval = requireNonNull(pruneInterval); - this.maxHistorySize = requireNonNull(maxHistorySize); - this.maxUpdatesPerJob = maxUpdatesPerJob; - } - } - - @Inject - JobUpdateHistoryPruner( - Clock clock, - ScheduledExecutorService executor, - Storage storage, - HistoryPrunerSettings settings) { - - this.clock = requireNonNull(clock); - this.executor = requireNonNull(executor); - this.storage = requireNonNull(storage); - this.settings = requireNonNull(settings); - } - - @Override - protected void startUp() { - executor.scheduleAtFixedRate( - new Runnable() { - @Override - public void run() { - storage.write(new MutateWork.NoResult.Quiet() { - @Override - public void execute(MutableStoreProvider storeProvider) { - Set<IJobUpdateKey> prunedUpdates = storeProvider.getJobUpdateStore().pruneHistory( - settings.maxUpdatesPerJob, - clock.nowMillis() - settings.maxHistorySize.as(Time.MILLISECONDS)); - - LOG.info(prunedUpdates.isEmpty() - ? "No job update history to prune." - : "Pruned job update history: " + Joiner.on(",").join(prunedUpdates)); - } - }); - } - }, - settings.pruneInterval.as(Time.MILLISECONDS), - settings.pruneInterval.as(Time.MILLISECONDS), - TimeUnit.MILLISECONDS); - } - - @Override - protected void shutDown() { - // Nothing to do - await VM shutdown. - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java b/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java deleted file mode 100644 index b125c1c..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/KillRetry.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Logger; - -import javax.inject.Inject; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterables; -import com.google.common.eventbus.Subscribe; -import com.twitter.common.stats.StatsProvider; -import com.twitter.common.util.BackoffStrategy; - -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.mesos.Driver; -import org.apache.aurora.scheduler.storage.Storage; - -import static java.util.Objects.requireNonNull; - -/** - * Watches for task transitions into {@link ScheduleStatus#KILLING KILLING} and periodically - * retries {@link Driver#killTask(String)} until the task transitions. - */ -public class KillRetry implements EventSubscriber { - private static final Logger LOG = Logger.getLogger(KillRetry.class.getName()); - - @VisibleForTesting - static final String RETRIES_COUNTER = "task_kill_retries"; - - private final Driver driver; - private final Storage storage; - private final ScheduledExecutorService executor; - private final BackoffStrategy backoffStrategy; - private final AtomicLong killRetries; - - @Inject - KillRetry( - Driver driver, - Storage storage, - ScheduledExecutorService executor, - BackoffStrategy backoffStrategy, - StatsProvider statsProvider) { - - this.driver = requireNonNull(driver); - this.storage = requireNonNull(storage); - this.executor = requireNonNull(executor); - this.backoffStrategy = requireNonNull(backoffStrategy); - killRetries = statsProvider.makeCounter(RETRIES_COUNTER); - } - - @Subscribe - public void taskChangedState(TaskStateChange stateChange) { - if (stateChange.getNewState() == ScheduleStatus.KILLING) { - new KillAttempt(stateChange.getTaskId()).tryLater(); - } - } - - private class KillAttempt implements Runnable { - private final String taskId; - private final AtomicLong retryInMs = new AtomicLong(); - - KillAttempt(String taskId) { - this.taskId = taskId; - } - - void tryLater() { - retryInMs.set(backoffStrategy.calculateBackoffMs(retryInMs.get())); - executor.schedule(this, retryInMs.get(), TimeUnit.MILLISECONDS); - } - - @Override - public void run() { - Query.Builder query = Query.taskScoped(taskId).byStatus(ScheduleStatus.KILLING); - if (!Iterables.isEmpty(Storage.Util.fetchTasks(storage, query))) { - LOG.info("Task " + taskId + " not yet killed, retrying."); - - // Kill did not yet take effect, try again. - driver.killTask(taskId); - killRetries.incrementAndGet(); - tryLater(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java deleted file mode 100644 index e60d01e..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/OfferManager.java +++ /dev/null @@ -1,406 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.Comparator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.inject.Inject; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Supplier; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; -import com.google.common.collect.Ordering; -import com.google.common.eventbus.Subscribe; -import com.twitter.common.inject.TimedInterceptor.Timed; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.stats.Stats; - -import org.apache.aurora.gen.MaintenanceMode; -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; -import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; -import org.apache.aurora.scheduler.mesos.Driver; -import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.mesos.Protos.OfferID; -import org.apache.mesos.Protos.SlaveID; - -import static java.util.Objects.requireNonNull; - -import static org.apache.aurora.gen.MaintenanceMode.DRAINED; -import static org.apache.aurora.gen.MaintenanceMode.DRAINING; -import static org.apache.aurora.gen.MaintenanceMode.NONE; -import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED; -import static org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged; - -/** - * Tracks the Offers currently known by the scheduler. - */ -public interface OfferManager extends EventSubscriber { - - /** - * Notifies the scheduler of a new resource offer. - * - * @param offer Newly-available resource offer. - */ - void addOffer(HostOffer offer); - - /** - * Invalidates an offer. This indicates that the scheduler should not attempt to match any - * tasks against the offer. - * - * @param offer Canceled offer. - */ - void cancelOffer(OfferID offer); - - /** - * Launches the first task that satisfies the {@code acceptor} by returning a {@link Assignment}. - * - * @param acceptor Function that determines if an offer is accepted. - * @param groupKey Task group key. - * @return {@code true} if the task was launched, {@code false} if no offers satisfied the - * {@code acceptor}. - * @throws LaunchException If the acceptor accepted an offer, but there was an error launching the - * task. - */ - boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey groupKey) - throws LaunchException; - - /** - * Notifies the offer queue that a host's attributes have changed. - * - * @param change State change notification. - */ - void hostAttributesChanged(HostAttributesChanged change); - - /** - * Gets the offers that the scheduler is holding. - * - * @return A snapshot of the offers that the scheduler is currently holding. - */ - Iterable<HostOffer> getOffers(); - - /** - * Gets an offer for the given slave ID. - * - * @param slaveId Slave ID to get offer for. - * @return An offer for the slave ID. - */ - Optional<HostOffer> getOffer(SlaveID slaveId); - - /** - * Calculates the amount of time before an offer should be 'returned' by declining it. - * The delay is calculated for each offer that is received, so the return delay may be - * fixed or variable. - */ - interface OfferReturnDelay extends Supplier<Amount<Long, Time>> { - } - - /** - * Thrown when there was an unexpected failure trying to launch a task. - */ - class LaunchException extends Exception { - LaunchException(String msg) { - super(msg); - } - - LaunchException(String msg, Throwable cause) { - super(msg, cause); - } - } - - class OfferManagerImpl implements OfferManager { - @VisibleForTesting - static final Logger LOG = Logger.getLogger(OfferManagerImpl.class.getName()); - - private final HostOffers hostOffers = new HostOffers(); - private final AtomicLong offerRaces = Stats.exportLong("offer_accept_races"); - - private final Driver driver; - private final OfferReturnDelay returnDelay; - private final ScheduledExecutorService executor; - - @Inject - OfferManagerImpl( - Driver driver, - OfferReturnDelay returnDelay, - ScheduledExecutorService executor) { - - this.driver = requireNonNull(driver); - this.returnDelay = requireNonNull(returnDelay); - this.executor = requireNonNull(executor); - } - - @Override - public void addOffer(final HostOffer offer) { - // We run a slight risk of a race here, which is acceptable. The worst case is that we - // temporarily hold two offers for the same host, which should be corrected when we return - // them after the return delay. - // There's also a chance that we return an offer for compaction ~simultaneously with the - // same-host offer being canceled/returned. This is also fine. - Optional<HostOffer> sameSlave = hostOffers.get(offer.getOffer().getSlaveId()); - if (sameSlave.isPresent()) { - // If there are existing offers for the slave, decline all of them so the master can - // compact all of those offers into a single offer and send them back. - LOG.info("Returning offers for " + offer.getOffer().getSlaveId().getValue() - + " for compaction."); - decline(offer.getOffer().getId()); - removeAndDecline(sameSlave.get().getOffer().getId()); - } else { - hostOffers.add(offer); - executor.schedule( - new Runnable() { - @Override - public void run() { - removeAndDecline(offer.getOffer().getId()); - } - }, - returnDelay.get().as(Time.MILLISECONDS), - TimeUnit.MILLISECONDS); - } - } - - void removeAndDecline(OfferID id) { - if (removeFromHostOffers(id)) { - decline(id); - } - } - - void decline(OfferID id) { - LOG.fine("Declining offer " + id); - driver.declineOffer(id); - } - - @Override - public void cancelOffer(final OfferID offerId) { - removeFromHostOffers(offerId); - } - - private boolean removeFromHostOffers(final OfferID offerId) { - requireNonNull(offerId); - - // The small risk of inconsistency is acceptable here - if we have an accept/remove race - // on an offer, the master will mark the task as LOST and it will be retried. - return hostOffers.remove(offerId); - } - - @Override - public Iterable<HostOffer> getOffers() { - return hostOffers.getWeaklyConsistentOffers(); - } - - @Override - public Optional<HostOffer> getOffer(SlaveID slaveId) { - return hostOffers.get(slaveId); - } - - /** - * Updates the preference of a host's offers. - * - * @param change Host change notification. - */ - @Subscribe - public void hostAttributesChanged(HostAttributesChanged change) { - hostOffers.updateHostAttributes(change.getAttributes()); - } - - /** - * Notifies the queue that the driver is disconnected, and all the stored offers are now - * invalid. - * <p> - * The queue takes this as a signal to flush its queue. - * - * @param event Disconnected event. - */ - @Subscribe - public void driverDisconnected(DriverDisconnected event) { - LOG.info("Clearing stale offers since the driver is disconnected."); - hostOffers.clear(); - } - - /** - * A container for the data structures used by this class, to make it easier to reason about - * the different indices used and their consistency. - */ - private static class HostOffers { - private static final Comparator<HostOffer> PREFERENCE_COMPARATOR = - // Currently, the only preference is based on host maintenance status. - Ordering.explicit(NONE, SCHEDULED, DRAINING, DRAINED) - .onResultOf(new Function<HostOffer, MaintenanceMode>() { - @Override - public MaintenanceMode apply(HostOffer offer) { - return offer.getAttributes().getMode(); - } - }) - .compound(Ordering.arbitrary()); - - private final Set<HostOffer> offers = new ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR); - private final Map<OfferID, HostOffer> offersById = Maps.newHashMap(); - private final Map<SlaveID, HostOffer> offersBySlave = Maps.newHashMap(); - private final Map<String, HostOffer> offersByHost = Maps.newHashMap(); - // TODO(maxim): Expose via a debug endpoint. AURORA-1136. - // Keep track of offer->groupKey mappings that will never be matched to avoid redundant - // scheduling attempts. See Assignment.Result for more details on static ban. - private final Multimap<OfferID, TaskGroupKey> staticallyBannedOffers = HashMultimap.create(); - - HostOffers() { - // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive. - // Could track this separately if it turns out to pose problems. - Stats.exportSize("outstanding_offers", offers); - } - - synchronized Optional<HostOffer> get(SlaveID slaveId) { - return Optional.fromNullable(offersBySlave.get(slaveId)); - } - - synchronized void add(HostOffer offer) { - offers.add(offer); - offersById.put(offer.getOffer().getId(), offer); - offersBySlave.put(offer.getOffer().getSlaveId(), offer); - offersByHost.put(offer.getOffer().getHostname(), offer); - } - - synchronized boolean remove(OfferID id) { - HostOffer removed = offersById.remove(id); - if (removed != null) { - offers.remove(removed); - offersBySlave.remove(removed.getOffer().getSlaveId()); - offersByHost.remove(removed.getOffer().getHostname()); - staticallyBannedOffers.removeAll(id); - } - return removed != null; - } - - synchronized void updateHostAttributes(IHostAttributes attributes) { - HostOffer offer = offersByHost.remove(attributes.getHost()); - if (offer != null) { - // Remove and re-add a host's offer to re-sort based on its new hostStatus - remove(offer.getOffer().getId()); - add(new HostOffer(offer.getOffer(), attributes)); - } - } - - synchronized Iterable<HostOffer> getWeaklyConsistentOffers() { - return Iterables.unmodifiableIterable(offers); - } - - synchronized boolean isStaticallyBanned(HostOffer offer, TaskGroupKey groupKey) { - boolean result = staticallyBannedOffers.containsEntry(offer.getOffer().getId(), groupKey); - if (LOG.isLoggable(Level.FINE)) { - LOG.fine(String.format( - "Host offer %s is statically banned for %s: %s", - offer, - groupKey, - result)); - } - return result; - } - - synchronized void addStaticGroupBan(HostOffer offer, TaskGroupKey groupKey) { - OfferID offerId = offer.getOffer().getId(); - if (offersById.containsKey(offerId)) { - staticallyBannedOffers.put(offerId, groupKey); - - if (LOG.isLoggable(Level.FINE)) { - LOG.fine( - String.format("Adding static ban for offer: %s, groupKey: %s", offer, groupKey)); - } - } - } - - synchronized void clear() { - offers.clear(); - offersById.clear(); - offersBySlave.clear(); - offersByHost.clear(); - staticallyBannedOffers.clear(); - } - } - - @Timed("offer_queue_launch_first") - @Override - public boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey groupKey) - throws LaunchException { - - // It's important that this method is not called concurrently - doing so would open up the - // possibility of a race between the same offers being accepted by different threads. - - for (HostOffer offer : hostOffers.getWeaklyConsistentOffers()) { - if (!hostOffers.isStaticallyBanned(offer, groupKey) - && acceptOffer(offer, acceptor, groupKey)) { - return true; - } - } - - return false; - } - - @Timed("offer_queue_accept_offer") - protected boolean acceptOffer( - HostOffer offer, - Function<HostOffer, Assignment> acceptor, - TaskGroupKey groupKey) throws LaunchException { - - Assignment assignment = acceptor.apply(offer); - switch (assignment.getResult()) { - - case SUCCESS: - // Guard against an offer being removed after we grabbed it from the iterator. - // If that happens, the offer will not exist in hostOffers, and we can immediately - // send it back to LOST for quick reschedule. - // Removing while iterating counts on the use of a weakly-consistent iterator being used, - // which is a feature of ConcurrentSkipListSet. - if (hostOffers.remove(offer.getOffer().getId())) { - try { - driver.launchTask(offer.getOffer().getId(), assignment.getTaskInfo().get()); - return true; - } catch (IllegalStateException e) { - // TODO(William Farner): Catch only the checked exception produced by Driver - // once it changes from throwing IllegalStateException when the driver is not yet - // registered. - throw new LaunchException("Failed to launch task.", e); - } - } else { - offerRaces.incrementAndGet(); - throw new LaunchException( - "Accepted offer no longer exists in offer queue, likely data race."); - } - - case FAILURE_STATIC_MISMATCH: - // Exclude an offer that results in a static mismatch from further attempts to match - // against all tasks from the same group. - hostOffers.addStaticGroupBan(offer, groupKey); - return false; - - default: - return false; - } - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java b/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java deleted file mode 100644 index 6a8c967..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.Objects; - -import com.google.common.annotations.VisibleForTesting; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.util.Random; - -import org.apache.aurora.scheduler.async.OfferManager.OfferReturnDelay; - -import static com.google.common.base.Preconditions.checkArgument; - -/** - * Returns offers after a random duration within a fixed window. - */ -@VisibleForTesting -class RandomJitterReturnDelay implements OfferReturnDelay { - private final int minHoldTimeMs; - private final int maxJitterWindowMs; - private final Random random; - - RandomJitterReturnDelay(int minHoldTimeMs, int maxJitterWindowMs, Random random) { - checkArgument(minHoldTimeMs >= 0); - checkArgument(maxJitterWindowMs >= 0); - - this.minHoldTimeMs = minHoldTimeMs; - this.maxJitterWindowMs = maxJitterWindowMs; - this.random = Objects.requireNonNull(random); - } - - @Override - public Amount<Long, Time> get() { - return Amount.of((long) minHoldTimeMs + random.nextInt(maxJitterWindowMs), Time.MILLISECONDS); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java deleted file mode 100644 index 6a0c0a9..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.EnumSet; -import java.util.List; -import java.util.Set; -import java.util.logging.Logger; - -import javax.inject.Inject; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.util.BackoffStrategy; -import com.twitter.common.util.Random; - -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.entities.ITaskEvent; - -import static java.util.Objects.requireNonNull; - -import static org.apache.aurora.gen.ScheduleStatus.DRAINING; -import static org.apache.aurora.gen.ScheduleStatus.KILLING; -import static org.apache.aurora.gen.ScheduleStatus.RESTARTING; - -/** - * Calculates scheduling delays for tasks. - */ -public interface RescheduleCalculator { - /** - * Calculates the delay, in milliseconds, before the task should be considered eligible for - * (re)scheduling at scheduler startup. - * - * @param task Task to calculate delay for. - * @return Delay in msec. - */ - long getStartupScheduleDelayMs(IScheduledTask task); - - /** - * Calculates the penalty, in milliseconds, that a task should be penalized before being - * eligible for rescheduling. - * - * @param task Task to calculate delay for. - * @return Delay in msec. - */ - long getFlappingPenaltyMs(IScheduledTask task); - - class RescheduleCalculatorImpl implements RescheduleCalculator { - - private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName()); - - private final Storage storage; - private final RescheduleCalculatorSettings settings; - // TODO(wfarner): Inject 'random' in the constructor for better test coverage. - private final Random random = new Random.SystemRandom(new java.util.Random()); - - private static final Predicate<ScheduleStatus> IS_ACTIVE_STATUS = - Predicates.in(Tasks.ACTIVE_STATES); - - private static final Set<ScheduleStatus> INTERRUPTED_TASK_STATES = - EnumSet.of(RESTARTING, KILLING, DRAINING); - - private final Predicate<IScheduledTask> flapped = new Predicate<IScheduledTask>() { - @Override - public boolean apply(IScheduledTask task) { - if (!task.isSetTaskEvents()) { - return false; - } - - List<ITaskEvent> events = Lists.reverse(task.getTaskEvents()); - - // Avoid penalizing tasks that were interrupted by outside action, such as a user - // restarting them. - if (Iterables.any(Iterables.transform(events, Tasks.TASK_EVENT_TO_STATUS), - Predicates.in(INTERRUPTED_TASK_STATES))) { - return false; - } - - ITaskEvent terminalEvent = Iterables.get(events, 0); - ScheduleStatus terminalState = terminalEvent.getStatus(); - Preconditions.checkState(Tasks.isTerminated(terminalState)); - - ITaskEvent activeEvent = Iterables.find( - events, - Predicates.compose(IS_ACTIVE_STATUS, Tasks.TASK_EVENT_TO_STATUS)); - - long thresholdMs = settings.flappingTaskThreashold.as(Time.MILLISECONDS); - - return (terminalEvent.getTimestamp() - activeEvent.getTimestamp()) < thresholdMs; - } - }; - - @VisibleForTesting - public static class RescheduleCalculatorSettings { - private final BackoffStrategy flappingTaskBackoff; - private final Amount<Long, Time> flappingTaskThreashold; - private final Amount<Integer, Time> maxStartupRescheduleDelay; - - public RescheduleCalculatorSettings( - BackoffStrategy flappingTaskBackoff, - Amount<Long, Time> flappingTaskThreashold, - Amount<Integer, Time> maxStartupRescheduleDelay) { - - this.flappingTaskBackoff = requireNonNull(flappingTaskBackoff); - this.flappingTaskThreashold = requireNonNull(flappingTaskThreashold); - this.maxStartupRescheduleDelay = requireNonNull(maxStartupRescheduleDelay); - } - } - - @Inject - RescheduleCalculatorImpl(Storage storage, RescheduleCalculatorSettings settings) { - this.storage = requireNonNull(storage); - this.settings = requireNonNull(settings); - } - - @Override - public long getStartupScheduleDelayMs(IScheduledTask task) { - return random.nextInt(settings.maxStartupRescheduleDelay.as(Time.MILLISECONDS).intValue()) - + getFlappingPenaltyMs(task); - } - - private Optional<IScheduledTask> getTaskAncestor(IScheduledTask task) { - if (!task.isSetAncestorId()) { - return Optional.absent(); - } - - Iterable<IScheduledTask> res = - Storage.Util.fetchTasks(storage, Query.taskScoped(task.getAncestorId())); - - return Optional.fromNullable(Iterables.getOnlyElement(res, null)); - } - - @Override - public long getFlappingPenaltyMs(IScheduledTask task) { - Optional<IScheduledTask> curTask = getTaskAncestor(task); - long penaltyMs = 0; - while (curTask.isPresent() && flapped.apply(curTask.get())) { - LOG.info( - String.format("Ancestor of %s flapped: %s", Tasks.id(task), Tasks.id(curTask.get()))); - long newPenalty = settings.flappingTaskBackoff.calculateBackoffMs(penaltyMs); - // If the backoff strategy is truncated then there is no need for us to continue. - if (newPenalty == penaltyMs) { - break; - } - penaltyMs = newPenalty; - curTask = getTaskAncestor(curTask.get()); - } - - return penaltyMs; - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java deleted file mode 100644 index 635419b..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroup.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.Queue; -import java.util.Set; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; - -import org.apache.aurora.scheduler.base.TaskGroupKey; - -/** - * A group of task IDs that are eligible for scheduling, but may be waiting for a backoff to expire. - */ -class TaskGroup { - private final TaskGroupKey key; - private long penaltyMs; - private final Queue<String> tasks; - - TaskGroup(TaskGroupKey key, String initialTaskId) { - this.key = key; - this.penaltyMs = 0; - this.tasks = Lists.newLinkedList(); - this.tasks.add(initialTaskId); - } - - synchronized TaskGroupKey getKey() { - return key; - } - - synchronized Optional<String> peek() { - return Optional.fromNullable(tasks.peek()); - } - - synchronized boolean hasMore() { - return !tasks.isEmpty(); - } - - synchronized void remove(String taskId) { - tasks.remove(taskId); - } - - synchronized void offer(String taskId) { - tasks.offer(taskId); - } - - synchronized void setPenaltyMs(long penaltyMs) { - this.penaltyMs = penaltyMs; - } - - // Begin methods used for debug interfaces. - - public synchronized String getName() { - return key.toString(); - } - - public synchronized Set<String> getTaskIds() { - return ImmutableSet.copyOf(tasks); - } - - public synchronized long getPenaltyMs() { - return penaltyMs; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java deleted file mode 100644 index 1580404..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/TaskGroups.java +++ /dev/null @@ -1,239 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.logging.Logger; - -import javax.inject.Inject; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.eventbus.Subscribe; -import com.google.common.util.concurrent.RateLimiter; -import com.twitter.common.application.ShutdownRegistry; -import com.twitter.common.base.Command; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.stats.SlidingStats; -import com.twitter.common.stats.Stats; -import com.twitter.common.util.BackoffStrategy; -import com.twitter.common.util.concurrent.ExecutorServiceShutdown; - -import org.apache.aurora.scheduler.base.AsyncUtil; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; - -import static java.util.Objects.requireNonNull; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -import static org.apache.aurora.gen.ScheduleStatus.PENDING; - -/** - * A collection of task groups, where a task group is a collection of tasks that are known to be - * equal in the way they schedule. This is expected to be tasks associated with the same job key, - * who also have {@code equal()} {@link org.apache.aurora.scheduler.storage.entities.ITaskConfig} - * values. - * <p> - * This is used to prevent redundant work in trying to schedule tasks as well as to provide - * nearly-equal responsiveness when scheduling across jobs. In other words, a 1000 instance job - * cannot starve a 1 instance job. - */ -public class TaskGroups implements EventSubscriber { - - private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName()); - - private final ConcurrentMap<TaskGroupKey, TaskGroup> groups = Maps.newConcurrentMap(); - private final ScheduledExecutorService executor; - private final TaskScheduler taskScheduler; - private final long firstScheduleDelay; - private final BackoffStrategy backoff; - private final RescheduleCalculator rescheduleCalculator; - - // Track the penalties of tasks at the time they were scheduled. This is to provide data that - // may influence the selection of a different backoff strategy. - private final SlidingStats scheduledTaskPenalties = - new SlidingStats("scheduled_task_penalty", "ms"); - - public static class TaskGroupsSettings { - private final Amount<Long, Time> firstScheduleDelay; - private final BackoffStrategy taskGroupBackoff; - private final RateLimiter rateLimiter; - - public TaskGroupsSettings( - Amount<Long, Time> firstScheduleDelay, - BackoffStrategy taskGroupBackoff, - RateLimiter rateLimiter) { - - this.firstScheduleDelay = requireNonNull(firstScheduleDelay); - this.taskGroupBackoff = requireNonNull(taskGroupBackoff); - this.rateLimiter = requireNonNull(rateLimiter); - } - } - - @Inject - TaskGroups( - ShutdownRegistry shutdownRegistry, - TaskGroupsSettings settings, - TaskScheduler taskScheduler, - RescheduleCalculator rescheduleCalculator) { - - this( - createThreadPool(shutdownRegistry), - settings.firstScheduleDelay, - settings.taskGroupBackoff, - settings.rateLimiter, - taskScheduler, - rescheduleCalculator); - } - - @VisibleForTesting - TaskGroups( - final ScheduledExecutorService executor, - final Amount<Long, Time> firstScheduleDelay, - final BackoffStrategy backoff, - final RateLimiter rateLimiter, - final TaskScheduler taskScheduler, - final RescheduleCalculator rescheduleCalculator) { - - requireNonNull(firstScheduleDelay); - Preconditions.checkArgument(firstScheduleDelay.getValue() > 0); - - this.executor = requireNonNull(executor); - requireNonNull(rateLimiter); - requireNonNull(taskScheduler); - this.firstScheduleDelay = firstScheduleDelay.as(Time.MILLISECONDS); - this.backoff = requireNonNull(backoff); - this.rescheduleCalculator = requireNonNull(rescheduleCalculator); - - this.taskScheduler = new TaskScheduler() { - @Override - public boolean schedule(String taskId) { - rateLimiter.acquire(); - return taskScheduler.schedule(taskId); - } - }; - } - - private synchronized void evaluateGroupLater(Runnable evaluate, TaskGroup group) { - // Avoid check-then-act by holding the intrinsic lock. If not done atomically, we could - // remove a group while a task is being added to it. - if (group.hasMore()) { - executor.schedule(evaluate, group.getPenaltyMs(), MILLISECONDS); - } else { - groups.remove(group.getKey()); - } - } - - private void startGroup(final TaskGroup group) { - Runnable monitor = new Runnable() { - @Override - public void run() { - Optional<String> taskId = group.peek(); - long penaltyMs = 0; - if (taskId.isPresent()) { - if (taskScheduler.schedule(taskId.get())) { - scheduledTaskPenalties.accumulate(group.getPenaltyMs()); - group.remove(taskId.get()); - if (group.hasMore()) { - penaltyMs = firstScheduleDelay; - } - } else { - penaltyMs = backoff.calculateBackoffMs(group.getPenaltyMs()); - } - } - - group.setPenaltyMs(penaltyMs); - evaluateGroupLater(this, group); - } - }; - evaluateGroupLater(monitor, group); - } - - private static ScheduledExecutorService createThreadPool(ShutdownRegistry shutdownRegistry) { - final ScheduledThreadPoolExecutor executor = - AsyncUtil.singleThreadLoggingScheduledExecutor("TaskScheduler-%d", LOG); - - Stats.exportSize("schedule_queue_size", executor.getQueue()); - shutdownRegistry.addAction(new Command() { - @Override - public void execute() { - new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute(); - } - }); - return executor; - } - - /** - * Informs the task groups of a task state change. - * <p> - * This is used to observe {@link org.apache.aurora.gen.ScheduleStatus#PENDING} tasks and begin - * attempting to schedule them. - * - * @param stateChange State change notification. - */ - @Subscribe - public synchronized void taskChangedState(TaskStateChange stateChange) { - if (stateChange.getNewState() == PENDING) { - IScheduledTask task = stateChange.getTask(); - TaskGroupKey key = TaskGroupKey.from(task.getAssignedTask().getTask()); - TaskGroup newGroup = new TaskGroup(key, Tasks.id(task)); - TaskGroup existing = groups.putIfAbsent(key, newGroup); - if (existing == null) { - long penaltyMs; - if (stateChange.isTransition()) { - penaltyMs = firstScheduleDelay; - } else { - penaltyMs = rescheduleCalculator.getStartupScheduleDelayMs(task); - } - newGroup.setPenaltyMs(penaltyMs); - startGroup(newGroup); - } else { - existing.offer(Tasks.id(task)); - } - } - } - - /** - * Signals the scheduler that tasks have been deleted. - * - * @param deleted Tasks deleted event. - */ - @Subscribe - public synchronized void tasksDeleted(TasksDeleted deleted) { - for (IAssignedTask task - : Iterables.transform(deleted.getTasks(), Tasks.SCHEDULED_TO_ASSIGNED)) { - TaskGroup group = groups.get(TaskGroupKey.from(task.getTask())); - if (group != null) { - group.remove(task.getTaskId()); - } - } - } - - public Iterable<TaskGroup> getGroups() { - return ImmutableSet.copyOf(groups.values()); - } - -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java deleted file mode 100644 index 7b6c063..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; - -import javax.inject.Inject; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.eventbus.Subscribe; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.util.Clock; - -import org.apache.aurora.gen.apiConstants; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.entities.IJobKey; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; - -import static java.util.Objects.requireNonNull; - -import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; -import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; - -/** - * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks - * transitioning into one of the inactive states. - */ -public class TaskHistoryPruner implements EventSubscriber { - private static final Logger LOG = Logger.getLogger(TaskHistoryPruner.class.getName()); - - private final ScheduledExecutorService executor; - private final StateManager stateManager; - private final Clock clock; - private final HistoryPrunnerSettings settings; - private final Storage storage; - - private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() { - @Override - public boolean apply(IScheduledTask task) { - return Tasks.getLatestEvent(task).getTimestamp() - <= clock.nowMillis() - settings.minRetentionThresholdMillis; - } - }; - - static class HistoryPrunnerSettings { - private final long pruneThresholdMillis; - private final long minRetentionThresholdMillis; - private final int perJobHistoryGoal; - - HistoryPrunnerSettings( - Amount<Long, Time> inactivePruneThreshold, - Amount<Long, Time> minRetentionThreshold, - int perJobHistoryGoal) { - - this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS); - this.minRetentionThresholdMillis = minRetentionThreshold.as(Time.MILLISECONDS); - this.perJobHistoryGoal = perJobHistoryGoal; - } - } - - @Inject - TaskHistoryPruner( - final ScheduledExecutorService executor, - final StateManager stateManager, - final Clock clock, - final HistoryPrunnerSettings settings, - final Storage storage) { - - this.executor = requireNonNull(executor); - this.stateManager = requireNonNull(stateManager); - this.clock = requireNonNull(clock); - this.settings = requireNonNull(settings); - this.storage = requireNonNull(storage); - } - - @VisibleForTesting - long calculateTimeout(long taskEventTimestampMillis) { - return Math.max( - settings.minRetentionThresholdMillis, - settings.pruneThresholdMillis - Math.max(0, clock.nowMillis() - taskEventTimestampMillis)); - } - - /** - * When triggered, records an inactive task state change. - * - * @param change Event when a task changes state. - */ - @Subscribe - public void recordStateChange(TaskStateChange change) { - if (Tasks.isTerminated(change.getNewState())) { - long timeoutBasis = change.isTransition() - ? clock.nowMillis() - : Iterables.getLast(change.getTask().getTaskEvents()).getTimestamp(); - registerInactiveTask( - Tasks.SCHEDULED_TO_JOB_KEY.apply(change.getTask()), - change.getTaskId(), - calculateTimeout(timeoutBasis)); - } - } - - private void deleteTasks(final Set<String> taskIds) { - LOG.info("Pruning inactive tasks " + taskIds); - storage.write(new Storage.MutateWork.NoResult.Quiet() { - @Override - protected void execute(Storage.MutableStoreProvider storeProvider) { - stateManager.deleteTasks(storeProvider, taskIds); - } - }); - } - - @VisibleForTesting - static Query.Builder jobHistoryQuery(IJobKey jobKey) { - return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES); - } - - private void registerInactiveTask( - final IJobKey jobKey, - final String taskId, - long timeRemaining) { - - LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms."); - executor.schedule( - new Runnable() { - @Override - public void run() { - LOG.info("Pruning expired inactive task " + taskId); - deleteTasks(ImmutableSet.of(taskId)); - } - }, - timeRemaining, - TimeUnit.MILLISECONDS); - - executor.submit(new Runnable() { - @Override - public void run() { - Iterable<IScheduledTask> inactiveTasks = - Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey)); - int numInactiveTasks = Iterables.size(inactiveTasks); - int tasksToPrune = numInactiveTasks - settings.perJobHistoryGoal; - if (tasksToPrune > 0 && numInactiveTasks > settings.perJobHistoryGoal) { - Set<String> toPrune = FluentIterable - .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks)) - .filter(safeToDelete) - .limit(tasksToPrune) - .transform(Tasks.SCHEDULED_TO_ID) - .toSet(); - deleteTasks(toPrune); - } - } - }); - } -}
