Repository: mesos
Updated Branches:
  refs/heads/master cdaac1653 -> ca1359405


Add source and reason to TaskStatus.

Review: https://reviews.apache.org/r/26382/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ca135940
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ca135940
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ca135940

Branch: refs/heads/master
Commit: ca13594054b179bd36ce14323de3111f92bae3cd
Parents: cdaac16
Author: Dominic Hamon <[email protected]>
Authored: Thu Oct 16 11:47:43 2014 -0700
Committer: Dominic Hamon <[email protected]>
Committed: Tue Nov 4 14:01:19 2014 -0800

----------------------------------------------------------------------
 include/mesos/mesos.proto                       |  45 ++++++--
 src/common/protobuf_utils.cpp                   |   7 ++
 src/common/protobuf_utils.hpp                   |   2 +
 src/examples/balloon_framework.cpp              |   4 +-
 src/examples/java/TestFramework.java            |   4 +
 src/examples/low_level_scheduler_libprocess.cpp |   2 +
 src/examples/low_level_scheduler_pthread.cpp    |   2 +
 src/examples/no_executor_framework.cpp          |   2 +
 src/examples/test_framework.cpp                 |   2 +
 src/master/master.cpp                           |  67 +++++++++---
 src/sched/sched.cpp                             |   2 +
 src/slave/slave.cpp                             | 107 ++++++++++++-------
 src/slave/slave.hpp                             |   8 +-
 src/tests/fault_tolerance_tests.cpp             |  16 ++-
 src/tests/master_authorization_tests.cpp        |   1 +
 src/tests/master_tests.cpp                      |   2 +
 src/tests/resource_offers_tests.cpp             |   5 +
 17 files changed, 211 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 397d542..6c846f2 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -607,15 +607,16 @@ message TaskInfo {
  * another task).
  */
 enum TaskState {
-  TASK_STAGING = 6;  // Initial state. Framework status updates should not use.
+  TASK_STAGING = 6; // Initial state. Framework status updates should not use.
   TASK_STARTING = 0;
   TASK_RUNNING = 1;
-  TASK_FINISHED = 2; // TERMINAL.
-  TASK_FAILED = 3;   // TERMINAL.
-  TASK_KILLED = 4;   // TERMINAL.
-  TASK_LOST = 5;     // TERMINAL.
-
-  // TODO(benh): TASK_ERROR = 7; // TERMINAL.
+  TASK_FINISHED = 2; // TERMINAL. The task finished successfully.
+  TASK_FAILED = 3; // TERMINAL. The task failed to finish successfully.
+  TASK_KILLED = 4; // TERMINAL. The task was killed by the executor.
+  TASK_LOST = 5; // TERMINAL. The task failed but can be rescheduled.
+  // TASK_ERROR is currently unused but will be introduced in 0.22.0.
+  // TODO(dhamon): Start using TASK_ERROR.
+  TASK_ERROR = 7; // TERMINAL. The task description contains an error.
 }
 
 
