Repository: mesos
Updated Branches:
  refs/heads/master 82c2a7866 -> 3c96155a4


Added TaskStatus::Reason to containerizer Termination message.

Review: https://reviews.apache.org/r/38746


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3c96155a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3c96155a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3c96155a

Branch: refs/heads/master
Commit: 3c96155a4618000a0896bd42f7ca1e2a363b48fd
Parents: 82c2a78
Author: Jie Yu <[email protected]>
Authored: Thu Sep 24 18:42:34 2015 -0700
Committer: Jie Yu <[email protected]>
Committed: Mon Oct 12 17:12:01 2015 -0700

----------------------------------------------------------------------
 include/mesos/containerizer/containerizer.proto |  13 +-
 include/mesos/mesos.proto                       |  14 +-
 include/mesos/slave/isolator.proto              |   6 +
 src/common/protobuf_utils.cpp                   |   4 +-
 src/common/protobuf_utils.hpp                   |   3 +-
 src/slave/containerizer/docker.cpp              |  12 +-
 .../containerizer/external_containerizer.cpp    |   5 -
 .../containerizer/isolators/cgroups/mem.cpp     |   7 +-
 .../containerizer/isolators/posix/disk.cpp      |  10 +-
 src/slave/containerizer/mesos/containerizer.cpp |  80 ++++----
 src/slave/containerizer/mesos/containerizer.hpp |  13 +-
 src/slave/slave.cpp                             | 187 +++++++++++++++----
 src/slave/slave.hpp                             |  16 +-
 src/tests/containerizer.cpp                     |   1 -
 .../docker_containerizer_tests.cpp              |  11 +-
 .../containerizer/mesos_containerizer_tests.cpp |   1 -
 src/tests/oversubscription_tests.cpp            |   4 +-
 src/tests/slave_recovery_tests.cpp              |  23 ++-
 src/tests/slave_tests.cpp                       |  25 +--
 19 files changed, 288 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/include/mesos/containerizer/containerizer.proto
