Repository: aurora
Updated Branches:
  refs/heads/master 46b1112ee -> c746452e5


Give jobs the ability to determine how to handle partitions by integrating with 
new Mesos Partition-Aware APIs.

  * Adds a new flag -partition_aware that will subscribe to the new Mesos 
partition-aware states.
  * Adds a new configuration object, PartitionPolicy, to give job owners 
control over how to handle tasks that become partitioned.

Reviewed at https://reviews.apache.org/r/63536/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/c746452e
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/c746452e
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/c746452e

Branch: refs/heads/master
Commit: c746452e5fa1bc49da701b059fe69898b9b8a15c
Parents: 46b1112
Author: David McLaughlin <[email protected]>
Authored: Tue Nov 21 14:17:39 2017 -0800
Committer: David McLaughlin <[email protected]>
Committed: Tue Nov 21 14:17:39 2017 -0800

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   8 +
 .../thrift/org/apache/aurora/gen/api.thrift     |  21 ++-
 docs/reference/configuration.md                 |   7 +
 docs/reference/scheduler-configuration.md       |   2 +
 docs/reference/task-lifecycle.md                |  25 +++
 examples/vagrant/upstart/aurora-scheduler.conf  |   1 +
 .../aurora/scheduler/base/Conversions.java      |   6 +-
 .../org/apache/aurora/scheduler/base/Jobs.java  |   1 +
 .../aurora/scheduler/base/TaskTestUtil.java     |   3 +
 .../mesos/CommandLineDriverSettingsModule.java  |  12 ++
 .../scheduler/state/PartitionManager.java       | 117 ++++++++++++
 .../aurora/scheduler/state/SideEffect.java      |   5 +
 .../scheduler/state/StateManagerImpl.java       |  42 +++++
 .../aurora/scheduler/state/StateModule.java     |   2 +
 .../scheduler/state/TaskStateMachine.java       |  65 ++++++-
 .../python/apache/aurora/client/cli/jobs.py     |   5 +
 .../python/apache/aurora/config/schema/base.py  |   7 +
 src/main/python/apache/aurora/config/thrift.py  |   6 +
 .../apache/aurora/scheduler/base/JobsTest.java  |   2 +-
 .../scheduler/config/CommandLineTest.java       |   4 +-
 .../CommandLineDriverSettingsModuleTest.java    |  23 +++
 .../scheduler/state/PartitionManagerTest.java   | 177 +++++++++++++++++++
 .../scheduler/state/StateManagerImplTest.java   |  81 +++++++++
 .../scheduler/state/TaskStateMachineTest.java   |  45 +++++
 .../apache/aurora/client/cli/test_task.py       |   6 +-
 .../python/apache/aurora/config/test_thrift.py  |  28 ++-
 .../apache/aurora/e2e/partition_aware.aurora    |  22 +++
 .../sh/org/apache/aurora/e2e/test_end_to_end.sh |  51 ++++++
 28 files changed, 755 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index d653b79..e622a8d 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -4,6 +4,14 @@
 ### New/updated:
 
 - Updated to Mesos 1.4.0.
+- Added experimental support for the Mesos partition-aware APIs. The key idea 
is a new ScheduleStatus
+  PARTITIONED that represents a task in an unknown state. Users of Aurora can 
have per-job policies
+  of whether or not to reschedule and how long to wait for the partition to 
heal. Backwards
+  compatibility with existing behavior (i.e. transition to LOST immediately on 
a partition) is
+  maintained. The support is experimental due to bugs found in Mesos that 
would cause issues in
+  a production cluster. For that reason, the functionality is behind a new 
flag `-partition_aware`
+  that is disabled by default. When Mesos support is improved and the new 
behavior is vetted in
+  production clusters, we'll enable this by default.
 
 ### Deprecations and removals:
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/api/src/main/thrift/org/apache/aurora/gen/api.thrift
----------------------------------------------------------------------
diff --git a/api/src/main/thrift/org/apache/aurora/gen/api.thrift 
b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
index 1d36926..2978f6d 100644
--- a/api/src/main/thrift/org/apache/aurora/gen/api.thrift
+++ b/api/src/main/thrift/org/apache/aurora/gen/api.thrift
@@ -219,6 +219,11 @@ union Resource {
   5: i64 numGpus
 }
 