@@ -623,9 +624,39 @@ enum TaskState {
  * Describes the current status of a task.
  */
 message TaskStatus {
+  /** Describes the source of the task status update. */
+  enum Source {
+    SOURCE_MASTER = 0;
+    SOURCE_SLAVE = 1;
+    SOURCE_EXECUTOR = 2;
+  }
+
+  /** Detailed reason for the task status update. */
+  enum Reason {
+    REASON_COMMAND_EXECUTOR_FAILED = 0;
+    REASON_EXECUTOR_TERMINATED = 1;
+    REASON_EXECUTOR_UNREGISTERED = 2;
+    REASON_FRAMEWORK_REMOVED = 3;
+    REASON_GC_ERROR = 4;
+    REASON_INVALID_FRAMEWORKID = 5;
+    REASON_INVALID_OFFERS = 6;
+    REASON_MASTER_DISCONNECTED = 7;
+    REASON_MEMORY_LIMIT = 8;
+    REASON_RECONCILIATION = 9;
+    REASON_SLAVE_DISCONNECTED = 10;
+    REASON_SLAVE_REMOVED = 11;
+    REASON_SLAVE_RESTARTED = 12;
+    REASON_SLAVE_UNKNOWN = 13;
+    REASON_TASK_INVALID = 14;
+    REASON_TASK_UNAUTHORIZED = 15;
+    REASON_TASK_UNKNOWN = 16;
+  }
+
   required TaskID task_id = 1;
   required TaskState state = 2;
   optional string message = 4; // Possible message explaining state.
+  optional Source source = 9;
+  optional Reason reason = 10;
   optional bytes data = 3;
   optional SlaveID slave_id = 5;
   optional ExecutorID executor_id = 7; // TODO(benh): Use in master/slave.

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index a9b65e3..33ce782 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -47,7 +47,9 @@ StatusUpdate createStatusUpdate(
     const Option<SlaveID>& slaveId,
     const TaskID& taskId,
     const TaskState& state,
+    const TaskStatus::Source& source,
     const std::string& message = "",
+    const Option<TaskStatus::Reason>& reason = None(),
     const Option<ExecutorID>& executorId = None())
 {
   StatusUpdate update;
@@ -72,9 +74,14 @@ StatusUpdate createStatusUpdate(
   }
 
   status->set_state(state);
+  status->set_source(source);
   status->set_message(message);
   status->set_timestamp(update.timestamp());
 
+  if (reason.isSome()) {
+    status->set_reason(reason.get());
+  }
+
   return update;
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 212d512..bc3ef2a 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -41,7 +41,9 @@ StatusUpdate createStatusUpdate(
     const Option<SlaveID>& slaveId,
     const TaskID& taskId,
     const TaskState& state,
+    const TaskStatus::Source& source,
     const std::string& message = "",
+    const Option<TaskStatus::Reason>& reason = None(),
     const Option<ExecutorID>& executorId = None());
 
 Task createTask(

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/examples/balloon_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/balloon_framework.cpp 
b/src/examples/balloon_framework.cpp
index b05d567..c2337ba 100644
--- a/src/examples/balloon_framework.cpp
+++ b/src/examples/balloon_framework.cpp
@@ -129,8 +129,10 @@ public:
   virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status)
   {
     std::cout << "Task in state " << status.state() << std::endl;
+    std::cout << "Source: " << status.source() << std::endl;
+    std::cout << "Reason: " << status.reason() << std::endl;
     if (status.has_message()) {
-      std::cout << "Reason: " << status.message() << std::endl;
+      std::cout << "Message: " << status.message() << std::endl;
     }
 
     if (protobuf::isTerminalState(status.state())) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/examples/java/TestFramework.java
----------------------------------------------------------------------
diff --git a/src/examples/java/TestFramework.java 
b/src/examples/java/TestFramework.java
index bc593d0..ce87de8 100644
--- a/src/examples/java/TestFramework.java
+++ b/src/examples/java/TestFramework.java
@@ -132,6 +132,10 @@ public class TestFramework {
         System.err.println("Aborting because task " + 
status.getTaskId().getValue() +
                            " is in unexpected state " +
                            status.getState().getValueDescriptor().getName() +
+                           " with reason '" +
+                           status.getReason().getValueDescriptor().getName() + 
"'" +
+                           " from source '" +
+                           status.getSource().getValueDescriptor().getName() + 
"'" +
                            " with message '" + status.getMessage() + "'");
         driver.abort();
       }

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/examples/low_level_scheduler_libprocess.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_libprocess.cpp 
b/src/examples/low_level_scheduler_libprocess.cpp
index 7ef5ea7..89b4318 100644
--- a/src/examples/low_level_scheduler_libprocess.cpp
+++ b/src/examples/low_level_scheduler_libprocess.cpp
@@ -290,6 +290,8 @@ private:
         status.state() == TASK_FAILED) {
       EXIT(1) << "Exiting because task " << status.task_id()
               << " is in unexpected state " << status.state()
+              << " with reason " << status.reason()
+              << " from source " << status.source()
               << " with message '" << status.message() << "'";
     }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/examples/low_level_scheduler_pthread.cpp
----------------------------------------------------------------------
diff --git a/src/examples/low_level_scheduler_pthread.cpp 
b/src/examples/low_level_scheduler_pthread.cpp
index 6e233a1..e5cd48a 100644
--- a/src/examples/low_level_scheduler_pthread.cpp
+++ b/src/examples/low_level_scheduler_pthread.cpp
@@ -340,6 +340,8 @@ private:
         status.state() == TASK_FAILED) {
       EXIT(1) << "Exiting because task " << status.task_id()
               << " is in unexpected state " << status.state()
+              << " with reason " << status.reason()
+              << " from source " << status.source()
               << " with message '" << status.message() << "'";
     }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/examples/no_executor_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/no_executor_framework.cpp 
b/src/examples/no_executor_framework.cpp
index f98a073..9c84e03 100644
--- a/src/examples/no_executor_framework.cpp
+++ b/src/examples/no_executor_framework.cpp
@@ -126,6 +126,8 @@ public:
         status.state() == TASK_FAILED) {
       cout << "Aborting because task " << taskId
            << " is in unexpected state " << status.state()
+           << " with reason " << status.reason()
+           << " from source " << status.source()
            << " with message '" << status.message() << "'"
            << endl;
       driver->abort();

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/examples/test_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_framework.cpp b/src/examples/test_framework.cpp
index 187a611..e87198b 100644
--- a/src/examples/test_framework.cpp
+++ b/src/examples/test_framework.cpp
@@ -133,6 +133,8 @@ public:
         status.state() == TASK_FAILED) {
       cout << "Aborting because task " << taskId
            << " is in unexpected state " << status.state()
+           << " with reason " << status.reason()
+           << " from source " << status.source()
            << " with message '" << status.message() << "'"
            << endl;
       driver->abort();

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d914786..a860496 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2321,7 +2321,9 @@ void Master::launchTasks(
           task.slave_id(),
           task.task_id(),
           TASK_LOST,
-          "Task launched with invalid offers: " + error.get().message);
+          TaskStatus::SOURCE_MASTER,
+          "Task launched with invalid offers: " + error.get().message,
+          TaskStatus::REASON_INVALID_OFFERS);
 
       metrics.tasks_lost++;
       stats.tasks[TASK_LOST]++;
@@ -2542,7 +2544,11 @@ void Master::_launchTasks(
           task.slave_id(),
           task.task_id(),
           TASK_LOST,
-          (slave == NULL ? "Slave removed" : "Slave disconnected"));
+          TaskStatus::SOURCE_MASTER,
+          slave == NULL ? "Slave removed" : "Slave disconnected",
+          slave == NULL ?
+              TaskStatus::REASON_SLAVE_REMOVED :
+              TaskStatus::REASON_SLAVE_DISCONNECTED);
 
       metrics.tasks_lost++;
       stats.tasks[TASK_LOST]++;
