Repository: mesos Updated Branches: refs/heads/master 4c71ba1bd -> 55678b41f
Fixed some style issues. Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/55678b41 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/55678b41 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/55678b41 Branch: refs/heads/master Commit: 55678b41fb462effd912d5a310b421928f80f3b7 Parents: 2c3facb Author: Jie Yu <[email protected]> Authored: Fri Aug 18 11:43:27 2017 -0700 Committer: Jie Yu <[email protected]> Committed: Fri Aug 18 11:43:56 2017 -0700 ---------------------------------------------------------------------- src/resource_provider/driver.cpp | 2 +- src/resource_provider/http_connection.hpp | 65 ++++++++++++++------------ src/resource_provider/manager.cpp | 1 - 3 files changed, 37 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/55678b41/src/resource_provider/driver.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/driver.cpp b/src/resource_provider/driver.cpp index fdb35d4..62c4ca1 100644 --- a/src/resource_provider/driver.cpp +++ b/src/resource_provider/driver.cpp @@ -29,9 +29,9 @@ #include "resource_provider/http_connection.hpp" #include "resource_provider/validation.hpp" -using process::dispatch; using process::Future; using process::Owned; +using process::dispatch; using process::spawn; using process::terminate; using process::wait; http://git-wip-us.apache.org/repos/asf/mesos/blob/55678b41/src/resource_provider/http_connection.hpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/http_connection.hpp b/src/resource_provider/http_connection.hpp index bc1f01a..47686a8 100644 --- a/src/resource_provider/http_connection.hpp +++ b/src/resource_provider/http_connection.hpp @@ -58,8 +58,8 @@ namespace internal { /** * HTTP connection handler. * - * Manages the connection to a Call/Event based v1 API like the resource - * provider API. + * Manages the connection to a Call/Event based v1 API like the + * resource provider API. */ template <typename Call, typename Event> class HttpConnectionProcess @@ -72,10 +72,10 @@ public: * @param prefix prefix of the actor. * @param _detector the endpoint detector. * @param _contentType the content type expected by this connection. - * @param validate a callback which will be invoked when a call needs - * to be validated. - * @param connected a callback which will be invoked when the connection - * is established. + * @param validate a callback which will be invoked when a call + * needs to be validated. + * @param connected a callback which will be invoked when the + * connection is established. * @param disconnected a callback which will be invoked when the * connection is disconnected. * @param received a callback which will be be invoked when events @@ -108,17 +108,19 @@ public: } if (call.type() == Call::SUBSCRIBE && state != State::CONNECTED) { - // It might be possible that the scheduler is retrying. We drop the - // request if we have an ongoing subscribe request in flight or if the - // scheduler is already subscribed. + // It might be possible that the client is retrying. We drop the + // request if we have an ongoing subscribe request in flight or + // if the client is already subscribed. return process::Failure( - "Resource provider is in state" + stringify(state)); + "Cannot process 'SUBSCRIBE' call as the driver is in " + "state " + stringify(state)); } if (call.type() != Call::SUBSCRIBE && state != State::SUBSCRIBED) { // We drop all non-subscribe calls if we are not currently subscribed. return process::Failure( - "Resource provider is in state " + stringify(state)); + "Cannot process '" + stringify(call.type()) + "' call " + "as the driver is in state " + stringify(state)); } CHECK(state == State::CONNECTED || state == State::SUBSCRIBED); @@ -151,11 +153,13 @@ public: } CHECK_SOME(connectionId); - return response.then(defer(self(), - &Self::_send, - connectionId.get(), - call, - lambda::_1)); + + return response.then( + defer(self(), + &Self::_send, + connectionId.get(), + call, + lambda::_1)); } protected: @@ -181,8 +185,8 @@ protected: LOG(WARNING) << "Failed to detect an endpoint: " << future.failure(); // TODO(nfnt): A non-retryable error might be the reason for the - // failed future. In that case the resource provider should be - // informed about this error and the URL dectection aborted. + // failed future. In that case the client should be informed + // about this error and the URL dectection aborted. } // Invoke the disconnected callback if we were previously connected. @@ -240,7 +244,8 @@ protected: state = State::CONNECTING; // We create two persistent connections here, one for subscribe - // call/streaming response and another for non-subscribe calls/responses. + // call/streaming response and another for non-subscribe + // calls/responses. collect( process::http::connect(endpoint.get()), process::http::connect(endpoint.get())) @@ -291,8 +296,8 @@ protected: connectionId.get(), "Non-subscribe connection interrupted")); - // Invoke the connected callback once we have established both subscribe - // and non-subscribe connections with the master. + // Invoke the connected callback once we have established both + // subscribe and non-subscribe connections with the master. mutex.lock() .then(defer(self(), [this]() { return process::async(callbacks.connected); @@ -328,8 +333,8 @@ protected: } // We can reach here if we noticed a disconnection for either of - // subscribe/non-subscribe connections. We discard the future here to - // trigger an endpoint re-detection. + // subscribe/non-subscribe connections. We discard the future here + // to trigger an endpoint re-detection. detection.discard(); } @@ -338,8 +343,8 @@ protected: const Call& call, const process::http::Response& response) { - // It is possible that we detected a new endpoint before a response - // could be received. + // It is possible that we detected a new endpoint before a + // response could be received. if (connectionId != _connectionId) { return process::Failure("Ignoring response from stale connection"); } @@ -430,6 +435,7 @@ protected: if (event.isFailed()) { LOG(ERROR) << "Failed to decode stream of events: " << event.failure(); + disconnected(connectionId.get(), event.failure()); return; } @@ -542,10 +548,11 @@ private: process::Owned<EndpointDetector> detector; std::queue<Event> events; - // There can be multiple simulataneous ongoing (re-)connection attempts with - // the remote endpoint (e.g., the endpoint failed over while an attempt was - // in progress). This helps us in uniquely identifying the current connection - // instance and ignoring the stale instance. + // There can be multiple simulataneous ongoing (re-)connection + // attempts with the remote endpoint (e.g., the endpoint failed over + // while an attempt was in progress). This helps us in uniquely + // identifying the current connection instance and ignoring the + // stale instance. Option<UUID> connectionId; Option<UUID> streamId; http://git-wip-us.apache.org/repos/asf/mesos/blob/55678b41/src/resource_provider/manager.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp index 7072c36..da9dff1 100644 --- a/src/resource_provider/manager.cpp +++ b/src/resource_provider/manager.cpp @@ -252,7 +252,6 @@ Future<http::Response> ResourceProviderManagerProcess::api( return ok; } - if (!resourceProviders.contains(call.resource_provider_id())) { return BadRequest("Resource provider cannot be found"); }