----------------------------------------------------------------------
diff --git a/include/mesos/containerizer/containerizer.proto 
b/include/mesos/containerizer/containerizer.proto
index f16ccc8..7cf6d2b 100644
--- a/include/mesos/containerizer/containerizer.proto
+++ b/include/mesos/containerizer/containerizer.proto
@@ -82,15 +82,14 @@ message Usage {
  * containerizer to the slave.
  */
 message Termination {
-  // A container may be killed if it exceeds its resources; this will
-  // be indicated by killed=true and described by the message string.
-  // TODO(jaybuff): As part of MESOS-2035 we should remove killed and
-  // replace it with a TaskStatus::Reason.
-  required bool killed = 1;
-  required string message = 2;
-
   // Exit status of the process.
   optional int32 status = 3;
+
+  // The 'state', 'reasons' and 'message' of a status update for
+  // non-terminal tasks when the executor is terminated.
+  optional TaskState state = 4;
+  repeated TaskStatus.Reason reasons = 5;
+  optional string message = 2;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 579e32c..2f774d1 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -1097,8 +1097,19 @@ message TaskStatus {
   // TODO(bmahler): Differentiate between slave removal reasons
   // (e.g. unhealthy vs. unregistered for maintenance).
   enum Reason {
+    // TODO(jieyu): The default value when a caller doesn't check for
+    // presence is 0 and so ideally the 0 reason is not a valid one.
+    // Since this is not used anywhere, consider removing this reason.
     REASON_COMMAND_EXECUTOR_FAILED = 0;
-    REASON_EXECUTOR_PREEMPTED = 17;
+
+    REASON_CONTAINER_LAUNCH_FAILED = 21;
+    REASON_CONTAINER_LIMITATION = 19;
+    REASON_CONTAINER_LIMITATION_DISK = 20;
+    REASON_CONTAINER_LIMITATION_MEMORY = 8;
+    REASON_CONTAINER_PREEMPTED = 17;
+    REASON_CONTAINER_UPDATE_FAILED = 22;
+    REASON_EXECUTOR_REGISTRATION_TIMEOUT = 23;
+    REASON_EXECUTOR_REREGISTRATION_TIMEOUT = 24;
     REASON_EXECUTOR_TERMINATED = 1;
     REASON_EXECUTOR_UNREGISTERED = 2;
     REASON_FRAMEWORK_REMOVED = 3;
@@ -1106,7 +1117,6 @@ message TaskStatus {
     REASON_INVALID_FRAMEWORKID = 5;
     REASON_INVALID_OFFERS = 6;
     REASON_MASTER_DISCONNECTED = 7;
-    REASON_MEMORY_LIMIT = 8;
     REASON_RECONCILIATION = 9;
     REASON_RESOURCES_UNKNOWN = 18;
     REASON_SLAVE_DISCONNECTED = 10;

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/include/mesos/slave/isolator.proto
----------------------------------------------------------------------
diff --git a/include/mesos/slave/isolator.proto 
b/include/mesos/slave/isolator.proto
index 9d38a25..0e71a93 100644
--- a/include/mesos/slave/isolator.proto
+++ b/include/mesos/slave/isolator.proto
@@ -36,6 +36,12 @@ message ContainerLimitation
 
   // Description of the limitation.
   optional string message = 2;
+
+  // The container will be terminated when a resource limitation is
+  // reached. This field specifies the 'reason' that will be sent in
+  // the status update for any remaining non-terminal tasks when the
+  // container is terminated.
+  optional TaskStatus.Reason reason = 3;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index c1e8e01..1e795dc 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -233,13 +233,15 @@ namespace slave {
 
 ContainerLimitation createContainerLimitation(
     const Resources& resources,
-    const std::string& message)
+    const std::string& message,
+    const TaskStatus::Reason& reason)
 {
   ContainerLimitation limitation;
   foreach (Resource resource, resources) {
     limitation.add_resources()->CopyFrom(resource);
   }
   limitation.set_message(message);
+  limitation.set_reason(reason);
   return limitation;
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 8793851..44a2b1d 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -92,7 +92,8 @@ namespace slave {
 
 mesos::slave::ContainerLimitation createContainerLimitation(
     const Resources& resources,
-    const std::string& message);
+    const std::string& message,
+    const TaskStatus::Reason& reason);
 
 
 mesos::slave::ContainerState createContainerState(

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp 
b/src/slave/containerizer/docker.cpp
index 174448c..7022958 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -1365,11 +1365,10 @@ void DockerContainerizerProcess::destroy(
     // This means we failed to launch the container and we're trying to
     // cleanup.
     CHECK_PENDING(container->status.future());
-    containerizer::Termination termination;
-    termination.set_killed(killed);
-    termination.set_message(
-        "Failed to launch container: " + container->launch.failure());
-    container->termination.set(termination);
+
+    // NOTE: The launch error message will be retrieved by the slave
+    // and properly set in the corresponding status update.
+    container->termination.set(containerizer::Termination());
 
     containers_.erase(containerId);
     delete container;
@@ -1409,7 +1408,6 @@ void DockerContainerizerProcess::destroy(
     fetcher->kill(containerId);
 
     containerizer::Termination termination;
-    termination.set_killed(killed);
     termination.set_message("Container destroyed while fetching");
     container->termination.set(termination);
 
@@ -1429,7 +1427,6 @@ void DockerContainerizerProcess::destroy(
     container->pull.discard();
 
     containerizer::Termination termination;
-    termination.set_killed(killed);
     termination.set_message("Container destroyed while pulling image");
     container->termination.set(termination);
 
@@ -1548,7 +1545,6 @@ void DockerContainerizerProcess::___destroy(
   Container* container = containers_[containerId];
 
   containerizer::Termination termination;
-  termination.set_killed(killed);
 
   if (status.isReady() && status.get().isSome()) {
     termination.set_status(status.get().get());

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/slave/containerizer/external_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.cpp 
b/src/slave/containerizer/external_containerizer.cpp
index 2116492..30f7e8e 100644
--- a/src/slave/containerizer/external_containerizer.cpp
+++ b/src/slave/containerizer/external_containerizer.cpp
@@ -676,11 +676,6 @@ void ExternalContainerizerProcess::__wait(
       if (status.isSome()) {
         VLOG(2) << "Wait got destroyed on '" << containerId << "'";
         containerizer::Termination termination;
-        // 'killed' must only be true when a resource limitation
-        // had to be enforced through terminating a task.
-        // TODO(tillt): Consider renaming 'killed' towards 'limited'.
-        termination.set_killed(false);
-        termination.set_message("");
         termination.set_status(status.get());
         actives[containerId]->termination.set(termination);
         cleanup(containerId);

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/slave/containerizer/isolators/cgroups/mem.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/cgroups/mem.cpp 
b/src/slave/containerizer/isolators/cgroups/mem.cpp
index 89c86be..6f49e5a 100644
--- a/src/slave/containerizer/isolators/cgroups/mem.cpp
+++ b/src/slave/containerizer/isolators/cgroups/mem.cpp
@@ -694,8 +694,11 @@ void CgroupsMemIsolatorProcess::oom(const ContainerID& 
containerId)
       stringify(usage.isSome() ? usage.get().megabytes() : 0),
       "*").get();
 
-  info->limitation.set(protobuf::slave::createContainerLimitation(
-        mem, message.str()));
+  info->limitation.set(
+      protobuf::slave::createContainerLimitation(
+          mem,
+          message.str(),
+          TaskStatus::REASON_CONTAINER_LIMITATION_MEMORY));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/slave/containerizer/isolators/posix/disk.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/posix/disk.cpp 
b/src/slave/containerizer/isolators/posix/disk.cpp
index c324c79..73e62a2 100644
--- a/src/slave/containerizer/isolators/posix/disk.cpp
+++ b/src/slave/containerizer/isolators/posix/disk.cpp
@@ -247,10 +247,12 @@ void PosixDiskIsolatorProcess::_collect(
       CHECK_SOME(quota);
 
       if (future.get() > quota.get()) {
-        info->limitation.set(protobuf::slave::createContainerLimitation(
-            Resources(info->paths[path].quota),
-            "Disk usage (" + stringify(future.get()) +
-            ") exceeds quota (" + stringify(quota.get()) + ")"));
+        info->limitation.set(
+            protobuf::slave::createContainerLimitation(
+                Resources(info->paths[path].quota),
+                "Disk usage (" + stringify(future.get()) +
+                ") exceeds quota (" + stringify(quota.get()) + ")",
+                TaskStatus::REASON_CONTAINER_LIMITATION_DISK));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp 
b/src/slave/containerizer/mesos/containerizer.cpp
index b904b2d..d1fc5a4 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -369,8 +369,7 @@ void MesosContainerizer::destroy(const ContainerID& 
containerId)
   dispatch(
       process.get(),
       &MesosContainerizerProcess::destroy,
-      containerId,
-      true);
+      containerId);
 }
 
 
@@ -1079,8 +1078,7 @@ Future<ResourceStatistics> 
MesosContainerizerProcess::usage(
 
 
 void MesosContainerizerProcess::destroy(
-    const ContainerID& containerId,
-    bool killed)
+    const ContainerID& containerId)
 {
   if (!containers_.contains(containerId)) {
     LOG(WARNING) << "Ignoring destroy of unknown container: " << containerId;
@@ -1112,8 +1110,7 @@ void MesosContainerizerProcess::destroy(
           &Self::___destroy,
           containerId,
           status,
-          "Container destroyed while preparing isolators",
-          killed));
+          "Container destroyed while preparing isolators"));
 
     return;
   }
@@ -1131,30 +1128,28 @@ void MesosContainerizerProcess::destroy(
     // Wait for the isolators to finish isolating before we start
     // to destroy the container.
     container->isolation
-      .onAny(defer(self(), &Self::_destroy, containerId, killed));
+      .onAny(defer(self(), &Self::_destroy, containerId));
 
     return;
   }
 
   container->state = DESTROYING;
-  _destroy(containerId, killed);
+  _destroy(containerId);
 }
 
 
 void MesosContainerizerProcess::_destroy(
-    const ContainerID& containerId,
-    bool killed)
+    const ContainerID& containerId)
 {
   // Kill all processes then continue destruction.
   launcher->destroy(containerId)
-    .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1, killed));
+    .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1));
 }
 
 
 void MesosContainerizerProcess::__destroy(
     const ContainerID& containerId,
-    const Future<Nothing>& future,
-    bool killed)
+    const Future<Nothing>& future)
 {
   CHECK(containers_.contains(containerId));
 
@@ -1185,16 +1180,14 @@ void MesosContainerizerProcess::__destroy(
         &Self::___destroy,
         containerId,
         lambda::_1,
-        None(),
-        killed));
+        None()));
 }
 
 
 void MesosContainerizerProcess::___destroy(
     const ContainerID& containerId,
     const Future<Option<int>>& status,
-    const Option<string>& message,
-    bool killed)
+    const Option<string>& message)
 {
   cleanupIsolators(containerId)
     .onAny(defer(self(),
@@ -1202,8 +1195,7 @@ void MesosContainerizerProcess::___destroy(
                  containerId,
                  status,
                  lambda::_1,
-                 message,
-                 killed));
+                 message));
 }
 
 