@@ -2578,8 +2584,10 @@ void Master::_launchTasks(
           framework->id,
           task.slave_id(),
           task.task_id(),
-          TASK_LOST,
-          validation.get().message);
+          TASK_LOST,  // TODO(dhamon): TASK_ERROR in 0.22
+          TaskStatus::SOURCE_MASTER,
+          validation.get().message,
+          TaskStatus::REASON_TASK_INVALID);
 
       metrics.tasks_lost++;
       stats.tasks[TASK_LOST]++;
@@ -2603,10 +2611,12 @@ void Master::_launchTasks(
           framework->id,
           task.slave_id(),
           task.task_id(),
-          TASK_LOST,
+          TASK_LOST,  // TODO(dhamon): TASK_ERROR in 0.22
+          TaskStatus::SOURCE_MASTER,
           authorization.isFailed() ?
               "Authorization failure: " + authorization.failure() :
-              "Not authorized to launch as user '" + user + "'");
+              "Not authorized to launch as user '" + user + "'",
+          TaskStatus::REASON_TASK_UNAUTHORIZED);
 
       metrics.tasks_lost++;
       stats.tasks[TASK_LOST]++;
@@ -2635,7 +2645,9 @@ void Master::_launchTasks(
           task.slave_id(),
           task.task_id(),
           TASK_LOST,
-          error);
+          TaskStatus::SOURCE_MASTER,
+          error,
+          TaskStatus::REASON_TASK_INVALID);
 
       metrics.tasks_lost++;
       stats.tasks[TASK_LOST]++;
