Repository: aurora Updated Branches: refs/heads/master 46b1112ee -> c746452e5
Give jobs the ability to determine how to handle partitions by integrating with new Mesos Partition-Aware APIs. * Adds a new flag -partition_aware that will subscribe to the new Mesos partition-aware states. * Adds a new configuration object, PartitionPolicy, to give job owners control over how to handle tasks that become partitioned. Reviewed at https://reviews.apache.org/r/63536/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/c746452e Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/c746452e Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/c746452e Branch: refs/heads/master Commit: c746452e5fa1bc49da701b059fe69898b9b8a15c Parents: 46b1112 Author: David McLaughlin <[email protected]> Authored: Tue Nov 21 14:17:39 2017 -0800 Committer: David McLaughlin <[email protected]> Committed: Tue Nov 21 14:17:39 2017 -0800 ---------------------------------------------------------------------- RELEASE-NOTES.md | 8 + .../thrift/org/apache/aurora/gen/api.thrift | 21 ++- docs/reference/configuration.md | 7 + docs/reference/scheduler-configuration.md | 2 + docs/reference/task-lifecycle.md | 25 +++ examples/vagrant/upstart/aurora-scheduler.conf | 1 + .../aurora/scheduler/base/Conversions.java | 6 +- .../org/apache/aurora/scheduler/base/Jobs.java | 1 + .../aurora/scheduler/base/TaskTestUtil.java | 3 + .../mesos/CommandLineDriverSettingsModule.java | 12 ++ .../scheduler/state/PartitionManager.java | 117 ++++++++++++ .../aurora/scheduler/state/SideEffect.java | 5 + .../scheduler/state/StateManagerImpl.java | 42 +++++ .../aurora/scheduler/state/StateModule.java | 2 + .../scheduler/state/TaskStateMachine.java | 65 ++++++- .../python/apache/aurora/client/cli/jobs.py | 5 + .../python/apache/aurora/config/schema/base.py | 7 + src/main/python/apache/aurora/config/thrift.py | 6 + .../apache/aurora/scheduler/base/JobsTest.java | 2 +- .../scheduler/config/CommandLineTest.java | 4 +- .../CommandLineDriverSettingsModuleTest.java | 23 +++ .../scheduler/state/PartitionManagerTest.java | 177 +++++++++++++++++++ .../scheduler/state/StateManagerImplTest.java | 81 +++++++++ .../scheduler/state/TaskStateMachineTest.java | 45 +++++ .../apache/aurora/client/cli/test_task.py | 6 +- .../python/apache/aurora/config/test_thrift.py | 28 ++- .../apache/aurora/e2e/partition_aware.aurora | 22 +++ .../sh/org/apache/aurora/e2e/test_end_to_end.sh | 51 ++++++ 28 files changed, 755 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/RELEASE-NOTES.md ---------------------------------------------------------------------- diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index d653b79..e622a8d 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -4,6 +4,14 @@ ### New/updated: - Updated to Mesos 1.4.0. +- Added experimental support for the Mesos partition-aware APIs. The key idea is a new ScheduleStatus + PARTITIONED that represents a task in an unknown state. Users of Aurora can have per-job policies + of whether or not to reschedule and how long to wait for the partition to heal. Backwards + compatibility with existing behavior (i.e. transition to LOST immediately on a partition) is + maintained. The support is experimental due to bugs found in Mesos that would cause issues in + a production cluster. For that reason, the functionality is behind a new flag `-partition_aware` + that is disabled by default. When Mesos support is improved and the new behavior is vetted in + production clusters, we'll enable this by default. ### Deprecations and removals: http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/api/src/main/thrift/org/apache/aurora/gen/api.thrift ---------------------------------------------------------------------- diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift b/api/src/main/thrift/org/apache/aurora/gen/api.thrift index 1d36926..2978f6d 100644 --- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift +++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift @@ -219,6 +219,11 @@ union Resource { 5: i64 numGpus } +struct PartitionPolicy { + 1: bool reschedule + 2: optional i64 delaySecs +} + /** Description of the tasks contained within a job. */ struct TaskConfig { /** Job task belongs to. */ @@ -259,6 +264,8 @@ struct TaskConfig { 25: optional ExecutorConfig executorConfig /** Used to display additional details in the UI. */ 27: optional set<Metadata> metadata + /** Policy for how to deal with task partitions */ + 34: optional PartitionPolicy partitionPolicy // This field is deliberately placed at the end to work around a bug in the immutable wrapper // code generator. See AURORA-1185 for details. @@ -403,7 +410,11 @@ enum ScheduleStatus { /** A fault in the task environment has caused the system to believe the task no longer exists. * This can happen, for example, when a slave process disappears. */ - LOST = 7 + LOST = 7, + /** + * The task is currently partitioned and in an unknown state. + **/ + PARTITIONED = 18 } // States that a task may be in while still considered active. @@ -415,6 +426,7 @@ const set<ScheduleStatus> ACTIVE_STATES = [ScheduleStatus.ASSIGNED, ScheduleStatus.RESTARTING ScheduleStatus.RUNNING, ScheduleStatus.STARTING, + ScheduleStatus.PARTITIONED, ScheduleStatus.THROTTLED] // States that a task may be in while associated with a slave machine and non-terminal. @@ -424,6 +436,7 @@ const set<ScheduleStatus> SLAVE_ASSIGNED_STATES = [ScheduleStatus.ASSIGNED, ScheduleStatus.PREEMPTING, ScheduleStatus.RESTARTING, ScheduleStatus.RUNNING, + ScheduleStatus.PARTITIONED, ScheduleStatus.STARTING] // States that a task may be in while in an active sandbox. @@ -431,6 +444,7 @@ const set<ScheduleStatus> LIVE_STATES = [ScheduleStatus.KILLING, ScheduleStatus.PREEMPTING, ScheduleStatus.RESTARTING, ScheduleStatus.DRAINING, + ScheduleStatus.PARTITIONED, ScheduleStatus.RUNNING] // States a completed task may be in. @@ -499,6 +513,11 @@ struct ScheduledTask { * this task. */ 3: i32 failureCount + /** + * The number of partitions this task has accumulated over its lifetime. + */ + 6: i32 timesPartitioned + /** State change history for this task. */ 4: list<TaskEvent> taskEvents /** http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/docs/reference/configuration.md ---------------------------------------------------------------------- diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index f647bcf..67d9914 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -356,6 +356,7 @@ Job Schema ```tier``` | String | Task tier type. The default scheduler tier configuration allows for 3 tiers: `revocable`, `preemptible`, and `preferred`. If a tier is not elected, Aurora assigns the task to a tier based on its choice of `production` (that is `preferred` for production and `preemptible` for non-production jobs). See the section on [Configuration Tiers](../features/multitenancy.md#configuration-tiers) for more information. ```announce``` | ```Announcer``` object | Optionally enable Zookeeper ServerSet announcements. See [Announcer Objects] for more information. ```enable_hooks``` | Boolean | Whether to enable [Client Hooks](client-hooks.md) for this job. (Default: False) + ```partition_policy``` | ```PartitionPolicy``` object | An optional partition policy that allows job owners to define how to handle partitions for running tasks (in partition-aware Aurora clusters) ### UpdateConfig Objects @@ -403,6 +404,12 @@ Parameters for controlling a task's health checks via HTTP or a shell command. | ------- | :-------: | -------- | ```shell_command``` | String | An alternative to HTTP health checking. Specifies a shell command that will be executed. Any non-zero exit status will be interpreted as a health check failure. +### PartitionPolicy Objects +| param | type | description +| ------- | :-------: | -------- +| ```reschedule``` | Boolean | Whether or not to reschedule when running tasks become partitioned (Default: True) +| ```delay_secs``` | Integer | How long to delay transitioning to LOST when running tasks are partitioned. (Default: 0) + ### Announcer Objects http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/docs/reference/scheduler-configuration.md ---------------------------------------------------------------------- diff --git a/docs/reference/scheduler-configuration.md b/docs/reference/scheduler-configuration.md index d17541f..6c385b5 100644 --- a/docs/reference/scheduler-configuration.md +++ b/docs/reference/scheduler-configuration.md @@ -176,6 +176,8 @@ Optional flags: Maximum amount of random jitter to add to the offer hold time window. -offer_reservation_duration (default (3, mins)) Time to reserve a agent's offers while trying to satisfy a task preempting another. +-partition_aware (default false) + Whether or not to integrate with the partition-aware Mesos capabilities. -populate_discovery_info (default false) If true, Aurora populates DiscoveryInfo field of Mesos TaskInfo. -preemption_delay (default (3, mins)) http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/docs/reference/task-lifecycle.md ---------------------------------------------------------------------- diff --git a/docs/reference/task-lifecycle.md b/docs/reference/task-lifecycle.md index 8ec0077..b4642b3 100644 --- a/docs/reference/task-lifecycle.md +++ b/docs/reference/task-lifecycle.md @@ -105,6 +105,31 @@ agent go into `LOST` state and new `Task`s are created in their place. From `PENDING` state, there is no guarantee a `Task` will be reassigned to the same machine unless job constraints explicitly force it there. + +## RUNNING to PARTITIONED states +If Aurora is configured to enable partition awareness, a task which is in a +running state can transition to `PARTITIONED`. This happens when the state +of the task in Mesos becomes unknown. By default Aurora errs on the side of +availability, so all tasks that transition to `PARTITIONED` are immediately +transitioned to `LOST`. + +This policy is not ideal for all types of workloads you may wish to run in +your Aurora cluster, e.g. for jobs where task failures are very expensive. +So job owners may set their own `PartitionPolicy` where they can control +how long to remain in `PARTITIONED` before transitioning to `LOST`. Or they +can disable any automatic attempts to `reschedule` when in `PARTITIONED`, +effectively waiting out the partition for as long as possible. + + +## PARTITIONED and transient states +The `PartitionPolicy` provided by users only applies to tasks which are +currently running. When tasks are moving in and out of transient states, +e.g. tasks being updated, restarted, preempted, etc., `PARTITIONED` tasks +are moved immediately to `LOST`. This prevents situations where system +or user-initiated actions are blocked indefinitely waiting for partitions +to resolve (that may never be resolved). + + ### Giving Priority to Production Tasks: PREEMPTING Sometimes a Task needs to be interrupted, such as when a non-production http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/examples/vagrant/upstart/aurora-scheduler.conf ---------------------------------------------------------------------- diff --git a/examples/vagrant/upstart/aurora-scheduler.conf b/examples/vagrant/upstart/aurora-scheduler.conf index 5ca3cae..dbbe1d1 100644 --- a/examples/vagrant/upstart/aurora-scheduler.conf +++ b/examples/vagrant/upstart/aurora-scheduler.conf @@ -50,6 +50,7 @@ exec bin/aurora-scheduler \ -populate_discovery_info=true \ -receive_revocable_resources=true \ -enable_revocable_ram=true \ + -partition_aware=true \ -allow_gpu_resource=true \ -allow_container_volumes=true \ -offer_filter_duration=0secs \ http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/base/Conversions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/base/Conversions.java b/src/main/java/org/apache/aurora/scheduler/base/Conversions.java index 33cc012..801dba6 100644 --- a/src/main/java/org/apache/aurora/scheduler/base/Conversions.java +++ b/src/main/java/org/apache/aurora/scheduler/base/Conversions.java @@ -68,13 +68,11 @@ public final class Conversions { .put(TaskState.TASK_KILLED, ScheduleStatus.KILLED) .put(TaskState.TASK_LOST, ScheduleStatus.LOST) .put(TaskState.TASK_ERROR, ScheduleStatus.LOST) - // Task states send to partition-aware Mesos frameworks. Aurora does not advertise the - // PARTITION_AWARE capability yet (AURORA-1814). We still map the task states to be safe. - .put(TaskState.TASK_UNREACHABLE, ScheduleStatus.LOST) .put(TaskState.TASK_DROPPED, ScheduleStatus.LOST) .put(TaskState.TASK_GONE, ScheduleStatus.LOST) .put(TaskState.TASK_GONE_BY_OPERATOR, ScheduleStatus.LOST) - .put(TaskState.TASK_UNKNOWN, ScheduleStatus.LOST) + .put(TaskState.TASK_UNREACHABLE, ScheduleStatus.PARTITIONED) + .put(TaskState.TASK_UNKNOWN, ScheduleStatus.PARTITIONED) .build(); /** http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/base/Jobs.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/base/Jobs.java b/src/main/java/org/apache/aurora/scheduler/base/Jobs.java index 8d5f4e5..27bee65 100644 --- a/src/main/java/org/apache/aurora/scheduler/base/Jobs.java +++ b/src/main/java/org/apache/aurora/scheduler/base/Jobs.java @@ -65,6 +65,7 @@ public final class Jobs { case RUNNING: case KILLING: case DRAINING: + case PARTITIONED: case PREEMPTING: stats.setActiveTaskCount(stats.getActiveTaskCount() + 1); break; http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java index 7c223ea..dd64a5b 100644 --- a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java +++ b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java @@ -31,6 +31,7 @@ import org.apache.aurora.gen.Identity; import org.apache.aurora.gen.LimitConstraint; import org.apache.aurora.gen.MesosFetcherURI; import org.apache.aurora.gen.Metadata; +import org.apache.aurora.gen.PartitionPolicy; import org.apache.aurora.gen.Resource; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; @@ -122,6 +123,7 @@ public final class TaskTestUtil { .setMaxTaskFailures(-1) .setProduction(true) .setTier(PROD_TIER_NAME) + .setPartitionPolicy(new PartitionPolicy().setDelaySecs(5).setReschedule(true)) .setConstraints(ImmutableSet.of( new Constraint( "valueConstraint", @@ -198,6 +200,7 @@ public final class TaskTestUtil { .setScheduler("scheduler2"))) .setAncestorId("ancestor") .setFailureCount(3) + .setTimesPartitioned(2) .setAssignedTask(assignedTask)); } http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java index 5e83b2a..bcfb888 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java @@ -97,6 +97,11 @@ public class CommandLineDriverSettingsModule extends AbstractModule { arity = 1) public boolean receiveRevocableResources = false; + @Parameter(names = "-partition_aware", + description = "Enable paritition-aware status updates.", + arity = 1) + public boolean isPartitionAware = false; + @Parameter(names = "-mesos_role", description = "The Mesos role this framework will register as. The default is to left this empty, " @@ -132,6 +137,7 @@ public class CommandLineDriverSettingsModule extends AbstractModule { options.frameworkFailoverTimeout, options.receiveRevocableResources, allowGpuResource, + options.isPartitionAware, role); bind(FrameworkInfo.class) .annotatedWith(FrameworkInfoFactory.FrameworkInfoFactoryImpl.BaseFrameworkInfo.class) @@ -176,6 +182,7 @@ public class CommandLineDriverSettingsModule extends AbstractModule { Amount<Long, Time> failoverTimeout, boolean revocable, boolean allowGpu, + boolean enablePartitionAwareness, Optional<String> role) { FrameworkInfo.Builder infoBuilder = FrameworkInfo.newBuilder() @@ -196,6 +203,11 @@ public class CommandLineDriverSettingsModule extends AbstractModule { infoBuilder.addCapabilities(Capability.newBuilder().setType(GPU_RESOURCES)); } + if (enablePartitionAwareness) { + infoBuilder.addCapabilities( + Capability.newBuilder().setType(Capability.Type.PARTITION_AWARE).build()); + } + if (role.isPresent()) { infoBuilder.setRole(role.get()); } http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/state/PartitionManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/PartitionManager.java b/src/main/java/org/apache/aurora/scheduler/state/PartitionManager.java new file mode 100644 index 0000000..6d49c49 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/state/PartitionManager.java @@ -0,0 +1,117 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.state; + +import java.time.Duration; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.eventbus.Subscribe; +import com.google.inject.Inject; + +import org.apache.aurora.common.util.Clock; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.scheduler.base.AsyncUtil; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +/** + * Subscribes to transitions into PARTITIONED task state and, if applicable, sets a timer to + * move the task to LOST. + */ +public class PartitionManager implements PubsubEvent.EventSubscriber { + private static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class); + + private final ScheduledExecutorService executor; + private final StateManager stateManager; + private final Storage storage; + private final Clock clock; + + @VisibleForTesting + static final String TRANSITION_MESSAGE = "Task partitioned for too long."; + + @VisibleForTesting + PartitionManager( + Storage storage, + StateManager stateManager, + Clock clock, + ScheduledExecutorService executor) { + + this.executor = requireNonNull(executor); + this.stateManager = requireNonNull(stateManager); + this.storage = requireNonNull(storage); + this.clock = requireNonNull(clock); + } + + @Inject + PartitionManager(Storage storage, StateManager stateManager, Clock clock) { + this( + storage, + stateManager, + clock, + AsyncUtil.singleThreadLoggingScheduledExecutor("PartitionManager", LOG)); + } + + private long getLastTransitionSecsAgo(IScheduledTask task) { + return Duration.ofMillis( + clock.nowMillis() - Tasks.getLatestEvent(task).getTimestamp()).getSeconds(); + } + + /** + * Schedules a delayed task to move tasks from PARTITIONED -> LOST based on task partition policy. + * @param stateChange The state change event. + */ + @Subscribe + public void handle(TaskStateChange stateChange) { + ITaskConfig config = stateChange.getTask().getAssignedTask().getTask(); + String taskId = Tasks.id(stateChange.getTask()); + // Partition Policy can be null, in which case its equivalent to reschedule with 0s delay. + if (stateChange.getNewState().equals(ScheduleStatus.PARTITIONED) + && (!config.isSetPartitionPolicy() || config.getPartitionPolicy().isReschedule())) { + long delay = config.isSetPartitionPolicy() ? config.getPartitionPolicy().getDelaySecs() : 0; + // We're recovering from a failover, so modify the delay based on last event time. + if (!stateChange.isTransition()) { + delay = Math.max(0, delay - getLastTransitionSecsAgo(stateChange.getTask())); + } + LOG.info("Partitioned task {} will be rescheduled in {} secs", taskId, delay); + // Use the timestamp to verify the task state hasn't changed when we execute after the delay. + long lastTimestamp = Tasks.getLatestEvent(stateChange.getTask()).getTimestamp(); + executor.schedule(() -> storage.write((Quiet) storeProvider -> { + Optional<IScheduledTask> maybeTask = storeProvider.getTaskStore().fetchTask(taskId); + if (maybeTask.isPresent() + && Tasks.getLatestEvent(maybeTask.get()).getTimestamp() == lastTimestamp) { + stateManager.changeState( + storeProvider, + stateChange.getTask().getAssignedTask().getTaskId(), + Optional.of(ScheduleStatus.PARTITIONED), + ScheduleStatus.LOST, + Optional.of(TRANSITION_MESSAGE)); + } + }), delay, TimeUnit.SECONDS); + } else if (stateChange.getNewState().equals(ScheduleStatus.PARTITIONED)) { + LOG.info("Partitioned task {} will not be rescheduled.", taskId); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java b/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java index b91a085..2aa500a 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java +++ b/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java @@ -68,6 +68,11 @@ class SideEffect { KILL, /** + * Transition a task to LOST. + */ + TRANSITION_TO_LOST, + + /** * Create a new state machine with a copy of this task. */ RESCHEDULE, http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java index 9989ed4..3ba676b 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java @@ -232,6 +232,7 @@ public class StateManagerImpl implements StateManager { Action.INCREMENT_FAILURES, Action.SAVE_STATE, Action.RESCHEDULE, + Action.TRANSITION_TO_LOST, Action.KILL, Action.DELETE); @@ -289,6 +290,12 @@ public class StateManagerImpl implements StateManager { Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, task1 -> { ScheduledTask mutableTask = task1.newBuilder(); mutableTask.setStatus(targetState.get()); + if (targetState.get() == ScheduleStatus.PARTITIONED) { + mutableTask.setTimesPartitioned(mutableTask.getTimesPartitioned() + 1); + // If we're moving to partitioned state, remove any existing partition transitions + // in order to prevent the event history growing unbounded. + mutableTask.setTaskEvents(compactPartitionEvents(mutableTask.getTaskEvents())); + } mutableTask.addToTaskEvents(new TaskEvent() .setTimestamp(clock.nowMillis()) .setStatus(targetState.get()) @@ -299,6 +306,15 @@ public class StateManagerImpl implements StateManager { events.add(TaskStateChange.transition(mutated.get(), stateMachine.getPreviousState())); break; + case TRANSITION_TO_LOST: + updateTaskAndExternalState( + taskStore, + Optional.absent(), + taskId, + ScheduleStatus.LOST, + Optional.of("Action performed on partitioned task, marking as LOST.")); + break; + case RESCHEDULE: Preconditions.checkState( upToDateTask.isPresent(), @@ -366,6 +382,32 @@ public class StateManagerImpl implements StateManager { return result.getResult(); } + /* + * Compact cyclical transitions into PARTITIONED into a single event in order to place an upper + * bound on the number of task events for a task. We do not want to lose unique transitions. + * So consider the following history: + * + * ... -> RUNNING -> PARTITIONED -> RUNNING -> PARTITIONED -> RUNNING -> PARTITIONED + * + * We'd want to compact this into a single RUNNING -> PARTITIONED where RUNNING is the first + * occurence of RUNNING. But consider another example: + * + * ... -> RUNNING -> PARTITIONED -> RUNNING -> DRAINING -> PARTITIONED + * + * In this case, there is no compaction to be done because there is no cycle. + */ + private List<TaskEvent> compactPartitionEvents(List<TaskEvent> taskEvents) { + int size = taskEvents.size(); + // We only compact as we're transitioning into PARTITIONED. So cycles happen when the second + // last event is PARTITIONED and the last and third last statuses are the same. + if (size >= 3 && taskEvents.get(size - 2).getStatus().equals(ScheduleStatus.PARTITIONED) + && taskEvents.get(size - 3).getStatus().equals(taskEvents.get(size - 1).getStatus())) { + // Drop the last two elements off taskEvents. + return taskEvents.subList(0, size - 2); + } + return taskEvents; + } + @Override public void deleteTasks(MutableStoreProvider storeProvider, final Set<String> taskIds) { TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore(); http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/state/StateModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java index c03fff1..b7a3c0b 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java +++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java @@ -64,6 +64,8 @@ public class StateModule extends AbstractModule { bind(UUIDGenerator.class).to(UUIDGeneratorImpl.class); bind(UUIDGeneratorImpl.class).in(Singleton.class); + PubsubEventModule.bindSubscriber(binder(), PartitionManager.class); + bindMaintenanceController(binder()); } http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java index eb4fe7d..914d401 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java +++ b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java @@ -49,6 +49,7 @@ import static org.apache.aurora.scheduler.state.SideEffect.Action.INCREMENT_FAIL import static org.apache.aurora.scheduler.state.SideEffect.Action.KILL; import static org.apache.aurora.scheduler.state.SideEffect.Action.RESCHEDULE; import static org.apache.aurora.scheduler.state.SideEffect.Action.SAVE_STATE; +import static org.apache.aurora.scheduler.state.SideEffect.Action.TRANSITION_TO_LOST; import static org.apache.aurora.scheduler.state.StateChangeResult.ILLEGAL; import static org.apache.aurora.scheduler.state.StateChangeResult.ILLEGAL_WITH_SIDE_EFFECTS; import static org.apache.aurora.scheduler.state.StateChangeResult.NOOP; @@ -62,6 +63,7 @@ import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.INIT; import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.KILLED; import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.KILLING; import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.LOST; +import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.PARTITIONED; import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.PENDING; import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.PREEMPTING; import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.RESTARTING; @@ -117,6 +119,7 @@ class TaskStateMachine { KILLED(Optional.of(ScheduleStatus.KILLED)), KILLING(Optional.of(ScheduleStatus.KILLING)), LOST(Optional.of(ScheduleStatus.LOST)), + PARTITIONED(Optional.of(ScheduleStatus.PARTITIONED)), /** * The task does not have an associated state as it has been deleted from the store. */ @@ -170,6 +173,19 @@ class TaskStateMachine { Consumer<Transition<TaskState>> manageTerminatedTasks = Consumers.combine( ImmutableList.<Consumer<Transition<TaskState>>>builder() + .add(transition -> { + // The transition from KILLING -> PARTITIONED needs to be handled separately, in + // order to mark the task as terminal and unblock any operations that depend on + // kills (e.g. job updates). Just sending a KILL signal alone is insufficient, + // as any partitioned task will be on an agent that is unreachable. + if (transition.getTo() == PARTITIONED) { + if (transition.getFrom() == KILLING) { + addFollowup(TRANSITION_TO_LOST); + } else { + addFollowup(KILL); + } + } + }) // Kill a task that we believe to be terminated when an attempt is made to revive. .add( Consumers.filter(Transition.to(ASSIGNED, STARTING, RUNNING), @@ -193,6 +209,14 @@ class TaskStateMachine { addFollowup(KILL); break; + case PARTITIONED: + // When a task becomes partitioned during a user or operator-initiated action, we + // bypass their partition policy and immediately transition to lost. This is to + // prevent a situation where a task becoming partitioned could indefinitely block + // machine maintenance, preemption or a job restart. + addFollowup(TRANSITION_TO_LOST); + break; + case LOST: addFollowup(KILL); addFollowup(RESCHEDULE); @@ -261,9 +285,34 @@ class TaskStateMachine { .to(PENDING, KILLING) .withCallback(deleteIfKilling)) .addState( + Rule.from(PARTITIONED) + .to(LOST, FAILED, FINISHED, RUNNING, ASSIGNED, STARTING, RESTARTING, DRAINING, + PREEMPTING, KILLING) + .withCallback( + transition -> { + switch (transition.getTo()) { + case LOST: + addFollowup(KILL); + addFollowup(RESCHEDULE); + break; + case KILLING: + case RESTARTING: + case DRAINING: + case PREEMPTING: + addFollowup(TRANSITION_TO_LOST); + break; + case FAILED: + incrementFailuresMaybeReschedule.execute(); + break; + default: + // No-op + } + } + )) + .addState( Rule.from(ASSIGNED) .to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, DRAINING, - KILLED, KILLING, LOST, PREEMPTING) + KILLED, KILLING, LOST, PREEMPTING, PARTITIONED) .withCallback( transition -> { switch (transition.getTo()) { @@ -308,7 +357,7 @@ class TaskStateMachine { .addState( Rule.from(STARTING) .to(RUNNING, FINISHED, FAILED, RESTARTING, DRAINING, KILLING, - KILLED, LOST, PREEMPTING) + KILLED, LOST, PREEMPTING, PARTITIONED) .withCallback( transition -> { switch (transition.getTo()) { @@ -351,7 +400,8 @@ class TaskStateMachine { )) .addState( Rule.from(RUNNING) - .to(FINISHED, RESTARTING, DRAINING, FAILED, KILLING, KILLED, LOST, PREEMPTING) + .to(FINISHED, RESTARTING, DRAINING, FAILED, KILLING, KILLED, LOST, + PREEMPTING, PARTITIONED) .withCallback( transition -> { switch (transition.getTo()) { @@ -398,15 +448,15 @@ class TaskStateMachine { .withCallback(manageTerminatedTasks)) .addState( Rule.from(PREEMPTING) - .to(FINISHED, FAILED, KILLING, KILLED, LOST) + .to(FINISHED, FAILED, KILLING, KILLED, LOST, PARTITIONED) .withCallback(manageRestartingTask)) .addState( Rule.from(RESTARTING) - .to(FINISHED, FAILED, KILLING, KILLED, LOST) + .to(FINISHED, FAILED, KILLING, KILLED, LOST, PARTITIONED) .withCallback(manageRestartingTask)) .addState( Rule.from(DRAINING) - .to(FINISHED, FAILED, KILLING, KILLED, LOST) + .to(FINISHED, FAILED, KILLING, KILLED, LOST, PARTITIONED) .withCallback(manageRestartingTask)) .addState( Rule.from(FAILED) @@ -419,7 +469,7 @@ class TaskStateMachine { // TODO(maxim): Re-evaluate if *DELETED states are valid transitions here. .addState( Rule.from(KILLING) - .to(FINISHED, FAILED, KILLED, LOST, DELETED) + .to(FINISHED, FAILED, KILLED, LOST, DELETED, PARTITIONED) .withCallback(manageTerminatedTasks)) .addState( Rule.from(LOST) @@ -505,7 +555,6 @@ class TaskStateMachine { if (stateMachine.getState() == taskState) { return new TransitionResult(NOOP, ImmutableSet.of()); } - boolean success = stateMachine.transition(taskState); ImmutableSet<SideEffect> transitionEffects = ImmutableSet.copyOf(sideEffects); sideEffects.clear(); http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/python/apache/aurora/client/cli/jobs.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/client/cli/jobs.py b/src/main/python/apache/aurora/client/cli/jobs.py index b79ae56..cbae387 100644 --- a/src/main/python/apache/aurora/client/cli/jobs.py +++ b/src/main/python/apache/aurora/client/cli/jobs.py @@ -254,6 +254,11 @@ class InspectCommand(Verb): context.print_out("constraints:", indent=2) for constraint, value in job.constraints().get().items(): context.print_out("'%s': '%s'" % (constraint, value), indent=4) + if job.has_partition_policy(): + context.print_out("partition_policy:", indent=2) + context.print_out("reschedule: %s" % job.partition_policy().reschedule(), indent=4) + context.print_out("delay_secs: '%s'" % job.partition_policy().delay_secs(), indent=4) + context.print_out("service: %s" % job_thrift.taskConfig.isService, indent=2) context.print_out("production: %s" % bool(job.production().get()), indent=2) context.print_out("") http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/python/apache/aurora/config/schema/base.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/config/schema/base.py b/src/main/python/apache/aurora/config/schema/base.py index 18ce826..a466e78 100644 --- a/src/main/python/apache/aurora/config/schema/base.py +++ b/src/main/python/apache/aurora/config/schema/base.py @@ -154,6 +154,11 @@ class Container(Struct): docker = Docker +class PartitionPolicy(Struct): + reschedule = Default(Boolean, True) + delay_secs = Default(Integer, 0) + + class MesosJob(Struct): name = Default(String, '{{task.name}}') role = Required(String) @@ -182,6 +187,8 @@ class MesosJob(Struct): enable_hooks = Default(Boolean, False) # enable client API hooks; from env python-list 'hooks' + partition_policy = PartitionPolicy + # Specifying a `Container` with a `docker` property for Docker jobs is deprecated, instead just # specify the value of the container property to be a `Docker` container directly. container = Choice([Container, Docker, Mesos]) http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/python/apache/aurora/config/thrift.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/config/thrift.py b/src/main/python/apache/aurora/config/thrift.py index bedf8cd..eb00144 100644 --- a/src/main/python/apache/aurora/config/thrift.py +++ b/src/main/python/apache/aurora/config/thrift.py @@ -48,6 +48,7 @@ from gen.apache.aurora.api.ttypes import ( MesosContainer, Metadata, Mode, + PartitionPolicy, Resource, TaskConfig, TaskConstraint, @@ -268,6 +269,11 @@ def convert(job, metadata=frozenset(), ports=frozenset()): task.contactEmail = not_empty_or(job.contact(), None) task.tier = not_empty_or(job.tier(), None) + if job.has_partition_policy(): + task.partitionPolicy = PartitionPolicy( + fully_interpolated(job.partition_policy().reschedule()), + fully_interpolated(job.partition_policy().delay_secs())) + # Add metadata to a task, to display in the scheduler UI. task.metadata = frozenset(Metadata(key=str(key), value=str(value)) for key, value in metadata) http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/java/org/apache/aurora/scheduler/base/JobsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/base/JobsTest.java b/src/test/java/org/apache/aurora/scheduler/base/JobsTest.java index 13f656f..92bf4f4 100644 --- a/src/test/java/org/apache/aurora/scheduler/base/JobsTest.java +++ b/src/test/java/org/apache/aurora/scheduler/base/JobsTest.java @@ -38,7 +38,7 @@ public class JobsTest { addStateTransition(makeTask("id", TaskTestUtil.JOB), status, 100L)).toList(); IJobStats expectedStats = IJobStats.build(new JobStats() - .setActiveTaskCount(7) + .setActiveTaskCount(8) .setFailedTaskCount(2) .setFinishedTaskCount(2) .setPendingTaskCount(3)); http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java index c2d875b..0ec4de6 100644 --- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java +++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java @@ -191,6 +191,7 @@ public class CommandLineTest { expected.driver.executorUser = "testing"; expected.driver.receiveRevocableResources = true; expected.driver.mesosRole = "testing"; + expected.driver.isPartitionAware = true; expected.jetty.hostnameOverride = "testing"; expected.jetty.httpPort = 42; expected.jetty.listenIp = "testing"; @@ -383,7 +384,8 @@ public class CommandLineTest { "-cron_start_max_backoff=42days", "-cron_scheduling_max_batch_size=42", "-enable_revocable_cpus=false", - "-enable_revocable_ram=true" + "-enable_revocable_ram=true", + "-partition_aware=true" ); assertEqualOptions(expected, parsed); } http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java index 7b04291..a86184c 100644 --- a/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java @@ -29,6 +29,7 @@ import org.junit.Test; import static org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule.Options.PRINCIPAL_KEY; import static org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule.Options.SECRET_KEY; import static org.apache.mesos.v1.Protos.FrameworkInfo.Capability.Type.GPU_RESOURCES; +import static org.apache.mesos.v1.Protos.FrameworkInfo.Capability.Type.PARTITION_AWARE; import static org.apache.mesos.v1.Protos.FrameworkInfo.Capability.Type.REVOCABLE_RESOURCES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -82,6 +83,7 @@ public class CommandLineDriverSettingsModuleTest { Amount.of(1L, Time.MINUTES), false, // revocable false, // allow gpu + false, // partition aware Optional.absent()); assertEquals("", info.getPrincipal()); assertEquals(0, info.getCapabilitiesCount()); @@ -97,6 +99,7 @@ public class CommandLineDriverSettingsModuleTest { Amount.of(1L, Time.MINUTES), true, // revocable false, // allow gpu + false, // partition aware Optional.absent()); assertEquals("", info.getPrincipal()); assertEquals(1, info.getCapabilitiesCount()); @@ -113,6 +116,7 @@ public class CommandLineDriverSettingsModuleTest { Amount.of(1L, Time.MINUTES), false, // revocable true, // allow gpu + false, // partition aware Optional.absent()); assertEquals("", info.getPrincipal()); assertEquals(1, info.getCapabilitiesCount()); @@ -121,6 +125,23 @@ public class CommandLineDriverSettingsModuleTest { } @Test + public void testFrameworkInfoPartitionAware() { + Protos.FrameworkInfo info = CommandLineDriverSettingsModule.buildFrameworkInfo( + "aurora", + "user", + Optional.absent(), + Amount.of(1L, Time.MINUTES), + false, // revocable + false, // allow gpu + true, // partition aware + Optional.absent()); + assertEquals("", info.getPrincipal()); + assertEquals(1, info.getCapabilitiesCount()); + assertEquals(PARTITION_AWARE, info.getCapabilities(0).getType()); + assertFalse(info.hasRole()); + } + + @Test public void testFrameworkInfoNoRevocableWithAnnouncedPrincipal() { Protos.FrameworkInfo info = CommandLineDriverSettingsModule.buildFrameworkInfo( "aurora", @@ -129,6 +150,7 @@ public class CommandLineDriverSettingsModuleTest { Amount.of(1L, Time.MINUTES), false, // revocable false, // allow gpu + false, // partition aware Optional.absent()); assertEquals("auroraprincipal", info.getPrincipal()); assertEquals(0, info.getCapabilitiesCount()); @@ -144,6 +166,7 @@ public class CommandLineDriverSettingsModuleTest { Amount.of(1L, Time.MINUTES), true, // revocable true, // allow gpu + false, // partition aware Optional.of(TEST_ROLE)); assertEquals("auroraprincipal", info.getPrincipal()); assertEquals(2, info.getCapabilitiesCount()); http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/java/org/apache/aurora/scheduler/state/PartitionManagerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/PartitionManagerTest.java b/src/test/java/org/apache/aurora/scheduler/state/PartitionManagerTest.java new file mode 100644 index 0000000..8f654db --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/state/PartitionManagerTest.java @@ -0,0 +1,177 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.state; + +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Optional; + +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.common.util.testing.FakeClock; +import org.apache.aurora.gen.PartitionPolicy; +import org.apache.aurora.gen.ScheduleStatus; +import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; +import org.easymock.Capture; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.ScheduleStatus.PARTITIONED; +import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; + +public class PartitionManagerTest extends EasyMockTest { + private final FakeClock clock = new FakeClock(); + private PartitionManager partitionManager; + private StateManager stateManager; + private ScheduledExecutorService executor; + + private static final IScheduledTask BASE_TASK = TaskTestUtil.makeTask("test", TaskTestUtil.JOB); + private StorageTestUtil storageUtil; + + @Before + public void setUp() { + storageUtil = new StorageTestUtil(this); + stateManager = createMock(StateManager.class); + executor = createMock(ScheduledExecutorService.class); + partitionManager = new PartitionManager(storageUtil.storage, stateManager, clock, executor); + } + + private static TaskStateChange makeStateChange( + IScheduledTask task, + Optional<ScheduleStatus> oldState, + ScheduleStatus newState) { + + IScheduledTask newTask = IScheduledTask.build(task.newBuilder().setStatus(newState)); + if (oldState.isPresent()) { + return TaskStateChange.transition(newTask, oldState.get()); + } + return TaskStateChange.initialized(newTask); + } + + private static TaskStateChange makeStateChange( + IScheduledTask task, + ScheduleStatus oldState, + ScheduleStatus newState) { + + return makeStateChange(task, Optional.of(oldState), newState); + } + + private static TaskStateChange makeStateChange(IScheduledTask task, ScheduleStatus newState) { + return makeStateChange(task, Optional.absent(), newState); + } + + private static IScheduledTask taskWithoutPartitionPolicy() { + return taskWithPartitionPolicy(null); + } + + private static IScheduledTask taskWithPartitionPolicy(PartitionPolicy partitionPolicy) { + return IScheduledTask.build(BASE_TASK.newBuilder() + .setAssignedTask(BASE_TASK.getAssignedTask().newBuilder().setTask( + BASE_TASK.getAssignedTask().getTask().newBuilder().setPartitionPolicy(partitionPolicy)) + )); + } + + private Capture<Callable<Void>> expectExecuteWithDelay(long seconds) { + Capture<Callable<Void>> callableCapture = createCapture(); + expect( + executor.schedule(capture(callableCapture), eq(seconds), eq(TimeUnit.SECONDS)) + ).andReturn(createMock(new Clazz<ScheduledFuture<Void>>() { })); + return callableCapture; + } + + @Test + public void testNonPartitionedTransition() { + control.replay(); + partitionManager.handle(makeStateChange(taskWithoutPartitionPolicy(), RUNNING)); + } + + @Test + public void testPartitionPolicyNoReschedule() { + control.replay(); + partitionManager.handle(makeStateChange( + taskWithPartitionPolicy(new PartitionPolicy().setReschedule(false)), + RUNNING, + PARTITIONED)); + } + + @Test + public void testNoPartitionPolicy() { + expectExecuteWithDelay(0); + control.replay(); + partitionManager.handle(makeStateChange(taskWithoutPartitionPolicy(), RUNNING, PARTITIONED)); + } + + @Test + public void testPartitionPolicyWithDelay() { + expectExecuteWithDelay(10); + control.replay(); + partitionManager.handle(makeStateChange( + taskWithPartitionPolicy(new PartitionPolicy().setReschedule(true).setDelaySecs(10)), + RUNNING, + PARTITIONED)); + } + + @Test + public void testDelayAfterFailover() { + IScheduledTask failoverTask = TaskTestUtil.addStateTransition( + taskWithPartitionPolicy(new PartitionPolicy().setReschedule(true).setDelaySecs(10)), + PARTITIONED, + 0); + clock.setNowMillis(5000); + expectExecuteWithDelay(5); + control.replay(); + partitionManager.handle(makeStateChange( + failoverTask, + PARTITIONED)); + } + + @Test + public void testStateChangeToLostNoModifications() throws Exception { + IScheduledTask task = taskWithoutPartitionPolicy(); + Capture<Callable<Void>> stateChange = expectExecuteWithDelay(0); + storageUtil.expectOperations(); + storageUtil.expectTaskFetch(Tasks.id(task), task); + expect(stateManager.changeState( + storageUtil.mutableStoreProvider, + Tasks.id(task), + Optional.of(PARTITIONED), + ScheduleStatus.LOST, + Optional.of(PartitionManager.TRANSITION_MESSAGE))).andReturn(StateChangeResult.SUCCESS); + control.replay(); + partitionManager.handle(makeStateChange(task, RUNNING, PARTITIONED)); + stateChange.getValue().call(); + } + + @Test + public void testNoStateChangeAfterTaskTransition() throws Exception { + IScheduledTask task = taskWithoutPartitionPolicy(); + Capture<Callable<Void>> stateChange = expectExecuteWithDelay(0); + storageUtil.expectOperations(); + storageUtil.expectTaskFetch( + Tasks.id(task), + TaskTestUtil.addStateTransition(task, RUNNING, 10000)); + control.replay(); + partitionManager.handle(makeStateChange(task, RUNNING, PARTITIONED)); + stateChange.getValue().call(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java index 0366cd6..4cff10b 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java @@ -70,8 +70,11 @@ import static org.apache.aurora.gen.ScheduleStatus.INIT; import static org.apache.aurora.gen.ScheduleStatus.KILLED; import static org.apache.aurora.gen.ScheduleStatus.KILLING; import static org.apache.aurora.gen.ScheduleStatus.LOST; +import static org.apache.aurora.gen.ScheduleStatus.PARTITIONED; import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.gen.ScheduleStatus.RESTARTING; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; +import static org.apache.aurora.gen.ScheduleStatus.STARTING; import static org.apache.aurora.gen.ScheduleStatus.THROTTLED; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.resetPorts; import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; @@ -354,6 +357,84 @@ public class StateManagerImplTest extends EasyMockTest { assertEquals(1, rescheduledTask.getFailureCount()); } + @Test + public void testIncrementPartitionCount() { + String taskId = "a"; + expect(taskIdGenerator.generate(SERVICE_CONFIG, 0)).andReturn(taskId); + expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, PARTITIONED); + control.replay(); + + insertTask(SERVICE_CONFIG, 0); + + assignTask(taskId, HOST_A); + changeState(taskId, RUNNING); + changeState(taskId, PARTITIONED); + IScheduledTask updatedTask = Storage.Util.fetchTask(storage, taskId).get(); + assertEquals(1, updatedTask.getTimesPartitioned()); + } + + @Test + public void testTransitionToLost() { + String taskId = "a"; + expect(taskIdGenerator.generate(SERVICE_CONFIG, 0)).andReturn(taskId); + expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RESTARTING, PARTITIONED, LOST); + driver.killTask(EasyMock.anyObject()); + driver.killTask(EasyMock.anyObject()); + String taskId2 = "a2"; + expect(taskIdGenerator.generate(SERVICE_CONFIG, 0)).andReturn(taskId2); + noFlappingPenalty(); + expectStateTransitions(taskId2, INIT, PENDING); + + control.replay(); + + insertTask(SERVICE_CONFIG, 0); + + assignTask(taskId, HOST_A); + changeState(taskId, ASSIGNED); + changeState(taskId, RESTARTING); + changeState(taskId, PARTITIONED); + IScheduledTask updatedTask = Storage.Util.fetchTask(storage, taskId).get(); + assertEquals(LOST, updatedTask.getStatus()); + } + + @Test + public void testCompactPartitionCycles() { + String taskId = "a"; + expect(taskIdGenerator.generate(SERVICE_CONFIG, 0)).andReturn(taskId); + expectStateTransitions( + taskId, + INIT, + PENDING, + ASSIGNED, + STARTING, + PARTITIONED, + RUNNING, + PARTITIONED, + RUNNING, + PARTITIONED, + RUNNING, + PARTITIONED); + + control.replay(); + + insertTask(SERVICE_CONFIG, 0); + assignTask(taskId, HOST_A); + changeState(taskId, STARTING); + changeState(taskId, PARTITIONED); + changeState(taskId, RUNNING); + changeState(taskId, PARTITIONED); + changeState(taskId, RUNNING); + changeState(taskId, PARTITIONED); + changeState(taskId, RUNNING); + changeState(taskId, PARTITIONED); + IScheduledTask updatedTask = Storage.Util.fetchTask(storage, taskId).get(); + assertEquals( + ImmutableList.of(PENDING, ASSIGNED, STARTING, PARTITIONED, RUNNING, PARTITIONED), + updatedTask.getTaskEvents().stream() + .map(e -> e.getStatus()) + .collect(Collectors.toList())); + } + private static ITaskConfig setMaxFailures(ITaskConfig config, int maxFailures) { return ITaskConfig.build(config.newBuilder().setMaxTaskFailures(maxFailures)); } http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java index 8d6c3ff..bb4e752 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java @@ -49,6 +49,7 @@ import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.INIT; import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.KILLED; import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.KILLING; import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.LOST; +import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.PARTITIONED; import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.PENDING; import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.PREEMPTING; import static org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.RESTARTING; @@ -344,6 +345,10 @@ public class TaskStateMachineTest { private static final TransitionResult ILLEGAL_KILL = new TransitionResult( ILLEGAL_WITH_SIDE_EFFECTS, ImmutableSet.of(new SideEffect(Action.KILL, Optional.absent()))); + private static final TransitionResult TRANSITION_TO_LOST = new TransitionResult( + SUCCESS, + ImmutableSet.of(new SideEffect(Action.TRANSITION_TO_LOST, Optional.absent()), + new SideEffect(Action.SAVE_STATE, Optional.absent()))); private static final TransitionResult RECORD_FAILURE = new TransitionResult( SUCCESS, ImmutableSet.of( @@ -398,15 +403,18 @@ public class TaskStateMachineTest { .put(new TestCase(false, INIT, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(false, INIT, STARTING), ILLEGAL_KILL) .put(new TestCase(false, INIT, RUNNING), ILLEGAL_KILL) + .put(new TestCase(false, INIT, PARTITIONED), ILLEGAL_KILL) .put(new TestCase(true, THROTTLED, PENDING), SAVE) .put(new TestCase(true, THROTTLED, KILLING), DELETE_TASK) .put(new TestCase(false, THROTTLED, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(false, THROTTLED, STARTING), ILLEGAL_KILL) .put(new TestCase(false, THROTTLED, RUNNING), ILLEGAL_KILL) + .put(new TestCase(false, THROTTLED, PARTITIONED), ILLEGAL_KILL) .put(new TestCase(true, PENDING, ASSIGNED), SAVE) .put(new TestCase(false, PENDING, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(false, PENDING, STARTING), ILLEGAL_KILL) .put(new TestCase(false, PENDING, RUNNING), ILLEGAL_KILL) + .put(new TestCase(false, PENDING, PARTITIONED), ILLEGAL_KILL) .put(new TestCase(true, PENDING, KILLING), DELETE_TASK) .put(new TestCase(false, ASSIGNED, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(true, ASSIGNED, STARTING), SAVE) @@ -421,6 +429,8 @@ public class TaskStateMachineTest { .put(new TestCase(true, ASSIGNED, KILLED), SAVE_AND_RESCHEDULE) .put(new TestCase(true, ASSIGNED, KILLING), SAVE_AND_KILL) .put(new TestCase(true, ASSIGNED, LOST), SAVE_KILL_AND_RESCHEDULE) + .put(new TestCase(false, ASSIGNED, PARTITIONED), ILLEGAL_KILL) + .put(new TestCase(true, ASSIGNED, PARTITIONED), SAVE) .put(new TestCase(false, STARTING, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(false, STARTING, STARTING), ILLEGAL_KILL) .put(new TestCase(true, STARTING, RUNNING), SAVE) @@ -433,6 +443,8 @@ public class TaskStateMachineTest { .put(new TestCase(true, STARTING, KILLED), SAVE_AND_RESCHEDULE) .put(new TestCase(true, STARTING, KILLING), SAVE_AND_KILL) .put(new TestCase(true, STARTING, LOST), SAVE_AND_RESCHEDULE) + .put(new TestCase(false, STARTING, PARTITIONED), ILLEGAL_KILL) + .put(new TestCase(true, STARTING, PARTITIONED), SAVE) .put(new TestCase(false, RUNNING, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(false, RUNNING, STARTING), ILLEGAL_KILL) .put(new TestCase(false, RUNNING, RUNNING), ILLEGAL_KILL) @@ -444,6 +456,8 @@ public class TaskStateMachineTest { .put(new TestCase(true, RUNNING, KILLED), SAVE_AND_RESCHEDULE) .put(new TestCase(true, RUNNING, KILLING), SAVE_AND_KILL) .put(new TestCase(true, RUNNING, LOST), SAVE_AND_RESCHEDULE) + .put(new TestCase(false, RUNNING, PARTITIONED), ILLEGAL_KILL) + .put(new TestCase(true, RUNNING, PARTITIONED), SAVE) .put(new TestCase(true, FINISHED, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(false, FINISHED, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(true, FINISHED, STARTING), ILLEGAL_KILL) @@ -451,6 +465,8 @@ public class TaskStateMachineTest { .put(new TestCase(true, FINISHED, RUNNING), ILLEGAL_KILL) .put(new TestCase(false, FINISHED, RUNNING), ILLEGAL_KILL) .put(new TestCase(true, FINISHED, DELETED), DELETE_TASK) + .put(new TestCase(false, FINISHED, PARTITIONED), ILLEGAL_KILL) + .put(new TestCase(true, FINISHED, PARTITIONED), ILLEGAL_KILL) .put(new TestCase(true, PREEMPTING, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(false, PREEMPTING, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(true, PREEMPTING, STARTING), ILLEGAL_KILL) @@ -462,6 +478,8 @@ public class TaskStateMachineTest { .put(new TestCase(true, PREEMPTING, KILLED), SAVE_AND_RESCHEDULE) .put(new TestCase(true, PREEMPTING, KILLING), SAVE) .put(new TestCase(true, PREEMPTING, LOST), SAVE_KILL_AND_RESCHEDULE) + .put(new TestCase(false, PREEMPTING, PARTITIONED), ILLEGAL_KILL) + .put(new TestCase(true, PREEMPTING, PARTITIONED), TRANSITION_TO_LOST) .put(new TestCase(true, RESTARTING, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(false, RESTARTING, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(true, RESTARTING, STARTING), ILLEGAL_KILL) @@ -473,6 +491,8 @@ public class TaskStateMachineTest { .put(new TestCase(true, RESTARTING, KILLED), SAVE_AND_RESCHEDULE) .put(new TestCase(true, RESTARTING, KILLING), SAVE) .put(new TestCase(true, RESTARTING, LOST), SAVE_KILL_AND_RESCHEDULE) + .put(new TestCase(false, RESTARTING, PARTITIONED), ILLEGAL_KILL) + .put(new TestCase(true, RESTARTING, PARTITIONED), TRANSITION_TO_LOST) .put(new TestCase(true, DRAINING, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(false, DRAINING, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(true, DRAINING, STARTING), ILLEGAL_KILL) @@ -484,6 +504,8 @@ public class TaskStateMachineTest { .put(new TestCase(true, DRAINING, KILLED), SAVE_AND_RESCHEDULE) .put(new TestCase(true, DRAINING, KILLING), SAVE) .put(new TestCase(true, DRAINING, LOST), SAVE_KILL_AND_RESCHEDULE) + .put(new TestCase(false, DRAINING, PARTITIONED), ILLEGAL_KILL) + .put(new TestCase(true, DRAINING, PARTITIONED), TRANSITION_TO_LOST) .put(new TestCase(true, FAILED, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(false, FAILED, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(true, FAILED, STARTING), ILLEGAL_KILL) @@ -491,6 +513,8 @@ public class TaskStateMachineTest { .put(new TestCase(true, FAILED, RUNNING), ILLEGAL_KILL) .put(new TestCase(false, FAILED, RUNNING), ILLEGAL_KILL) .put(new TestCase(true, FAILED, DELETED), DELETE_TASK) + .put(new TestCase(false, FAILED, PARTITIONED), ILLEGAL_KILL) + .put(new TestCase(true, FAILED, PARTITIONED), ILLEGAL_KILL) .put(new TestCase(true, KILLED, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(false, KILLED, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(true, KILLED, STARTING), ILLEGAL_KILL) @@ -498,6 +522,8 @@ public class TaskStateMachineTest { .put(new TestCase(true, KILLED, RUNNING), ILLEGAL_KILL) .put(new TestCase(false, KILLED, RUNNING), ILLEGAL_KILL) .put(new TestCase(true, KILLED, DELETED), DELETE_TASK) + .put(new TestCase(false, KILLED, PARTITIONED), ILLEGAL_KILL) + .put(new TestCase(true, KILLED, PARTITIONED), ILLEGAL_KILL) .put(new TestCase(true, KILLING, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(false, KILLING, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(true, KILLING, STARTING), ILLEGAL_KILL) @@ -509,6 +535,22 @@ public class TaskStateMachineTest { .put(new TestCase(true, KILLING, KILLED), SAVE) .put(new TestCase(true, KILLING, LOST), SAVE) .put(new TestCase(true, KILLING, DELETED), DELETE_TASK) + .put(new TestCase(false, KILLING, PARTITIONED), ILLEGAL_KILL) + .put(new TestCase(true, KILLING, PARTITIONED), TRANSITION_TO_LOST) + .put(new TestCase(false, PARTITIONED, ASSIGNED), ILLEGAL_KILL) + .put(new TestCase(true, PARTITIONED, ASSIGNED), SAVE) + .put(new TestCase(false, PARTITIONED, STARTING), ILLEGAL_KILL) + .put(new TestCase(true, PARTITIONED, STARTING), SAVE) + .put(new TestCase(false, PARTITIONED, RUNNING), ILLEGAL_KILL) + .put(new TestCase(true, PARTITIONED, RUNNING), SAVE) + .put(new TestCase(true, PARTITIONED, PREEMPTING), TRANSITION_TO_LOST) + .put(new TestCase(true, PARTITIONED, RESTARTING), TRANSITION_TO_LOST) + .put(new TestCase(true, PARTITIONED, DRAINING), TRANSITION_TO_LOST) + .put(new TestCase(true, PARTITIONED, KILLING), TRANSITION_TO_LOST) + .put(new TestCase(true, PARTITIONED, FAILED), RECORD_FAILURE) + .put(new TestCase(true, PARTITIONED, FINISHED), SAVE) + .put(new TestCase(true, PARTITIONED, LOST), SAVE_KILL_AND_RESCHEDULE) + .put(new TestCase(false, PARTITIONED, PARTITIONED), ILLEGAL_KILL) .put(new TestCase(true, LOST, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(false, LOST, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(true, LOST, STARTING), ILLEGAL_KILL) @@ -516,9 +558,12 @@ public class TaskStateMachineTest { .put(new TestCase(true, LOST, RUNNING), ILLEGAL_KILL) .put(new TestCase(false, LOST, RUNNING), ILLEGAL_KILL) .put(new TestCase(true, LOST, DELETED), DELETE_TASK) + .put(new TestCase(false, LOST, PARTITIONED), ILLEGAL_KILL) + .put(new TestCase(true, LOST, PARTITIONED), ILLEGAL_KILL) .put(new TestCase(false, DELETED, ASSIGNED), ILLEGAL_KILL) .put(new TestCase(false, DELETED, STARTING), ILLEGAL_KILL) .put(new TestCase(false, DELETED, RUNNING), ILLEGAL_KILL) + .put(new TestCase(false, DELETED, PARTITIONED), ILLEGAL_KILL) .build(); @Test http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/python/apache/aurora/client/cli/test_task.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/client/cli/test_task.py b/src/test/python/apache/aurora/client/cli/test_task.py index 186cb27..a543d4a 100644 --- a/src/test/python/apache/aurora/client/cli/test_task.py +++ b/src/test/python/apache/aurora/client/cli/test_task.py @@ -95,7 +95,7 @@ class TestRunCommand(AuroraClientCommandTest): mock_scheduler_proxy.getTasksStatus.assert_called_with(TaskQuery( jobKeys=[JobKey(role='bozo', environment='test', name='hello')], statuses=set([ScheduleStatus.RUNNING, ScheduleStatus.KILLING, ScheduleStatus.RESTARTING, - ScheduleStatus.PREEMPTING, ScheduleStatus.DRAINING]), + ScheduleStatus.PREEMPTING, ScheduleStatus.PARTITIONED, ScheduleStatus.DRAINING]), instanceIds=instances), retry=True) @@ -150,7 +150,7 @@ class TestSshCommand(AuroraClientCommandTest): jobKeys=[JobKey(role='bozo', environment='test', name='hello')], instanceIds=set([1]), statuses=set([ScheduleStatus.RUNNING, ScheduleStatus.KILLING, ScheduleStatus.RESTARTING, - ScheduleStatus.PREEMPTING, ScheduleStatus.DRAINING])), + ScheduleStatus.PREEMPTING, ScheduleStatus.PARTITIONED, ScheduleStatus.DRAINING])), retry=True) mock_subprocess.assert_called_with(['ssh', '-t', '-v', 'bozo@slavehost', 'cd /slaveroot/slaves/*/frameworks/*/executors/thermos-1287391823/runs/' @@ -178,7 +178,7 @@ class TestSshCommand(AuroraClientCommandTest): jobKeys=[JobKey(role='bozo', environment='test', name='hello')], instanceIds=None, statuses=set([ScheduleStatus.RUNNING, ScheduleStatus.KILLING, ScheduleStatus.RESTARTING, - ScheduleStatus.PREEMPTING, ScheduleStatus.DRAINING])), + ScheduleStatus.PREEMPTING, ScheduleStatus.PARTITIONED, ScheduleStatus.DRAINING])), retry=True) mock_subprocess.assert_called_with(['ssh', '-t', '-v', 'bozo@slavehost', 'cd /slaveroot/slaves/*/frameworks/*/executors/thermos-1287391823/runs/' http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/python/apache/aurora/config/test_thrift.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/config/test_thrift.py b/src/test/python/apache/aurora/config/test_thrift.py index 7a1567a..76d0ad6 100644 --- a/src/test/python/apache/aurora/config/test_thrift.py +++ b/src/test/python/apache/aurora/config/test_thrift.py @@ -18,6 +18,7 @@ import re import pytest from apache.aurora.config import AuroraConfig +from apache.aurora.config.schema.base import PartitionPolicy as PystachioPartitionPolicy from apache.aurora.config.schema.base import ( AppcImage, Container, @@ -37,7 +38,13 @@ from apache.thermos.config.schema import Process, Resources, Task from gen.apache.aurora.api.constants import GOOD_IDENTIFIER_PATTERN_PYTHON from gen.apache.aurora.api.ttypes import Mode as ThriftMode -from gen.apache.aurora.api.ttypes import CronCollisionPolicy, Identity, JobKey, Resource +from gen.apache.aurora.api.ttypes import ( + CronCollisionPolicy, + Identity, + JobKey, + PartitionPolicy, + Resource +) from gen.apache.aurora.test.constants import INVALID_IDENTIFIERS, VALID_IDENTIFIERS HELLO_WORLD = Job( @@ -143,6 +150,7 @@ def test_config_with_options(): priority=200, service=True, cron_collision_policy='RUN_OVERLAP', + partition_policy=PystachioPartitionPolicy(delay_secs=10), constraints={ 'dedicated': 'root', 'cpu': 'x86_64' @@ -159,6 +167,24 @@ def test_config_with_options(): assert job.cronCollisionPolicy == CronCollisionPolicy.RUN_OVERLAP assert len(tti.constraints) == 2 assert job.key.environment == 'prod' + assert tti.partitionPolicy == PartitionPolicy(True, 10) + + +def test_disable_partition_policy(): + hwc = HELLO_WORLD( + production=True, + priority=200, + service=True, + cron_collision_policy='RUN_OVERLAP', + partition_policy=PystachioPartitionPolicy(reschedule=False), + constraints={ + 'dedicated': 'root', + 'cpu': 'x86_64' + }, + environment='prod' + ) + job = convert_pystachio_to_thrift(hwc) + assert job.taskConfig.partitionPolicy == PartitionPolicy(False, 0) def test_config_with_ports(): http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/sh/org/apache/aurora/e2e/partition_aware.aurora ---------------------------------------------------------------------- diff --git a/src/test/sh/org/apache/aurora/e2e/partition_aware.aurora b/src/test/sh/org/apache/aurora/e2e/partition_aware.aurora new file mode 100644 index 0000000..7ea9fad --- /dev/null +++ b/src/test/sh/org/apache/aurora/e2e/partition_aware.aurora @@ -0,0 +1,22 @@ +# run the script +hello_world = Process( + name = 'hello_world', + cmdline = 'while :; do sleep 10; done') + +# describe the task +hello_world_task = SequentialTask( + processes = [hello_world], + resources = Resources(cpu = 0.2, ram = 1 * MB, disk = 8 * MB)) + +base = Service( + cluster = 'devcluster', + environment = 'test', + role = 'vagrant', + task = hello_world_task +) + +jobs = [ + base(name='partition_aware_default'), + base(name='partition_aware_disabled', partition_policy=PartitionPolicy(reschedule=False)), + base(name='partition_aware_delay', partition_policy=PartitionPolicy(delay_secs=60 * 60 * 24)) +] http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh ---------------------------------------------------------------------- diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh index f0819fb..1500bda 100755 --- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh +++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh @@ -304,6 +304,43 @@ test_update_fail() { fi } +test_partition_awareness() { + local _config=$1 _cluster=$2 _default_jobkey=$3 _disabled_jobkey=$4 _delay_jobkey=$5 + + # create three jobs with different partition policies + aurora update start --wait $_default_jobkey $_config + aurora update start --wait $_disabled_jobkey $_config + aurora update start --wait $_delay_jobkey $_config + + # partition the agent + sudo stop mesos-slave + + # the default job should become LOST and then transition to PENDING + wait_until_task_status $_default_jobkey "0" "PENDING" + + # the other two should be PARTITIONED + assert_task_status $_disabled_jobkey "0" "PARTITIONED" + assert_task_status $_delay_jobkey "0" "PARTITIONED" + + # start the agent back up + sudo start mesos-slave + + # This can be removed when https://issues.apache.org/jira/browse/MESOS-6406 is resolved. + # We have to pause and let the agent reregister with Mesos, then ask Aurora to explicitly + # reconcile to get the RUNNING status update. + sleep 30 + aurora_admin reconcile_tasks $_cluster + + # the PARTITIONED tasks should now be running + assert_task_status $_disabled_jobkey "0" "RUNNING" + assert_task_status $_delay_jobkey "0" "RUNNING" + + # Clean up + aurora job killall $_default_jobkey + aurora job killall $_disabled_jobkey + aurora job killall $_delay_jobkey +} + test_announce() { local _role=$1 _env=$2 _job=$3 @@ -668,6 +705,10 @@ TEST_EPHEMERAL_DAEMON_WITH_FINAL_JOB=ephemeral_daemon_with_final TEST_EPHEMERAL_DAEMON_WITH_FINAL_CONFIG_FILE=$TEST_ROOT/ephemeral_daemon_with_final.aurora TEST_DAEMONIZING_PROCESS_JOB=daemonize TEST_DAEMONIZING_PROCESS_CONFIG_FILE=$TEST_ROOT/test_daemonizing_process.aurora +TEST_PARTITION_AWARENESS_CONFIG_FILE=$TEST_ROOT/partition_aware.aurora +TEST_JOB_PA_DEFAULT=$TEST_CLUSTER/$TEST_ROLE/$TEST_ENV/partition_aware_default +TEST_JOB_PA_DISABLED=$TEST_CLUSTER/$TEST_ROLE/$TEST_ENV/partition_aware_disabled +TEST_JOB_PA_DELAY=$TEST_CLUSTER/$TEST_ROLE/$TEST_ENV/partition_aware_delay BASE_ARGS=( $TEST_CLUSTER @@ -708,6 +749,14 @@ TEST_DAEMONIZING_PROCESS_ARGS=( $TEST_DAEMONIZING_PROCESS_CONFIG_FILE ) +TEST_PARTITION_AWARENESS_ARGS=( + $TEST_PARTITION_AWARENESS_CONFIG_FILE + $TEST_CLUSTER + $TEST_JOB_PA_DEFAULT + $TEST_JOB_PA_DISABLED + $TEST_JOB_PA_DELAY +) + TEST_JOB_KILL_MESSAGE_ARGS=("${TEST_JOB_ARGS[@]}" "--message='Test message'") trap collect_result EXIT @@ -716,6 +765,8 @@ aurorabuild all setup_ssh setup_docker_registry +test_partition_awareness "${TEST_PARTITION_AWARENESS_ARGS[@]}" + test_version test_http_example "${TEST_JOB_ARGS[@]}" test_http_example "${TEST_JOB_WATCH_SECS_ARGS[@]}"