@@ -1211,8 +1203,7 @@ void MesosContainerizerProcess::____destroy(
     const ContainerID& containerId,
     const Future<Option<int>>& status,
     const Future<list<Future<Nothing>>>& cleanups,
-    Option<string> message,
-    bool killed)
+    Option<string> message)
 {
   // This should not occur because we only use the Future<list> to
   // facilitate chaining.
@@ -1239,36 +1230,39 @@ void MesosContainerizerProcess::____destroy(
     }
   }
 
-  // If any isolator limited the container then we mark it to killed.
-  // Note: We may not see a limitation in time for it to be
+  containerizer::Termination termination;
+
+  if (status.isReady() && status->isSome()) {
+    termination.set_status(status->get());
+  }
+
+  // NOTE: We may not see a limitation in time for it to be
   // registered. This could occur if the limitation (e.g., an OOM)
   // killed the executor and we triggered destroy() off the executor
   // exit.
-  if (!killed && container->limitations.size() > 0) {
-    string message_;
-    foreach (const ContainerLimitation& limitation, container->limitations) {
-      message_ += limitation.message();
-    }
-    message = strings::trim(message_);
-  } else if (!killed && message.isNone()) {
-    message = "Executor terminated";
-  }
+  if (!container->limitations.empty()) {
+    termination.set_state(TaskState::TASK_FAILED);
 
-  containerizer::Termination termination;
+    // We concatenate the messages if there are multiple limitations.
+    vector<string> messages;
+
+    foreach (const ContainerLimitation& limitation, container->limitations) {
+      messages.push_back(limitation.message());
 
-  // Killed means that the container was either asked to be destroyed
-  // by the slave or was destroyed because an isolator limited the
-  // container.
-  termination.set_killed(killed);
+      if (limitation.has_reason()) {
+        termination.add_reasons(limitation.reason());
+      }
+    }
 
-  if (message.isSome()) {
-    termination.set_message(message.get());
+    message = strings::join("; ", messages);
   }
 
-  if (status.isReady() && status.get().isSome()) {
-    termination.set_status(status.get().get());
+  if (message.isNone()) {
+    message = "Executor terminated";
   }
 
+  termination.set_message(message.get());
+
   container->promise.set(termination);
 
   containers_.erase(containerId);
@@ -1284,7 +1278,7 @@ void MesosContainerizerProcess::reaped(const ContainerID& 
containerId)
   LOG(INFO) << "Executor for container '" << containerId << "' has exited";
 
   // The executor has exited so destroy the container.
-  destroy(containerId, false);
+  destroy(containerId);
 }
 
 
@@ -1312,7 +1306,7 @@ void MesosContainerizerProcess::limited(
   }
 
   // The container has been affected by the limitation so destroy it.
-  destroy(containerId, true);
+  destroy(containerId);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp 
b/src/slave/containerizer/mesos/containerizer.hpp
index 4c14192..4aad8a3 100644
--- a/src/slave/containerizer/mesos/containerizer.hpp
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -158,7 +158,7 @@ public:
       const ContainerID& containerId,
       int pipeWrite);
 