@@ -2724,6 +2736,7 @@ void Master::killTask(
         None(),
         taskId,
         TASK_KILLED,
+        TaskStatus::SOURCE_MASTER,
         "Killed pending task");
 
     forward(update, UPID(), framework);
@@ -2765,7 +2778,9 @@ void Master::killTask(
           None(),
           taskId,
           TASK_LOST,
-          "Attempted to kill an unknown task");
+          TaskStatus::SOURCE_MASTER,
+          "Attempted to kill an unknown task",
+          TaskStatus::REASON_TASK_UNKNOWN);
 
       forward(update, UPID(), framework);
     } else {
@@ -3542,7 +3557,9 @@ void Master::reconcileTasks(
           task.slave_id(),
           task.task_id(),
           TASK_STAGING,
-          "Reconciliation: Latest task state");
+          TaskStatus::SOURCE_MASTER,
+          "Reconciliation: Latest task state",
+          TaskStatus::REASON_RECONCILIATION);
 
       VLOG(1) << "Sending implicit reconciliation state "
               << update.status().state()
@@ -3566,7 +3583,9 @@ void Master::reconcileTasks(
           task->slave_id(),
           task->task_id(),
           state,
-          "Reconciliation: Latest task state");
+          TaskStatus::SOURCE_MASTER,
+          "Reconciliation: Latest task state",
+          TaskStatus::REASON_RECONCILIATION);
 
       VLOG(1) << "Sending implicit reconciliation state "
               << update.status().state()
@@ -3617,7 +3636,9 @@ void Master::reconcileTasks(
           task_.slave_id(),
           task_.task_id(),
           TASK_STAGING,
-          "Reconciliation: Latest task state");
+          TaskStatus::SOURCE_MASTER,
+          "Reconciliation: Latest task state",
+          TaskStatus::REASON_RECONCILIATION);
     } else if (task != NULL) {
       // (2) Task is known: send the latest status update state.
       const TaskState& state = task->has_status_update_state()
@@ -3629,7 +3650,9 @@ void Master::reconcileTasks(
           task->slave_id(),
           task->task_id(),
           state,
-          "Reconciliation: Latest task state");
+          TaskStatus::SOURCE_MASTER,
+          "Reconciliation: Latest task state",
+          TaskStatus::REASON_RECONCILIATION);
     } else if (slaveId.isSome() && slaves.registered.contains(slaveId.get())) {
       // (3) Task is unknown, slave is registered: TASK_LOST.
       update = protobuf::createStatusUpdate(
@@ -3637,7 +3660,9 @@ void Master::reconcileTasks(
           slaveId.get(),
           status.task_id(),
           TASK_LOST,
-          "Reconciliation: Task is unknown to the slave");
+          TaskStatus::SOURCE_MASTER,
+          "Reconciliation: Task is unknown to the slave",
+          TaskStatus::REASON_RECONCILIATION);
     } else if (slaves.transitioning(slaveId)) {
       // (4) Task is unknown, slave is transitionary: no-op.
       LOG(INFO) << "Ignoring reconciliation request of task "
@@ -3650,7 +3675,9 @@ void Master::reconcileTasks(
           slaveId,
           status.task_id(),
           TASK_LOST,
-          "Reconciliation: Task is unknown");
+          TaskStatus::SOURCE_MASTER,
+          "Reconciliation: Task is unknown",
+          TaskStatus::REASON_RECONCILIATION);
     }
 
     if (update.isSome()) {
@@ -4010,7 +4037,9 @@ void Master::reconcile(
           status->mutable_task_id()->CopyFrom(task->task_id());
           status->mutable_slave_id()->CopyFrom(slave->id);
           status->set_state(state);
+          status->set_source(TaskStatus::SOURCE_MASTER);
           status->set_message("Reconciliation request");
+          status->set_reason(TaskStatus::REASON_RECONCILIATION);
           status->set_timestamp(Clock::now().secs());
         } else {
           // TODO(bmahler): Remove this case in 0.22.0.
@@ -4019,7 +4048,9 @@ void Master::reconcile(
               slave->id,
               task->task_id(),
               TASK_LOST,
-              "Task is unknown to the slave");
+              TaskStatus::SOURCE_MASTER,
+              "Task is unknown to the slave",
+              TaskStatus::REASON_TASK_UNKNOWN);
 
           updateTask(task, update);
           removeTask(task);
@@ -4277,7 +4308,9 @@ void Master::removeFramework(Framework* framework)
         task->slave_id(),
         task->task_id(),
         TASK_KILLED,
+        TaskStatus::SOURCE_MASTER,
         "Framework " + framework->id.value() + " removed",
+        TaskStatus::REASON_FRAMEWORK_REMOVED,
         (task->has_executor_id()
          ? Option<ExecutorID>(task->executor_id())
          : None()));
