Added support for COMMAND health checks to default executor. Review: https://reviews.apache.org/r/55901/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4bbfaebb Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4bbfaebb Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4bbfaebb Branch: refs/heads/master Commit: 4bbfaebb793b3f08e9acf1f358e881da02b4a068 Parents: b97d682 Author: Gastón Kleiman <[email protected]> Authored: Fri Mar 24 00:48:50 2017 +0100 Committer: Alexander Rukletsov <[email protected]> Committed: Fri Mar 24 00:56:44 2017 +0100 ---------------------------------------------------------------------- src/checks/health_checker.cpp | 256 ++++++++++++++++++++++++++++++++- src/checks/health_checker.hpp | 74 +++++++++- src/launcher/default_executor.cpp | 12 +- src/tests/health_check_tests.cpp | 113 +++++++++++++++ 4 files changed, 445 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/4bbfaebb/src/checks/health_checker.cpp ---------------------------------------------------------------------- diff --git a/src/checks/health_checker.cpp b/src/checks/health_checker.cpp index 236d07b..3a7de78 100644 --- a/src/checks/health_checker.cpp +++ b/src/checks/health_checker.cpp @@ -29,6 +29,8 @@ #include <mesos/mesos.hpp> +#include <mesos/agent/agent.hpp> + #include <process/collect.hpp> #include <process/delay.hpp> #include <process/dispatch.hpp> @@ -50,9 +52,12 @@ #include <stout/os/constants.hpp> #include <stout/os/killtree.hpp> +#include "common/http.hpp" #include "common/status_utils.hpp" #include "common/validation.hpp" +#include "internal/evolve.hpp" + #ifdef __linux__ #include "linux/ns.hpp" #endif @@ -66,6 +71,9 @@ using process::Owned; using process::Subprocess; using process::Time; +using process::http::Connection; +using process::http::Response; + using std::map; using std::string; using std::tuple; @@ -140,7 +148,39 @@ Try<Owned<HealthChecker>> HealthChecker::create( callback, taskId, taskPid, - namespaces)); + namespaces, + None(), + None(), + false)); + + return Owned<HealthChecker>(new HealthChecker(process)); +} + + +Try<Owned<HealthChecker>> HealthChecker::create( + const HealthCheck& check, + const string& launcherDir, + const lambda::function<void(const TaskHealthStatus&)>& callback, + const TaskID& taskId, + const ContainerID& taskContainerId, + const process::http::URL& agentURL) +{ + // Validate the 'HealthCheck' protobuf. + Option<Error> error = validation::healthCheck(check); + if (error.isSome()) { + return error.get(); + } + + Owned<HealthCheckerProcess> process(new HealthCheckerProcess( + check, + launcherDir, + callback, + taskId, + None(), + {}, + taskContainerId, + agentURL, + true)); return Owned<HealthChecker>(new HealthChecker(process)); } @@ -175,7 +215,10 @@ HealthCheckerProcess::HealthCheckerProcess( const lambda::function<void(const TaskHealthStatus&)>& _callback, const TaskID& _taskId, const Option<pid_t>& _taskPid, - const vector<string>& _namespaces) + const vector<string>& _namespaces, + const Option<ContainerID>& _taskContainerId, + const Option<process::http::URL>& _agentURL, + bool _commandCheckViaAgent) : ProcessBase(process::ID::generate("health-checker")), check(_check), launcherDir(_launcherDir), @@ -183,6 +226,9 @@ HealthCheckerProcess::HealthCheckerProcess( taskId(_taskId), taskPid(_taskPid), namespaces(_namespaces), + taskContainerId(_taskContainerId), + agentURL(_agentURL), + commandCheckViaAgent(_commandCheckViaAgent), consecutiveFailures(0), initializing(true) { @@ -285,7 +331,8 @@ void HealthCheckerProcess::performSingleCheck() switch (check.type()) { case HealthCheck::COMMAND: { - checkResult = commandHealthCheck(); + checkResult = commandCheckViaAgent ? nestedCommandHealthCheck() + : commandHealthCheck(); break; } @@ -417,6 +464,209 @@ Future<Nothing> HealthCheckerProcess::commandHealthCheck() } +Future<Nothing> HealthCheckerProcess::nestedCommandHealthCheck() +{ + CHECK_EQ(HealthCheck::COMMAND, check.type()); + CHECK_SOME(taskContainerId); + CHECK(check.has_command()); + CHECK_SOME(agentURL); + + VLOG(1) << "Launching command health check of task " << stringify(taskId); + + return process::http::connect(agentURL.get()) + .repair([](const Future<Connection>& future) { + return Failure( + "Unable to establish connection with the agent: " + future.failure()); + }) + .then(defer(self(), &Self::_nestedCommandHealthCheck, lambda::_1)); +} + + +Future<Nothing> HealthCheckerProcess::_nestedCommandHealthCheck( + Connection connection) +{ + // TODO(gkleiman): Don't reuse the `ContainerID`, it is not safe. + ContainerID checkContainerId; + checkContainerId.set_value(taskContainerId.get().value() + "-health-check"); + checkContainerId.mutable_parent()->CopyFrom(taskContainerId.get()); + + CommandInfo command(check.command()); + + agent::Call call; + call.set_type(agent::Call::LAUNCH_NESTED_CONTAINER_SESSION); + + agent::Call::LaunchNestedContainerSession* launch = + call.mutable_launch_nested_container_session(); + + launch->mutable_container_id()->CopyFrom(checkContainerId); + launch->mutable_command()->CopyFrom(command); + + process::http::Request request; + request.method = "POST"; + request.url = agentURL.get(); + request.body = serialize(ContentType::PROTOBUF, evolve(call)); + request.headers = {{"Accept", stringify(ContentType::RECORDIO)}, + {"Message-Accept", stringify(ContentType::PROTOBUF)}, + {"Content-Type", stringify(ContentType::PROTOBUF)}}; + + // `LAUNCH_NESTED_CONTAINER_SESSION` returns a streamed response with + // the output of the container. The agent will close the stream once + // the container has exited, or kill the container if the client + // closes the connection. + // + // We're calling `Connection::send` with `streamed = false`, so that + // it returns an HTTP response of type 'BODY' once the entire response + // is received. + // + // This means that this future will not be completed until after the + // health check command has finished or the connection has been + // closed. + return connection.send(request, false) + .after(checkTimeout, + defer(self(), + &Self::nestedCommandHealthCheckTimedOut, + checkContainerId, + connection, + lambda::_1)) + .then(defer(self(), + &Self::__nestedCommandHealthCheck, + checkContainerId, + lambda::_1)); +} + + +Future<Nothing> HealthCheckerProcess::__nestedCommandHealthCheck( + const ContainerID& checkContainerId, + const Response& launchResponse) +{ + if (launchResponse.code != process::http::Status::OK) { + return Failure( + "Received '" + launchResponse.status + "' (" + launchResponse.body + + ") while launching a command health check of task '" + + stringify(taskId) + "'"); + } + + // We need to make a copy so that the lambdas can capture it. + const TaskID taskId_ = taskId; + + return waitNestedContainer(checkContainerId) + .repair([taskId_](const Future<Option<int>>& future) { + return Failure( + "Unable to get the exit code of command health check of task '" + + stringify(taskId_) + "': " + future.failure()); + }) + .then([taskId_](const Option<int> status) -> Future<Nothing> { + if (status.isNone()) { + return Failure( + "Unable to get the exit code of command health check of task '" + + stringify(taskId_) + "'"); + } else if (status.get() != 0) { + return Failure( + "Command health check of task '" + stringify(taskId_) + + "' returned " + WSTRINGIFY(status.get())); + } else { + return Nothing(); + } + }); +} + + +Future<Response> +HealthCheckerProcess::nestedCommandHealthCheckTimedOut( + const ContainerID& checkContainerId, + Connection connection, + Future<Response> future) +{ + future.discard(); + + // Closing the connection will make the agent kill the container. + connection.disconnect(); + + const Failure failure = Failure( + "Command health check of task '" + stringify(taskId) + + "' has timed out after " + stringify(checkTimeout)); + + // We need to make a copy so that the lambda can capture it. + const TaskID taskId_ = taskId; + + // If the health check delay interval is zero, we'll try to perform + // another health check right after we finish processing the current + // timeout. + // + // All the containers created for the health checks reuse the same + // `ContainerID`. In order to prevent conflicts, the future returned + // by this method should only be completed once we're sure that the + // container has been cleaned up. + return waitNestedContainer(checkContainerId) + .repair([failure, taskId_](const Future<Option<int>>& waitFuture) { + // We assume that once `WaitNestedContainer` returns, irrespective of + // whether the response contains a failure, the container will be in a + // terminal state, so starting a new health check will not lead to a + // transient failure. + // + // This means that we don't need to retry the `WaitNestedContainer` + // call. + LOG(WARNING) << "Unable to get the exit code of command health check of " + << "task '" << stringify(taskId_) + << "': " << waitFuture.failure(); + + return Future<Option<int>>(failure); + }) + .then([failure](const Option<int>&) { + return Future<Response>(failure); + }); +} + + +Future<Option<int>> HealthCheckerProcess::waitNestedContainer( + const ContainerID& containerId) +{ + agent::Call call; + call.set_type(agent::Call::WAIT_NESTED_CONTAINER); + + agent::Call::WaitNestedContainer* containerWait = + call.mutable_wait_nested_container(); + + containerWait->mutable_container_id()->CopyFrom(containerId); + + process::http::Request request; + request.method = "POST"; + request.url = agentURL.get(); + request.body = serialize(ContentType::PROTOBUF, evolve(call)); + request.headers = {{"Accept", stringify(ContentType::PROTOBUF)}, + {"Content-Type", stringify(ContentType::PROTOBUF)}}; + + return process::http::request(request, false) + .then(defer(self(), + &Self::_waitNestedContainer, + containerId, + lambda::_1)); +} + + +Future<Option<int>> HealthCheckerProcess::_waitNestedContainer( + const ContainerID& containerId, + const Response& httpResponse) +{ + if (httpResponse.code != process::http::Status::OK) { + return Failure( + "Received '" + httpResponse.status + "' (" + httpResponse.body + + ") while waiting on health check of task " + stringify(taskId)); + } + + Try<agent::Response> response = + deserialize<agent::Response>(ContentType::PROTOBUF, httpResponse.body); + CHECK_SOME(response); + + CHECK(response->has_wait_nested_container()); + + return ( + response->wait_nested_container().has_exit_status() + ? Option<int>(response->wait_nested_container().exit_status()) + : Option<int>::none()); +} + + Future<Nothing> HealthCheckerProcess::httpHealthCheck() { CHECK_EQ(HealthCheck::HTTP, check.type()); http://git-wip-us.apache.org/repos/asf/mesos/blob/4bbfaebb/src/checks/health_checker.hpp ---------------------------------------------------------------------- diff --git a/src/checks/health_checker.hpp b/src/checks/health_checker.hpp index 44df544..a7307ac 100644 --- a/src/checks/health_checker.hpp +++ b/src/checks/health_checker.hpp @@ -51,6 +51,9 @@ public: * Attempts to create a `HealthChecker` object. In case of success, health * checking starts immediately after initialization. * + * If the check is a command health check, the checker will fork a process, + * enter the task's namespaces, and execute the commmand. + * * @param check The protobuf message definition of health check. * @param launcherDir A directory where Mesos helper binaries are located. * @param callback A callback HealthChecker uses to send health status @@ -76,6 +79,35 @@ public: const Option<pid_t>& taskPid, const std::vector<std::string>& namespaces); + /** + * Attempts to create a `HealthChecker` object. In case of success, health + * checking starts immediately after initialization. + * + * If the check is a command health check, the checker will delegate the + * execution of the check to the Mesos agent via the + * `LaunchNestedContainerSession` API call. + * + * @param check The protobuf message definition of health check. + * @param launcherDir A directory where Mesos helper binaries are located. + * @param callback A callback HealthChecker uses to send health status + * updates to its owner (usually an executor). + * @param taskId The TaskID of the target task. + * @param taskContainerId The ContainerID of the target task. + * @param agentURL The URL of the agent. + * @return A `HealthChecker` object or an error if `create` fails. + * + * @todo A better approach would be to return a stream of updates, e.g., + * `process::Stream<TaskHealthStatus>` rather than invoking a callback. + */ + static Try<process::Owned<HealthChecker>> create( + const HealthCheck& check, + const std::string& launcherDir, + const lambda::function<void(const TaskHealthStatus&)>& callback, + const TaskID& taskId, + const ContainerID& taskContainerId, + const process::http::URL& agentURL); + + ~HealthChecker(); /** @@ -99,7 +131,10 @@ public: const lambda::function<void(const TaskHealthStatus&)>& _callback, const TaskID& _taskId, const Option<pid_t>& _taskPid, - const std::vector<std::string>& _namespaces); + const std::vector<std::string>& _namespaces, + const Option<ContainerID>& _taskContainerId, + const Option<process::http::URL>& _agentURL, + bool _commandCheckViaAgent); virtual ~HealthCheckerProcess() {} @@ -117,6 +152,39 @@ private: process::Future<Nothing> commandHealthCheck(); + process::Future<Nothing> nestedCommandHealthCheck(); + + process::Future<Nothing> _nestedCommandHealthCheck( + process::http::Connection connection); + + process::Future<Nothing> __nestedCommandHealthCheck( + const ContainerID& checkContainerId, + const process::http::Response& launchResponse); + + process::Future<process::http::Response> + nestedCommandHealthCheckTimedOut( + const ContainerID& checkContainerId, + process::http::Connection connection, + process::Future<process::http::Response> future); + + /** + * Waits for a container to be terminated. + * + * Waits for a container to be terminated via the Agent's + * `WaitNestedContainer` API call. + * + * @param containerID The `ContainerID` of the container that we want + * to wait for. + * + * @return The exit status as returned by the Agent. + */ + process::Future<Option<int>> waitNestedContainer( + const ContainerID& containerId); + + process::Future<Option<int>> _waitNestedContainer( + const ContainerID& containerId, + const process::http::Response& httpResponse); + process::Future<Nothing> httpHealthCheck(); process::Future<Nothing> _httpHealthCheck( @@ -148,6 +216,10 @@ private: const TaskID taskId; const Option<pid_t> taskPid; const std::vector<std::string> namespaces; + const Option<ContainerID> taskContainerId; + const Option<process::http::URL> agentURL; + const bool commandCheckViaAgent; + Option<lambda::function<pid_t(const lambda::function<int()>&)>> clone; uint32_t consecutiveFailures; http://git-wip-us.apache.org/repos/asf/mesos/blob/4bbfaebb/src/launcher/default_executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp index f19bba9..58efb4c 100644 --- a/src/launcher/default_executor.cpp +++ b/src/launcher/default_executor.cpp @@ -143,6 +143,8 @@ public: connectionId = UUID::random(); doReliableRegistration(); + + // TODO(gkleiman): Resume (health) checks. } void disconnected() @@ -160,6 +162,8 @@ public: container->waiting = None(); } } + + // TODO(gkleiman): Stop (health) checks. } void received(const Event& event) @@ -502,18 +506,14 @@ protected: } if (task.has_health_check()) { - // TODO(anand): Add support for command health checks. - CHECK_NE(HealthCheck::COMMAND, task.health_check().type()) - << "Command health checks are not supported yet"; - Try<Owned<checks::HealthChecker>> healthChecker = checks::HealthChecker::create( task.health_check(), launcherDirectory, defer(self(), &Self::taskHealthUpdated, lambda::_1), taskId, - None(), - vector<string>()); + containerId, + agent); if (healthChecker.isError()) { // TODO(anand): Should we send a TASK_FAILED instead? http://git-wip-us.apache.org/repos/asf/mesos/blob/4bbfaebb/src/tests/health_check_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/health_check_tests.cpp b/src/tests/health_check_tests.cpp index a5b35cf..211f8b8 100644 --- a/src/tests/health_check_tests.cpp +++ b/src/tests/health_check_tests.cpp @@ -35,6 +35,7 @@ #include "tests/health_check_test_helper.hpp" #include "tests/mesos.hpp" #include "tests/mock_docker.hpp" +#include "tests/resources_utils.hpp" #include "tests/utils.hpp" #ifdef __linux__ @@ -2131,6 +2132,118 @@ TEST_F(HealthCheckTest, ROOT_DOCKER_DockerHealthyTaskViaTCP) } } + +TEST_F_TEMP_DISABLED_ON_WINDOWS( + HealthCheckTest, DefaultExecutorCommandHealthCheck) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + // Disable AuthN on the agent. + slave::Flags flags = CreateSlaveFlags(); + flags.authenticate_http_readwrite = false; + + Fetcher fetcher; + + // We have to explicitly create a `Containerizer` in non-local mode, + // because `LaunchNestedContainerSession` (used by command health + // checks) tries to start a IO switchboard, which doesn't work in + // local mode yet. + Try<MesosContainerizer*> _containerizer = + MesosContainerizer::create(flags, false, &fetcher); + + ASSERT_SOME(_containerizer); + + Owned<slave::Containerizer> containerizer(_containerizer.get()); + Owned<MasterDetector> detector = master.get()->createDetector(); + + Try<Owned<cluster::Slave>> agent = + StartSlave(detector.get(), containerizer.get(), flags); + ASSERT_SOME(agent); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(frameworkId); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + Future<TaskStatus> statusRunning; + Future<TaskStatus> statusHealthy; + + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusRunning)) + .WillOnce(FutureArg<1>(&statusHealthy)); + + TaskInfo task = createTask(offers->front(), "sleep 120"); + + HealthCheck healthCheck; + + healthCheck.set_type(HealthCheck::COMMAND); + healthCheck.mutable_command()->set_value("exit $STATUS"); + healthCheck.set_delay_seconds(0); + healthCheck.set_interval_seconds(0); + healthCheck.set_grace_period_seconds(0); + + Environment::Variable* variable = healthCheck.mutable_command()-> + mutable_environment()->mutable_variables()->Add(); + variable->set_name("STATUS"); + variable->set_value("0"); + + task.mutable_health_check()->CopyFrom(healthCheck); + + Resources executorResources = + allocatedResources(Resources::parse("cpus:0.1;mem:32;disk:32").get(), "*"); + + task.mutable_resources()->CopyFrom(task.resources() - executorResources); + + TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(task); + + ExecutorInfo executor; + executor.mutable_executor_id()->set_value("default"); + executor.set_type(ExecutorInfo::DEFAULT); + executor.mutable_framework_id()->CopyFrom(frameworkId.get()); + executor.mutable_resources()->CopyFrom(executorResources); + executor.mutable_shutdown_grace_period()->set_nanoseconds(Seconds(10).ns()); + + driver.acceptOffers( + {offers->front().id()}, {LAUNCH_GROUP(executor, taskGroup)}); + + AWAIT_READY(statusRunning); + EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); + + AWAIT_READY(statusHealthy); + EXPECT_EQ(TASK_RUNNING, statusHealthy.get().state()); + EXPECT_TRUE(statusHealthy.get().has_healthy()); + EXPECT_TRUE(statusHealthy.get().healthy()); + + Future<hashset<ContainerID>> containerIds = containerizer->containers(); + + AWAIT_READY(containerIds); + + driver.stop(); + driver.join(); + + // Cleanup all mesos launched containers. + foreach (const ContainerID& containerId, containerIds.get()) { + AWAIT_READY(containerizer->wait(containerId)); + } +} + } // namespace tests { } // namespace internal { } // namespace mesos {