-  virtual void destroy(const ContainerID& containerId, bool killed);
+  virtual void destroy(const ContainerID& containerId);
 
   virtual process::Future<hashset<ContainerID>> containers();
 
@@ -204,20 +204,18 @@ private:
       pid_t _pid);
 
   // Continues 'destroy()' once isolators has completed.
-  void _destroy(const ContainerID& containerId, bool killed);
+  void _destroy(const ContainerID& containerId);
 
   // Continues '_destroy()' once all processes have been killed by the 
launcher.
   void __destroy(
       const ContainerID& containerId,
-      const process::Future<Nothing>& future,
-      bool killed);
+      const process::Future<Nothing>& future);
 
   // Continues '__destroy()' once we get the exit status of the executor.
   void ___destroy(
       const ContainerID& containerId,
       const process::Future<Option<int>>& status,
-      const Option<std::string>& message,
-      bool killed);
+      const Option<std::string>& message);
 
   // Continues '___destroy()' once all isolators have completed
   // cleanup.
@@ -225,8 +223,7 @@ private:
       const ContainerID& containerId,
       const process::Future<Option<int>>& status,
       const process::Future<std::list<process::Future<Nothing>>>& cleanups,
-      Option<std::string> message,
-      bool killed);
+      Option<std::string> message);
 
   // Call back for when an isolator limits a container and impacts the
   // processes. This will trigger container destruction.

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 01c5e42..6b25b49 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1685,6 +1685,21 @@ void Slave::runTasks(
                << (future.isFailed() ? future.failure() : "discarded");
 
     containerizer->destroy(containerId);
+
+    Executor* executor = getExecutor(frameworkId, executorId);
+    if (executor != NULL) {
+      containerizer::Termination termination;
+      termination.set_state(TASK_LOST);
+      termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED);
+      termination.set_message(
+          "Failed to update resources for container: " +
+          (future.isFailed() ? future.failure() : "discarded"));
+
+      executor->pendingTermination = termination;
+
+      // TODO(jieyu): Set executor->state to be TERMINATING.
+    }
+
     return;
   }
 