@@ -4369,7 +4402,9 @@ void Master::removeFramework(Slave* slave, Framework* 
framework)
         task->slave_id(),
         task->task_id(),
         TASK_LOST,
+        TaskStatus::SOURCE_MASTER,
         "Slave " + slave->info.hostname() + " disconnected",
+        TaskStatus::REASON_SLAVE_DISCONNECTED,
         (task->has_executor_id()
             ? Option<ExecutorID>(task->executor_id()) : None()));
 
@@ -4486,7 +4521,9 @@ void Master::removeSlave(Slave* slave)
           task->slave_id(),
           task->task_id(),
           TASK_LOST,
+          TaskStatus::SOURCE_MASTER,
           "Slave " + slave->info.hostname() + " removed",
+          TaskStatus::REASON_SLAVE_REMOVED,
           (task->has_executor_id() ?
               Option<ExecutorID>(task->executor_id()) : None()));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index d84465c..e5f828d 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -836,6 +836,8 @@ protected:
         TaskStatus* status = update.mutable_status();
         status->mutable_task_id()->MergeFrom(task.task_id());
         status->set_state(TASK_LOST);
+        status->set_source(TaskStatus::SOURCE_MASTER);
+        status->set_reason(TaskStatus::REASON_MASTER_DISCONNECTED);
         status->set_message("Master Disconnected");
         update.set_timestamp(Clock::now().secs());
         update.set_uuid(UUID::random().toBytes());

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index b893517..b542491 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -888,7 +888,9 @@ void Slave::reregistered(
             info.id(),
             taskId,
             TASK_LOST,
-            "Reconciliation: task unknown to the slave");
+            TaskStatus::SOURCE_SLAVE,
+            "Reconciliation: task unknown to the slave",
+            TaskStatus::REASON_RECONCILIATION);
 
         // NOTE: We can't use statusUpdate() here because it drops
         // updates for unknown frameworks.
@@ -1237,8 +1239,10 @@ void Slave::_runTask(
         info.id(),
         task.task_id(),
         TASK_LOST,
+        TaskStatus::SOURCE_SLAVE,
         "Could not launch the task because we failed to unschedule directories"
-        " scheduled for gc");
+        " scheduled for gc",
+        TaskStatus::REASON_GC_ERROR);
 
     // TODO(vinod): Ensure that the status update manager reliably
     // delivers this update. Currently, we don't guarantee this
