Moved health checker closer to container in default executor. With the recent introduction of the `Container` struct in the default executor, tasks' health checkers should be moved to this struct. Also, health checking is stopped on per-task basis and not on shutdown.
Review: https://reviews.apache.org/r/56449/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0d9d3289 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0d9d3289 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0d9d3289 Branch: refs/heads/master Commit: 0d9d32898d0cc2422236cf9c70a17ca1d583b910 Parents: 3f73a92 Author: Alexander Rukletsov <[email protected]> Authored: Thu Mar 23 17:11:07 2017 +0100 Committer: Alexander Rukletsov <[email protected]> Committed: Fri Mar 24 00:17:27 2017 +0100 ---------------------------------------------------------------------- src/launcher/default_executor.cpp | 70 ++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/0d9d3289/src/launcher/default_executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp index 6a885af..9f98786 100644 --- a/src/launcher/default_executor.cpp +++ b/src/launcher/default_executor.cpp @@ -89,6 +89,9 @@ private: TaskID taskId; TaskGroupInfo taskGroup; // Task group of the child container. + // Health checker for the container. + Option<Owned<checks::HealthChecker>> healthChecker; + // Connection used for waiting on the child container. It is possible // that a container is active but a connection for sending the // `WAIT_NESTED_CONTAINER` call has not been established yet. @@ -436,15 +439,15 @@ protected: const TaskID& taskId = task.task_id(); unacknowledgedTasks[taskId] = task; - containers[taskId] = Owned<Container>( - new Container {containerId, taskId, taskGroup, None(), false, false}); + containers[taskId] = Owned<Container>(new Container + {containerId, taskId, taskGroup, None(), None(), false, false}); if (task.has_health_check()) { // TODO(anand): Add support for command health checks. CHECK_NE(HealthCheck::COMMAND, task.health_check().type()) << "Command health checks are not supported yet"; - Try<Owned<checks::HealthChecker>> _checker = + Try<Owned<checks::HealthChecker>> healthChecker = checks::HealthChecker::create( task.health_check(), launcherDirectory, @@ -453,15 +456,15 @@ protected: None(), vector<string>()); - if (_checker.isError()) { + if (healthChecker.isError()) { // TODO(anand): Should we send a TASK_FAILED instead? LOG(ERROR) << "Failed to create health checker: " - << _checker.error(); + << healthChecker.error(); _shutdown(); return; } - checkers[taskId] = _checker.get(); + containers.at(taskId)->healthChecker = healthChecker.get(); } // Currently, the Mesos agent does not expose the mapping from @@ -663,15 +666,12 @@ protected: deserialize<agent::Response>(contentType, response->body); CHECK_SOME(waitResponse); - // If the task has been health checked, stop the associated checker. - // - // TODO(alexr): Once we support `TASK_KILLING` in this executor, health - // checking should be stopped right before sending the `TASK_KILLING` - // update to avoid subsequent `TASK_RUNNING` updates. - if (checkers.contains(taskId)) { - CHECK_NOTNULL(checkers.at(taskId).get()); - checkers.at(taskId)->stop(); - checkers.erase(taskId); + // If the task is health checked, stop the associated health checker + // to avoid sending health updates after a terminal status update. + if (container->healthChecker.isSome()) { + CHECK_NOTNULL(container->healthChecker->get()); + container->healthChecker->get()->stop(); + container->healthChecker = None(); } TaskState taskState; @@ -775,12 +775,6 @@ protected: shuttingDown = true; - // Stop health checking all tasks because we are shutting down. - foreach (const Owned<checks::HealthChecker>& checker, checkers.values()) { - checker->stop(); - } - checkers.clear(); - if (!launched) { _shutdown(); return; @@ -849,6 +843,16 @@ protected: CHECK(!container->killing); container->killing = true; + // If the task is health checked, stop the associated health checker. + // + // TODO(alexr): Once we support `TASK_KILLING` in this executor, + // consider health checking the task after sending `TASK_KILLING`. + if (container->healthChecker.isSome()) { + CHECK_NOTNULL(container->healthChecker->get()); + container->healthChecker->get()->stop(); + container->healthChecker = None(); + } + LOG(INFO) << "Killing child container " << container->containerId; agent::Call call; @@ -897,10 +901,23 @@ protected: void taskHealthUpdated(const TaskHealthStatus& healthStatus) { - // This prevents us from sending `TASK_RUNNING` after a terminal status - // update, because we may receive an update from a health check scheduled - // before the task has been waited on. - if (!checkers.contains(healthStatus.task_id())) { + // If the health checked container has already been waited on, + // ignore the health update. This prevents us from sending + // `TASK_RUNNING` after a terminal status update. + if (!containers.contains(healthStatus.task_id())) { + VLOG(1) << "Received task health update for terminated task" + << " '" << healthStatus.task_id() << "'; ignoring"; + return; + } + + // If the health checked container has already been asked to + // terminate, ignore the health update. + // + // TODO(alexr): Once we support `TASK_KILLING` in this executor, + // consider sending health updates after sending `TASK_KILLING`. + if (containers.at(healthStatus.task_id())->healthChecker.isNone()) { + VLOG(1) << "Received task health update for terminating task" + << " '" << healthStatus.task_id() << "'; ignoring"; return; } @@ -1086,9 +1103,6 @@ private: // the stale instance. We initialize this to a new value upon receiving // a `connected()` callback. Option<UUID> connectionId; - - // TODO(anand): Move the health checker information to the `Container` struct. - hashmap<TaskID, Owned<checks::HealthChecker>> checkers; // Health checkers. }; } // namespace internal {