@@ -2662,6 +2677,20 @@ void Slave::_reregisterExecutor(
                << (future.isFailed() ? future.failure() : "discarded");
 
     containerizer->destroy(containerId);
+
+    Executor* executor = getExecutor(frameworkId, executorId);
+    if (executor != NULL) {
+      containerizer::Termination termination;
+      termination.set_state(TASK_LOST);
+      termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED);
+      termination.set_message(
+          "Failed to update resources for container: " +
+          (future.isFailed() ? future.failure() : "discarded"));
+
+      executor->pendingTermination = termination;
+
+      // TODO(jieyu): Set executor->state to be TERMINATING.
+    }
   }
 }
 
@@ -2683,7 +2712,7 @@ void Slave::reregisterExecutorTimeout()
         case Executor::TERMINATING:
         case Executor::TERMINATED:
           break;
-        case Executor::REGISTERING:
+        case Executor::REGISTERING: {
           // If we are here, the executor must have been hung and not
           // exited! This is because if the executor properly exited,
           // it should have already been identified by the isolator
@@ -2691,10 +2720,21 @@ void Slave::reregisterExecutorTimeout()
           LOG(INFO) << "Killing un-reregistered executor '" << executor->id
                     << "' of framework " << framework->id();
 
+          containerizer->destroy(executor->containerId);
+
           executor->state = Executor::TERMINATING;
 
-          containerizer->destroy(executor->containerId);
+          containerizer::Termination termination;
+          termination.set_state(TASK_LOST);
+          termination.add_reasons(
+              TaskStatus::REASON_EXECUTOR_REREGISTRATION_TIMEOUT);
+          termination.set_message(
+              "Executor did not re-register within " +
+              stringify(EXECUTOR_REREGISTER_TIMEOUT));
+
+          executor->pendingTermination = termination;
           break;
+        }
         default:
           LOG(FATAL) << "Executor '" << executor->id
                      << "' of framework " << framework->id()
@@ -2914,15 +2954,28 @@ void Slave::_statusUpdate(
     const ContainerID& containerId,
     bool checkpoint)
 {
-  if (future.isSome() && !future.get().isReady()) {
+  if (future.isSome() && !future->isReady()) {
     LOG(ERROR) << "Failed to update resources for container " << containerId
                << " of executor " << executorId
                << " running task " << update.status().task_id()
                << " on status update for terminal task, destroying container: "
-               << (future.get().isFailed() ? future.get().failure()
-                                           : "discarded");
+               << (future->isFailed() ? future->failure() : "discarded");
 
     containerizer->destroy(containerId);
+
+    Executor* executor = getExecutor(update.framework_id(), executorId);
+    if (executor != NULL) {
+      containerizer::Termination termination;
+      termination.set_state(TASK_LOST);
+      termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED);
+      termination.set_message(
+          "Failed to update resources for container: " +
+          (future->isFailed() ? future->failure() : "discarded"));
+
+      executor->pendingTermination = termination;
+
+      // TODO(jieyu): Set executor->state to be TERMINATING.
+    }
   }
 
   if (checkpoint) {
@@ -3208,6 +3261,19 @@ Framework* Slave::getFramework(const FrameworkID& 
frameworkId)
 }
 
 
+Executor* Slave::getExecutor(
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId)
+{
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    return framework->getExecutor(executorId);
+  }
+
+  return NULL;
+}
+
+
 ExecutorInfo Slave::getExecutorInfo(
     const FrameworkID& frameworkId,
     const TaskInfo& task)