@@ -1300,7 +1304,9 @@ void Slave::_runTask(
           info.id(),
           task.task_id(),
           TASK_LOST,
-          "Executor terminating/terminated");
+          TaskStatus::SOURCE_SLAVE,
+          "Executor terminating/terminated",
+          TaskStatus::REASON_EXECUTOR_TERMINATED);
 
       statusUpdate(update, UPID());
       break;
@@ -1416,7 +1422,11 @@ void Slave::killTask(
                    << " before it was launched";
 
       const StatusUpdate& update = protobuf::createStatusUpdate(
-          frameworkId, info.id(), taskId, TASK_KILLED,
+          frameworkId,
+          info.id(),
+          taskId,
+          TASK_KILLED,
+          TaskStatus::SOURCE_SLAVE,
           "Task killed before it was launched");
       statusUpdate(update, UPID());
 
@@ -1439,7 +1449,13 @@ void Slave::killTask(
     // We send a TASK_LOST update because this task has never
     // been launched on this slave.
     const StatusUpdate& update = protobuf::createStatusUpdate(
-        frameworkId, info.id(), taskId, TASK_LOST, "Cannot find executor");
+        frameworkId,
+        info.id(),
+        taskId,
+        TASK_LOST,
+        TaskStatus::SOURCE_SLAVE,
+        "Cannot find executor",
+        TaskStatus::REASON_EXECUTOR_TERMINATED);
 
     statusUpdate(update, UPID());
     return;
@@ -1456,7 +1472,9 @@ void Slave::killTask(
           info.id(),
           taskId,
           TASK_KILLED,
+          TaskStatus::SOURCE_SLAVE,
           "Unregistered executor",
+          TaskStatus::REASON_EXECUTOR_UNREGISTERED,
           executor->id);
 
       statusUpdate(update, UPID());
@@ -2126,7 +2144,9 @@ void Slave::reregisterExecutor(
               info.id(),
               task->task_id(),
               TASK_LOST,
+              TaskStatus::SOURCE_SLAVE,
               "Task launched during slave restart",
+              TaskStatus::REASON_SLAVE_RESTARTED,
               executorId);
 
           statusUpdate(update, UPID());
@@ -2205,7 +2225,9 @@ void Slave::statusUpdate(const StatusUpdate& update, 
const UPID& pid)
         state == RUNNING || state == TERMINATING)
     << state;
 
-  const TaskStatus& status = update.status();
+  TaskStatus status = update.status();
+  status.set_source(pid == UPID() ? TaskStatus::SOURCE_SLAVE
+                                  : TaskStatus::SOURCE_EXECUTOR);
 
   Framework* framework = getFramework(update.framework_id());
   if (framework == NULL) {
@@ -2269,8 +2291,9 @@ void Slave::statusUpdate(const StatusUpdate& update, 
const UPID& pid)
                  << " (" << executor->pid << ")";
   }
 
-  stats.tasks[update.status().state()]++;
+  stats.tasks[status.state()]++;
   stats.validStatusUpdates++;
+
   metrics.valid_status_updates++;
 
   // We set the latest state of the task here so that the slave can
@@ -2933,22 +2956,8 @@ void Slave::executorTerminated(
         // supports it.
         foreach (Task* task, executor->launchedTasks.values()) {
           if (!protobuf::isTerminalState(task->state())) {
-            mesos::TaskState taskState;
-            if ((termination.isReady() && termination.get().killed()) ||
-                executor->isCommandExecutor()) {
-              taskState = TASK_FAILED;
-            } else {
-              taskState = TASK_LOST;
-            }
-            statusUpdate(protobuf::createStatusUpdate(
-                frameworkId,
-                info.id(),
-                task->task_id(),
-                taskState,
-                termination.isReady() ? termination.get().message() :
-                                        "Abnormal executor termination",
-                executorId),
-                UPID());
+            sendExecutorTerminatedStatusUpdate(
+                task->task_id(), termination, frameworkId, executor);
           }
         }
 
@@ -2956,22 +2965,8 @@ void Slave::executorTerminated(
         // TODO(vinod): Use foreachvalue instead once LinkedHashmap
         // supports it.
         foreach (const TaskInfo& task, executor->queuedTasks.values()) {
-          mesos::TaskState taskState;
-          if ((termination.isReady() && termination.get().killed()) ||
-              executor->isCommandExecutor()) {
-            taskState = TASK_FAILED;
-          } else {
-            taskState = TASK_LOST;
-          }
-          statusUpdate(protobuf::createStatusUpdate(
-              frameworkId,
-              info.id(),
-              task.task_id(),
-              taskState,
-              termination.isReady() ? termination.get().message() :
-                                      "Abnormal executor termination",
-              executorId),
-              UPID());
+          sendExecutorTerminatedStatusUpdate(
+              task.task_id(), termination, frameworkId, executor);
         }
       }
 
@@ -3709,6 +3704,38 @@ double Slave::_executors_terminating()
 }
 
 
