Refrained from reusing check's `ContainerID` in COMMAND health checks. Review: https://reviews.apache.org/r/57647/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9ff4b517 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9ff4b517 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9ff4b517 Branch: refs/heads/master Commit: 9ff4b5172d5aaffd68133d438ee845ad542e6cb6 Parents: 0c0fbc5 Author: Gastón Kleiman <[email protected]> Authored: Fri Mar 24 00:50:32 2017 +0100 Committer: Alexander Rukletsov <[email protected]> Committed: Fri Mar 24 05:29:12 2017 +0100 ---------------------------------------------------------------------- src/checks/health_checker.cpp | 95 ++++++++++++++++++++++++++++++-------- src/checks/health_checker.hpp | 9 +++- 2 files changed, 84 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/9ff4b517/src/checks/health_checker.cpp ---------------------------------------------------------------------- diff --git a/src/checks/health_checker.cpp b/src/checks/health_checker.cpp index 1c098d1..2211228 100644 --- a/src/checks/health_checker.cpp +++ b/src/checks/health_checker.cpp @@ -49,6 +49,7 @@ #include <stout/strings.hpp> #include <stout/try.hpp> #include <stout/unreachable.hpp> +#include <stout/uuid.hpp> #include <stout/os/constants.hpp> #include <stout/os/killtree.hpp> @@ -507,7 +508,64 @@ Future<Nothing> HealthCheckerProcess::nestedCommandHealthCheck() // error. auto promise = std::make_shared<Promise<Nothing>>(); - // TODO(alexr): Use lambda a named capture for + if (previousCheckContainerId.isSome()) { + agent::Call call; + call.set_type(agent::Call::REMOVE_NESTED_CONTAINER); + + agent::Call::RemoveNestedContainer* removeContainer = + call.mutable_remove_nested_container(); + + removeContainer->mutable_container_id()->CopyFrom( + previousCheckContainerId.get()); + + process::http::Request request; + request.method = "POST"; + request.url = agentURL.get(); + request.body = serialize(ContentType::PROTOBUF, evolve(call)); + request.headers = {{"Accept", stringify(ContentType::PROTOBUF)}, + {"Content-Type", stringify(ContentType::PROTOBUF)}}; + + process::http::request(request, false) + .onFailed(defer(self(), [this, promise](const string& failure) { + LOG(WARNING) << "Connection to remove the nested container " + << stringify(previousCheckContainerId.get()) + << " used for the command health check of task " + << stringify(taskId) << " failed: " << failure; + + // Something went wrong while sending the request, we treat this + // as a transient failure and discard the promise. + promise->discard(); + })) + .onReady(defer(self(), [this, promise](const Response& response) { + if (response.code != process::http::Status::OK) { + // The agent was unable to remove the health check container, + // we treat this as a transient failure and discard the + // promise. + LOG(WARNING) << "Received '" << response.status << "' (" + << response.body + << ") while removing the nested container " + << stringify(previousCheckContainerId.get()) + << " used for the COMMAND health check for task" + << stringify(taskId); + + promise->discard(); + } + + previousCheckContainerId = None(); + _nestedCommandHealthCheck(promise); + })); + } else { + _nestedCommandHealthCheck(promise); + } + + return promise->future(); +} + + +void HealthCheckerProcess::_nestedCommandHealthCheck( + shared_ptr<process::Promise<Nothing>> promise) +{ + // TODO(alexr): Use a lambda named capture for // this cached value once it is available. const TaskID _taskId = taskId; @@ -521,21 +579,20 @@ Future<Nothing> HealthCheckerProcess::nestedCommandHealthCheck() promise->discard(); })) .onReady(defer(self(), - &Self::_nestedCommandHealthCheck, promise, lambda::_1)); - - return promise->future(); + &Self::__nestedCommandHealthCheck, promise, lambda::_1)); } -void HealthCheckerProcess::_nestedCommandHealthCheck( +void HealthCheckerProcess::__nestedCommandHealthCheck( shared_ptr<process::Promise<Nothing>> promise, Connection connection) { - // TODO(gkleiman): Don't reuse the `ContainerID`, it is not safe. ContainerID checkContainerId; - checkContainerId.set_value(taskContainerId.get().value() + "-health-check"); + checkContainerId.set_value("health-check-" + UUID::random().toString()); checkContainerId.mutable_parent()->CopyFrom(taskContainerId.get()); + previousCheckContainerId = checkContainerId; + CommandInfo command(check.command()); agent::Call call; @@ -591,14 +648,14 @@ void HealthCheckerProcess::_nestedCommandHealthCheck( checkTimedOut, lambda::_1)) .onReady(defer(self(), - &Self::__nestedCommandHealthCheck, + &Self::___nestedCommandHealthCheck, promise, checkContainerId, lambda::_1)); } -void HealthCheckerProcess::__nestedCommandHealthCheck( +void HealthCheckerProcess::___nestedCommandHealthCheck( shared_ptr<process::Promise<Nothing>> promise, const ContainerID& checkContainerId, const Response& launchResponse) @@ -662,19 +719,19 @@ void HealthCheckerProcess::nestedCommandHealthCheckFailure( // another health check right after we finish processing the current // timeout. // - // All the containers created for the health checks reuse the same - // `ContainerID`. In order to prevent conflicts, the promise should - // be completed once we're sure that the container has been cleaned - // up. + // We'll try to remove the container created for the health check at + // the beginning of the next check. In order to prevent a failure, + // the promise should only be completed once we're sure that the + // container has terminated. waitNestedContainer(checkContainerId) .onAny([failure, promise](const Future<Option<int>>&) { - // We assume that once `WaitNestedContainer` returns, irrespective of - // whether the response contains a failure, the container will be in a - // terminal state, so starting a new health check will not lead to a - // transient failure. + // We assume that once `WaitNestedContainer` returns, + // irrespective of whether the response contains a failure, the + // container will be in a terminal state, and that it will be + // possible to remove it. // - // This means that we don't need to retry the `WaitNestedContainer` - // call. + // This means that we don't need to retry the + // `WaitNestedContainer` call. promise->fail(failure); }); } else { http://git-wip-us.apache.org/repos/asf/mesos/blob/9ff4b517/src/checks/health_checker.hpp ---------------------------------------------------------------------- diff --git a/src/checks/health_checker.hpp b/src/checks/health_checker.hpp index 3d17ea8..e17f12f 100644 --- a/src/checks/health_checker.hpp +++ b/src/checks/health_checker.hpp @@ -158,10 +158,13 @@ private: process::Future<Nothing> nestedCommandHealthCheck(); void _nestedCommandHealthCheck( + std::shared_ptr<process::Promise<Nothing>> promise); + + void __nestedCommandHealthCheck( std::shared_ptr<process::Promise<Nothing>> promise, process::http::Connection connection); - void __nestedCommandHealthCheck( + void ___nestedCommandHealthCheck( std::shared_ptr<process::Promise<Nothing>> promise, const ContainerID& checkContainerId, const process::http::Response& launchResponse); @@ -232,6 +235,10 @@ private: process::Time startTime; bool initializing; bool paused; + + // Contains the ID of the most recently terminated nested container + // that was used to perform a COMMAND health check. + Option<ContainerID> previousCheckContainerId; };