@@ -3355,6 +3421,21 @@ void Slave::executorLaunched(
     ++metrics.container_launch_errors;
 
     containerizer->destroy(containerId);
+
+    Executor* executor = getExecutor(frameworkId, executorId);
+    if (executor != NULL) {
+      containerizer::Termination termination;
+      termination.set_state(TASK_FAILED);
+      termination.add_reasons(TaskStatus::REASON_CONTAINER_LAUNCH_FAILED);
+      termination.set_message(
+          "Failed to launch container: " +
+          (future.isFailed() ? future.failure() : "discarded"));
+
+      executor->pendingTermination = termination;
+
+      // TODO(jieyu): Set executor->state to be TERMINATING.
+    }
+
     return;
   } else if (!future.get()) {
     LOG(ERROR) << "Container '" << containerId
@@ -3364,7 +3445,7 @@ void Slave::executorLaunched(
                << flags.containerizers << ") could create a container for the "
                << "provided TaskInfo/ExecutorInfo message.";
 
-     ++metrics.container_launch_errors;
+    ++metrics.container_launch_errors;
     return;
   }
 
@@ -3885,17 +3966,27 @@ void Slave::registerExecutorTimeout(
     case Executor::TERMINATED:
       // Ignore the registration timeout.
       break;
-    case Executor::REGISTERING:
+    case Executor::REGISTERING: {
       LOG(INFO) << "Terminating executor " << executor->id
                 << " of framework " << framework->id()
                 << " because it did not register within "
                 << flags.executor_registration_timeout;
 
+      // Immediately kill the executor.
+      containerizer->destroy(containerId);
+
       executor->state = Executor::TERMINATING;
 
-      // Immediately kill the executor.
-      containerizer->destroy(executor->containerId);
+      containerizer::Termination termination;
+      termination.set_state(TASK_FAILED);
+      
termination.add_reasons(TaskStatus::REASON_EXECUTOR_REGISTRATION_TIMEOUT);
+      termination.set_message(
+          "Executor did not register within " +
+          stringify(flags.executor_registration_timeout));
+
+      executor->pendingTermination = termination;
       break;
+    }
     default:
       LOG(FATAL) << "Executor '" << executor->id
                  << "' of framework " << framework->id()
@@ -4430,6 +4521,8 @@ void Slave::_qosCorrections(const 
Future<list<QoSCorrection>>& future)
                     << "' of framework " << frameworkId
                     << " as QoS correction";
 
+          containerizer->destroy(containerId);
+
           // TODO(nnielsen): We should ensure that we are addressing
           // the _container_ which the QoS controller intended to
           // kill. Without this check, we may run into a scenario
@@ -4438,8 +4531,13 @@ void Slave::_qosCorrections(const 
Future<list<QoSCorrection>>& future)
           // container than the one the QoS controller targeted
           // (MESOS-2875).
           executor->state = Executor::TERMINATING;
-          executor->reason = TaskStatus::REASON_EXECUTOR_PREEMPTED;
-          containerizer->destroy(containerId);
+
+          containerizer::Termination termination;
+          termination.set_state(TASK_LOST);
+          termination.add_reasons(TaskStatus::REASON_CONTAINER_PREEMPTED);
+          termination.set_message("Container preempted by QoS correction");
+
+          executor->pendingTermination = termination;
 
           ++metrics.executors_preempted;
           break;
@@ -4633,38 +4731,61 @@ void Slave::sendExecutorTerminatedStatusUpdate(
     const FrameworkID& frameworkId,
     const Executor* executor)
 {
-  mesos::TaskState taskState = TASK_LOST;
-  TaskStatus::Reason reason = TaskStatus::REASON_EXECUTOR_TERMINATED;
-
   CHECK_NOTNULL(executor);
 
-  if (executor->reason.isSome()) {
-    // TODO(nnielsen): We want to dispatch the task status and reason
-    // from the termination reason (MESOS-2035) and the executor
-    // reason based on a specific policy i.e. if the termination
-    // reason is set, this overrides executor->reason. At the moment,
-    // we infer the containerizer reason for killing from 'killed'
-    // field in 'termination' and are explicitly overriding the task
-    // status and reason.
-    reason = executor->reason.get();
-  } else if (termination.isReady() && termination.get().killed()) {
-    taskState = TASK_FAILED;
-    // TODO(dhamon): MESOS-2035: Add 'reason' to containerizer::Termination.
-    reason = TaskStatus::REASON_MEMORY_LIMIT;
-  } else if (executor->isCommandExecutor()) {
-    taskState = TASK_FAILED;
-    reason = TaskStatus::REASON_COMMAND_EXECUTOR_FAILED;
+  mesos::TaskState state;
+  TaskStatus::Reason reason;
+  string message;
+
+  // Determine the task state for the status update.
+  if (termination.isReady() && termination->has_state()) {
+    state = termination->state();
+  } else if (executor->pendingTermination.isSome() &&
+             executor->pendingTermination->has_state()) {
+    state = executor->pendingTermination->state();
+  } else {
+    state = TASK_FAILED;
+  }
+
+  // Determine the task reason for the status update.
+  // TODO(jieyu): Handle multiple reasons (MESOS-2657).
+  if (termination.isReady() && termination->reasons().size() > 0) {
+    reason = termination->reasons(0);
+  } else if (executor->pendingTermination.isSome() &&
+             executor->pendingTermination->reasons().size() > 0) {
+    reason = executor->pendingTermination->reasons(0);
+  } else {
+    reason = TaskStatus::REASON_EXECUTOR_TERMINATED;
+  }
+
+  // Determine the message for the status update.
+  vector<string> messages;
+
+  if (executor->pendingTermination.isSome() &&
+      executor->pendingTermination->has_message()) {
+    messages.push_back(executor->pendingTermination->message());
+  }
+
+  if (!termination.isReady()) {
+    messages.push_back("Abnormal executor termination");
+  } else if (termination->has_message()) {
+    messages.push_back(termination->message());
+  }
+
+  if (messages.empty()) {
+    message = "Executor terminated";
+  } else {
+    message = strings::join("; ", messages);
   }
 
   statusUpdate(protobuf::createStatusUpdate(
       frameworkId,
       info.id(),
       taskId,
-      taskState,
+      state,
       TaskStatus::SOURCE_SLAVE,
       UUID::random(),
-      termination.isReady() ? termination.get().message()
-                            : "Abnormal executor termination",
+      message,
       reason,
       executor->id),
       UPID());

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 18be4f8..d6e417b 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -300,9 +300,13 @@ public:
 
   void authenticate();
 
-  // Helper routine to lookup a framework.
+  // Helper routines to lookup a framework/executor.
   Framework* getFramework(const FrameworkID& frameworkId);
 
+  Executor* getExecutor(
+      const FrameworkID& frameworkId,
+      const ExecutorID& executorId);
+
   // Returns an ExecutorInfo for a TaskInfo (possibly
   // constructing one if the task has a CommandInfo).
   ExecutorInfo getExecutorInfo(
@@ -623,10 +627,12 @@ struct Executor
   // attempts to do some memset's which are unsafe).
   boost::circular_buffer<std::shared_ptr<Task>> completedTasks;
 
-  // The 'reason' is for the slave to encode the reason behind a
-  // terminal status update for those pending/unterminated tasks when
-  // the executor is terminated.
-  Option<TaskStatus::Reason> reason;
+  // When the slave initiates a destroy of the container, we expect a
+  // termination to occur. The 'pendingTermation' indicates why the
+  // slave initiated the destruction and will influence the
+  // information sent in the status updates for any remaining
+  // non-terminal tasks.
+  Option<containerizer::Termination> pendingTermination;
 
 private:
   Executor(const Executor&);              // No copying.

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/tests/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer.cpp b/src/tests/containerizer.cpp
index 1f74315..451c7be 100644
--- a/src/tests/containerizer.cpp
+++ b/src/tests/containerizer.cpp
@@ -238,7 +238,6 @@ void TestContainerizer::destroy(const ContainerID& 
containerId)
 
   if (promises.contains(containerId)) {
     containerizer::Termination termination;
-    termination.set_killed(false);
     termination.set_message("Killed executor");
     termination.set_status(0);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/tests/containerizer/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/docker_containerizer_tests.cpp 
b/src/tests/containerizer/docker_containerizer_tests.cpp
index 8771ef6..4bb65af 100644
--- a/src/tests/containerizer/docker_containerizer_tests.cpp
+++ b/src/tests/containerizer/docker_containerizer_tests.cpp
@@ -2402,9 +2402,9 @@ TEST_F(DockerContainerizerTest, 
ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed)
   task.mutable_command()->CopyFrom(command);
   task.mutable_container()->CopyFrom(containerInfo);
 
-  Future<TaskStatus> statusFailed;
+  Future<TaskStatus> statusLost;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&statusFailed));
+    .WillOnce(FutureArg<1>(&statusLost));
 
   Future<ContainerID> containerId;
   EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
@@ -2420,9 +2420,10 @@ TEST_F(DockerContainerizerTest, 
ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed)
 
   AWAIT_READY_FOR(containerId, Seconds(60));
 
-  AWAIT_READY(statusFailed);
-
-  EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
+  AWAIT_READY(statusLost);
+  EXPECT_EQ(TASK_LOST, statusLost.get().state());
+  EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED,
+            statusLost.get().reason());
 
   driver.stop();
   driver.join();

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/tests/containerizer/mesos_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/mesos_containerizer_tests.cpp 
b/src/tests/containerizer/mesos_containerizer_tests.cpp
index 873acd3..b48133c 100644
--- a/src/tests/containerizer/mesos_containerizer_tests.cpp
+++ b/src/tests/containerizer/mesos_containerizer_tests.cpp
@@ -669,7 +669,6 @@ TEST_F(MesosContainerizerDestroyTest, DestroyWhilePreparing)
       "Container destroyed while preparing isolators",
       termination.message());
 
-  EXPECT_TRUE(termination.killed());
   EXPECT_FALSE(termination.has_status());
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp 
b/src/tests/oversubscription_tests.cpp
index 2cf047e..0d0bf7e 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -872,8 +872,8 @@ TEST_F(OversubscriptionTest, QoSCorrectionKill)
 
   // Verify task status is TASK_LOST.
   AWAIT_READY(status2);
-  ASSERT_EQ(TASK_LOST, status2.get().state());
-  ASSERT_EQ(TaskStatus::REASON_EXECUTOR_PREEMPTED, status2.get().reason());
+  ASSERT_EQ(TASK_LOST, status2->state());
+  ASSERT_EQ(TaskStatus::REASON_CONTAINER_PREEMPTED, status2->reason());
 
   // Verify that slave incremented counter for preempted executors.
   snapshot = Metrics();

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp 
b/src/tests/slave_recovery_tests.cpp
index fd285fb..a50960c 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -506,7 +506,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
 
 // The slave is stopped before the (command) executor is registered.
 // When it comes back up with recovery=reconnect, make sure the
-// executor is killed and the task is transitioned to FAILED.
+// executor is killed and the task is transitioned to LOST.
 TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor)
 {
   Try<PID<Master> > master = this->StartMaster();
@@ -591,9 +591,12 @@ TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor)
     Clock::settle();
   }
 
