Repository: aurora Updated Branches: refs/heads/master 1fbf7732d -> 92f6d9f64
Add the ability to customize scheduling logic. Uses Guice module injection to enable replacing the first-fit scheduling algorithm and associated first-fit preemption logic. See design/proposal document here: https://docs.google.com/document/d/1fVHLt9AF-YbOCVCDMQmi5DATVusn-tqY8DldKbjVEm0/edit?usp=sharing Bugs closed: AURORA-1920 Reviewed at https://reviews.apache.org/r/59039/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/92f6d9f6 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/92f6d9f6 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/92f6d9f6 Branch: refs/heads/master Commit: 92f6d9f6443174a9ae3dc001967c208add2149ed Parents: 1fbf773 Author: David McLaughlin <[email protected]> Authored: Wed May 24 22:51:54 2017 -0700 Committer: David McLaughlin <[email protected]> Committed: Wed May 24 22:51:54 2017 -0700 ---------------------------------------------------------------------- RELEASE-NOTES.md | 3 + docs/development/design-documents.md | 1 + docs/reference/scheduler-configuration.md | 4 + .../preemptor/PendingTaskProcessorModule.java | 30 ++ .../preemptor/PreemptionVictimFilterModule.java | 30 ++ .../scheduler/preemptor/PreemptorModule.java | 62 ++- .../state/FirstFitTaskAssignerModule.java | 31 ++ .../aurora/scheduler/state/StateModule.java | 29 +- .../aurora/scheduler/state/TaskAssigner.java | 6 +- .../preemptor/PreemptorModuleTest.java | 10 +- .../state/FirstFitTaskAssignerTest.java | 516 +++++++++++++++++++ .../scheduler/state/TaskAssignerImplTest.java | 516 ------------------- 12 files changed, 706 insertions(+), 532 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/RELEASE-NOTES.md ---------------------------------------------------------------------- diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 77376e4..75b3ddb 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -25,6 +25,9 @@ instances. See [here](docs/reference/client-commands.md#scping-with-task-machines) for details. Currently only fully supported for Mesos containers (you can copy files from the Docker container sandbox but you cannot send files to it). +- Added ability to inject your own scheduling logic, via a lazy Guice module binding. This is an + alpha-level feature and not subject to backwards compatibility considerations. You can specify + your custom modules using the `task_assigner_modules` and `preemption_slot_finder_modules` options. 0.17.0 ====== http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/docs/development/design-documents.md ---------------------------------------------------------------------- diff --git a/docs/development/design-documents.md b/docs/development/design-documents.md index c942643..9c714e4 100644 --- a/docs/development/design-documents.md +++ b/docs/development/design-documents.md @@ -18,5 +18,6 @@ Current and past documents: * [Supporting the Mesos Universal Containerizer](https://docs.google.com/document/d/111T09NBF2zjjl7HE95xglsDpRdKoZqhCRM5hHmOfTLA/edit?usp=sharing) * [Tier Management In Apache Aurora](https://docs.google.com/document/d/1erszT-HsWf1zCIfhbqHlsotHxWUvDyI2xUwNQQQxLgs/edit?usp=sharing) * [Ubiquitous Jobs](https://docs.google.com/document/d/12hr6GnUZU3mc7xsWRzMi3nQILGB-3vyUxvbG-6YmvdE/edit) +* [Pluggable Scheduling](https://docs.google.com/document/d/1fVHLt9AF-YbOCVCDMQmi5DATVusn-tqY8DldKbjVEm0/edit) Design documents can be found in the Aurora issue tracker via the query [`project = AURORA AND text ~ "docs.google.com" ORDER BY created`](https://issues.apache.org/jira/browse/AURORA-1528?jql=project%20%3D%20AURORA%20AND%20text%20~%20%22docs.google.com%22%20ORDER%20BY%20created). http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/docs/reference/scheduler-configuration.md ---------------------------------------------------------------------- diff --git a/docs/reference/scheduler-configuration.md b/docs/reference/scheduler-configuration.md index 3e3d799..3d53c5a 100644 --- a/docs/reference/scheduler-configuration.md +++ b/docs/reference/scheduler-configuration.md @@ -190,6 +190,8 @@ Optional flags: If true, Aurora populates DiscoveryInfo field of Mesos TaskInfo. -preemption_delay (default (3, mins)) Time interval after which a pending task becomes eligible to preempt other tasks +-preemption_slot_finder_modules (default [class org.apache.aurora.scheduler.preemptor.PendingTaskProcessorModule, class org.apache.aurora.scheduler.preemptor.PreemptionVictimFilterModule]) + Guice modules for replacing preemption logic. -preemption_slot_hold_time (default (5, mins)) Time to hold a preemption slot found before it is discarded. -preemption_slot_search_interval (default (1, mins)) @@ -234,6 +236,8 @@ Optional flags: Time for a stat to be retained in memory before expiring. -stat_sampling_interval (default (1, secs)) Statistic value sampling interval. +-task_assigner_modules (default [class org.apache.aurora.scheduler.state.FirstFitTaskAssignerModule]) + Guice modules for replacing task assignment logic. -thermos_executor_cpu (default 0.25) The number of CPU cores to allocate for each instance of the executor. -thermos_executor_flags http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorModule.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorModule.java new file mode 100644 index 0000000..b943f74 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorModule.java @@ -0,0 +1,30 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import javax.inject.Singleton; + +import com.google.inject.AbstractModule; + +/** + * Module to install the default pending task slot-finder implementation. + */ +public class PendingTaskProcessorModule extends AbstractModule { + @Override + protected void configure() { + bind(Runnable.class).annotatedWith(PreemptorModule.PreemptionSlotFinder.class) + .to(PendingTaskProcessor.class); + bind(PendingTaskProcessor.class).in(Singleton.class); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterModule.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterModule.java new file mode 100644 index 0000000..582f660 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterModule.java @@ -0,0 +1,30 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import javax.inject.Singleton; + +import com.google.inject.AbstractModule; + +/** + * Module to install the default preemption victim filter implementation. + */ +public class PreemptionVictimFilterModule extends AbstractModule { + @Override + protected void configure() { + bind(PreemptionVictimFilter.class) + .to(PreemptionVictimFilter.PreemptionVictimFilterImpl.class); + bind(PreemptionVictimFilter.PreemptionVictimFilterImpl.class).in(Singleton.class); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java index 92087eb..b3ca1a3 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java @@ -13,13 +13,19 @@ */ package org.apache.aurora.scheduler.preemptor; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.Set; import javax.inject.Inject; +import javax.inject.Qualifier; import javax.inject.Singleton; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.AbstractScheduledService; import com.google.inject.AbstractModule; +import com.google.inject.Module; import com.google.inject.PrivateModule; import com.google.inject.TypeLiteral; @@ -29,12 +35,17 @@ import org.apache.aurora.common.args.constraints.Positive; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.app.MoreModules; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.aurora.scheduler.preemptor.BiCache.BiCacheSettings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.util.Objects.requireNonNull; public class PreemptorModule extends AbstractModule { @@ -65,22 +76,54 @@ public class PreemptorModule extends AbstractModule { help = "The maximum number of reservations for a task group to be made in a batch.") private static final Arg<Integer> RESERVATION_MAX_BATCH_SIZE = Arg.create(5); + @CmdLine(name = "preemption_slot_finder_modules", + help = "Guice modules for custom preemption slot searching for pending tasks.") + private static final Arg<Set<Module>> SLOT_FINDER_MODULES = Arg.create( + ImmutableSet.of( + MoreModules.lazilyInstantiated(PendingTaskProcessorModule.class), + MoreModules.lazilyInstantiated(PreemptionVictimFilterModule.class))); + private final boolean enablePreemptor; private final Amount<Long, Time> preemptionDelay; private final Amount<Long, Time> slotSearchInterval; private final Integer reservationBatchSize; + private final Set<Module> slotFinderModules; + + /* + * Binding annotation for the async processor that finds preemption slots. + */ + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + public @interface PreemptionSlotFinder { } @VisibleForTesting public PreemptorModule( boolean enablePreemptor, Amount<Long, Time> preemptionDelay, Amount<Long, Time> slotSearchInterval, - Integer reservationBatchSize) { + Integer reservationBatchSize, + Set<Module> slotFinderModules) { this.enablePreemptor = enablePreemptor; this.preemptionDelay = requireNonNull(preemptionDelay); this.slotSearchInterval = requireNonNull(slotSearchInterval); this.reservationBatchSize = requireNonNull(reservationBatchSize); + this.slotFinderModules = requireNonNull(slotFinderModules); + } + + @VisibleForTesting + public PreemptorModule( + boolean enablePreemptor, + Amount<Long, Time> preemptionDelay, + Amount<Long, Time> slotSearchInterval, + Integer reservationBatchSize) { + + this( + enablePreemptor, + preemptionDelay, + slotSearchInterval, + reservationBatchSize, + SLOT_FINDER_MODULES.get()); } public PreemptorModule() { @@ -88,7 +131,8 @@ public class PreemptorModule extends AbstractModule { ENABLE_PREEMPTOR.get(), PREEMPTION_DELAY.get(), PREEMPTION_SLOT_SEARCH_INTERVAL.get(), - RESERVATION_MAX_BATCH_SIZE.get()); + RESERVATION_MAX_BATCH_SIZE.get(), + SLOT_FINDER_MODULES.get()); } @Override @@ -99,9 +143,6 @@ public class PreemptorModule extends AbstractModule { if (enablePreemptor) { LOG.info("Preemptor Enabled."); bind(PreemptorMetrics.class).in(Singleton.class); - bind(PreemptionVictimFilter.class) - .to(PreemptionVictimFilter.PreemptionVictimFilterImpl.class); - bind(PreemptionVictimFilter.PreemptionVictimFilterImpl.class).in(Singleton.class); bind(Preemptor.class).to(Preemptor.PreemptorImpl.class); bind(Preemptor.PreemptorImpl.class).in(Singleton.class); bind(new TypeLiteral<Amount<Long, Time>>() { }) @@ -114,11 +155,14 @@ public class PreemptorModule extends AbstractModule { bind(new TypeLiteral<Integer>() { }) .annotatedWith(PendingTaskProcessor.ReservationBatchSize.class) .toInstance(reservationBatchSize); - bind(PendingTaskProcessor.class).in(Singleton.class); bind(ClusterState.class).to(ClusterStateImpl.class); bind(ClusterStateImpl.class).in(Singleton.class); expose(ClusterStateImpl.class); + for (Module module: slotFinderModules) { + install(module); + } + bind(PreemptorService.class).in(Singleton.class); bind(AbstractScheduledService.Scheduler.class).toInstance( AbstractScheduledService.Scheduler.newFixedRateSchedule( @@ -127,7 +171,7 @@ public class PreemptorModule extends AbstractModule { slotSearchInterval.getUnit().getTimeUnit())); expose(PreemptorService.class); - expose(PendingTaskProcessor.class); + expose(Runnable.class).annotatedWith(PreemptionSlotFinder.class); } else { bind(Preemptor.class).toInstance(NULL_PREEMPTOR); LOG.warn("Preemptor Disabled."); @@ -147,11 +191,11 @@ public class PreemptorModule extends AbstractModule { } static class PreemptorService extends AbstractScheduledService { - private final PendingTaskProcessor slotFinder; + private final Runnable slotFinder; private final Scheduler schedule; @Inject - PreemptorService(PendingTaskProcessor slotFinder, Scheduler schedule) { + PreemptorService(@PreemptionSlotFinder Runnable slotFinder, Scheduler schedule) { this.slotFinder = requireNonNull(slotFinder); this.schedule = requireNonNull(schedule); } http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java b/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java new file mode 100644 index 0000000..dc244ee --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java @@ -0,0 +1,31 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.state; + +import javax.inject.Singleton; + +import com.google.inject.AbstractModule; + +import org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner; + +/** + * Exposes the default TaskAssigner implementation, which is a first-fit scheduling algorithm. + */ +public class FirstFitTaskAssignerModule extends AbstractModule { + @Override + protected void configure() { + bind(TaskAssigner.class).to(FirstFitTaskAssigner.class); + bind(FirstFitTaskAssigner.class).in(Singleton.class); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/main/java/org/apache/aurora/scheduler/state/StateModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java index 0186484..77a37b8 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java +++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java @@ -13,28 +13,51 @@ */ package org.apache.aurora.scheduler.state; +import java.util.Set; import javax.inject.Singleton; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import com.google.inject.AbstractModule; import com.google.inject.Binder; +import com.google.inject.Module; +import org.apache.aurora.common.args.Arg; +import org.apache.aurora.common.args.CmdLine; +import org.apache.aurora.scheduler.app.MoreModules; import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.aurora.scheduler.mesos.MesosTaskFactory; import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl; import org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl; -import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl; import org.apache.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl; +import static java.util.Objects.requireNonNull; + /** * Binding module for scheduling logic and higher-level state management. */ public class StateModule extends AbstractModule { + @CmdLine(name = "task_assigner_modules", + help = "Guice modules for customizing task assignment.") + private static final Arg<Set<Module>> TASK_ASSIGNER_MODULES = Arg.create( + ImmutableSet.of(MoreModules.lazilyInstantiated(FirstFitTaskAssignerModule.class))); + + private final Set<Module> assignerModules; + + public StateModule() { + this(TASK_ASSIGNER_MODULES.get()); + } + + private StateModule(Set<Module> assignerModules) { + this.assignerModules = requireNonNull(assignerModules); + } + @Override protected void configure() { - bind(TaskAssigner.class).to(TaskAssignerImpl.class); - bind(TaskAssignerImpl.class).in(Singleton.class); + for (Module module: assignerModules) { + install(module); + } bind(MesosTaskFactory.class).to(MesosTaskFactoryImpl.class); bind(StateManager.class).to(StateManagerImpl.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java index 7c531af..25399e4 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java +++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java @@ -78,8 +78,8 @@ public interface TaskAssigner { Iterable<IAssignedTask> tasks, Map<String, TaskGroupKey> preemptionReservations); - class TaskAssignerImpl implements TaskAssigner { - private static final Logger LOG = LoggerFactory.getLogger(TaskAssignerImpl.class); + class FirstFitTaskAssigner implements TaskAssigner { + private static final Logger LOG = LoggerFactory.getLogger(FirstFitTaskAssigner.class); @VisibleForTesting static final Optional<String> LAUNCH_FAILED_MSG = @@ -100,7 +100,7 @@ public interface TaskAssigner { private final UpdateAgentReserver updateAgentReserver; @Inject - public TaskAssignerImpl( + public FirstFitTaskAssigner( StateManager stateManager, SchedulingFilter filter, MesosTaskFactory taskFactory, http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java index da064e3..3317133 100644 --- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java @@ -14,6 +14,7 @@ package org.apache.aurora.scheduler.preemptor; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; @@ -70,7 +71,14 @@ public class PreemptorModuleTest extends EasyMockTest { false, Amount.of(0L, Time.SECONDS), Amount.of(0L, Time.SECONDS), - 5)); + 5, + ImmutableSet.of(new AbstractModule() { + @Override + protected void configure() { + bind(Runnable.class).annotatedWith(PreemptorModule.PreemptionSlotFinder.class) + .toInstance(createMock(Runnable.class)); + } + }))); control.replay(); http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java b/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java new file mode 100644 index 0000000..25c1137 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java @@ -0,0 +1,516 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.state; + +import java.util.Map; +import java.util.Set; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.Attribute; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.JobKey; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.TierManager; +import org.apache.aurora.scheduler.base.InstanceKeys; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.filter.SchedulingFilter; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; +import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; +import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; +import org.apache.aurora.scheduler.mesos.MesosTaskFactory; +import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.resources.ResourceBag; +import org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IInstanceKey; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.apache.aurora.scheduler.updater.UpdateAgentReserver; +import org.apache.mesos.v1.Protos.AgentID; +import org.apache.mesos.v1.Protos.FrameworkID; +import org.apache.mesos.v1.Protos.OfferID; +import org.apache.mesos.v1.Protos.Resource; +import org.apache.mesos.v1.Protos.TaskID; +import org.apache.mesos.v1.Protos.TaskInfo; +import org.apache.mesos.v1.Protos.Value.Range; +import org.apache.mesos.v1.Protos.Value.Ranges; +import org.apache.mesos.v1.Protos.Value.Type; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.ScheduleStatus.LOST; +import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER; +import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB; +import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty; +import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer; +import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; +import static org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.ASSIGNER_EVALUATED_OFFERS; +import static org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.ASSIGNER_LAUNCH_FAILURES; +import static org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.LAUNCH_FAILED_MSG; +import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; +import static org.apache.mesos.v1.Protos.Offer; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class FirstFitTaskAssignerTest extends EasyMockTest { + + private static final int PORT = 1000; + private static final Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT)); + private static final String SLAVE_ID = MESOS_OFFER.getAgentId().getValue(); + private static final HostOffer OFFER = + new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes() + .setHost(MESOS_OFFER.getHostname()) + .setAttributes(ImmutableSet.of( + new Attribute("host", ImmutableSet.of(MESOS_OFFER.getHostname())))))); + private static final IScheduledTask TASK = makeTask("id", JOB); + private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask()); + private static final TaskInfo TASK_INFO = TaskInfo.newBuilder() + .setName("taskName") + .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK))) + .setAgentId(MESOS_OFFER.getAgentId()) + .build(); + private static final IInstanceKey INSTANCE_KEY = + InstanceKeys.from(JOB, TASK.getAssignedTask().getInstanceId()); + private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of(); + private static final UnusedResource UNUSED = new UnusedResource( + bagFromMesosResources(MESOS_OFFER.getResourcesList()), + OFFER.getAttributes()); + private static final HostOffer OFFER_2 = new HostOffer( + Offer.newBuilder() + .setId(OfferID.newBuilder().setValue("offerId0")) + .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId")) + .setAgentId(AgentID.newBuilder().setValue("slaveId0")) + .setHostname("hostName0") + .addResources(Resource.newBuilder() + .setName("ports") + .setType(Type.RANGES) + .setRanges( + Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT)))) + .build(), + IHostAttributes.build(new HostAttributes())); + + private static final Set<String> NO_ASSIGNMENT = ImmutableSet.of(); + + private ResourceRequest resourceRequest; + + private MutableStoreProvider storeProvider; + private StateManager stateManager; + private SchedulingFilter filter; + private MesosTaskFactory taskFactory; + private OfferManager offerManager; + private FirstFitTaskAssigner assigner; + private TierManager tierManager; + private FakeStatsProvider statsProvider; + private UpdateAgentReserver updateAgentReserver; + + @Before + public void setUp() throws Exception { + storeProvider = createMock(MutableStoreProvider.class); + filter = createMock(SchedulingFilter.class); + taskFactory = createMock(MesosTaskFactory.class); + stateManager = createMock(StateManager.class); + offerManager = createMock(OfferManager.class); + tierManager = createMock(TierManager.class); + updateAgentReserver = createMock(UpdateAgentReserver.class); + statsProvider = new FakeStatsProvider(); + assigner = new FirstFitTaskAssigner( + stateManager, + filter, + taskFactory, + offerManager, + tierManager, + updateAgentReserver, + statsProvider); + resourceRequest = new ResourceRequest( + TASK.getAssignedTask().getTask(), + ResourceBag.EMPTY, + empty()); + } + + @Test + public void testAssignNoTasks() throws Exception { + control.replay(); + + assertEquals( + NO_ASSIGNMENT, + assigner.maybeAssign(storeProvider, null, null, ImmutableSet.of(), null)); + } + + @Test + public void testAssignPartialNoVetoes() throws Exception { + expectNoUpdateReservations(1); + expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); + offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); + expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of()); + expectAssignTask(MESOS_OFFER); + expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER)) + .andReturn(TASK_INFO); + + control.replay(); + + AttributeAggregate aggregate = empty(); + assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + assertEquals( + ImmutableSet.of(Tasks.id(TASK)), + assigner.maybeAssign( + storeProvider, + new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate), + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of( + TASK.getAssignedTask(), + makeTask("id2", JOB).getAssignedTask(), + makeTask("id3", JOB).getAssignedTask()), + ImmutableMap.of(SLAVE_ID, GROUP_KEY))); + assertNotEquals(empty(), aggregate); + assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + } + + @Test + public void testAssignVetoesWithStaticBan() throws Exception { + expectNoUpdateReservations(1); + expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); + offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); + expect(filter.filter(UNUSED, resourceRequest)) + .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied"))); + + control.replay(); + + assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + assertEquals( + NO_ASSIGNMENT, + assigner.maybeAssign( + storeProvider, + resourceRequest, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of(TASK.getAssignedTask()), + NO_RESERVATION)); + assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + } + + @Test + public void testAssignVetoesWithNoStaticBan() throws Exception { + expectNoUpdateReservations(1); + expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); + expect(filter.filter(UNUSED, resourceRequest)) + .andReturn(ImmutableSet.of(Veto.unsatisfiedLimit("limit"))); + + control.replay(); + + assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + assertEquals( + NO_ASSIGNMENT, + assigner.maybeAssign( + storeProvider, + resourceRequest, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of(TASK.getAssignedTask()), + NO_RESERVATION)); + assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + } + + @Test + public void testAssignmentClearedOnError() throws Exception { + expectNoUpdateReservations(1); + expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER, OFFER_2)); + offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); + expectLastCall().andThrow(new OfferManager.LaunchException("expected")); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); + expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of()); + expectAssignTask(MESOS_OFFER); + expect(stateManager.changeState( + storeProvider, + Tasks.id(TASK), + Optional.of(PENDING), + LOST, + LAUNCH_FAILED_MSG)) + .andReturn(StateChangeResult.SUCCESS); + expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER)) + .andReturn(TASK_INFO); + + control.replay(); + + assertEquals(0L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES)); + assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + // Ensures scheduling loop terminates on the first launch failure. + assertEquals( + NO_ASSIGNMENT, + assigner.maybeAssign( + storeProvider, + resourceRequest, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of( + TASK.getAssignedTask(), + makeTask("id2", JOB).getAssignedTask(), + makeTask("id3", JOB).getAssignedTask()), + NO_RESERVATION)); + assertEquals(1L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES)); + assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + } + + @Test + public void testAssignmentSkippedForReservedSlave() throws Exception { + expectNoUpdateReservations(0); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); + expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); + + control.replay(); + + assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + assertEquals( + NO_ASSIGNMENT, + assigner.maybeAssign( + storeProvider, + resourceRequest, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of(TASK.getAssignedTask()), + ImmutableMap.of(SLAVE_ID, TaskGroupKey.from( + ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n"))))))); + assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + } + + @Test + public void testTaskWithReservedSlaveLandsElsewhere() throws Exception { + // Ensures slave/task reservation relationship is only enforced in slave->task direction + // and permissive in task->slave direction. In other words, a task with a slave reservation + // should still be tried against other unreserved slaves. + expectNoUpdateReservations(1); + expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER_2, OFFER)); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); + expect(filter.filter( + new UnusedResource( + bagFromMesosResources(OFFER_2.getOffer().getResourcesList()), + OFFER_2.getAttributes()), + resourceRequest)).andReturn(ImmutableSet.of()); + expectAssignTask(OFFER_2.getOffer()); + expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER_2.getOffer())) + .andReturn(TASK_INFO); + offerManager.launchTask(OFFER_2.getOffer().getId(), TASK_INFO); + + control.replay(); + + assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + assertEquals( + ImmutableSet.of(Tasks.id(TASK)), + assigner.maybeAssign( + storeProvider, + resourceRequest, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of(TASK.getAssignedTask()), + ImmutableMap.of(SLAVE_ID, GROUP_KEY))); + assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + } + + @Test + public void testAssignerDoesNotReturnOnFirstMismatch() throws Exception { + // Ensures scheduling loop does not terminate prematurely when the first mismatch is identified. + HostOffer mismatched = new HostOffer( + Offer.newBuilder() + .setId(OfferID.newBuilder().setValue("offerId0")) + .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId")) + .setAgentId(AgentID.newBuilder().setValue("slaveId0")) + .setHostname("hostName0") + .addResources(Resource.newBuilder() + .setName("ports") + .setType(Type.RANGES) + .setRanges( + Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT)))) + .build(), + IHostAttributes.build(new HostAttributes())); + + expectNoUpdateReservations(2); + expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(mismatched, OFFER)); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); + expect(filter.filter( + new UnusedResource( + bagFromMesosResources(mismatched.getOffer().getResourcesList()), + mismatched.getAttributes()), + resourceRequest)) + .andReturn(ImmutableSet.of(Veto.constraintMismatch("constraint mismatch"))); + offerManager.banOffer(mismatched.getOffer().getId(), GROUP_KEY); + expect(filter.filter( + new UnusedResource( + bagFromMesosResources(MESOS_OFFER.getResourcesList()), OFFER.getAttributes()), + resourceRequest)) + .andReturn(ImmutableSet.of()); + + expectAssignTask(MESOS_OFFER); + expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getOffer())) + .andReturn(TASK_INFO); + offerManager.launchTask(OFFER.getOffer().getId(), TASK_INFO); + + control.replay(); + + assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + assertEquals( + ImmutableSet.of(Tasks.id(TASK)), + assigner.maybeAssign( + storeProvider, + resourceRequest, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of(TASK.getAssignedTask()), + ImmutableMap.of(SLAVE_ID, GROUP_KEY))); + assertEquals(2L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + } + + @Test + public void testResourceMapperCallback() { + AssignedTask builder = TASK.newBuilder().getAssignedTask(); + builder.unsetAssignedPorts(); + + control.replay(); + + assertEquals( + TASK.getAssignedTask(), + assigner.mapAndAssignResources(MESOS_OFFER, IAssignedTask.build(builder))); + } + + @Test + public void testAssignToReservedAgent() throws Exception { + expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true); + expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID)); + updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY); + expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER)); + expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of()); + expectAssignTask(MESOS_OFFER); + offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); + + expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER)) + .andReturn(TASK_INFO); + + control.replay(); + + AttributeAggregate aggregate = empty(); + assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + assertEquals( + ImmutableSet.of(Tasks.id(TASK)), + assigner.maybeAssign( + storeProvider, + new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate), + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of( + TASK.getAssignedTask()), + ImmutableMap.of(SLAVE_ID, GROUP_KEY))); + assertNotEquals(empty(), aggregate); + assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + } + + @Test + public void testAssignReservedAgentWhenOfferNotReady() throws Exception { + expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true); + expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID)); + expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER)); + expect(filter.filter(UNUSED, resourceRequest)) + .andReturn(ImmutableSet.of(Veto.insufficientResources("cpu", 1))); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); + offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY); + expectLastCall(); + + control.replay(); + + AttributeAggregate aggregate = empty(); + assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + assertEquals( + ImmutableSet.of(), + assigner.maybeAssign( + storeProvider, + new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate), + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of(TASK.getAssignedTask()), + ImmutableMap.of(SLAVE_ID, GROUP_KEY))); + assertEquals(empty(), aggregate); + assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + } + + @Test + public void testAssignWithMixOfReservedAndNotReserved() throws Exception { + AttributeAggregate aggregate = empty(); + ResourceRequest resources = new ResourceRequest( + TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate); + expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true); + expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID)); + updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY); + expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER)); + expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of()); + expectAssignTask(MESOS_OFFER); + offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); + + expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER)) + .andReturn(TASK_INFO); + + // Normal scheduling loop for the remaining task... + expect(updateAgentReserver.getAgent(InstanceKeys.from(JOB, 9999))).andReturn(Optional.absent()); + expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); + expect(updateAgentReserver.getReservations(OFFER.getOffer().getAgentId().getValue())) + .andReturn(ImmutableSet.of()); + expect(filter.filter(UNUSED, resources)) + .andReturn(ImmutableSet.of(Veto.constraintMismatch("lol"))); + offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY); + + control.replay(); + + assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + assertEquals( + ImmutableSet.of(Tasks.id(TASK)), + assigner.maybeAssign( + storeProvider, + resources, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of( + TASK.getAssignedTask(), + makeTask("another-task", JOB, 9999).getAssignedTask()), + ImmutableMap.of(SLAVE_ID, GROUP_KEY))); + assertNotEquals(empty(), aggregate); + assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); + } + + private void expectAssignTask(Offer offer) { + expect(stateManager.assignTask( + eq(storeProvider), + eq(Tasks.id(TASK)), + eq(offer.getHostname()), + eq(offer.getAgentId()), + anyObject())).andReturn(TASK.getAssignedTask()); + } + + private void expectNoUpdateReservations(int offers) { + expect(updateAgentReserver.hasReservations(anyObject())).andReturn(false); + for (int i = 0; i < offers; i++) { + expect(updateAgentReserver.getReservations(anyString())).andReturn(ImmutableSet.of()); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/92f6d9f6/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java deleted file mode 100644 index 11835dc..0000000 --- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java +++ /dev/null @@ -1,516 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.state; - -import java.util.Map; -import java.util.Set; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.gen.Attribute; -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.JobKey; -import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.TierManager; -import org.apache.aurora.scheduler.base.InstanceKeys; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.filter.AttributeAggregate; -import org.apache.aurora.scheduler.filter.SchedulingFilter; -import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; -import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; -import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; -import org.apache.aurora.scheduler.mesos.MesosTaskFactory; -import org.apache.aurora.scheduler.offers.OfferManager; -import org.apache.aurora.scheduler.resources.ResourceBag; -import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IInstanceKey; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import org.apache.aurora.scheduler.testing.FakeStatsProvider; -import org.apache.aurora.scheduler.updater.UpdateAgentReserver; -import org.apache.mesos.v1.Protos.AgentID; -import org.apache.mesos.v1.Protos.FrameworkID; -import org.apache.mesos.v1.Protos.OfferID; -import org.apache.mesos.v1.Protos.Resource; -import org.apache.mesos.v1.Protos.TaskID; -import org.apache.mesos.v1.Protos.TaskInfo; -import org.apache.mesos.v1.Protos.Value.Range; -import org.apache.mesos.v1.Protos.Value.Ranges; -import org.apache.mesos.v1.Protos.Value.Type; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.gen.ScheduleStatus.LOST; -import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER; -import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB; -import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; -import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty; -import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources; -import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange; -import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer; -import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; -import static org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl.ASSIGNER_EVALUATED_OFFERS; -import static org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl.ASSIGNER_LAUNCH_FAILURES; -import static org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl.LAUNCH_FAILED_MSG; -import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import static org.apache.mesos.v1.Protos.Offer; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.anyString; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - -public class TaskAssignerImplTest extends EasyMockTest { - - private static final int PORT = 1000; - private static final Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT)); - private static final String SLAVE_ID = MESOS_OFFER.getAgentId().getValue(); - private static final HostOffer OFFER = - new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes() - .setHost(MESOS_OFFER.getHostname()) - .setAttributes(ImmutableSet.of( - new Attribute("host", ImmutableSet.of(MESOS_OFFER.getHostname())))))); - private static final IScheduledTask TASK = makeTask("id", JOB); - private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask()); - private static final TaskInfo TASK_INFO = TaskInfo.newBuilder() - .setName("taskName") - .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK))) - .setAgentId(MESOS_OFFER.getAgentId()) - .build(); - private static final IInstanceKey INSTANCE_KEY = - InstanceKeys.from(JOB, TASK.getAssignedTask().getInstanceId()); - private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of(); - private static final UnusedResource UNUSED = new UnusedResource( - bagFromMesosResources(MESOS_OFFER.getResourcesList()), - OFFER.getAttributes()); - private static final HostOffer OFFER_2 = new HostOffer( - Offer.newBuilder() - .setId(OfferID.newBuilder().setValue("offerId0")) - .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId")) - .setAgentId(AgentID.newBuilder().setValue("slaveId0")) - .setHostname("hostName0") - .addResources(Resource.newBuilder() - .setName("ports") - .setType(Type.RANGES) - .setRanges( - Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT)))) - .build(), - IHostAttributes.build(new HostAttributes())); - - private static final Set<String> NO_ASSIGNMENT = ImmutableSet.of(); - - private ResourceRequest resourceRequest; - - private MutableStoreProvider storeProvider; - private StateManager stateManager; - private SchedulingFilter filter; - private MesosTaskFactory taskFactory; - private OfferManager offerManager; - private TaskAssignerImpl assigner; - private TierManager tierManager; - private FakeStatsProvider statsProvider; - private UpdateAgentReserver updateAgentReserver; - - @Before - public void setUp() throws Exception { - storeProvider = createMock(MutableStoreProvider.class); - filter = createMock(SchedulingFilter.class); - taskFactory = createMock(MesosTaskFactory.class); - stateManager = createMock(StateManager.class); - offerManager = createMock(OfferManager.class); - tierManager = createMock(TierManager.class); - updateAgentReserver = createMock(UpdateAgentReserver.class); - statsProvider = new FakeStatsProvider(); - assigner = new TaskAssignerImpl( - stateManager, - filter, - taskFactory, - offerManager, - tierManager, - updateAgentReserver, - statsProvider); - resourceRequest = new ResourceRequest( - TASK.getAssignedTask().getTask(), - ResourceBag.EMPTY, - empty()); - } - - @Test - public void testAssignNoTasks() throws Exception { - control.replay(); - - assertEquals( - NO_ASSIGNMENT, - assigner.maybeAssign(storeProvider, null, null, ImmutableSet.of(), null)); - } - - @Test - public void testAssignPartialNoVetoes() throws Exception { - expectNoUpdateReservations(1); - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); - offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of()); - expectAssignTask(MESOS_OFFER); - expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER)) - .andReturn(TASK_INFO); - - control.replay(); - - AttributeAggregate aggregate = empty(); - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - ImmutableSet.of(Tasks.id(TASK)), - assigner.maybeAssign( - storeProvider, - new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate), - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of( - TASK.getAssignedTask(), - makeTask("id2", JOB).getAssignedTask(), - makeTask("id3", JOB).getAssignedTask()), - ImmutableMap.of(SLAVE_ID, GROUP_KEY))); - assertNotEquals(empty(), aggregate); - assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testAssignVetoesWithStaticBan() throws Exception { - expectNoUpdateReservations(1); - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); - offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(filter.filter(UNUSED, resourceRequest)) - .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied"))); - - control.replay(); - - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - NO_ASSIGNMENT, - assigner.maybeAssign( - storeProvider, - resourceRequest, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of(TASK.getAssignedTask()), - NO_RESERVATION)); - assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testAssignVetoesWithNoStaticBan() throws Exception { - expectNoUpdateReservations(1); - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(filter.filter(UNUSED, resourceRequest)) - .andReturn(ImmutableSet.of(Veto.unsatisfiedLimit("limit"))); - - control.replay(); - - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - NO_ASSIGNMENT, - assigner.maybeAssign( - storeProvider, - resourceRequest, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of(TASK.getAssignedTask()), - NO_RESERVATION)); - assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testAssignmentClearedOnError() throws Exception { - expectNoUpdateReservations(1); - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER, OFFER_2)); - offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); - expectLastCall().andThrow(new OfferManager.LaunchException("expected")); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of()); - expectAssignTask(MESOS_OFFER); - expect(stateManager.changeState( - storeProvider, - Tasks.id(TASK), - Optional.of(PENDING), - LOST, - LAUNCH_FAILED_MSG)) - .andReturn(StateChangeResult.SUCCESS); - expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER)) - .andReturn(TASK_INFO); - - control.replay(); - - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES)); - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - // Ensures scheduling loop terminates on the first launch failure. - assertEquals( - NO_ASSIGNMENT, - assigner.maybeAssign( - storeProvider, - resourceRequest, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of( - TASK.getAssignedTask(), - makeTask("id2", JOB).getAssignedTask(), - makeTask("id3", JOB).getAssignedTask()), - NO_RESERVATION)); - assertEquals(1L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES)); - assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testAssignmentSkippedForReservedSlave() throws Exception { - expectNoUpdateReservations(0); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); - - control.replay(); - - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - NO_ASSIGNMENT, - assigner.maybeAssign( - storeProvider, - resourceRequest, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of(TASK.getAssignedTask()), - ImmutableMap.of(SLAVE_ID, TaskGroupKey.from( - ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n"))))))); - assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testTaskWithReservedSlaveLandsElsewhere() throws Exception { - // Ensures slave/task reservation relationship is only enforced in slave->task direction - // and permissive in task->slave direction. In other words, a task with a slave reservation - // should still be tried against other unreserved slaves. - expectNoUpdateReservations(1); - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER_2, OFFER)); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(filter.filter( - new UnusedResource( - bagFromMesosResources(OFFER_2.getOffer().getResourcesList()), - OFFER_2.getAttributes()), - resourceRequest)).andReturn(ImmutableSet.of()); - expectAssignTask(OFFER_2.getOffer()); - expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER_2.getOffer())) - .andReturn(TASK_INFO); - offerManager.launchTask(OFFER_2.getOffer().getId(), TASK_INFO); - - control.replay(); - - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - ImmutableSet.of(Tasks.id(TASK)), - assigner.maybeAssign( - storeProvider, - resourceRequest, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of(TASK.getAssignedTask()), - ImmutableMap.of(SLAVE_ID, GROUP_KEY))); - assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testAssignerDoesNotReturnOnFirstMismatch() throws Exception { - // Ensures scheduling loop does not terminate prematurely when the first mismatch is identified. - HostOffer mismatched = new HostOffer( - Offer.newBuilder() - .setId(OfferID.newBuilder().setValue("offerId0")) - .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId")) - .setAgentId(AgentID.newBuilder().setValue("slaveId0")) - .setHostname("hostName0") - .addResources(Resource.newBuilder() - .setName("ports") - .setType(Type.RANGES) - .setRanges( - Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT)))) - .build(), - IHostAttributes.build(new HostAttributes())); - - expectNoUpdateReservations(2); - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(mismatched, OFFER)); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(filter.filter( - new UnusedResource( - bagFromMesosResources(mismatched.getOffer().getResourcesList()), - mismatched.getAttributes()), - resourceRequest)) - .andReturn(ImmutableSet.of(Veto.constraintMismatch("constraint mismatch"))); - offerManager.banOffer(mismatched.getOffer().getId(), GROUP_KEY); - expect(filter.filter( - new UnusedResource( - bagFromMesosResources(MESOS_OFFER.getResourcesList()), OFFER.getAttributes()), - resourceRequest)) - .andReturn(ImmutableSet.of()); - - expectAssignTask(MESOS_OFFER); - expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getOffer())) - .andReturn(TASK_INFO); - offerManager.launchTask(OFFER.getOffer().getId(), TASK_INFO); - - control.replay(); - - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - ImmutableSet.of(Tasks.id(TASK)), - assigner.maybeAssign( - storeProvider, - resourceRequest, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of(TASK.getAssignedTask()), - ImmutableMap.of(SLAVE_ID, GROUP_KEY))); - assertEquals(2L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testResourceMapperCallback() { - AssignedTask builder = TASK.newBuilder().getAssignedTask(); - builder.unsetAssignedPorts(); - - control.replay(); - - assertEquals( - TASK.getAssignedTask(), - assigner.mapAndAssignResources(MESOS_OFFER, IAssignedTask.build(builder))); - } - - @Test - public void testAssignToReservedAgent() throws Exception { - expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true); - expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID)); - updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY); - expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER)); - expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of()); - expectAssignTask(MESOS_OFFER); - offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - - expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER)) - .andReturn(TASK_INFO); - - control.replay(); - - AttributeAggregate aggregate = empty(); - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - ImmutableSet.of(Tasks.id(TASK)), - assigner.maybeAssign( - storeProvider, - new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate), - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of( - TASK.getAssignedTask()), - ImmutableMap.of(SLAVE_ID, GROUP_KEY))); - assertNotEquals(empty(), aggregate); - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testAssignReservedAgentWhenOfferNotReady() throws Exception { - expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true); - expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID)); - expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER)); - expect(filter.filter(UNUSED, resourceRequest)) - .andReturn(ImmutableSet.of(Veto.insufficientResources("cpu", 1))); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY); - expectLastCall(); - - control.replay(); - - AttributeAggregate aggregate = empty(); - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - ImmutableSet.of(), - assigner.maybeAssign( - storeProvider, - new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate), - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of(TASK.getAssignedTask()), - ImmutableMap.of(SLAVE_ID, GROUP_KEY))); - assertEquals(empty(), aggregate); - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testAssignWithMixOfReservedAndNotReserved() throws Exception { - AttributeAggregate aggregate = empty(); - ResourceRequest resources = new ResourceRequest( - TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate); - expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true); - expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID)); - updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY); - expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER)); - expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of()); - expectAssignTask(MESOS_OFFER); - offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - - expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER)) - .andReturn(TASK_INFO); - - // Normal scheduling loop for the remaining task... - expect(updateAgentReserver.getAgent(InstanceKeys.from(JOB, 9999))).andReturn(Optional.absent()); - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); - expect(updateAgentReserver.getReservations(OFFER.getOffer().getAgentId().getValue())) - .andReturn(ImmutableSet.of()); - expect(filter.filter(UNUSED, resources)) - .andReturn(ImmutableSet.of(Veto.constraintMismatch("lol"))); - offerManager.banOffer(MESOS_OFFER.getId(), GROUP_KEY); - - control.replay(); - - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - ImmutableSet.of(Tasks.id(TASK)), - assigner.maybeAssign( - storeProvider, - resources, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of( - TASK.getAssignedTask(), - makeTask("another-task", JOB, 9999).getAssignedTask()), - ImmutableMap.of(SLAVE_ID, GROUP_KEY))); - assertNotEquals(empty(), aggregate); - assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - private void expectAssignTask(Offer offer) { - expect(stateManager.assignTask( - eq(storeProvider), - eq(Tasks.id(TASK)), - eq(offer.getHostname()), - eq(offer.getAgentId()), - anyObject())).andReturn(TASK.getAssignedTask()); - } - - private void expectNoUpdateReservations(int offers) { - expect(updateAgentReserver.hasReservations(anyObject())).andReturn(false); - for (int i = 0; i < offers; i++) { - expect(updateAgentReserver.getReservations(anyString())).andReturn(ImmutableSet.of()); - } - } -}
