Made COMMAND health checks resilient to agent failovers. Review: https://reviews.apache.org/r/57646/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0c0fbc57 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0c0fbc57 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0c0fbc57 Branch: refs/heads/master Commit: 0c0fbc57bed2ab26dff516491c6264f37d14cd4f Parents: 85edc8f Author: Gastón Kleiman <[email protected]> Authored: Fri Mar 24 00:50:04 2017 +0100 Committer: Alexander Rukletsov <[email protected]> Committed: Fri Mar 24 05:29:12 2017 +0100 ---------------------------------------------------------------------- src/checks/health_checker.cpp | 228 +++++++++++++++++++++++-------------- src/checks/health_checker.hpp | 16 ++- 2 files changed, 155 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/0c0fbc57/src/checks/health_checker.cpp ---------------------------------------------------------------------- diff --git a/src/checks/health_checker.cpp b/src/checks/health_checker.cpp index 3290eb6..1c098d1 100644 --- a/src/checks/health_checker.cpp +++ b/src/checks/health_checker.cpp @@ -34,6 +34,7 @@ #include <process/collect.hpp> #include <process/delay.hpp> #include <process/dispatch.hpp> +#include <process/future.hpp> #include <process/http.hpp> #include <process/io.hpp> #include <process/subprocess.hpp> @@ -68,6 +69,7 @@ using process::Clock; using process::Failure; using process::Future; using process::Owned; +using process::Promise; using process::Subprocess; using process::Time; @@ -75,6 +77,7 @@ using process::http::Connection; using process::http::Response; using std::map; +using std::shared_ptr; using std::string; using std::tuple; using std::vector; @@ -377,6 +380,13 @@ void HealthCheckerProcess::processCheckResult( return; } + if (future.isDiscarded()) { + LOG(INFO) << HealthCheck::Type_Name(check.type()) + + " health check of task " + stringify(taskId) + " discarded"; + scheduleNext(checkInterval); + return; + } + VLOG(1) << "Performed " << HealthCheck::Type_Name(check.type()) << " health check in " << stopwatch.elapsed(); @@ -386,8 +396,7 @@ void HealthCheckerProcess::processCheckResult( } string message = HealthCheck::Type_Name(check.type()) + - " health check failed: " + - (future.isFailed() ? future.failure() : "discarded"); + " health check failed: " + future.failure(); failure(message); } @@ -489,16 +498,37 @@ Future<Nothing> HealthCheckerProcess::nestedCommandHealthCheck() VLOG(1) << "Launching command health check of task " << stringify(taskId); - return process::http::connect(agentURL.get()) - .repair([](const Future<Connection>& future) { - return Failure( - "Unable to establish connection with the agent: " + future.failure()); - }) - .then(defer(self(), &Self::_nestedCommandHealthCheck, lambda::_1)); + // We don't want recoverable errors, e.g., the agent responding with + // HTTP status code 503, to trigger a health check failure. + // + // The future returned by this method represents the result of a + // health check. It will be set to `Nothing` if the check succeeded, + // to a `Failure` if it failed, and discarded if there was a transient + // error. + auto promise = std::make_shared<Promise<Nothing>>(); + + // TODO(alexr): Use lambda a named capture for + // this cached value once it is available. + const TaskID _taskId = taskId; + + process::http::connect(agentURL.get()) + .onFailed(defer(self(), [_taskId, promise](const string& failure) { + LOG(WARNING) << "Unable to establish connection with the agent to launch" + << " COMMAND health check for task '" << _taskId + << "': " << failure; + + // We treat this as a transient failure. + promise->discard(); + })) + .onReady(defer(self(), + &Self::_nestedCommandHealthCheck, promise, lambda::_1)); + + return promise->future(); } -Future<Nothing> HealthCheckerProcess::_nestedCommandHealthCheck( +void HealthCheckerProcess::_nestedCommandHealthCheck( + shared_ptr<process::Promise<Nothing>> promise, Connection connection) { // TODO(gkleiman): Don't reuse the `ContainerID`, it is not safe. @@ -525,6 +555,12 @@ Future<Nothing> HealthCheckerProcess::_nestedCommandHealthCheck( {"Message-Accept", stringify(ContentType::PROTOBUF)}, {"Content-Type", stringify(ContentType::PROTOBUF)}}; + // TODO(alexr): Use lambda named captures for + // these cached values once they are available. + const Duration timeout = checkTimeout; + + auto checkTimedOut = std::make_shared<bool>(false); + // `LAUNCH_NESTED_CONTAINER_SESSION` returns a streamed response with // the output of the container. The agent will close the stream once // the container has exited, or kill the container if the client @@ -537,100 +573,123 @@ Future<Nothing> HealthCheckerProcess::_nestedCommandHealthCheck( // This means that this future will not be completed until after the // health check command has finished or the connection has been // closed. - return connection.send(request, false) + connection.send(request, false) .after(checkTimeout, - defer(self(), - &Self::nestedCommandHealthCheckTimedOut, - checkContainerId, - connection, - lambda::_1)) - .then(defer(self(), - &Self::__nestedCommandHealthCheck, - checkContainerId, - lambda::_1)); + defer(self(), [timeout, checkTimedOut](Future<Response> future) { + future.discard(); + + *checkTimedOut = true; + + return Failure( + "Command timed out after " + stringify(timeout) + "; aborting"); + })) + .onFailed(defer(self(), + &Self::nestedCommandHealthCheckFailure, + promise, + connection, + checkContainerId, + checkTimedOut, + lambda::_1)) + .onReady(defer(self(), + &Self::__nestedCommandHealthCheck, + promise, + checkContainerId, + lambda::_1)); } -Future<Nothing> HealthCheckerProcess::__nestedCommandHealthCheck( +void HealthCheckerProcess::__nestedCommandHealthCheck( + shared_ptr<process::Promise<Nothing>> promise, const ContainerID& checkContainerId, const Response& launchResponse) { if (launchResponse.code != process::http::Status::OK) { - return Failure( - "Received '" + launchResponse.status + "' (" + launchResponse.body + - ") while launching a command health check of task '" + - stringify(taskId) + "'"); + // The agent was unable to launch the health check container, we + // treat this as a transient failure. + LOG(WARNING) << "Received '" << launchResponse.status << "' (" + << launchResponse.body << ") while launching command health " + << "check of task " << stringify(taskId); + + promise->discard(); + return; } // We need to make a copy so that the lambdas can capture it. - const TaskID taskId_ = taskId; + const TaskID _taskId = taskId; - return waitNestedContainer(checkContainerId) - .repair([taskId_](const Future<Option<int>>& future) { - return Failure( - "Unable to get the exit code of command health check of task '" + - stringify(taskId_) + "': " + future.failure()); + waitNestedContainer(checkContainerId) + .onFailed([_taskId, promise](const string& failure) { + promise->fail( + "Unable to get the exit code of command health check of task " + + stringify(_taskId) + ": " + failure); }) - .then([taskId_](const Option<int> status) -> Future<Nothing> { + .onReady([_taskId, promise](const Option<int>& status) -> void { if (status.isNone()) { - return Failure( - "Unable to get the exit code of command health check of task '" + - stringify(taskId_) + "'"); + promise->fail( + "Unable to get the exit code of command health check of task " + + stringify(_taskId)); + // TODO(gkleiman): Make sure that the following block works on Windows. + } else if (WIFSIGNALED(status.get()) && + WTERMSIG(status.get()) == SIGKILL) { + // The check container was signaled, probably because the task + // finished while the check was still in-flight, so we discard + // the result. + promise->discard(); } else if (status.get() != 0) { - return Failure( - "Command health check of task '" + stringify(taskId_) + - "' returned " + WSTRINGIFY(status.get())); + promise->fail( + "Command health check of task " + stringify(_taskId) + + " returned " + WSTRINGIFY(status.get())); } else { - return Nothing(); + promise->set(Nothing()); } }); } -Future<Response> -HealthCheckerProcess::nestedCommandHealthCheckTimedOut( - const ContainerID& checkContainerId, +void HealthCheckerProcess::nestedCommandHealthCheckFailure( + shared_ptr<Promise<Nothing>> promise, Connection connection, - Future<Response> future) + ContainerID checkContainerId, + shared_ptr<bool> checkTimedOut, + const string& failure) { - future.discard(); - - // Closing the connection will make the agent kill the container. - connection.disconnect(); - - const Failure failure = Failure( - "Command health check of task '" + stringify(taskId) + - "' has timed out after " + stringify(checkTimeout)); - - // We need to make a copy so that the lambda can capture it. - const TaskID taskId_ = taskId; - - // If the health check delay interval is zero, we'll try to perform - // another health check right after we finish processing the current - // timeout. - // - // All the containers created for the health checks reuse the same - // `ContainerID`. In order to prevent conflicts, the future returned - // by this method should only be completed once we're sure that the - // container has been cleaned up. - return waitNestedContainer(checkContainerId) - .repair([failure, taskId_](const Future<Option<int>>& waitFuture) { - // We assume that once `WaitNestedContainer` returns, irrespective of - // whether the response contains a failure, the container will be in a - // terminal state, so starting a new health check will not lead to a - // transient failure. - // - // This means that we don't need to retry the `WaitNestedContainer` - // call. - LOG(WARNING) << "Unable to get the exit code of command health check of " - << "task '" << stringify(taskId_) - << "': " << waitFuture.failure(); - - return Future<Option<int>>(failure); - }) - .then([failure](const Option<int>&) { - return Future<Response>(failure); - }); + if (*checkTimedOut) { + // The health check timed out, closing the connection will make the + // agent kill the container. + connection.disconnect(); + + // If the health check delay interval is zero, we'll try to perform + // another health check right after we finish processing the current + // timeout. + // + // All the containers created for the health checks reuse the same + // `ContainerID`. In order to prevent conflicts, the promise should + // be completed once we're sure that the container has been cleaned + // up. + waitNestedContainer(checkContainerId) + .onAny([failure, promise](const Future<Option<int>>&) { + // We assume that once `WaitNestedContainer` returns, irrespective of + // whether the response contains a failure, the container will be in a + // terminal state, so starting a new health check will not lead to a + // transient failure. + // + // This means that we don't need to retry the `WaitNestedContainer` + // call. + promise->fail(failure); + }); + } else { + // The agent was not able to complete the request, discarding the + // promise signals the health checker that it should retry the + // health check. + // + // This will allow us to recover from a blip. The executor will + // pause the health checker when it detects that the agent is not + // available. + LOG(WARNING) << "Connection to the agent to launch COMMAND health check" + << " for task '" << taskId << "' failed: " << failure; + + promise->discard(); + } } @@ -653,10 +712,13 @@ Future<Option<int>> HealthCheckerProcess::waitNestedContainer( {"Content-Type", stringify(ContentType::PROTOBUF)}}; return process::http::request(request, false) + .repair([this](const Future<Response>& future) { + return Failure( + "Connection to wait for a health check of task " + + stringify(taskId) + " failed: " + future.failure()); + }) .then(defer(self(), - &Self::_waitNestedContainer, - containerId, - lambda::_1)); + &Self::_waitNestedContainer, containerId, lambda::_1)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/0c0fbc57/src/checks/health_checker.hpp ---------------------------------------------------------------------- diff --git a/src/checks/health_checker.hpp b/src/checks/health_checker.hpp index 29df49b..3d17ea8 100644 --- a/src/checks/health_checker.hpp +++ b/src/checks/health_checker.hpp @@ -17,6 +17,7 @@ #ifndef __HEALTH_CHECKER_HPP__ #define __HEALTH_CHECKER_HPP__ +#include <memory> #include <string> #include <tuple> #include <vector> @@ -156,18 +157,21 @@ private: process::Future<Nothing> nestedCommandHealthCheck(); - process::Future<Nothing> _nestedCommandHealthCheck( + void _nestedCommandHealthCheck( + std::shared_ptr<process::Promise<Nothing>> promise, process::http::Connection connection); - process::Future<Nothing> __nestedCommandHealthCheck( + void __nestedCommandHealthCheck( + std::shared_ptr<process::Promise<Nothing>> promise, const ContainerID& checkContainerId, const process::http::Response& launchResponse); - process::Future<process::http::Response> - nestedCommandHealthCheckTimedOut( - const ContainerID& checkContainerId, + void nestedCommandHealthCheckFailure( + std::shared_ptr<process::Promise<Nothing>> promise, process::http::Connection connection, - process::Future<process::http::Response> future); + ContainerID checkContainerId, + std::shared_ptr<bool> checkTimedOut, + const std::string& failure); /** * Waits for a container to be terminated.