-  // Scheduler should receive the TASK_FAILED update.
+  // Scheduler should receive the TASK_LOST update.
   AWAIT_READY(status);
-  ASSERT_EQ(TASK_FAILED, status.get().state());
+  ASSERT_EQ(TASK_LOST, status->state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source());
+  EXPECT_EQ(TaskStatus::REASON_EXECUTOR_REREGISTRATION_TIMEOUT,
+            status->reason());
 
   // Master should subsequently reoffer the same resources.
   while (offers2.isPending()) {
@@ -616,7 +619,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor)
 // The slave is stopped after a non-terminal update is received.
 // The command executor terminates when the slave is down.
 // When it comes back up with recovery=reconnect, make
-// sure the task is properly transitioned to FAILED.
+// sure the task is properly transitioned to LOST.
 TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
 {
   Try<PID<Master> > master = this->StartMaster();
@@ -710,9 +713,12 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
     Clock::settle();
   }
 
-  // Scheduler should receive the TASK_FAILED update.
+  // Scheduler should receive the TASK_LOST update.
   AWAIT_READY(status);
-  ASSERT_EQ(TASK_FAILED, status.get().state());
+  ASSERT_EQ(TASK_LOST, status->state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source());
+  EXPECT_EQ(TaskStatus::REASON_EXECUTOR_REREGISTRATION_TIMEOUT,
+            status->reason());
 
   while (offers2.isPending()) {
     Clock::advance(Seconds(1));
@@ -3169,7 +3175,10 @@ TYPED_TEST(SlaveRecoveryTest, 
RestartBeforeContainerizerLaunch)
 
   // Scheduler should receive the TASK_FAILED update.
   AWAIT_READY(status);
-  ASSERT_EQ(TASK_FAILED, status.get().state());
+  ASSERT_EQ(TASK_FAILED, status->state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source());
+  EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED,
+            status->reason());
 
   driver.stop();
   driver.join();

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index dccdbb0..10a4fa7 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -229,8 +229,10 @@ TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
   }
 
   AWAIT_READY(status);
