Moved health checker closer to container in default executor.

With the recent introduction of the `Container` struct in the default
executor, tasks' health checkers should be moved to this struct. Also,
health checking is stopped on per-task basis and not on shutdown.

Review: https://reviews.apache.org/r/56449/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0d9d3289
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0d9d3289
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0d9d3289

Branch: refs/heads/master
Commit: 0d9d32898d0cc2422236cf9c70a17ca1d583b910
Parents: 3f73a92
Author: Alexander Rukletsov <[email protected]>
Authored: Thu Mar 23 17:11:07 2017 +0100
Committer: Alexander Rukletsov <[email protected]>
Committed: Fri Mar 24 00:17:27 2017 +0100

----------------------------------------------------------------------
 src/launcher/default_executor.cpp | 70 ++++++++++++++++++++--------------
 1 file changed, 42 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0d9d3289/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp 
b/src/launcher/default_executor.cpp
index 6a885af..9f98786 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -89,6 +89,9 @@ private:
     TaskID taskId;
     TaskGroupInfo taskGroup; // Task group of the child container.
 
+    // Health checker for the container.
+    Option<Owned<checks::HealthChecker>> healthChecker;
+
     // Connection used for waiting on the child container. It is possible
     // that a container is active but a connection for sending the
     // `WAIT_NESTED_CONTAINER` call has not been established yet.
@@ -436,15 +439,15 @@ protected:
       const TaskID& taskId = task.task_id();
 
       unacknowledgedTasks[taskId] = task;
-      containers[taskId] = Owned<Container>(
-          new Container {containerId, taskId, taskGroup, None(), false, 
false});
+      containers[taskId] = Owned<Container>(new Container
+        {containerId, taskId, taskGroup, None(), None(), false, false});
 
       if (task.has_health_check()) {
         // TODO(anand): Add support for command health checks.
         CHECK_NE(HealthCheck::COMMAND, task.health_check().type())
           << "Command health checks are not supported yet";
 
-        Try<Owned<checks::HealthChecker>> _checker =
+        Try<Owned<checks::HealthChecker>> healthChecker =
           checks::HealthChecker::create(
               task.health_check(),
               launcherDirectory,
@@ -453,15 +456,15 @@ protected:
               None(),
               vector<string>());
 
-        if (_checker.isError()) {
+        if (healthChecker.isError()) {
           // TODO(anand): Should we send a TASK_FAILED instead?
           LOG(ERROR) << "Failed to create health checker: "
-                     << _checker.error();
+                     << healthChecker.error();
           _shutdown();
           return;
         }
 
-        checkers[taskId] = _checker.get();
+        containers.at(taskId)->healthChecker = healthChecker.get();
       }
 
       // Currently, the Mesos agent does not expose the mapping from
@@ -663,15 +666,12 @@ protected:
       deserialize<agent::Response>(contentType, response->body);
     CHECK_SOME(waitResponse);
 
-    // If the task has been health checked, stop the associated checker.
-    //
-    // TODO(alexr): Once we support `TASK_KILLING` in this executor, health
-    // checking should be stopped right before sending the `TASK_KILLING`
-    // update to avoid subsequent `TASK_RUNNING` updates.
-    if (checkers.contains(taskId)) {
-      CHECK_NOTNULL(checkers.at(taskId).get());
-      checkers.at(taskId)->stop();
-      checkers.erase(taskId);
+    // If the task is health checked, stop the associated health checker
+    // to avoid sending health updates after a terminal status update.
+    if (container->healthChecker.isSome()) {
+      CHECK_NOTNULL(container->healthChecker->get());
+      container->healthChecker->get()->stop();
+      container->healthChecker = None();
     }
 
     TaskState taskState;
@@ -775,12 +775,6 @@ protected:
 
     shuttingDown = true;
 
-    // Stop health checking all tasks because we are shutting down.
-    foreach (const Owned<checks::HealthChecker>& checker, checkers.values()) {
-      checker->stop();
-    }
-    checkers.clear();
-
     if (!launched) {
       _shutdown();
       return;
@@ -849,6 +843,16 @@ protected:
     CHECK(!container->killing);
     container->killing = true;
 
+    // If the task is health checked, stop the associated health checker.
+    //
+    // TODO(alexr): Once we support `TASK_KILLING` in this executor,
+    // consider health checking the task after sending `TASK_KILLING`.
+    if (container->healthChecker.isSome()) {
+      CHECK_NOTNULL(container->healthChecker->get());
+      container->healthChecker->get()->stop();
+      container->healthChecker = None();
+    }
+
     LOG(INFO) << "Killing child container " << container->containerId;
 
     agent::Call call;
@@ -897,10 +901,23 @@ protected:
 
   void taskHealthUpdated(const TaskHealthStatus& healthStatus)
   {
-    // This prevents us from sending `TASK_RUNNING` after a terminal status
-    // update, because we may receive an update from a health check scheduled
-    // before the task has been waited on.
-    if (!checkers.contains(healthStatus.task_id())) {
+    // If the health checked container has already been waited on,
+    // ignore the health update. This prevents us from sending
+    // `TASK_RUNNING` after a terminal status update.
+    if (!containers.contains(healthStatus.task_id())) {
+      VLOG(1) << "Received task health update for terminated task"
+              << " '" << healthStatus.task_id() << "'; ignoring";
+      return;
+    }
+
+    // If the health checked container has already been asked to
+    // terminate, ignore the health update.
+    //
+    // TODO(alexr): Once we support `TASK_KILLING` in this executor,
+    // consider sending health updates after sending `TASK_KILLING`.
+    if (containers.at(healthStatus.task_id())->healthChecker.isNone()) {
+      VLOG(1) << "Received task health update for terminating task"
+              << " '" << healthStatus.task_id() << "'; ignoring";
       return;
     }
 
@@ -1086,9 +1103,6 @@ private:
   // the stale instance. We initialize this to a new value upon receiving
   // a `connected()` callback.
   Option<UUID> connectionId;
-
-  // TODO(anand): Move the health checker information to the `Container` 
struct.
-  hashmap<TaskID, Owned<checks::HealthChecker>> checkers; // Health checkers.
 };
 
 } // namespace internal {

Reply via email to