Repository: mesos Updated Branches: refs/heads/master 080e1b7eb -> 1a1fa95d0
Cleaned up namespaces in "checker.cpp". Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1a1fa95d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1a1fa95d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1a1fa95d Branch: refs/heads/master Commit: 1a1fa95d0de179d7efab002a99a0e6261ce307f9 Parents: 3f81c6f Author: Alexander Rukletsov <[email protected]> Authored: Thu Mar 30 17:53:14 2017 +0200 Committer: Alexander Rukletsov <[email protected]> Committed: Thu Mar 30 19:34:24 2017 +0200 ---------------------------------------------------------------------- src/checks/checker.cpp | 108 ++++++++++++++++++++------------------------ 1 file changed, 50 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/1a1fa95d/src/checks/checker.cpp ---------------------------------------------------------------------- diff --git a/src/checks/checker.cpp b/src/checks/checker.cpp index e48e037..7510bf2 100644 --- a/src/checks/checker.cpp +++ b/src/checks/checker.cpp @@ -68,15 +68,14 @@ #include "linux/ns.hpp" #endif +namespace http = process::http; + using process::Failure; using process::Future; using process::Owned; using process::Promise; using process::Subprocess; -using process::http::Connection; -using process::http::Response; - using std::map; using std::shared_ptr; using std::string; @@ -139,7 +138,7 @@ public: const Option<pid_t>& _taskPid, const vector<string>& _namespaces, const Option<ContainerID>& _taskContainerId, - const Option<process::http::URL>& _agentURL, + const Option<http::URL>& _agentURL, bool _commandCheckViaAgent); void pause(); @@ -158,44 +157,40 @@ private: const Stopwatch& stopwatch, const Option<CheckStatusInfo>& result); - process::Future<int> commandCheck(); + Future<int> commandCheck(); - process::Future<int> nestedCommandCheck(); - void _nestedCommandCheck(std::shared_ptr<process::Promise<int>> promise); + Future<int> nestedCommandCheck(); + void _nestedCommandCheck(shared_ptr<Promise<int>> promise); void __nestedCommandCheck( - std::shared_ptr<process::Promise<int>> promise, - process::http::Connection connection); + shared_ptr<Promise<int>> promise, + http::Connection connection); void ___nestedCommandCheck( - std::shared_ptr<process::Promise<int>> promise, + shared_ptr<Promise<int>> promise, const ContainerID& checkContainerId, - const process::http::Response& launchResponse); + const http::Response& launchResponse); void nestedCommandCheckFailure( - std::shared_ptr<process::Promise<int>> promise, - process::http::Connection connection, + shared_ptr<Promise<int>> promise, + http::Connection connection, ContainerID checkContainerId, - std::shared_ptr<bool> checkTimedOut, - const std::string& failure); + shared_ptr<bool> checkTimedOut, + const string& failure); - process::Future<Option<int>> waitNestedContainer( - const ContainerID& containerId); - process::Future<Option<int>> _waitNestedContainer( + Future<Option<int>> waitNestedContainer(const ContainerID& containerId); + Future<Option<int>> _waitNestedContainer( const ContainerID& containerId, - const process::http::Response& httpResponse); + const http::Response& httpResponse); void processCommandCheckResult( const Stopwatch& stopwatch, - const process::Future<int>& result); - - process::Future<int> httpCheck(); - process::Future<int> _httpCheck( - const std::tuple< - process::Future<Option<int>>, - process::Future<std::string>, - process::Future<std::string>>& t); + const Future<int>& result); + + Future<int> httpCheck(); + Future<int> _httpCheck( + const tuple<Future<Option<int>>, Future<string>, Future<string>>& t); void processHttpCheckResult( const Stopwatch& stopwatch, - const process::Future<int>& result); + const Future<int>& result); const CheckInfo check; Duration checkDelay; @@ -205,9 +200,9 @@ private: const lambda::function<void(const CheckStatusInfo&)> updateCallback; const TaskID taskId; const Option<pid_t> taskPid; - const std::vector<std::string> namespaces; + const vector<string> namespaces; const Option<ContainerID> taskContainerId; - const Option<process::http::URL> agentURL; + const Option<http::URL> agentURL; const bool commandCheckViaAgent; Option<lambda::function<pid_t(const lambda::function<int()>&)>> clone; @@ -248,12 +243,12 @@ Try<Owned<Checker>> Checker::create( } -Try<process::Owned<Checker>> Checker::create( +Try<Owned<Checker>> Checker::create( const CheckInfo& check, const lambda::function<void(const CheckStatusInfo&)>& callback, const TaskID& taskId, const ContainerID& taskContainerId, - const process::http::URL& agentURL) + const http::URL& agentURL) { // Validate the `CheckInfo` protobuf. Option<Error> error = validation::checkInfo(check); @@ -308,7 +303,7 @@ CheckerProcess::CheckerProcess( const Option<pid_t>& _taskPid, const vector<string>& _namespaces, const Option<ContainerID>& _taskContainerId, - const Option<process::http::URL>& _agentURL, + const Option<http::URL>& _agentURL, bool _commandCheckViaAgent) : ProcessBase(process::ID::generate("checker")), check(_check), @@ -587,14 +582,14 @@ Future<int> CheckerProcess::nestedCommandCheck() removeContainer->mutable_container_id()->CopyFrom( previousCheckContainerId.get()); - process::http::Request request; + http::Request request; request.method = "POST"; request.url = agentURL.get(); request.body = serialize(ContentType::PROTOBUF, evolve(call)); request.headers = {{"Accept", stringify(ContentType::PROTOBUF)}, {"Content-Type", stringify(ContentType::PROTOBUF)}}; - process::http::request(request, false) + http::request(request, false) .onFailed(defer(self(), [this, promise](const string& failure) { LOG(WARNING) << "Connection to remove the nested container '" @@ -606,8 +601,8 @@ Future<int> CheckerProcess::nestedCommandCheck() // as a transient failure and discard the promise. promise->discard(); })) - .onReady(defer(self(), [this, promise](const Response& response) { - if (response.code != process::http::Status::OK) { + .onReady(defer(self(), [this, promise](const http::Response& response) { + if (response.code != http::Status::OK) { // The agent was unable to remove the check container, we // treat this as a transient failure and discard the promise. LOG(WARNING) << "Received '" << response.status << "' (" @@ -630,14 +625,13 @@ Future<int> CheckerProcess::nestedCommandCheck() } -void CheckerProcess::_nestedCommandCheck( - shared_ptr<process::Promise<int>> promise) +void CheckerProcess::_nestedCommandCheck(shared_ptr<Promise<int>> promise) { // TODO(alexr): Use a lambda named capture for // this cached value once it is available. const TaskID _taskId = taskId; - process::http::connect(agentURL.get()) + http::connect(agentURL.get()) .onFailed(defer(self(), [_taskId, promise](const string& failure) { LOG(WARNING) << "Unable to establish connection with the agent to launch" << " COMMAND check for task '" << _taskId << "'" @@ -651,8 +645,8 @@ void CheckerProcess::_nestedCommandCheck( void CheckerProcess::__nestedCommandCheck( - shared_ptr<process::Promise<int>> promise, - Connection connection) + shared_ptr<Promise<int>> promise, + http::Connection connection) { ContainerID checkContainerId; checkContainerId.set_value("check-" + UUID::random().toString()); @@ -671,7 +665,7 @@ void CheckerProcess::__nestedCommandCheck( launch->mutable_container_id()->CopyFrom(checkContainerId); launch->mutable_command()->CopyFrom(command); - process::http::Request request; + http::Request request; request.method = "POST"; request.url = agentURL.get(); request.body = serialize(ContentType::PROTOBUF, evolve(call)); @@ -698,7 +692,8 @@ void CheckerProcess::__nestedCommandCheck( // check command has finished or the connection has been closed. connection.send(request, false) .after(checkTimeout, - defer(self(), [timeout, checkTimedOut](Future<Response> future) { + defer(self(), + [timeout, checkTimedOut](Future<http::Response> future) { future.discard(); *checkTimedOut = true; @@ -721,11 +716,11 @@ void CheckerProcess::__nestedCommandCheck( void CheckerProcess::___nestedCommandCheck( - shared_ptr<process::Promise<int>> promise, + shared_ptr<Promise<int>> promise, const ContainerID& checkContainerId, - const Response& launchResponse) + const http::Response& launchResponse) { - if (launchResponse.code != process::http::Status::OK) { + if (launchResponse.code != http::Status::OK) { // The agent was unable to launch the check container, // we treat this as a transient failure. LOG(WARNING) << "Received '" << launchResponse.status << "' (" @@ -760,7 +755,7 @@ void CheckerProcess::___nestedCommandCheck( void CheckerProcess::nestedCommandCheckFailure( shared_ptr<Promise<int>> promise, - Connection connection, + http::Connection connection, ContainerID checkContainerId, shared_ptr<bool> checkTimedOut, const string& failure) @@ -814,15 +809,15 @@ Future<Option<int>> CheckerProcess::waitNestedContainer( containerWait->mutable_container_id()->CopyFrom(containerId); - process::http::Request request; + http::Request request; request.method = "POST"; request.url = agentURL.get(); request.body = serialize(ContentType::PROTOBUF, evolve(call)); request.headers = {{"Accept", stringify(ContentType::PROTOBUF)}, {"Content-Type", stringify(ContentType::PROTOBUF)}}; - return process::http::request(request, false) - .repair([containerId](const Future<Response>& future) { + return http::request(request, false) + .repair([containerId](const Future<http::Response>& future) { return Failure( "Connection to wait for check container '" + stringify(containerId) + "' failed: " + future.failure()); @@ -834,9 +829,9 @@ Future<Option<int>> CheckerProcess::waitNestedContainer( Future<Option<int>> CheckerProcess::_waitNestedContainer( const ContainerID& containerId, - const Response& httpResponse) + const http::Response& httpResponse) { - if (httpResponse.code != process::http::Status::OK) { + if (httpResponse.code != http::Status::OK) { return Failure( "Received '" + httpResponse.status + "' (" + httpResponse.body + ") while waiting on check container '" + stringify(containerId) + "'"); @@ -979,10 +974,7 @@ Future<int> CheckerProcess::httpCheck() Future<int> CheckerProcess::_httpCheck( - const tuple< - Future<Option<int>>, - Future<string>, - Future<string>>& t) + const tuple<Future<Option<int>>, Future<string>, Future<string>>& t) { const Future<Option<int>>& status = std::get<0>(t); if (!status.isReady()) { @@ -1032,7 +1024,7 @@ Future<int> CheckerProcess::_httpCheck( void CheckerProcess::processHttpCheckResult( const Stopwatch& stopwatch, - const process::Future<int>& result) + const Future<int>& result) { CheckStatusInfo checkStatusInfo; checkStatusInfo.set_type(check.type());
