Added support for general checks to default executor.

Review: https://reviews.apache.org/r/56217/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7a4f62a7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7a4f62a7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7a4f62a7

Branch: refs/heads/master
Commit: 7a4f62a7b3f35e7923ac6b6a83a2c22f3f866dc7
Parents: fb86531
Author: Alexander Rukletsov <[email protected]>
Authored: Thu Mar 23 17:11:27 2017 +0100
Committer: Alexander Rukletsov <[email protected]>
Committed: Fri Mar 24 00:17:27 2017 +0100

----------------------------------------------------------------------
 src/launcher/default_executor.cpp | 121 ++++++++++++++++++++++++++++++++-
 1 file changed, 120 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7a4f62a7/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp 
b/src/launcher/default_executor.cpp
index f80e79e..0d0192a 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -43,6 +43,7 @@
 #include <stout/os.hpp>
 #include <stout/uuid.hpp>
 
+#include "checks/checker.hpp"
 #include "checks/health_checker.hpp"
 
 #include "common/http.hpp"
@@ -91,6 +92,9 @@ private:
 
     Option<TaskStatus> lastTaskStatus;
 
+    // Checker for the container.
+    Option<Owned<checks::Checker>> checker;
+
     // Health checker for the container.
     Option<Owned<checks::HealthChecker>> healthChecker;
 
@@ -469,10 +473,34 @@ protected:
         None(),
         None(),
         None(),
+        None(),
         false,
         false,
         false});
 
+      if (task.has_check()) {
+        // TODO(alexr): Add support for command checks.
+        CHECK_NE(CheckInfo::COMMAND, task.check().type())
+          << "Command checks are not supported yet";
+
+        Try<Owned<checks::Checker>> checker =
+          checks::Checker::create(
+              task.check(),
+              defer(self(), &Self::taskCheckUpdated, taskId, lambda::_1),
+              taskId,
+              None(),
+              vector<string>());
+
+        if (checker.isError()) {
+          // TODO(anand): Should we send a TASK_FAILED instead?
+          LOG(ERROR) << "Failed to create checker: " << checker.error();
+          _shutdown();
+          return;
+        }
+
+        containers.at(taskId)->checker = checker.get();
+      }
+
       if (task.has_health_check()) {
         // TODO(anand): Add support for command health checks.
         CHECK_NE(HealthCheck::COMMAND, task.health_check().type())
@@ -698,6 +726,14 @@ protected:
       deserialize<agent::Response>(contentType, response->body);
     CHECK_SOME(waitResponse);
 
+    // If there is an associated checker with the task, stop it to
+    // avoid sending check updates after a terminal status update.
+    if (container->checker.isSome()) {
+      CHECK_NOTNULL(container->checker->get());
+      container->checker->get()->stop();
+      container->checker = None();
+    }
+
     // If the task is health checked, stop the associated health checker
     // to avoid sending health updates after a terminal status update.
     if (container->healthChecker.isSome()) {
@@ -883,6 +919,16 @@ protected:
     CHECK(!container->killing);
     container->killing = true;
 
+    // If the task is checked, stop the associated checker.
+    //
+    // TODO(alexr): Once we support `TASK_KILLING` in this executor,
+    // consider continuing checking the task after sending `TASK_KILLING`.
+    if (container->checker.isSome()) {
+      CHECK_NOTNULL(container->checker->get());
+      container->checker->get()->stop();
+      container->checker = None();
+    }
+
     // If the task is health checked, stop the associated health checker.
     //
     // TODO(alexr): Once we support `TASK_KILLING` in this executor,
@@ -939,6 +985,50 @@ protected:
     kill(container);
   }
 
+  void taskCheckUpdated(
+      const TaskID& taskId,
+      const CheckStatusInfo& checkStatus)
+  {
+    // If the checked container has already been waited on,
+    // ignore the check update. This prevents us from sending
+    // `TASK_RUNNING` after a terminal status update.
+    if (!containers.contains(taskId)) {
+      VLOG(1) << "Received check update for terminated task"
+              << " '" << taskId << "'; ignoring";
+      return;
+    }
+
+    // If the checked container has already been asked to terminate,
+    // ignore the check update.
+    //
+    // TODO(alexr): Once we support `TASK_KILLING` in this executor,
+    // consider sending check updates after sending `TASK_KILLING`.
+    if (containers.at(taskId)->checker.isNone()) {
+      VLOG(1) << "Received check update for terminating task"
+              << " '" << taskId << "'; ignoring";
+      return;
+    }
+
+    LOG(INFO) << "Received check update for task '" << taskId << "'";
+
+    // Use the previous task status to preserve all attached information.
+    // We always send a `TASK_RUNNING` right after the task is launched.
+    CHECK_SOME(containers.at(taskId)->lastTaskStatus);
+    const TaskStatus status = protobuf::createTaskStatus(
+        containers.at(taskId)->lastTaskStatus.get(),
+        UUID::random(),
+        Clock::now().secs(),
+        None(),
+        None(),
+        None(),
+        TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+        None(),
+        None(),
+        checkStatus);
+
+    forward(status);
+  }
+
   void taskHealthUpdated(const TaskHealthStatus& healthStatus)
   {
     // If the health checked container has already been waited on,
@@ -1013,10 +1103,39 @@ private:
       status.set_message(message.get());
     }
 
-    // Fill the container ID associated with this task.
     CHECK(containers.contains(taskId));
     const Owned<Container>& container = containers.at(taskId);
 
+    // TODO(alexr): Augment health information in a way similar to
+    // `CheckStatusInfo`. See MESOS-6417 for more details.
+
+    // If a check for the task has been defined, `check_status` field in each
+    // task status must be set to a valid `CheckStatusInfo` message even if
+    // there is no check status available yet.
+    if (container->taskInfo.has_check()) {
+      CheckStatusInfo checkStatusInfo;
+      checkStatusInfo.set_type(container->taskInfo.check().type());
+      switch (container->taskInfo.check().type()) {
+        case CheckInfo::COMMAND: {
+          checkStatusInfo.mutable_command();
+          break;
+        }
+
+        case CheckInfo::HTTP: {
+          checkStatusInfo.mutable_http();
+          break;
+        }
+
+        case CheckInfo::UNKNOWN: {
+          LOG(FATAL) << "UNKNOWN check type is invalid";
+          break;
+        }
+      }
+
+      status.mutable_check_status()->CopyFrom(checkStatusInfo);
+    }
+
+    // Fill the container ID associated with this task.
     ContainerStatus* containerStatus = status.mutable_container_status();
     containerStatus->mutable_container_id()->CopyFrom(container->containerId);
 

Reply via email to