Added support for general checks to default executor. Review: https://reviews.apache.org/r/56217/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7a4f62a7 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7a4f62a7 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7a4f62a7 Branch: refs/heads/master Commit: 7a4f62a7b3f35e7923ac6b6a83a2c22f3f866dc7 Parents: fb86531 Author: Alexander Rukletsov <[email protected]> Authored: Thu Mar 23 17:11:27 2017 +0100 Committer: Alexander Rukletsov <[email protected]> Committed: Fri Mar 24 00:17:27 2017 +0100 ---------------------------------------------------------------------- src/launcher/default_executor.cpp | 121 ++++++++++++++++++++++++++++++++- 1 file changed, 120 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/7a4f62a7/src/launcher/default_executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp index f80e79e..0d0192a 100644 --- a/src/launcher/default_executor.cpp +++ b/src/launcher/default_executor.cpp @@ -43,6 +43,7 @@ #include <stout/os.hpp> #include <stout/uuid.hpp> +#include "checks/checker.hpp" #include "checks/health_checker.hpp" #include "common/http.hpp" @@ -91,6 +92,9 @@ private: Option<TaskStatus> lastTaskStatus; + // Checker for the container. + Option<Owned<checks::Checker>> checker; + // Health checker for the container. Option<Owned<checks::HealthChecker>> healthChecker; @@ -469,10 +473,34 @@ protected: None(), None(), None(), + None(), false, false, false}); + if (task.has_check()) { + // TODO(alexr): Add support for command checks. + CHECK_NE(CheckInfo::COMMAND, task.check().type()) + << "Command checks are not supported yet"; + + Try<Owned<checks::Checker>> checker = + checks::Checker::create( + task.check(), + defer(self(), &Self::taskCheckUpdated, taskId, lambda::_1), + taskId, + None(), + vector<string>()); + + if (checker.isError()) { + // TODO(anand): Should we send a TASK_FAILED instead? + LOG(ERROR) << "Failed to create checker: " << checker.error(); + _shutdown(); + return; + } + + containers.at(taskId)->checker = checker.get(); + } + if (task.has_health_check()) { // TODO(anand): Add support for command health checks. CHECK_NE(HealthCheck::COMMAND, task.health_check().type()) @@ -698,6 +726,14 @@ protected: deserialize<agent::Response>(contentType, response->body); CHECK_SOME(waitResponse); + // If there is an associated checker with the task, stop it to + // avoid sending check updates after a terminal status update. + if (container->checker.isSome()) { + CHECK_NOTNULL(container->checker->get()); + container->checker->get()->stop(); + container->checker = None(); + } + // If the task is health checked, stop the associated health checker // to avoid sending health updates after a terminal status update. if (container->healthChecker.isSome()) { @@ -883,6 +919,16 @@ protected: CHECK(!container->killing); container->killing = true; + // If the task is checked, stop the associated checker. + // + // TODO(alexr): Once we support `TASK_KILLING` in this executor, + // consider continuing checking the task after sending `TASK_KILLING`. + if (container->checker.isSome()) { + CHECK_NOTNULL(container->checker->get()); + container->checker->get()->stop(); + container->checker = None(); + } + // If the task is health checked, stop the associated health checker. // // TODO(alexr): Once we support `TASK_KILLING` in this executor, @@ -939,6 +985,50 @@ protected: kill(container); } + void taskCheckUpdated( + const TaskID& taskId, + const CheckStatusInfo& checkStatus) + { + // If the checked container has already been waited on, + // ignore the check update. This prevents us from sending + // `TASK_RUNNING` after a terminal status update. + if (!containers.contains(taskId)) { + VLOG(1) << "Received check update for terminated task" + << " '" << taskId << "'; ignoring"; + return; + } + + // If the checked container has already been asked to terminate, + // ignore the check update. + // + // TODO(alexr): Once we support `TASK_KILLING` in this executor, + // consider sending check updates after sending `TASK_KILLING`. + if (containers.at(taskId)->checker.isNone()) { + VLOG(1) << "Received check update for terminating task" + << " '" << taskId << "'; ignoring"; + return; + } + + LOG(INFO) << "Received check update for task '" << taskId << "'"; + + // Use the previous task status to preserve all attached information. + // We always send a `TASK_RUNNING` right after the task is launched. + CHECK_SOME(containers.at(taskId)->lastTaskStatus); + const TaskStatus status = protobuf::createTaskStatus( + containers.at(taskId)->lastTaskStatus.get(), + UUID::random(), + Clock::now().secs(), + None(), + None(), + None(), + TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED, + None(), + None(), + checkStatus); + + forward(status); + } + void taskHealthUpdated(const TaskHealthStatus& healthStatus) { // If the health checked container has already been waited on, @@ -1013,10 +1103,39 @@ private: status.set_message(message.get()); } - // Fill the container ID associated with this task. CHECK(containers.contains(taskId)); const Owned<Container>& container = containers.at(taskId); + // TODO(alexr): Augment health information in a way similar to + // `CheckStatusInfo`. See MESOS-6417 for more details. + + // If a check for the task has been defined, `check_status` field in each + // task status must be set to a valid `CheckStatusInfo` message even if + // there is no check status available yet. + if (container->taskInfo.has_check()) { + CheckStatusInfo checkStatusInfo; + checkStatusInfo.set_type(container->taskInfo.check().type()); + switch (container->taskInfo.check().type()) { + case CheckInfo::COMMAND: { + checkStatusInfo.mutable_command(); + break; + } + + case CheckInfo::HTTP: { + checkStatusInfo.mutable_http(); + break; + } + + case CheckInfo::UNKNOWN: { + LOG(FATAL) << "UNKNOWN check type is invalid"; + break; + } + } + + status.mutable_check_status()->CopyFrom(checkStatusInfo); + } + + // Fill the container ID associated with this task. ContainerStatus* containerStatus = status.mutable_container_status(); containerStatus->mutable_container_id()->CopyFrom(container->containerId);