+struct PartitionPolicy {
+  1: bool reschedule
+  2: optional i64 delaySecs
+}
+
 /** Description of the tasks contained within a job. */
 struct TaskConfig {
  /** Job task belongs to. */
@@ -259,6 +264,8 @@ struct TaskConfig {
  25: optional ExecutorConfig executorConfig
  /** Used to display additional details in the UI. */
  27: optional set<Metadata> metadata
+ /** Policy for how to deal with task partitions */
+ 34: optional PartitionPolicy partitionPolicy
 
  // This field is deliberately placed at the end to work around a bug in the 
immutable wrapper
  // code generator.  See AURORA-1185 for details.
@@ -403,7 +410,11 @@ enum ScheduleStatus {
   /** A fault in the task environment has caused the system to believe the 
task no longer exists.
    * This can happen, for example, when a slave process disappears.
    */
-  LOST             = 7
+  LOST             = 7,
+  /**
+   * The task is currently partitioned and in an unknown state.
+   **/
+  PARTITIONED      = 18
 }
 
 // States that a task may be in while still considered active.
@@ -415,6 +426,7 @@ const set<ScheduleStatus> ACTIVE_STATES = 
[ScheduleStatus.ASSIGNED,
                                            ScheduleStatus.RESTARTING
                                            ScheduleStatus.RUNNING,
                                            ScheduleStatus.STARTING,
+                                           ScheduleStatus.PARTITIONED,
                                            ScheduleStatus.THROTTLED]
 
 // States that a task may be in while associated with a slave machine and 
non-terminal.
@@ -424,6 +436,7 @@ const set<ScheduleStatus> SLAVE_ASSIGNED_STATES = 
[ScheduleStatus.ASSIGNED,
                                                    ScheduleStatus.PREEMPTING,
                                                    ScheduleStatus.RESTARTING,
                                                    ScheduleStatus.RUNNING,
+                                                   ScheduleStatus.PARTITIONED,
                                                    ScheduleStatus.STARTING]
 
 // States that a task may be in while in an active sandbox.
@@ -431,6 +444,7 @@ const set<ScheduleStatus> LIVE_STATES = 
[ScheduleStatus.KILLING,
                                          ScheduleStatus.PREEMPTING,
                                          ScheduleStatus.RESTARTING,
                                          ScheduleStatus.DRAINING,
+                                         ScheduleStatus.PARTITIONED,
                                          ScheduleStatus.RUNNING]
 
 // States a completed task may be in.
@@ -499,6 +513,11 @@ struct ScheduledTask {
    * this task.
    */
   3: i32 failureCount
+  /**
+   * The number of partitions this task has accumulated over its lifetime.
+   */
+  6: i32 timesPartitioned
+
   /** State change history for this task. */
   4: list<TaskEvent> taskEvents
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/docs/reference/configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md
index f647bcf..67d9914 100644
--- a/docs/reference/configuration.md
+++ b/docs/reference/configuration.md
@@ -356,6 +356,7 @@ Job Schema
   ```tier``` | String | Task tier type. The default scheduler tier 
configuration allows for 3 tiers: `revocable`, `preemptible`, and `preferred`. 
If a tier is not elected, Aurora assigns the task to a tier based on its choice 
of `production` (that is `preferred` for production and `preemptible` for 
non-production jobs). See the section on [Configuration 
Tiers](../features/multitenancy.md#configuration-tiers) for more information.
   ```announce``` | ```Announcer``` object | Optionally enable Zookeeper 
ServerSet announcements. See [Announcer Objects] for more information.
   ```enable_hooks``` | Boolean | Whether to enable [Client 
Hooks](client-hooks.md) for this job. (Default: False)
+  ```partition_policy``` | ```PartitionPolicy``` object | An optional 
partition policy that allows job owners to define how to handle partitions for 
running tasks (in partition-aware Aurora clusters)
 
 
 ### UpdateConfig Objects
@@ -403,6 +404,12 @@ Parameters for controlling a task's health checks via HTTP 
or a shell command.
 | -------                        | :-------: | --------
 | ```shell_command```            | String    | An alternative to HTTP health 
checking. Specifies a shell command that will be executed. Any non-zero exit 
status will be interpreted as a health check failure.
 
+### PartitionPolicy Objects
+| param                          | type      | description
+| -------                        | :-------: | --------
+| ```reschedule```               | Boolean   | Whether or not to reschedule 
when running tasks become partitioned (Default: True)
+| ```delay_secs```               | Integer   | How long to delay transitioning 
to LOST when running tasks are partitioned. (Default: 0)
+
 
 ### Announcer Objects
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/docs/reference/scheduler-configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/scheduler-configuration.md 
b/docs/reference/scheduler-configuration.md
index d17541f..6c385b5 100644
--- a/docs/reference/scheduler-configuration.md
+++ b/docs/reference/scheduler-configuration.md
@@ -176,6 +176,8 @@ Optional flags:
        Maximum amount of random jitter to add to the offer hold time window.
 -offer_reservation_duration (default (3, mins))
        Time to reserve a agent's offers while trying to satisfy a task 
preempting another.
+-partition_aware (default false)
+  Whether or not to integrate with the partition-aware Mesos capabilities.
 -populate_discovery_info (default false)
        If true, Aurora populates DiscoveryInfo field of Mesos TaskInfo.
 -preemption_delay (default (3, mins))

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/docs/reference/task-lifecycle.md
----------------------------------------------------------------------
diff --git a/docs/reference/task-lifecycle.md b/docs/reference/task-lifecycle.md
index 8ec0077..b4642b3 100644
--- a/docs/reference/task-lifecycle.md
+++ b/docs/reference/task-lifecycle.md
@@ -105,6 +105,31 @@ agent go into `LOST` state and new `Task`s are created in 
their place.
 From `PENDING` state, there is no guarantee a `Task` will be reassigned
 to the same machine unless job constraints explicitly force it there.
 
+
+## RUNNING to PARTITIONED states
+If Aurora is configured to enable partition awareness, a task which is in a
+running state can transition to `PARTITIONED`. This happens when the state
+of the task in Mesos becomes unknown. By default Aurora errs on the side of
+availability, so all tasks that transition to `PARTITIONED` are immediately
+transitioned to `LOST`.
+
+This policy is not ideal for all types of workloads you may wish to run in
+your Aurora cluster, e.g. for jobs where task failures are very expensive.
+So job owners may set their own `PartitionPolicy` where they can control
+how long to remain in `PARTITIONED` before transitioning to `LOST`. Or they
+can disable any automatic attempts to `reschedule` when in `PARTITIONED`,
+effectively waiting out the partition for as long as possible.
+
+
+## PARTITIONED and transient states
+The `PartitionPolicy` provided by users only applies to tasks which are
+currently running. When tasks are moving in and out of transient states,
+e.g. tasks being updated, restarted, preempted, etc., `PARTITIONED` tasks
+are moved immediately to `LOST`. This prevents situations where system
+or user-initiated actions are blocked indefinitely waiting for partitions
+to resolve (that may never be resolved).
+
+
 ### Giving Priority to Production Tasks: PREEMPTING
 
 Sometimes a Task needs to be interrupted, such as when a non-production

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/examples/vagrant/upstart/aurora-scheduler.conf
----------------------------------------------------------------------
diff --git a/examples/vagrant/upstart/aurora-scheduler.conf 
b/examples/vagrant/upstart/aurora-scheduler.conf
index 5ca3cae..dbbe1d1 100644
--- a/examples/vagrant/upstart/aurora-scheduler.conf
+++ b/examples/vagrant/upstart/aurora-scheduler.conf
@@ -50,6 +50,7 @@ exec bin/aurora-scheduler \
   -populate_discovery_info=true \
   -receive_revocable_resources=true \
   -enable_revocable_ram=true \
+  -partition_aware=true \
   -allow_gpu_resource=true \
   -allow_container_volumes=true \
   -offer_filter_duration=0secs \

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/base/Conversions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Conversions.java 
b/src/main/java/org/apache/aurora/scheduler/base/Conversions.java
index 33cc012..801dba6 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Conversions.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Conversions.java
@@ -68,13 +68,11 @@ public final class Conversions {
           .put(TaskState.TASK_KILLED, ScheduleStatus.KILLED)
           .put(TaskState.TASK_LOST, ScheduleStatus.LOST)
           .put(TaskState.TASK_ERROR, ScheduleStatus.LOST)
-          // Task states send to partition-aware Mesos frameworks. Aurora does 
not advertise the
-          // PARTITION_AWARE capability yet (AURORA-1814). We still map the 
task states to be safe.
-          .put(TaskState.TASK_UNREACHABLE, ScheduleStatus.LOST)
           .put(TaskState.TASK_DROPPED, ScheduleStatus.LOST)
           .put(TaskState.TASK_GONE, ScheduleStatus.LOST)
           .put(TaskState.TASK_GONE_BY_OPERATOR, ScheduleStatus.LOST)
-          .put(TaskState.TASK_UNKNOWN, ScheduleStatus.LOST)
+          .put(TaskState.TASK_UNREACHABLE, ScheduleStatus.PARTITIONED)
+          .put(TaskState.TASK_UNKNOWN, ScheduleStatus.PARTITIONED)
           .build();
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/base/Jobs.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Jobs.java 
b/src/main/java/org/apache/aurora/scheduler/base/Jobs.java
index 8d5f4e5..27bee65 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Jobs.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Jobs.java
@@ -65,6 +65,7 @@ public final class Jobs {
       case RUNNING:
       case KILLING:
       case DRAINING:
+      case PARTITIONED:
       case PREEMPTING:
         stats.setActiveTaskCount(stats.getActiveTaskCount() + 1);
         break;

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java 
b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
index 7c223ea..dd64a5b 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
@@ -31,6 +31,7 @@ import org.apache.aurora.gen.Identity;
 import org.apache.aurora.gen.LimitConstraint;
 import org.apache.aurora.gen.MesosFetcherURI;
 import org.apache.aurora.gen.Metadata;
+import org.apache.aurora.gen.PartitionPolicy;
 import org.apache.aurora.gen.Resource;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
@@ -122,6 +123,7 @@ public final class TaskTestUtil {
         .setMaxTaskFailures(-1)
         .setProduction(true)
         .setTier(PROD_TIER_NAME)
+        .setPartitionPolicy(new 
PartitionPolicy().setDelaySecs(5).setReschedule(true))
         .setConstraints(ImmutableSet.of(
             new Constraint(
                 "valueConstraint",
@@ -198,6 +200,7 @@ public final class TaskTestUtil {
                 .setScheduler("scheduler2")))
         .setAncestorId("ancestor")
         .setFailureCount(3)
+        .setTimesPartitioned(2)
         .setAssignedTask(assignedTask));
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
 
b/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
index 5e83b2a..bcfb888 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModule.java
@@ -97,6 +97,11 @@ public class CommandLineDriverSettingsModule extends 
AbstractModule {
         arity = 1)
     public boolean receiveRevocableResources = false;
 
+    @Parameter(names = "-partition_aware",
+        description = "Enable paritition-aware status updates.",
+        arity = 1)
+    public boolean isPartitionAware = false;
+
     @Parameter(names = "-mesos_role",
         description =
             "The Mesos role this framework will register as. The default is to 
left this empty, "
@@ -132,6 +137,7 @@ public class CommandLineDriverSettingsModule extends 
AbstractModule {
             options.frameworkFailoverTimeout,
             options.receiveRevocableResources,
             allowGpuResource,
+            options.isPartitionAware,
             role);
     bind(FrameworkInfo.class)
         
.annotatedWith(FrameworkInfoFactory.FrameworkInfoFactoryImpl.BaseFrameworkInfo.class)
@@ -176,6 +182,7 @@ public class CommandLineDriverSettingsModule extends 
AbstractModule {
       Amount<Long, Time> failoverTimeout,
       boolean revocable,
       boolean allowGpu,
+      boolean enablePartitionAwareness,
       Optional<String> role) {
 
     FrameworkInfo.Builder infoBuilder = FrameworkInfo.newBuilder()
@@ -196,6 +203,11 @@ public class CommandLineDriverSettingsModule extends 
AbstractModule {
       
infoBuilder.addCapabilities(Capability.newBuilder().setType(GPU_RESOURCES));
     }
 
+    if (enablePartitionAwareness) {
+      infoBuilder.addCapabilities(
+          
Capability.newBuilder().setType(Capability.Type.PARTITION_AWARE).build());
+    }
+
     if (role.isPresent()) {
       infoBuilder.setRole(role.get());
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/state/PartitionManager.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/state/PartitionManager.java 
b/src/main/java/org/apache/aurora/scheduler/state/PartitionManager.java
new file mode 100644
index 0000000..6d49c49
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/state/PartitionManager.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import java.time.Duration;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+
+import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Subscribes to transitions into PARTITIONED task state and, if applicable, 
sets a timer to
+ * move the task to LOST.
+ */
+public class PartitionManager implements PubsubEvent.EventSubscriber {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PartitionManager.class);
+
+  private final ScheduledExecutorService executor;
+  private final StateManager stateManager;
+  private final Storage storage;
+  private final Clock clock;
+
+  @VisibleForTesting
+  static final String TRANSITION_MESSAGE = "Task partitioned for too long.";
+
+  @VisibleForTesting
+  PartitionManager(
+      Storage storage,
+      StateManager stateManager,
+      Clock clock,
+      ScheduledExecutorService executor) {
+
+    this.executor = requireNonNull(executor);
+    this.stateManager = requireNonNull(stateManager);
+    this.storage = requireNonNull(storage);
+    this.clock = requireNonNull(clock);
+  }
+
+  @Inject
+  PartitionManager(Storage storage, StateManager stateManager, Clock clock) {
+    this(
+      storage,
+      stateManager,
+      clock,
+      AsyncUtil.singleThreadLoggingScheduledExecutor("PartitionManager", LOG));
+  }
+
+  private long getLastTransitionSecsAgo(IScheduledTask task) {
+    return Duration.ofMillis(
+        clock.nowMillis() - 
Tasks.getLatestEvent(task).getTimestamp()).getSeconds();
+  }
+
+  /**
+   * Schedules a delayed task to move tasks from PARTITIONED -> LOST based on 
task partition policy.
+   * @param stateChange The state change event.
+   */
+  @Subscribe
+  public void handle(TaskStateChange stateChange) {
+    ITaskConfig config = stateChange.getTask().getAssignedTask().getTask();
+    String taskId = Tasks.id(stateChange.getTask());
+    // Partition Policy can be null, in which case its equivalent to 
reschedule with 0s delay.
+    if (stateChange.getNewState().equals(ScheduleStatus.PARTITIONED)
+        && (!config.isSetPartitionPolicy() || 
config.getPartitionPolicy().isReschedule())) {
+      long delay = config.isSetPartitionPolicy() ? 
config.getPartitionPolicy().getDelaySecs() : 0;
+      // We're recovering from a failover, so modify the delay based on last 
event time.
+      if (!stateChange.isTransition()) {
+        delay = Math.max(0, delay - 
getLastTransitionSecsAgo(stateChange.getTask()));
+      }
+      LOG.info("Partitioned task {} will be rescheduled in {} secs", taskId, 
delay);
+      // Use the timestamp to verify the task state hasn't changed when we 
execute after the delay.
+      long lastTimestamp = 
Tasks.getLatestEvent(stateChange.getTask()).getTimestamp();
+      executor.schedule(() -> storage.write((Quiet) storeProvider -> {
+        Optional<IScheduledTask> maybeTask = 
storeProvider.getTaskStore().fetchTask(taskId);
+        if (maybeTask.isPresent()
+            && Tasks.getLatestEvent(maybeTask.get()).getTimestamp() == 
lastTimestamp) {
+          stateManager.changeState(
+              storeProvider,
+              stateChange.getTask().getAssignedTask().getTaskId(),
+              Optional.of(ScheduleStatus.PARTITIONED),
+              ScheduleStatus.LOST,
+              Optional.of(TRANSITION_MESSAGE));
+        }
+      }), delay, TimeUnit.SECONDS);
+    } else if (stateChange.getNewState().equals(ScheduleStatus.PARTITIONED)) {
+      LOG.info("Partitioned task {} will not be rescheduled.", taskId);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java 
b/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
index b91a085..2aa500a 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/SideEffect.java
@@ -68,6 +68,11 @@ class SideEffect {
     KILL,
 
     /**
+     * Transition a task to LOST.
+     */
+    TRANSITION_TO_LOST,
+
+    /**
      * Create a new state machine with a copy of this task.
      */
     RESCHEDULE,

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java 
b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
index 9989ed4..3ba676b 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateManagerImpl.java
@@ -232,6 +232,7 @@ public class StateManagerImpl implements StateManager {
       Action.INCREMENT_FAILURES,
       Action.SAVE_STATE,
       Action.RESCHEDULE,
+      Action.TRANSITION_TO_LOST,
       Action.KILL,
       Action.DELETE);
 
@@ -289,6 +290,12 @@ public class StateManagerImpl implements StateManager {
           Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, 
task1 -> {
             ScheduledTask mutableTask = task1.newBuilder();
             mutableTask.setStatus(targetState.get());
+            if (targetState.get() == ScheduleStatus.PARTITIONED) {
+              
mutableTask.setTimesPartitioned(mutableTask.getTimesPartitioned() + 1);
+              // If we're moving to partitioned state, remove any existing 
partition transitions
+              // in order to prevent the event history growing unbounded.
+              
mutableTask.setTaskEvents(compactPartitionEvents(mutableTask.getTaskEvents()));
+            }
             mutableTask.addToTaskEvents(new TaskEvent()
                 .setTimestamp(clock.nowMillis())
                 .setStatus(targetState.get())
@@ -299,6 +306,15 @@ public class StateManagerImpl implements StateManager {
           events.add(TaskStateChange.transition(mutated.get(), 
stateMachine.getPreviousState()));
           break;
 
+        case TRANSITION_TO_LOST:
+          updateTaskAndExternalState(
+              taskStore,
+              Optional.absent(),
+              taskId,
+              ScheduleStatus.LOST,
+              Optional.of("Action performed on partitioned task, marking as 
LOST."));
+          break;
+
         case RESCHEDULE:
           Preconditions.checkState(
               upToDateTask.isPresent(),
@@ -366,6 +382,32 @@ public class StateManagerImpl implements StateManager {
     return result.getResult();
   }
 
+  /*
+   * Compact cyclical transitions into PARTITIONED into a single event in 
order to place an upper
+   * bound on the number of task events for a task. We do not want to lose 
unique transitions.
+   * So consider the following history:
+   *
+   *  ... ->  RUNNING -> PARTITIONED -> RUNNING -> PARTITIONED -> RUNNING -> 
PARTITIONED
+   *
+   * We'd want to compact this into a single RUNNING -> PARTITIONED where 
RUNNING is the first
+   * occurence of RUNNING. But consider another example:
+   *
+   * ... -> RUNNING -> PARTITIONED -> RUNNING -> DRAINING -> PARTITIONED
+   *
+   * In this case, there is no compaction to be done because there is no cycle.
+   */
+  private List<TaskEvent> compactPartitionEvents(List<TaskEvent> taskEvents) {
+    int size = taskEvents.size();
+    // We only compact as we're transitioning into PARTITIONED. So cycles 
happen when the second
+    // last event is PARTITIONED and the last and third last statuses are the 
same.
+    if (size >= 3 && taskEvents.get(size - 
2).getStatus().equals(ScheduleStatus.PARTITIONED)
+        && taskEvents.get(size - 3).getStatus().equals(taskEvents.get(size - 
1).getStatus())) {
+      // Drop the last two elements off taskEvents.
+      return taskEvents.subList(0, size - 2);
+    }
+    return taskEvents;
+  }
+
   @Override
   public void deleteTasks(MutableStoreProvider storeProvider, final 
Set<String> taskIds) {
     TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java 
b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
index c03fff1..b7a3c0b 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -64,6 +64,8 @@ public class StateModule extends AbstractModule {
     bind(UUIDGenerator.class).to(UUIDGeneratorImpl.class);
     bind(UUIDGeneratorImpl.class).in(Singleton.class);
 
+    PubsubEventModule.bindSubscriber(binder(), PartitionManager.class);
+
     bindMaintenanceController(binder());
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java 
b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
index eb4fe7d..914d401 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java
@@ -49,6 +49,7 @@ import static 
org.apache.aurora.scheduler.state.SideEffect.Action.INCREMENT_FAIL
 import static org.apache.aurora.scheduler.state.SideEffect.Action.KILL;
 import static org.apache.aurora.scheduler.state.SideEffect.Action.RESCHEDULE;
 import static org.apache.aurora.scheduler.state.SideEffect.Action.SAVE_STATE;
+import static 
org.apache.aurora.scheduler.state.SideEffect.Action.TRANSITION_TO_LOST;
 import static org.apache.aurora.scheduler.state.StateChangeResult.ILLEGAL;
 import static 
org.apache.aurora.scheduler.state.StateChangeResult.ILLEGAL_WITH_SIDE_EFFECTS;
 import static org.apache.aurora.scheduler.state.StateChangeResult.NOOP;
@@ -62,6 +63,7 @@ import static 
org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.INIT;
 import static 
org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.KILLED;
 import static 
org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.KILLING;
 import static 
org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.LOST;
+import static 
org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.PARTITIONED;
 import static 
org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.PENDING;
 import static 
org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.PREEMPTING;
 import static 
org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.RESTARTING;
@@ -117,6 +119,7 @@ class TaskStateMachine {
     KILLED(Optional.of(ScheduleStatus.KILLED)),
     KILLING(Optional.of(ScheduleStatus.KILLING)),
     LOST(Optional.of(ScheduleStatus.LOST)),
+    PARTITIONED(Optional.of(ScheduleStatus.PARTITIONED)),
     /**
      * The task does not have an associated state as it has been deleted from 
the store.
      */
@@ -170,6 +173,19 @@ class TaskStateMachine {
 
     Consumer<Transition<TaskState>> manageTerminatedTasks = Consumers.combine(
         ImmutableList.<Consumer<Transition<TaskState>>>builder()
+            .add(transition -> {
+              // The transition from KILLING -> PARTITIONED needs to be 
handled separately, in
+              // order to mark the task as terminal and unblock any operations 
that depend on
+              // kills (e.g. job updates). Just sending a KILL signal alone is 
insufficient,
+              // as any partitioned task will be on an agent that is 
unreachable.
+              if (transition.getTo() == PARTITIONED) {
+                if (transition.getFrom() == KILLING) {
+                  addFollowup(TRANSITION_TO_LOST);
+                } else {
+                  addFollowup(KILL);
+                }
+              }
+            })
             // Kill a task that we believe to be terminated when an attempt is 
made to revive.
             .add(
                 Consumers.filter(Transition.to(ASSIGNED, STARTING, RUNNING),
@@ -193,6 +209,14 @@ class TaskStateMachine {
               addFollowup(KILL);
               break;
 
+            case PARTITIONED:
+              // When a task becomes partitioned during a user or 
operator-initiated action, we
+              // bypass their partition policy and immediately transition to 
lost. This is to
+              // prevent a situation where a task becoming partitioned could 
indefinitely block
+              // machine maintenance, preemption or a job restart.
+              addFollowup(TRANSITION_TO_LOST);
+              break;
+
             case LOST:
               addFollowup(KILL);
               addFollowup(RESCHEDULE);
@@ -261,9 +285,34 @@ class TaskStateMachine {
             .to(PENDING, KILLING)
             .withCallback(deleteIfKilling))
         .addState(
+            Rule.from(PARTITIONED)
+                .to(LOST, FAILED, FINISHED, RUNNING, ASSIGNED, STARTING, 
RESTARTING, DRAINING,
+                    PREEMPTING, KILLING)
+                .withCallback(
+                    transition -> {
+                      switch (transition.getTo()) {
+                        case LOST:
+                          addFollowup(KILL);
+                          addFollowup(RESCHEDULE);
+                          break;
+                        case KILLING:
+                        case RESTARTING:
+                        case DRAINING:
+                        case PREEMPTING:
+                          addFollowup(TRANSITION_TO_LOST);
+                          break;
+                        case FAILED:
+                          incrementFailuresMaybeReschedule.execute();
+                          break;
+                        default:
+                          // No-op
+                      }
+                    }
+                ))
+        .addState(
             Rule.from(ASSIGNED)
                 .to(STARTING, RUNNING, FINISHED, FAILED, RESTARTING, DRAINING,
-                    KILLED, KILLING, LOST, PREEMPTING)
+                    KILLED, KILLING, LOST, PREEMPTING, PARTITIONED)
                 .withCallback(
                     transition -> {
                       switch (transition.getTo()) {
@@ -308,7 +357,7 @@ class TaskStateMachine {
         .addState(
             Rule.from(STARTING)
                 .to(RUNNING, FINISHED, FAILED, RESTARTING, DRAINING, KILLING,
-                    KILLED, LOST, PREEMPTING)
+                    KILLED, LOST, PREEMPTING, PARTITIONED)
                 .withCallback(
                     transition -> {
                       switch (transition.getTo()) {
@@ -351,7 +400,8 @@ class TaskStateMachine {
                 ))
         .addState(
             Rule.from(RUNNING)
-                .to(FINISHED, RESTARTING, DRAINING, FAILED, KILLING, KILLED, 
LOST, PREEMPTING)
+                .to(FINISHED, RESTARTING, DRAINING, FAILED, KILLING, KILLED, 
LOST,
+                    PREEMPTING, PARTITIONED)
                 .withCallback(
                     transition -> {
                       switch (transition.getTo()) {
@@ -398,15 +448,15 @@ class TaskStateMachine {
                 .withCallback(manageTerminatedTasks))
         .addState(
             Rule.from(PREEMPTING)
-                .to(FINISHED, FAILED, KILLING, KILLED, LOST)
+                .to(FINISHED, FAILED, KILLING, KILLED, LOST, PARTITIONED)
                 .withCallback(manageRestartingTask))
         .addState(
             Rule.from(RESTARTING)
-                .to(FINISHED, FAILED, KILLING, KILLED, LOST)
+                .to(FINISHED, FAILED, KILLING, KILLED, LOST, PARTITIONED)
                 .withCallback(manageRestartingTask))
         .addState(
             Rule.from(DRAINING)
-                .to(FINISHED, FAILED, KILLING, KILLED, LOST)
+                .to(FINISHED, FAILED, KILLING, KILLED, LOST, PARTITIONED)
                 .withCallback(manageRestartingTask))
         .addState(
             Rule.from(FAILED)
@@ -419,7 +469,7 @@ class TaskStateMachine {
         // TODO(maxim): Re-evaluate if *DELETED states are valid transitions 
here.
         .addState(
             Rule.from(KILLING)
-                .to(FINISHED, FAILED, KILLED, LOST, DELETED)
+                .to(FINISHED, FAILED, KILLED, LOST, DELETED, PARTITIONED)
                 .withCallback(manageTerminatedTasks))
         .addState(
             Rule.from(LOST)
@@ -505,7 +555,6 @@ class TaskStateMachine {
     if (stateMachine.getState() == taskState) {
       return new TransitionResult(NOOP, ImmutableSet.of());
     }
-
     boolean success = stateMachine.transition(taskState);
     ImmutableSet<SideEffect> transitionEffects = 
ImmutableSet.copyOf(sideEffects);
     sideEffects.clear();

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/python/apache/aurora/client/cli/jobs.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/cli/jobs.py 
b/src/main/python/apache/aurora/client/cli/jobs.py
index b79ae56..cbae387 100644
--- a/src/main/python/apache/aurora/client/cli/jobs.py
+++ b/src/main/python/apache/aurora/client/cli/jobs.py
@@ -254,6 +254,11 @@ class InspectCommand(Verb):
       context.print_out("constraints:", indent=2)
       for constraint, value in job.constraints().get().items():
         context.print_out("'%s': '%s'" % (constraint, value), indent=4)
+    if job.has_partition_policy():
+      context.print_out("partition_policy:", indent=2)
+      context.print_out("reschedule: %s" % 
job.partition_policy().reschedule(), indent=4)
+      context.print_out("delay_secs: '%s'" % 
job.partition_policy().delay_secs(), indent=4)
+
     context.print_out("service:    %s" % job_thrift.taskConfig.isService, 
indent=2)
     context.print_out("production: %s" % bool(job.production().get()), 
indent=2)
     context.print_out("")

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/python/apache/aurora/config/schema/base.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/config/schema/base.py 
b/src/main/python/apache/aurora/config/schema/base.py
index 18ce826..a466e78 100644
--- a/src/main/python/apache/aurora/config/schema/base.py
+++ b/src/main/python/apache/aurora/config/schema/base.py
@@ -154,6 +154,11 @@ class Container(Struct):
   docker = Docker
 
 
+class PartitionPolicy(Struct):
+  reschedule = Default(Boolean, True)
+  delay_secs = Default(Integer, 0)
+
+
 class MesosJob(Struct):
   name          = Default(String, '{{task.name}}')
   role          = Required(String)
@@ -182,6 +187,8 @@ class MesosJob(Struct):
 
   enable_hooks = Default(Boolean, False)  # enable client API hooks; from env 
python-list 'hooks'
 
+  partition_policy = PartitionPolicy
+
   # Specifying a `Container` with a `docker` property for Docker jobs is 
deprecated, instead just
   # specify the value of the container property to be a `Docker` container 
directly.
   container = Choice([Container, Docker, Mesos])

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/main/python/apache/aurora/config/thrift.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/config/thrift.py 
b/src/main/python/apache/aurora/config/thrift.py
index bedf8cd..eb00144 100644
--- a/src/main/python/apache/aurora/config/thrift.py
+++ b/src/main/python/apache/aurora/config/thrift.py
@@ -48,6 +48,7 @@ from gen.apache.aurora.api.ttypes import (
     MesosContainer,
     Metadata,
     Mode,
+    PartitionPolicy,
     Resource,
     TaskConfig,
     TaskConstraint,
@@ -268,6 +269,11 @@ def convert(job, metadata=frozenset(), ports=frozenset()):
   task.contactEmail = not_empty_or(job.contact(), None)
   task.tier = not_empty_or(job.tier(), None)
 
+  if job.has_partition_policy():
+    task.partitionPolicy = PartitionPolicy(
+      fully_interpolated(job.partition_policy().reschedule()),
+      fully_interpolated(job.partition_policy().delay_secs()))
+
   # Add metadata to a task, to display in the scheduler UI.
   task.metadata = frozenset(Metadata(key=str(key), value=str(value)) for key, 
value in metadata)
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/java/org/apache/aurora/scheduler/base/JobsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/base/JobsTest.java 
b/src/test/java/org/apache/aurora/scheduler/base/JobsTest.java
index 13f656f..92bf4f4 100644
--- a/src/test/java/org/apache/aurora/scheduler/base/JobsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/base/JobsTest.java
@@ -38,7 +38,7 @@ public class JobsTest {
                 addStateTransition(makeTask("id", TaskTestUtil.JOB), status, 
100L)).toList();
 
     IJobStats expectedStats = IJobStats.build(new JobStats()
-        .setActiveTaskCount(7)
+        .setActiveTaskCount(8)
         .setFailedTaskCount(2)
         .setFinishedTaskCount(2)
         .setPendingTaskCount(3));

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java 
b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
index c2d875b..0ec4de6 100644
--- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
@@ -191,6 +191,7 @@ public class CommandLineTest {
     expected.driver.executorUser = "testing";
     expected.driver.receiveRevocableResources = true;
     expected.driver.mesosRole = "testing";
+    expected.driver.isPartitionAware = true;
     expected.jetty.hostnameOverride = "testing";
     expected.jetty.httpPort = 42;
     expected.jetty.listenIp = "testing";
@@ -383,7 +384,8 @@ public class CommandLineTest {
         "-cron_start_max_backoff=42days",
         "-cron_scheduling_max_batch_size=42",
         "-enable_revocable_cpus=false",
-        "-enable_revocable_ram=true"
+        "-enable_revocable_ram=true",
+        "-partition_aware=true"
     );
     assertEqualOptions(expected, parsed);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
 
b/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
index 7b04291..a86184c 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/mesos/CommandLineDriverSettingsModuleTest.java
@@ -29,6 +29,7 @@ import org.junit.Test;
 import static 
org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule.Options.PRINCIPAL_KEY;
 import static 
org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule.Options.SECRET_KEY;
 import static 
org.apache.mesos.v1.Protos.FrameworkInfo.Capability.Type.GPU_RESOURCES;
+import static 
org.apache.mesos.v1.Protos.FrameworkInfo.Capability.Type.PARTITION_AWARE;
 import static 
org.apache.mesos.v1.Protos.FrameworkInfo.Capability.Type.REVOCABLE_RESOURCES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -82,6 +83,7 @@ public class CommandLineDriverSettingsModuleTest {
         Amount.of(1L, Time.MINUTES),
         false, // revocable
         false, // allow gpu
+        false, // partition aware
         Optional.absent());
     assertEquals("", info.getPrincipal());
     assertEquals(0, info.getCapabilitiesCount());
@@ -97,6 +99,7 @@ public class CommandLineDriverSettingsModuleTest {
         Amount.of(1L, Time.MINUTES),
         true, // revocable
         false, // allow gpu
+        false, // partition aware
         Optional.absent());
     assertEquals("", info.getPrincipal());
     assertEquals(1, info.getCapabilitiesCount());
@@ -113,6 +116,7 @@ public class CommandLineDriverSettingsModuleTest {
         Amount.of(1L, Time.MINUTES),
         false, // revocable
         true, // allow gpu
+        false, // partition aware
         Optional.absent());
     assertEquals("", info.getPrincipal());
     assertEquals(1, info.getCapabilitiesCount());
@@ -121,6 +125,23 @@ public class CommandLineDriverSettingsModuleTest {
   }
 
   @Test
+  public void testFrameworkInfoPartitionAware() {
+    Protos.FrameworkInfo info = 
CommandLineDriverSettingsModule.buildFrameworkInfo(
+        "aurora",
+        "user",
+        Optional.absent(),
+        Amount.of(1L, Time.MINUTES),
+        false, // revocable
+        false, // allow gpu
+        true, // partition aware
+        Optional.absent());
+    assertEquals("", info.getPrincipal());
+    assertEquals(1, info.getCapabilitiesCount());
+    assertEquals(PARTITION_AWARE, info.getCapabilities(0).getType());
+    assertFalse(info.hasRole());
+  }
+
+  @Test
   public void testFrameworkInfoNoRevocableWithAnnouncedPrincipal() {
     Protos.FrameworkInfo info = 
CommandLineDriverSettingsModule.buildFrameworkInfo(
         "aurora",
@@ -129,6 +150,7 @@ public class CommandLineDriverSettingsModuleTest {
         Amount.of(1L, Time.MINUTES),
         false, // revocable
         false, // allow gpu
+        false, // partition aware
         Optional.absent());
     assertEquals("auroraprincipal", info.getPrincipal());
     assertEquals(0, info.getCapabilitiesCount());
@@ -144,6 +166,7 @@ public class CommandLineDriverSettingsModuleTest {
         Amount.of(1L, Time.MINUTES),
         true, // revocable
         true, // allow gpu
+        false, // partition aware
         Optional.of(TEST_ROLE));
     assertEquals("auroraprincipal", info.getPrincipal());
     assertEquals(2, info.getCapabilitiesCount());

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/java/org/apache/aurora/scheduler/state/PartitionManagerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/state/PartitionManagerTest.java 
b/src/test/java/org/apache/aurora/scheduler/state/PartitionManagerTest.java
new file mode 100644
index 0000000..8f654db
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/state/PartitionManagerTest.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.state;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Optional;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.util.testing.FakeClock;
+import org.apache.aurora.gen.PartitionPolicy;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.PARTITIONED;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+
+public class PartitionManagerTest extends EasyMockTest {
+  private final FakeClock clock = new FakeClock();
+  private PartitionManager partitionManager;
+  private StateManager stateManager;
+  private ScheduledExecutorService executor;
+
+  private static final IScheduledTask BASE_TASK = 
TaskTestUtil.makeTask("test", TaskTestUtil.JOB);
+  private StorageTestUtil storageUtil;
+
+  @Before
+  public void setUp() {
+    storageUtil = new StorageTestUtil(this);
+    stateManager = createMock(StateManager.class);
+    executor = createMock(ScheduledExecutorService.class);
+    partitionManager = new PartitionManager(storageUtil.storage, stateManager, 
clock, executor);
+  }
+
+  private static TaskStateChange makeStateChange(
+      IScheduledTask task,
+      Optional<ScheduleStatus> oldState,
+      ScheduleStatus newState) {
+
+    IScheduledTask newTask = 
IScheduledTask.build(task.newBuilder().setStatus(newState));
+    if (oldState.isPresent()) {
+      return TaskStateChange.transition(newTask, oldState.get());
+    }
+    return TaskStateChange.initialized(newTask);
+  }
+
+  private static TaskStateChange makeStateChange(
+      IScheduledTask task,
+      ScheduleStatus oldState,
+      ScheduleStatus newState) {
+
+    return makeStateChange(task, Optional.of(oldState), newState);
+  }
+
+  private static TaskStateChange makeStateChange(IScheduledTask task, 
ScheduleStatus newState) {
+    return makeStateChange(task, Optional.absent(), newState);
+  }
+
+  private static IScheduledTask taskWithoutPartitionPolicy() {
+    return taskWithPartitionPolicy(null);
+  }
+
+  private static IScheduledTask taskWithPartitionPolicy(PartitionPolicy 
partitionPolicy) {
+    return IScheduledTask.build(BASE_TASK.newBuilder()
+        .setAssignedTask(BASE_TASK.getAssignedTask().newBuilder().setTask(
+            
BASE_TASK.getAssignedTask().getTask().newBuilder().setPartitionPolicy(partitionPolicy))
+        ));
+  }
+
+  private Capture<Callable<Void>> expectExecuteWithDelay(long seconds) {
+    Capture<Callable<Void>> callableCapture = createCapture();
+    expect(
+      executor.schedule(capture(callableCapture), eq(seconds), 
eq(TimeUnit.SECONDS))
+    ).andReturn(createMock(new Clazz<ScheduledFuture<Void>>() { }));
+    return callableCapture;
+  }
+
+  @Test
+  public void testNonPartitionedTransition() {
+    control.replay();
+    partitionManager.handle(makeStateChange(taskWithoutPartitionPolicy(), 
RUNNING));
+  }
+
+  @Test
+  public void testPartitionPolicyNoReschedule() {
+    control.replay();
+    partitionManager.handle(makeStateChange(
+        taskWithPartitionPolicy(new PartitionPolicy().setReschedule(false)),
+        RUNNING,
+        PARTITIONED));
+  }
+
+  @Test
+  public void testNoPartitionPolicy() {
+    expectExecuteWithDelay(0);
+    control.replay();
+    partitionManager.handle(makeStateChange(taskWithoutPartitionPolicy(), 
RUNNING, PARTITIONED));
+  }
+
+  @Test
+  public void testPartitionPolicyWithDelay() {
+    expectExecuteWithDelay(10);
+    control.replay();
+    partitionManager.handle(makeStateChange(
+        taskWithPartitionPolicy(new 
PartitionPolicy().setReschedule(true).setDelaySecs(10)),
+        RUNNING,
+        PARTITIONED));
+  }
+
+  @Test
+  public void testDelayAfterFailover() {
+    IScheduledTask failoverTask = TaskTestUtil.addStateTransition(
+        taskWithPartitionPolicy(new 
PartitionPolicy().setReschedule(true).setDelaySecs(10)),
+        PARTITIONED,
+        0);
+    clock.setNowMillis(5000);
+    expectExecuteWithDelay(5);
+    control.replay();
+    partitionManager.handle(makeStateChange(
+        failoverTask,
+        PARTITIONED));
+  }
+
+  @Test
+  public void testStateChangeToLostNoModifications() throws Exception {
+    IScheduledTask task = taskWithoutPartitionPolicy();
+    Capture<Callable<Void>> stateChange = expectExecuteWithDelay(0);
+    storageUtil.expectOperations();
+    storageUtil.expectTaskFetch(Tasks.id(task), task);
+    expect(stateManager.changeState(
+        storageUtil.mutableStoreProvider,
+        Tasks.id(task),
+        Optional.of(PARTITIONED),
+        ScheduleStatus.LOST,
+        
Optional.of(PartitionManager.TRANSITION_MESSAGE))).andReturn(StateChangeResult.SUCCESS);
+    control.replay();
+    partitionManager.handle(makeStateChange(task, RUNNING, PARTITIONED));
+    stateChange.getValue().call();
+  }
+
+  @Test
+  public void testNoStateChangeAfterTaskTransition() throws Exception {
+    IScheduledTask task = taskWithoutPartitionPolicy();
+    Capture<Callable<Void>> stateChange = expectExecuteWithDelay(0);
+    storageUtil.expectOperations();
+    storageUtil.expectTaskFetch(
+        Tasks.id(task),
+        TaskTestUtil.addStateTransition(task, RUNNING, 10000));
+    control.replay();
+    partitionManager.handle(makeStateChange(task, RUNNING, PARTITIONED));
+    stateChange.getValue().call();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java 
b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index 0366cd6..4cff10b 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -70,8 +70,11 @@ import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.KILLING;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PARTITIONED;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.STARTING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
 import static 
org.apache.aurora.scheduler.resources.ResourceTestUtil.resetPorts;
 import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
@@ -354,6 +357,84 @@ public class StateManagerImplTest extends EasyMockTest {
     assertEquals(1, rescheduledTask.getFailureCount());
   }
 
+  @Test
+  public void testIncrementPartitionCount() {
+    String taskId = "a";
+    expect(taskIdGenerator.generate(SERVICE_CONFIG, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RUNNING, 
PARTITIONED);
+    control.replay();
+
+    insertTask(SERVICE_CONFIG, 0);
+
+    assignTask(taskId, HOST_A);
+    changeState(taskId, RUNNING);
+    changeState(taskId, PARTITIONED);
+    IScheduledTask updatedTask = Storage.Util.fetchTask(storage, taskId).get();
+    assertEquals(1, updatedTask.getTimesPartitioned());
+  }
+
+  @Test
+  public void testTransitionToLost() {
+    String taskId = "a";
+    expect(taskIdGenerator.generate(SERVICE_CONFIG, 0)).andReturn(taskId);
+    expectStateTransitions(taskId, INIT, PENDING, ASSIGNED, RESTARTING, 
PARTITIONED, LOST);
+    driver.killTask(EasyMock.anyObject());
+    driver.killTask(EasyMock.anyObject());
+    String taskId2 = "a2";
+    expect(taskIdGenerator.generate(SERVICE_CONFIG, 0)).andReturn(taskId2);
+    noFlappingPenalty();
+    expectStateTransitions(taskId2, INIT, PENDING);
+
+    control.replay();
+
+    insertTask(SERVICE_CONFIG, 0);
+
+    assignTask(taskId, HOST_A);
+    changeState(taskId, ASSIGNED);
+    changeState(taskId, RESTARTING);
+    changeState(taskId, PARTITIONED);
+    IScheduledTask updatedTask = Storage.Util.fetchTask(storage, taskId).get();
+    assertEquals(LOST, updatedTask.getStatus());
+  }
+
+  @Test
+  public void testCompactPartitionCycles() {
+    String taskId = "a";
+    expect(taskIdGenerator.generate(SERVICE_CONFIG, 0)).andReturn(taskId);
+    expectStateTransitions(
+        taskId,
+        INIT,
+        PENDING,
+        ASSIGNED,
+        STARTING,
+        PARTITIONED,
+        RUNNING,
+        PARTITIONED,
+        RUNNING,
+        PARTITIONED,
+        RUNNING,
+        PARTITIONED);
+
+    control.replay();
+
+    insertTask(SERVICE_CONFIG, 0);
+    assignTask(taskId, HOST_A);
+    changeState(taskId, STARTING);
+    changeState(taskId, PARTITIONED);
+    changeState(taskId, RUNNING);
+    changeState(taskId, PARTITIONED);
+    changeState(taskId, RUNNING);
+    changeState(taskId, PARTITIONED);
+    changeState(taskId, RUNNING);
+    changeState(taskId, PARTITIONED);
+    IScheduledTask updatedTask = Storage.Util.fetchTask(storage, taskId).get();
+    assertEquals(
+        ImmutableList.of(PENDING, ASSIGNED, STARTING, PARTITIONED, RUNNING, 
PARTITIONED),
+        updatedTask.getTaskEvents().stream()
+            .map(e -> e.getStatus())
+            .collect(Collectors.toList()));
+  }
+
   private static ITaskConfig setMaxFailures(ITaskConfig config, int 
maxFailures) {
     return 
ITaskConfig.build(config.newBuilder().setMaxTaskFailures(maxFailures));
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java 
b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
index 8d6c3ff..bb4e752 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskStateMachineTest.java
@@ -49,6 +49,7 @@ import static 
org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.INIT;
 import static 
org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.KILLED;
 import static 
org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.KILLING;
 import static 
org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.LOST;
+import static 
org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.PARTITIONED;
 import static 
org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.PENDING;
 import static 
org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.PREEMPTING;
 import static 
org.apache.aurora.scheduler.state.TaskStateMachine.TaskState.RESTARTING;
@@ -344,6 +345,10 @@ public class TaskStateMachineTest {
   private static final TransitionResult ILLEGAL_KILL = new TransitionResult(
       ILLEGAL_WITH_SIDE_EFFECTS,
       ImmutableSet.of(new SideEffect(Action.KILL, Optional.absent())));
+  private static final TransitionResult TRANSITION_TO_LOST = new 
TransitionResult(
+      SUCCESS,
+      ImmutableSet.of(new SideEffect(Action.TRANSITION_TO_LOST, 
Optional.absent()),
+          new SideEffect(Action.SAVE_STATE, Optional.absent())));
   private static final TransitionResult RECORD_FAILURE = new TransitionResult(
       SUCCESS,
       ImmutableSet.of(
@@ -398,15 +403,18 @@ public class TaskStateMachineTest {
           .put(new TestCase(false, INIT, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(false, INIT, STARTING), ILLEGAL_KILL)
           .put(new TestCase(false, INIT, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, INIT, PARTITIONED), ILLEGAL_KILL)
           .put(new TestCase(true, THROTTLED, PENDING), SAVE)
           .put(new TestCase(true, THROTTLED, KILLING), DELETE_TASK)
           .put(new TestCase(false, THROTTLED, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(false, THROTTLED, STARTING), ILLEGAL_KILL)
           .put(new TestCase(false, THROTTLED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, THROTTLED, PARTITIONED), ILLEGAL_KILL)
           .put(new TestCase(true, PENDING, ASSIGNED), SAVE)
           .put(new TestCase(false, PENDING, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(false, PENDING, STARTING), ILLEGAL_KILL)
           .put(new TestCase(false, PENDING, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, PENDING, PARTITIONED), ILLEGAL_KILL)
           .put(new TestCase(true, PENDING, KILLING), DELETE_TASK)
           .put(new TestCase(false, ASSIGNED, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(true, ASSIGNED, STARTING), SAVE)
@@ -421,6 +429,8 @@ public class TaskStateMachineTest {
           .put(new TestCase(true, ASSIGNED, KILLED), SAVE_AND_RESCHEDULE)
           .put(new TestCase(true, ASSIGNED, KILLING), SAVE_AND_KILL)
           .put(new TestCase(true, ASSIGNED, LOST), SAVE_KILL_AND_RESCHEDULE)
+          .put(new TestCase(false, ASSIGNED, PARTITIONED), ILLEGAL_KILL)
+          .put(new TestCase(true, ASSIGNED, PARTITIONED), SAVE)
           .put(new TestCase(false, STARTING, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(false, STARTING, STARTING), ILLEGAL_KILL)
           .put(new TestCase(true, STARTING, RUNNING), SAVE)
@@ -433,6 +443,8 @@ public class TaskStateMachineTest {
           .put(new TestCase(true, STARTING, KILLED), SAVE_AND_RESCHEDULE)
           .put(new TestCase(true, STARTING, KILLING), SAVE_AND_KILL)
           .put(new TestCase(true, STARTING, LOST), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(false, STARTING, PARTITIONED), ILLEGAL_KILL)
+          .put(new TestCase(true, STARTING, PARTITIONED), SAVE)
           .put(new TestCase(false, RUNNING, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(false, RUNNING, STARTING), ILLEGAL_KILL)
           .put(new TestCase(false, RUNNING, RUNNING), ILLEGAL_KILL)
@@ -444,6 +456,8 @@ public class TaskStateMachineTest {
           .put(new TestCase(true, RUNNING, KILLED), SAVE_AND_RESCHEDULE)
           .put(new TestCase(true, RUNNING, KILLING), SAVE_AND_KILL)
           .put(new TestCase(true, RUNNING, LOST), SAVE_AND_RESCHEDULE)
+          .put(new TestCase(false, RUNNING, PARTITIONED), ILLEGAL_KILL)
+          .put(new TestCase(true, RUNNING, PARTITIONED), SAVE)
           .put(new TestCase(true, FINISHED, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(false, FINISHED, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(true, FINISHED, STARTING), ILLEGAL_KILL)
@@ -451,6 +465,8 @@ public class TaskStateMachineTest {
           .put(new TestCase(true, FINISHED, RUNNING), ILLEGAL_KILL)
           .put(new TestCase(false, FINISHED, RUNNING), ILLEGAL_KILL)
           .put(new TestCase(true, FINISHED, DELETED), DELETE_TASK)
+          .put(new TestCase(false, FINISHED, PARTITIONED), ILLEGAL_KILL)
+          .put(new TestCase(true, FINISHED, PARTITIONED), ILLEGAL_KILL)
           .put(new TestCase(true, PREEMPTING, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(false, PREEMPTING, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(true, PREEMPTING, STARTING), ILLEGAL_KILL)
@@ -462,6 +478,8 @@ public class TaskStateMachineTest {
           .put(new TestCase(true, PREEMPTING, KILLED), SAVE_AND_RESCHEDULE)
           .put(new TestCase(true, PREEMPTING, KILLING), SAVE)
           .put(new TestCase(true, PREEMPTING, LOST), SAVE_KILL_AND_RESCHEDULE)
+          .put(new TestCase(false, PREEMPTING, PARTITIONED), ILLEGAL_KILL)
+          .put(new TestCase(true, PREEMPTING, PARTITIONED), TRANSITION_TO_LOST)
           .put(new TestCase(true, RESTARTING, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(false, RESTARTING, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(true, RESTARTING, STARTING), ILLEGAL_KILL)
@@ -473,6 +491,8 @@ public class TaskStateMachineTest {
           .put(new TestCase(true, RESTARTING, KILLED), SAVE_AND_RESCHEDULE)
           .put(new TestCase(true, RESTARTING, KILLING), SAVE)
           .put(new TestCase(true, RESTARTING, LOST), SAVE_KILL_AND_RESCHEDULE)
+          .put(new TestCase(false, RESTARTING, PARTITIONED), ILLEGAL_KILL)
+          .put(new TestCase(true, RESTARTING, PARTITIONED), TRANSITION_TO_LOST)
           .put(new TestCase(true, DRAINING, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(false, DRAINING, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(true, DRAINING, STARTING), ILLEGAL_KILL)
@@ -484,6 +504,8 @@ public class TaskStateMachineTest {
           .put(new TestCase(true, DRAINING, KILLED), SAVE_AND_RESCHEDULE)
           .put(new TestCase(true, DRAINING, KILLING), SAVE)
           .put(new TestCase(true, DRAINING, LOST), SAVE_KILL_AND_RESCHEDULE)
+          .put(new TestCase(false, DRAINING, PARTITIONED), ILLEGAL_KILL)
+          .put(new TestCase(true, DRAINING, PARTITIONED), TRANSITION_TO_LOST)
           .put(new TestCase(true, FAILED, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(false, FAILED, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(true, FAILED, STARTING), ILLEGAL_KILL)
@@ -491,6 +513,8 @@ public class TaskStateMachineTest {
           .put(new TestCase(true, FAILED, RUNNING), ILLEGAL_KILL)
           .put(new TestCase(false, FAILED, RUNNING), ILLEGAL_KILL)
           .put(new TestCase(true, FAILED, DELETED), DELETE_TASK)
+          .put(new TestCase(false, FAILED, PARTITIONED), ILLEGAL_KILL)
+          .put(new TestCase(true, FAILED, PARTITIONED), ILLEGAL_KILL)
           .put(new TestCase(true, KILLED, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(false, KILLED, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(true, KILLED, STARTING), ILLEGAL_KILL)
@@ -498,6 +522,8 @@ public class TaskStateMachineTest {
           .put(new TestCase(true, KILLED, RUNNING), ILLEGAL_KILL)
           .put(new TestCase(false, KILLED, RUNNING), ILLEGAL_KILL)
           .put(new TestCase(true, KILLED, DELETED), DELETE_TASK)
+          .put(new TestCase(false, KILLED, PARTITIONED), ILLEGAL_KILL)
+          .put(new TestCase(true, KILLED, PARTITIONED), ILLEGAL_KILL)
           .put(new TestCase(true, KILLING, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(false, KILLING, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(true, KILLING, STARTING), ILLEGAL_KILL)
@@ -509,6 +535,22 @@ public class TaskStateMachineTest {
           .put(new TestCase(true, KILLING, KILLED), SAVE)
           .put(new TestCase(true, KILLING, LOST), SAVE)
           .put(new TestCase(true, KILLING, DELETED), DELETE_TASK)
+          .put(new TestCase(false, KILLING, PARTITIONED), ILLEGAL_KILL)
+          .put(new TestCase(true, KILLING, PARTITIONED), TRANSITION_TO_LOST)
+          .put(new TestCase(false, PARTITIONED, ASSIGNED), ILLEGAL_KILL)
+          .put(new TestCase(true, PARTITIONED, ASSIGNED), SAVE)
+          .put(new TestCase(false, PARTITIONED, STARTING), ILLEGAL_KILL)
+          .put(new TestCase(true, PARTITIONED, STARTING), SAVE)
+          .put(new TestCase(false, PARTITIONED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(true, PARTITIONED, RUNNING), SAVE)
+          .put(new TestCase(true, PARTITIONED, PREEMPTING), TRANSITION_TO_LOST)
+          .put(new TestCase(true, PARTITIONED, RESTARTING), TRANSITION_TO_LOST)
+          .put(new TestCase(true, PARTITIONED, DRAINING), TRANSITION_TO_LOST)
+          .put(new TestCase(true, PARTITIONED, KILLING), TRANSITION_TO_LOST)
+          .put(new TestCase(true, PARTITIONED, FAILED), RECORD_FAILURE)
+          .put(new TestCase(true, PARTITIONED, FINISHED), SAVE)
+          .put(new TestCase(true, PARTITIONED, LOST), SAVE_KILL_AND_RESCHEDULE)
+          .put(new TestCase(false, PARTITIONED, PARTITIONED), ILLEGAL_KILL)
           .put(new TestCase(true, LOST, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(false, LOST, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(true, LOST, STARTING), ILLEGAL_KILL)
@@ -516,9 +558,12 @@ public class TaskStateMachineTest {
           .put(new TestCase(true, LOST, RUNNING), ILLEGAL_KILL)
           .put(new TestCase(false, LOST, RUNNING), ILLEGAL_KILL)
           .put(new TestCase(true, LOST, DELETED), DELETE_TASK)
+          .put(new TestCase(false, LOST, PARTITIONED), ILLEGAL_KILL)
+          .put(new TestCase(true, LOST, PARTITIONED), ILLEGAL_KILL)
           .put(new TestCase(false, DELETED, ASSIGNED), ILLEGAL_KILL)
           .put(new TestCase(false, DELETED, STARTING), ILLEGAL_KILL)
           .put(new TestCase(false, DELETED, RUNNING), ILLEGAL_KILL)
+          .put(new TestCase(false, DELETED, PARTITIONED), ILLEGAL_KILL)
           .build();
 
   @Test

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/python/apache/aurora/client/cli/test_task.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_task.py 
b/src/test/python/apache/aurora/client/cli/test_task.py
index 186cb27..a543d4a 100644
--- a/src/test/python/apache/aurora/client/cli/test_task.py
+++ b/src/test/python/apache/aurora/client/cli/test_task.py
@@ -95,7 +95,7 @@ class TestRunCommand(AuroraClientCommandTest):
       mock_scheduler_proxy.getTasksStatus.assert_called_with(TaskQuery(
           jobKeys=[JobKey(role='bozo', environment='test', name='hello')],
           statuses=set([ScheduleStatus.RUNNING, ScheduleStatus.KILLING, 
ScheduleStatus.RESTARTING,
-              ScheduleStatus.PREEMPTING, ScheduleStatus.DRAINING]),
+              ScheduleStatus.PREEMPTING, ScheduleStatus.PARTITIONED, 
ScheduleStatus.DRAINING]),
           instanceIds=instances),
           retry=True)
 
@@ -150,7 +150,7 @@ class TestSshCommand(AuroraClientCommandTest):
           jobKeys=[JobKey(role='bozo', environment='test', name='hello')],
           instanceIds=set([1]),
           statuses=set([ScheduleStatus.RUNNING, ScheduleStatus.KILLING, 
ScheduleStatus.RESTARTING,
-              ScheduleStatus.PREEMPTING, ScheduleStatus.DRAINING])),
+              ScheduleStatus.PREEMPTING, ScheduleStatus.PARTITIONED, 
ScheduleStatus.DRAINING])),
           retry=True)
       mock_subprocess.assert_called_with(['ssh', '-t', '-v', 'bozo@slavehost',
           'cd 
/slaveroot/slaves/*/frameworks/*/executors/thermos-1287391823/runs/'
@@ -178,7 +178,7 @@ class TestSshCommand(AuroraClientCommandTest):
           jobKeys=[JobKey(role='bozo', environment='test', name='hello')],
           instanceIds=None,
           statuses=set([ScheduleStatus.RUNNING, ScheduleStatus.KILLING, 
ScheduleStatus.RESTARTING,
-              ScheduleStatus.PREEMPTING, ScheduleStatus.DRAINING])),
+              ScheduleStatus.PREEMPTING, ScheduleStatus.PARTITIONED, 
ScheduleStatus.DRAINING])),
           retry=True)
       mock_subprocess.assert_called_with(['ssh', '-t', '-v', 'bozo@slavehost',
           'cd 
/slaveroot/slaves/*/frameworks/*/executors/thermos-1287391823/runs/'

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/python/apache/aurora/config/test_thrift.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/config/test_thrift.py 
b/src/test/python/apache/aurora/config/test_thrift.py
index 7a1567a..76d0ad6 100644
--- a/src/test/python/apache/aurora/config/test_thrift.py
+++ b/src/test/python/apache/aurora/config/test_thrift.py
@@ -18,6 +18,7 @@ import re
 import pytest
 
 from apache.aurora.config import AuroraConfig
+from apache.aurora.config.schema.base import PartitionPolicy as 
PystachioPartitionPolicy
 from apache.aurora.config.schema.base import (
     AppcImage,
     Container,
@@ -37,7 +38,13 @@ from apache.thermos.config.schema import Process, Resources, 
Task
 
 from gen.apache.aurora.api.constants import GOOD_IDENTIFIER_PATTERN_PYTHON
 from gen.apache.aurora.api.ttypes import Mode as ThriftMode
-from gen.apache.aurora.api.ttypes import CronCollisionPolicy, Identity, 
JobKey, Resource
+from gen.apache.aurora.api.ttypes import (
+    CronCollisionPolicy,
+    Identity,
+    JobKey,
+    PartitionPolicy,
+    Resource
+)
 from gen.apache.aurora.test.constants import INVALID_IDENTIFIERS, 
VALID_IDENTIFIERS
 
 HELLO_WORLD = Job(
@@ -143,6 +150,7 @@ def test_config_with_options():
     priority=200,
     service=True,
     cron_collision_policy='RUN_OVERLAP',
+    partition_policy=PystachioPartitionPolicy(delay_secs=10),
     constraints={
       'dedicated': 'root',
       'cpu': 'x86_64'
@@ -159,6 +167,24 @@ def test_config_with_options():
   assert job.cronCollisionPolicy == CronCollisionPolicy.RUN_OVERLAP
   assert len(tti.constraints) == 2
   assert job.key.environment == 'prod'
+  assert tti.partitionPolicy == PartitionPolicy(True, 10)
+
+
+def test_disable_partition_policy():
+  hwc = HELLO_WORLD(
+    production=True,
+    priority=200,
+    service=True,
+    cron_collision_policy='RUN_OVERLAP',
+    partition_policy=PystachioPartitionPolicy(reschedule=False),
+    constraints={
+      'dedicated': 'root',
+      'cpu': 'x86_64'
+    },
+    environment='prod'
+  )
+  job = convert_pystachio_to_thrift(hwc)
+  assert job.taskConfig.partitionPolicy == PartitionPolicy(False, 0)
 
 
 def test_config_with_ports():

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/sh/org/apache/aurora/e2e/partition_aware.aurora
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/partition_aware.aurora 
b/src/test/sh/org/apache/aurora/e2e/partition_aware.aurora
new file mode 100644
index 0000000..7ea9fad
--- /dev/null
+++ b/src/test/sh/org/apache/aurora/e2e/partition_aware.aurora
@@ -0,0 +1,22 @@
+# run the script
+hello_world = Process(
+  name = 'hello_world',
+  cmdline = 'while :; do   sleep 10; done')
+
+# describe the task
+hello_world_task = SequentialTask(
+  processes = [hello_world],
+  resources = Resources(cpu = 0.2, ram = 1 * MB, disk = 8 * MB))
+
+base = Service(
+  cluster = 'devcluster',
+  environment = 'test',
+  role = 'vagrant',
+  task = hello_world_task
+)
+
+jobs = [
+  base(name='partition_aware_default'),
+  base(name='partition_aware_disabled', 
partition_policy=PartitionPolicy(reschedule=False)),
+  base(name='partition_aware_delay', 
partition_policy=PartitionPolicy(delay_secs=60 * 60 * 24))
+]

http://git-wip-us.apache.org/repos/asf/aurora/blob/c746452e/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh 
b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
index f0819fb..1500bda 100755
--- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
+++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
@@ -304,6 +304,43 @@ test_update_fail() {
   fi
 }
 
+test_partition_awareness() {
+  local _config=$1 _cluster=$2 _default_jobkey=$3 _disabled_jobkey=$4 
_delay_jobkey=$5
+
+  # create three jobs with different partition policies
+  aurora update start --wait $_default_jobkey $_config
+  aurora update start --wait $_disabled_jobkey $_config
+  aurora update start --wait $_delay_jobkey $_config
+
+  # partition the agent
+  sudo stop mesos-slave
+
+  # the default job should become LOST and then transition to PENDING
+  wait_until_task_status $_default_jobkey "0" "PENDING"
+
+  # the other two should be PARTITIONED
+  assert_task_status $_disabled_jobkey "0" "PARTITIONED"
+  assert_task_status $_delay_jobkey "0" "PARTITIONED"
+
+  # start the agent back up
+  sudo start mesos-slave
+
+  # This can be removed when https://issues.apache.org/jira/browse/MESOS-6406 
is resolved.
+  # We have to pause and let the agent reregister with Mesos, then ask Aurora 
to explicitly
+  # reconcile to get the RUNNING status update.
+  sleep 30
+  aurora_admin reconcile_tasks $_cluster
+
+  # the PARTITIONED tasks should now be running
+  assert_task_status $_disabled_jobkey "0" "RUNNING"
+  assert_task_status $_delay_jobkey "0" "RUNNING"
+
+  # Clean up
+  aurora job killall $_default_jobkey
+  aurora job killall $_disabled_jobkey
+  aurora job killall $_delay_jobkey
+}
+
 test_announce() {
   local _role=$1 _env=$2 _job=$3
 
@@ -668,6 +705,10 @@ 
TEST_EPHEMERAL_DAEMON_WITH_FINAL_JOB=ephemeral_daemon_with_final
 
TEST_EPHEMERAL_DAEMON_WITH_FINAL_CONFIG_FILE=$TEST_ROOT/ephemeral_daemon_with_final.aurora
 TEST_DAEMONIZING_PROCESS_JOB=daemonize
 TEST_DAEMONIZING_PROCESS_CONFIG_FILE=$TEST_ROOT/test_daemonizing_process.aurora
+TEST_PARTITION_AWARENESS_CONFIG_FILE=$TEST_ROOT/partition_aware.aurora
+TEST_JOB_PA_DEFAULT=$TEST_CLUSTER/$TEST_ROLE/$TEST_ENV/partition_aware_default
+TEST_JOB_PA_DISABLED=$TEST_CLUSTER/$TEST_ROLE/$TEST_ENV/partition_aware_disabled
+TEST_JOB_PA_DELAY=$TEST_CLUSTER/$TEST_ROLE/$TEST_ENV/partition_aware_delay
 
 BASE_ARGS=(
   $TEST_CLUSTER
@@ -708,6 +749,14 @@ TEST_DAEMONIZING_PROCESS_ARGS=(
   $TEST_DAEMONIZING_PROCESS_CONFIG_FILE
 )
 
+TEST_PARTITION_AWARENESS_ARGS=(
+  $TEST_PARTITION_AWARENESS_CONFIG_FILE
+  $TEST_CLUSTER
+  $TEST_JOB_PA_DEFAULT
+  $TEST_JOB_PA_DISABLED
+  $TEST_JOB_PA_DELAY
+)
+
 TEST_JOB_KILL_MESSAGE_ARGS=("${TEST_JOB_ARGS[@]}" "--message='Test message'")
 
 trap collect_result EXIT
@@ -716,6 +765,8 @@ aurorabuild all
 setup_ssh
 setup_docker_registry
 
+test_partition_awareness "${TEST_PARTITION_AWARENESS_ARGS[@]}"
+
 test_version
 test_http_example "${TEST_JOB_ARGS[@]}"
 test_http_example "${TEST_JOB_WATCH_SECS_ARGS[@]}"

Reply via email to