Refactored http::internal::request to use http::connect. Review: https://reviews.apache.org/r/38609
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2199a599 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2199a599 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2199a599 Branch: refs/heads/master Commit: 2199a599d4e57cce0c9209660e488f530156e07b Parents: 0064543 Author: Benjamin Mahler <[email protected]> Authored: Mon Sep 21 18:45:21 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Mon Oct 5 16:41:16 2015 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/http.hpp | 3 + 3rdparty/libprocess/src/http.cpp | 173 ++-------------------- 2 files changed, 19 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/2199a599/3rdparty/libprocess/include/process/http.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp index dfcc188..591c1a9 100644 --- a/3rdparty/libprocess/include/process/http.hpp +++ b/3rdparty/libprocess/include/process/http.hpp @@ -769,6 +769,9 @@ Future<Connection> connect(const URL& url); // TODO(bmahler): Consolidate these functions into a single // http::request function that takes a 'Request' object. +// TODO(bmahler): Support discarding the future responses; +// discarding should disconnect from the server. + // TODO(joerg84): Make names consistent (see Mesos-3256). // Asynchronously sends an HTTP GET request to the specified URL http://git-wip-us.apache.org/repos/asf/mesos/blob/2199a599/3rdparty/libprocess/src/http.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp index d1ff13e..ebf7609 100644 --- a/3rdparty/libprocess/src/http.cpp +++ b/3rdparty/libprocess/src/http.cpp @@ -1070,169 +1070,28 @@ Future<Connection> connect(const URL& url) namespace internal { -// Forward declaration. -void _decode( - Socket socket, - Owned<StreamingResponseDecoder> decoder, - const Future<string>& data); - - -Future<Response> decode( - Socket socket, - Owned<StreamingResponseDecoder> decoder, - const string& data) -{ - deque<Response*> responses = decoder->decode(data.c_str(), data.length()); - - if (decoder->failed() || responses.size() > 1) { - foreach (Response* response, responses) { - delete response; - } - return Failure(string("Failed to decode HTTP response") + - (responses.size() > 1 ? ": more than one response received" : "")); - } - - if (responses.empty()) { - // Keep reading until the headers are complete. - return socket.recv(None()) - .then(lambda::bind(&decode, socket, decoder, lambda::_1)); - } - - // Keep feeding data to the decoder until EOF or a 'recv' failure. - if (!data.empty()) { - socket.recv(None()) - .onAny(lambda::bind(&_decode, socket, decoder, lambda::_1)); - } - - Response response = *responses[0]; - delete responses[0]; - return response; -} - - -void _decode( - Socket socket, - Owned<StreamingResponseDecoder> decoder, - const Future<string>& data) -{ - deque<Response*> responses; - - if (!data.isReady()) { - // Let the decoder process EOF if a failure - // or discard is encountered. - responses = decoder->decode("", 0); - } else { - responses = decoder->decode(data.get().c_str(), data.get().length()); - } - - // We're not expecting more responses to arrive on this socket! - if (!responses.empty() || decoder->failed()) { - VLOG(1) << "Failed to decode HTTP response: " - << (responses.size() > 1 - ? ": more than one response received" - : ""); - - foreach (Response* response, responses) { - delete response; - } - - return; - } - - // Keep reading if the socket has more data. - if (data.isReady() && !data.get().empty()) { - socket.recv(None()) - .onAny(lambda::bind(&_decode, socket, decoder, lambda::_1)); - } -} - - -// Forward declaration. -Future<Response> _request( - Socket socket, - const Address& address, - const Request& request, - bool streamedResponse); - - Future<Response> request(const Request& request, bool streamedResponse) { - Try<Socket> socket = [&request]() -> Try<Socket> { - // Default to 'http' if no scheme was specified. - if (request.url.scheme.isNone() || - request.url.scheme == string("http")) { - return Socket::create(Socket::POLL); - } - - if (request.url.scheme == string("https")) { -#ifdef USE_SSL_SOCKET - return Socket::create(Socket::SSL); -#else - return Error("'https' scheme requires SSL enabled"); -#endif - } - - return Error("Unsupported URL scheme"); - }(); - - if (socket.isError()) { - return Failure("Failed to create socket: " + socket.error()); - } - - Address address; - - if (request.url.ip.isSome()) { - address.ip = request.url.ip.get(); - } else if (request.url.domain.isNone()) { - return Failure("Expecting url.ip or url.domain to be set"); - } else { - Try<net::IP> ip = net::getIP(request.url.domain.get(), AF_INET); - - if (ip.isError()) { - return Failure("Failed to determine IP of domain '" + - request.url.domain.get() + "': " + ip.error()); - } - - address.ip = ip.get(); - } - - if (request.url.port.isNone()) { - return Failure("Expecting url.port to be set"); - } - - address.port = request.url.port.get(); - - return socket->connect(address) - .then(lambda::bind(&_request, - socket.get(), - address, - request, - streamedResponse)); -} + // We rely on the connection closing after the response. + CHECK(!request.keepAlive); + // This is a one time request which will close the connection when + // the response is received. Since 'Connection' is reference-counted, + // we must keep a copy around until the disconnection occurs. Note + // that in order to avoid a deadlock (Connection destruction occuring + // from the ConnectionProcess execution context), we use 'async'. + return http::connect(request.url) + .then([=](Connection connection) { + Future<Response> response = connection.send(request, streamedResponse); -Future<Response> _request( - Socket socket, - const Address& address, - const Request& request, - bool streamedResponse) -{ - // Need to disambiguate the Socket::recv for binding below. - Future<string> (Socket::*recv)(const Option<ssize_t>&) = &Socket::recv; + Connection* copy = new Connection(std::move(connection)); + auto deleter = [copy](){ delete copy; }; - Owned<StreamingResponseDecoder> decoder(new StreamingResponseDecoder()); + copy->disconnected() + .onAny([=]() { async(deleter); }); - Future<Response> pipeResponse = socket.send(encode(request)) - .then(lambda::function<Future<string>(void)>( - lambda::bind(recv, socket, None()))) - .then(lambda::bind(&internal::decode, socket, decoder, lambda::_1)); - - if (streamedResponse) { - return pipeResponse; - } else { - return pipeResponse - .then(lambda::bind(&internal::convert, lambda::_1)); - } + return response; + }); } } // namespace internal {