-  ASSERT_EQ(TASK_FAILED, status.get().state());
-  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source());
+  ASSERT_EQ(TASK_FAILED, status->state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source());
+  EXPECT_EQ(TaskStatus::REASON_EXECUTOR_REGISTRATION_TIMEOUT,
+            status->reason());
 
   Clock::resume();
 
@@ -298,9 +300,9 @@ TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor)
   containerizer.destroy(offers.get()[0].framework_id(), DEFAULT_EXECUTOR_ID);
 
   AWAIT_READY(status);
-  EXPECT_EQ(TASK_LOST, status.get().state());
-  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source());
-  EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status.get().reason());
+  EXPECT_EQ(TASK_FAILED, status->state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source());
+  EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status->reason());
 
   // We use 'gc.schedule' as a signal for the executor being cleaned
   // up by the slave.
@@ -427,7 +429,6 @@ TEST_F(SlaveTest, CommandExecutorWithOverride)
   AWAIT_READY(wait);
 
   containerizer::Termination termination;
-  termination.set_killed(false);
   termination.set_message("Killed executor");
   termination.set_status(0);
   promise.set(termination);
@@ -1371,9 +1372,9 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
   EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, status3.get().source());
 
   AWAIT_READY(status4);
-  EXPECT_EQ(TASK_LOST, status4.get().state());
-  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status4.get().source());
-  EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status4.get().reason());
+  EXPECT_EQ(TASK_LOST, status4->state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status4->source());
+  EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED, status4->reason());
 
   driver.stop();
   driver.join();
@@ -1482,9 +1483,9 @@ TEST_F(SlaveTest, TaskLaunchContainerizerUpdateFails)
   driver.start();
 
   AWAIT_READY(status);
-  EXPECT_EQ(TASK_LOST, status.get().state());
-  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source());
-  EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status.get().reason());
+  EXPECT_EQ(TASK_LOST, status->state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source());
+  EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED, status->reason());
 
   driver.stop();
   driver.join();

Reply via email to