+void Slave::sendExecutorTerminatedStatusUpdate(
+    const TaskID& taskId,
+    const Future<containerizer::Termination>& termination,
+    const FrameworkID& frameworkId,
+    const Executor* executor)
+{
+  mesos::TaskState taskState = TASK_LOST;
+  TaskStatus::Reason reason = TaskStatus::REASON_EXECUTOR_TERMINATED;
+
+  if (termination.isReady() && termination.get().killed()) {
+    taskState = TASK_FAILED;
+    // TODO(dhamon): MESOS-2035: Add 'reason' to containerizer::Termination.
+    reason = TaskStatus::REASON_MEMORY_LIMIT;
+  } else if (executor->isCommandExecutor()) {
+    taskState = TASK_FAILED;
+    reason = TaskStatus::REASON_COMMAND_EXECUTOR_FAILED;
+  }
+
+  statusUpdate(protobuf::createStatusUpdate(
+      frameworkId,
+      info.id(),
+      taskId,
+      taskState,
+      TaskStatus::SOURCE_SLAVE,
+      termination.isReady() ? termination.get().message() :
+                              "Abnormal executor termination",
+      reason,
+      executor->id),
+      UPID());
+}
+
+
 Slave::Metrics::Metrics(const Slave& slave)
   : uptime_secs(
         "slave/uptime_secs",
@@ -4348,7 +4375,7 @@ bool Executor::incompleteTasks()
 }
 
 
-bool Executor::isCommandExecutor()
+bool Executor::isCommandExecutor() const
 {
   return commandExecutor;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 6c183f8..5b082fc 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -396,6 +396,12 @@ private:
   double _executors_running();
   double _executors_terminating();
 
+  void sendExecutorTerminatedStatusUpdate(
+      const TaskID& taskId,
+      const Future<containerizer::Termination>& termination,
+      const FrameworkID& frameworkId,
+      const Executor* executor);
+
   const Flags flags;
 
   SlaveInfo info;
@@ -519,7 +525,7 @@ struct Executor
   bool incompleteTasks();
 
   // Returns true if this is a command executor.
-  bool isCommandExecutor();
+  bool isCommandExecutor() const;
 
   enum State {
     REGISTERING,  // Executor is launched but not (re-)registered yet.

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp 
b/src/tests/fault_tolerance_tests.cpp
index a18a41a..372c4fd 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -217,7 +217,11 @@ TEST_F(FaultToleranceTest, PartitionedSlaveStatusUpdates)
   TaskID taskId;
   taskId.set_value("task_id");
   const StatusUpdate& update = createStatusUpdate(
-      frameworkId.get(), slaveId, taskId, TASK_RUNNING);
+      frameworkId.get(),
+      slaveId,
+      taskId,
+      TASK_RUNNING,
+      TaskStatus::SOURCE_SLAVE);
 
   StatusUpdateMessage message;
   message.mutable_update()->CopyFrom(update);
@@ -1279,7 +1283,12 @@ TEST_F(FaultToleranceTest, 
ForwardStatusUpdateUnknownExecutor)
   taskId.set_value("task2");
 
   StatusUpdate statusUpdate2 = createStatusUpdate(
-      frameworkId, offer.slave_id(), taskId, TASK_RUNNING, "Dummy update");
+      frameworkId,
+      offer.slave_id(),
+      taskId,
+      TASK_RUNNING,
+      TaskStatus::SOURCE_SLAVE,
+      "Dummy update");
 
   process::dispatch(slave.get(), &Slave::statusUpdate, statusUpdate2, UPID());
 
@@ -1835,7 +1844,8 @@ TEST_F(FaultToleranceTest, SplitBrainMasters)
       frameworkId.get(),
       runningStatus.get().slave_id(),
       runningStatus.get().task_id(),
-      TASK_LOST));
+      TASK_LOST,
+      TaskStatus::SOURCE_SLAVE));
 
   // Spoof a message from a random master; this should be dropped by
   // the scheduler driver. Since this is delivered locally, it is

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp 
b/src/tests/master_authorization_tests.cpp
index 652e80d..5ae855e 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -206,6 +206,7 @@ TEST_F(MasterAuthorizationTest, UnauthorizedTask)
 
   AWAIT_READY(status);
   EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_UNAUTHORIZED, status.get().reason());
 
   driver.stop();
   driver.join();

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 2e52574..a6d1a4a 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -1356,6 +1356,7 @@ TEST_F(MasterTest, LaunchAcrossSlavesTest)
 
   AWAIT_READY(status);
   EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_INVALID_OFFERS, status.get().reason());
 
   // The resources of the invalid offers should be recovered.
   AWAIT_READY(resourcesRecovered);
@@ -1435,6 +1436,7 @@ TEST_F(MasterTest, LaunchDuplicateOfferTest)
 
   AWAIT_READY(status);
   EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_INVALID_OFFERS, status.get().reason());
 
   // The resources of the invalid offers should be recovered.
   AWAIT_READY(resourcesRecovered);

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp 
b/src/tests/resource_offers_tests.cpp
index fe66432..ee332c3 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -233,6 +233,7 @@ TEST_F(ResourceOffersTest, TaskUsesNoResources)
   AWAIT_READY(status);
   EXPECT_EQ(task.task_id(), status.get().task_id());
   EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
   EXPECT_TRUE(status.get().has_message());
   EXPECT_EQ("Task uses no resources", status.get().message());
 
@@ -291,6 +292,7 @@ TEST_F(ResourceOffersTest, TaskUsesInvalidResources)
   AWAIT_READY(status);
   EXPECT_EQ(task.task_id(), status.get().task_id());
   EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
   EXPECT_TRUE(status.get().has_message());
   EXPECT_EQ("Task uses invalid resources: cpus(*):0", status.get().message());
 
@@ -350,6 +352,7 @@ TEST_F(ResourceOffersTest, TaskUsesMoreResourcesThanOffered)
 
   EXPECT_EQ(task.task_id(), status.get().task_id());
   EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
   EXPECT_TRUE(status.get().has_message());
   EXPECT_TRUE(strings::contains(
       status.get().message(), "greater than offered"));
@@ -514,6 +517,7 @@ TEST_F(ResourceOffersTest, 
ResourcesGetReofferedAfterTaskInfoError)
   AWAIT_READY(status);
   EXPECT_EQ(task.task_id(), status.get().task_id());
   EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
   EXPECT_TRUE(status.get().has_message());
   EXPECT_EQ("Task uses invalid resources: cpus(*):0", status.get().message());
 
@@ -680,6 +684,7 @@ TEST_F(MultipleExecutorsTest, 
ExecutorInfoDiffersOnSameSlave)
   AWAIT_READY(status);
   EXPECT_EQ(task2.task_id(), status.get().task_id());
   EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
   EXPECT_TRUE(status.get().has_message());
   EXPECT_TRUE(strings::contains(
       status.get().message(), "Task has invalid ExecutorInfo"));

Reply via email to