Added an http::Connection for connection re-use and pipelining. Review: https://reviews.apache.org/r/38608
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/00645436 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/00645436 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/00645436 Branch: refs/heads/master Commit: 00645436616ccdc96be9810904fa6b4476a53925 Parents: 5fc2025 Author: Benjamin Mahler <[email protected]> Authored: Mon Sep 21 18:27:46 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Mon Oct 5 16:41:15 2015 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/http.hpp | 51 ++++ 3rdparty/libprocess/src/http.cpp | 322 +++++++++++++++++++++ 3rdparty/libprocess/src/tests/http_tests.cpp | 331 ++++++++++++++++++++++ 3 files changed, 704 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/00645436/3rdparty/libprocess/include/process/http.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp index ba3f0bc..dfcc188 100644 --- a/3rdparty/libprocess/include/process/http.hpp +++ b/3rdparty/libprocess/include/process/http.hpp @@ -50,6 +50,10 @@ namespace process { template <typename T> class Future; +namespace network { +class Socket; +} // namespace network { + namespace http { // Status code reason strings, from the HTTP1.1 RFC: @@ -715,6 +719,53 @@ std::string encode(const hashmap<std::string, std::string>& query); } // namespace query { +/** + * Represents a connection to an HTTP server. Pipelining will be + * used when there are multiple requests in-flight. + * + * TODO(bmahler): This does not prevent pipelining with HTTP/1.0. + */ +class Connection +{ +public: + Connection() = delete; + + /** + * Sends a request to the server. If there are additional requests + * in flight, pipelining will occur. If 'streamedResponse' is set, + * the response body will be of type 'PIPE'. Note that if the + * request or response has a 'Connection: close' header, the + * connection will close after the response completes. + */ + Future<Response> send(const Request& request, bool streamedResponse = false); + + /** + * Disconnects from the server. + */ + Future<Nothing> disconnect(); + + /** + * Returns a future that is satisfied when a disconnection occurs. + */ + Future<Nothing> disconnected(); + + bool operator==(const Connection& c) const { return data == c.data; } + bool operator!=(const Connection& c) const { return !(*this == c); } + +private: + Connection(const network::Socket& s); + friend Future<Connection> connect(const URL&); + + // Forward declaration. + struct Data; + + std::shared_ptr<Data> data; +}; + + +Future<Connection> connect(const URL& url); + + // TODO(bmahler): Consolidate these functions into a single // http::request function that takes a 'Request' object. http://git-wip-us.apache.org/repos/asf/mesos/blob/00645436/3rdparty/libprocess/src/http.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp index 3dd7898..d1ff13e 100644 --- a/3rdparty/libprocess/src/http.cpp +++ b/3rdparty/libprocess/src/http.cpp @@ -28,11 +28,17 @@ #include <queue> #include <string> #include <sstream> +#include <tuple> #include <vector> +#include <process/async.hpp> +#include <process/defer.hpp> +#include <process/dispatch.hpp> #include <process/future.hpp> #include <process/http.hpp> +#include <process/id.hpp> #include <process/owned.hpp> +#include <process/process.hpp> #include <process/socket.hpp> #include <stout/foreach.hpp> @@ -55,6 +61,7 @@ using std::ostream; using std::ostringstream; using std::queue; using std::string; +using std::tuple; using std::vector; using process::http::Request; @@ -748,6 +755,321 @@ Response __convert(const Response& pipeResponse, const string& body) } +class ConnectionProcess : public Process<ConnectionProcess> +{ +public: + ConnectionProcess(const Socket& _socket) + : ProcessBase(ID::generate("__http_connection__")), + socket(_socket), + sendChain(Nothing()), + close(false) {} + + Future<Response> send(const Request& request, bool streamedResponse) + { + if (!disconnection.future().isPending()) { + return Failure("Disconnected"); + } + + if (close) { + return Failure("Cannot pipeline after 'Connection: close'"); + } + + if (!request.keepAlive) { + close = true; + } + + // We must chain the calls to Socket::send as it + // otherwise interleaves data across calls. + Socket socket_ = socket; + + sendChain = sendChain + .then([socket_, request]() mutable { + return socket_.send(encode(request)); + }); + + // If we can't write to the socket, disconnect. + sendChain + .onFailed(defer(self(), [this](const string& failure) { + disconnect(failure); + })); + + Promise<Response> promise; + Future<Response> response = promise.future(); + + pipeline.push(std::make_tuple(streamedResponse, std::move(promise))); + + return response; + } + + Future<Nothing> disconnect(const Option<std::string>& message = None()) + { + Try<Nothing> shutdown = socket.shutdown(); + + disconnection.set(Nothing()); + + // If a response is still streaming, we send EOF to + // the decoder in order to fail the pipe reader. + if (decoder.writingBody()) { + decoder.decode("", 0); + } + + // Fail any remaining pipelined responses. + while (!pipeline.empty()) { + std::get<1>(pipeline.front()).fail( + message.isSome() ? message.get() : "Disconnected"); + pipeline.pop(); + } + + return shutdown; + } + + Future<Nothing> disconnected() + { + return disconnection.future(); + } + +protected: + virtual void initialize() + { + // Start the read loop on the socket. We read independently + // of the requests being sent in order to detect socket + // closure at any time. + read(); + } + + virtual void finalize() + { + disconnect("Connection object was destructed"); + } + +private: + void read() + { + socket.recv() + .onAny(defer(self(), &Self::_read, lambda::_1)); + } + + void _read(const Future<string>& data) + { + deque<Response*> responses; + + if (!data.isReady() || data->empty()) { + // Process EOF. Also send EOF to the decoder if a failure + // or discard is encountered. + responses = decoder.decode("", 0); + } else { + // We should only receive data if we're expecting a response + // in the pipeline, or if a response body is still streaming. + if (pipeline.empty() && !decoder.writingBody()) { + disconnect("Received data when none is expected"); + return; + } + + responses = decoder.decode(data->data(), data->length()); + } + + // Process any decoded responses. + while (!responses.empty()) { + // We do not expect any responses when the pipeline is empty. + // Note that this may occur when a 'Connection: close' header + // prematurely terminates the pipeline. + if (pipeline.empty()) { + while (!responses.empty()) { + delete responses.front(); + responses.pop_front(); + } + + disconnect("Received response without a request"); + return; + } + + Response* response = responses.front(); + responses.pop_front(); + + tuple<bool, Promise<Response>> t = std::move(pipeline.front()); + pipeline.pop(); + + bool streamedResponse = std::get<0>(t); + Promise<Response> promise = std::move(std::get<1>(t)); + + if (streamedResponse) { + promise.set(*response); + } else { + // If the response should not be streamed, we convert + // the PIPE response into a BODY response. + promise.associate(convert(*response)); + } + + if (response->headers.contains("Connection") && + response->headers.at("Connection") == "close") { + // This is the last response the server will send! + close = true; + + // Fail the remainder of the pipeline. + while (!pipeline.empty()) { + std::get<1>(pipeline.front()).fail( + "Received 'Connection: close' from the server"); + pipeline.pop(); + } + } + + delete response; + } + + // We keep reading and feeding data to the decoder until + // EOF or a failure is encountered. + if (!data.isReady()) { + disconnect(data.isFailed() ? data.failure() : "discarded"); + return; + } else if (data->empty()) { + disconnect(); // EOF. + return; + } else if (decoder.failed()) { + disconnect("Failed to decode response"); + return; + } + + // Close the connection if a 'Connection: close' header + // was found and we're done reading the last response. + if (close && pipeline.empty() && !decoder.writingBody()) { + disconnect(); + return; + } + + read(); + } + + Socket socket; + StreamingResponseDecoder decoder; + Future<Nothing> sendChain; + Promise<Nothing> disconnection; + + // For each response in the pipeline, we store a bool for + // whether the caller wants the response to be streamed. + queue<tuple<bool, Promise<Response>>> pipeline; + + // Whether the connection should be closed upon the + // completion of the last outstanding response. + bool close; +}; + +} // namespace internal { + + +struct Connection::Data +{ + Data(const Socket& s) + : process(new internal::ConnectionProcess(s)) + { + spawn(process.get()); + } + + ~Data() + { + // Note that we pass 'false' here to avoid injecting the + // termination event at the front of the queue. This is + // to ensure we don't drop any queued request dispatches + // which would leave the caller with a future stuck in + // a pending state. + terminate(process.get(), false); + wait(process.get()); + } + + Owned<internal::ConnectionProcess> process; +}; + + +Connection::Connection(const Socket& s) + : data(std::make_shared<Connection::Data>(s)) {} + + +Future<Response> Connection::send( + const http::Request& request, + bool streamedResponse) +{ + return dispatch( + data->process.get(), + &internal::ConnectionProcess::send, + request, + streamedResponse); +} + + +Future<Nothing> Connection::disconnect() +{ + return dispatch( + data->process.get(), + &internal::ConnectionProcess::disconnect, + None()); +} + + +Future<Nothing> Connection::disconnected() +{ + return dispatch( + data->process.get(), + &internal::ConnectionProcess::disconnected); +} + + +Future<Connection> connect(const URL& url) +{ + // TODO(bmahler): Move address resolution into the URL class? + Address address; + + if (url.ip.isNone() && url.domain.isNone()) { + return Failure("Expected URL.ip or URL.domain to be set"); + } + + if (url.ip.isSome()) { + address.ip = url.ip.get(); + } else { + Try<net::IP> ip = net::getIP(url.domain.get(), AF_INET); + + if (ip.isError()) { + return Failure("Failed to determine IP of domain '" + + url.domain.get() + "': " + ip.error()); + } + + address.ip = ip.get(); + } + + if (url.port.isNone()) { + return Failure("Expecting url.port to be set"); + } + + address.port = url.port.get(); + + Try<Socket> socket = [&url]() -> Try<Socket> { + // Default to 'http' if no scheme was specified. + if (url.scheme.isNone() || url.scheme == string("http")) { + return Socket::create(Socket::POLL); + } + + if (url.scheme == string("https")) { +#ifdef USE_SSL_SOCKET + return Socket::create(Socket::SSL); +#else + return Error("'https' scheme requires SSL enabled"); +#endif + } + + return Error("Unsupported URL scheme"); + }(); + + if (socket.isError()) { + return Failure("Failed to create socket: " + socket.error()); + } + + return socket->connect(address) + .then([socket]() { + return Connection(socket.get()); + }); +} + + +namespace internal { + // Forward declaration. void _decode( Socket socket, http://git-wip-us.apache.org/repos/asf/mesos/blob/00645436/3rdparty/libprocess/src/tests/http_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp index c380f35..38f3ad7 100644 --- a/3rdparty/libprocess/src/tests/http_tests.cpp +++ b/3rdparty/libprocess/src/tests/http_tests.cpp @@ -28,6 +28,7 @@ #include <process/gtest.hpp> #include <process/http.hpp> #include <process/io.hpp> +#include <process/owned.hpp> #include <process/socket.hpp> #include <stout/base64.hpp> @@ -41,6 +42,8 @@ using namespace process; +using process::Owned; + using process::http::URL; using process::network::Socket; @@ -646,6 +649,334 @@ TEST(HTTPTest, Delete) } +TEST(HTTPConnectionTest, Serial) +{ + Http http; + + http::URL url = http::URL( + "http", + http.process->self().address.ip, + http.process->self().address.port, + http.process->self().id + "/get"); + + Future<http::Connection> connect = http::connect(url); + AWAIT_READY(connect); + + http::Connection connection = connect.get(); + + // First test a regular (non-streaming) request. + Promise<http::Response> promise1; + Future<http::Request> get1; + + EXPECT_CALL(*http.process, get(_)) + .WillOnce(DoAll(FutureArg<0>(&get1), Return(promise1.future()))); + + http::Request request1; + request1.method = "GET"; + request1.url = url; + request1.body = "1"; + request1.keepAlive = true; + + Future<http::Response> response1 = connection.send(request1); + + AWAIT_READY(get1); + EXPECT_EQ("1", get1->body); + + promise1.set(http::OK("1")); + + AWAIT_EXPECT_RESPONSE_BODY_EQ("1", response1); + + // Now test a streaming response. + Promise<http::Response> promise2; + Future<http::Request> get2; + + EXPECT_CALL(*http.process, get(_)) + .WillOnce(DoAll(FutureArg<0>(&get2), Return(promise2.future()))); + + http::Request request2 = request1; + request2.body = "2"; + + Future<http::Response> response2 = connection.send(request2, true); + + AWAIT_READY(get2); + EXPECT_EQ("2", get2->body); + + promise2.set(http::OK("2")); + + AWAIT_READY(response2); + ASSERT_SOME(response2->reader); + + http::Pipe::Reader reader = response2->reader.get(); + AWAIT_EQ("2", reader.read()); + AWAIT_EQ("", reader.read()); + + // Disconnect. + AWAIT_READY(connection.disconnect()); + AWAIT_READY(connection.disconnected()); + + // After disconnection, sends should fail. + AWAIT_FAILED(connection.send(request1)); +} + + +TEST(HTTPConnectionTest, Pipeline) +{ + Http http; + + http::URL url = http::URL( + "http", + http.process->self().address.ip, + http.process->self().address.port, + http.process->self().id + "/get"); + + Future<http::Connection> connect = http::connect(url); + AWAIT_READY(connect); + + http::Connection connection = connect.get(); + + // Send three pipelined requests. + Promise<http::Response> promise1, promise2, promise3; + Future<http::Request> get1, get2, get3; + + EXPECT_CALL(*http.process, get(_)) + .WillOnce(DoAll(FutureArg<0>(&get1), + Return(promise1.future()))) + .WillOnce(DoAll(FutureArg<0>(&get2), + Return(promise2.future()))) + .WillOnce(DoAll(FutureArg<0>(&get3), + Return(promise3.future()))); + + http::Request request1, request2, request3; + + request1.method = "GET"; + request2.method = "GET"; + request3.method = "GET"; + + request1.url = url; + request2.url = url; + request3.url = url; + + request1.body = "1"; + request2.body = "2"; + request3.body = "3"; + + request1.keepAlive = true; + request2.keepAlive = true; + request3.keepAlive = true; + + Future<http::Response> response1 = connection.send(request1); + Future<http::Response> response2 = connection.send(request2, true); + Future<http::Response> response3 = connection.send(request3); + + // Ensure the requests are all received before any + // responses have been sent. + AWAIT_READY(get1); + AWAIT_READY(get2); + AWAIT_READY(get3); + + EXPECT_EQ("1", get1->body); + EXPECT_EQ("2", get2->body); + EXPECT_EQ("3", get3->body); + + // Complete the responses in the opposite order, and ensure + // that the pipelining in libprocess sends the responses in + // the same order as the requests were received. + promise3.set(http::OK("3")); + promise2.set(http::OK("2")); + + EXPECT_TRUE(response1.isPending()); + EXPECT_TRUE(response2.isPending()); + EXPECT_TRUE(response3.isPending()); + + promise1.set(http::OK("1")); + + AWAIT_READY(response1); + AWAIT_READY(response2); + AWAIT_READY(response3); + + EXPECT_EQ("1", response1->body); + + ASSERT_SOME(response2->reader); + + http::Pipe::Reader reader = response2->reader.get(); + AWAIT_EQ("2", reader.read()); + AWAIT_EQ("", reader.read()); + + EXPECT_EQ("3", response3->body); + + // Disconnect. + AWAIT_READY(connection.disconnect()); + AWAIT_READY(connection.disconnected()); + + // After disconnection, sends should fail. + AWAIT_FAILED(connection.send(request1)); +} + + +TEST(HTTPConnectionTest, ClosingRequest) +{ + Http http; + + http::URL url = http::URL( + "http", + http.process->self().address.ip, + http.process->self().address.port, + http.process->self().id + "/get"); + + Future<http::Connection> connect = http::connect(url); + AWAIT_READY(connect); + + http::Connection connection = connect.get(); + + // Issue two pipelined requests, the second will not have + // 'keepAlive' set. This prevents further requests and leads + // to a disconnection upon receiving the second response. + Promise<http::Response> promise1, promise2; + Future<http::Request> get1, get2; + + EXPECT_CALL(*http.process, get(_)) + .WillOnce(DoAll(FutureArg<0>(&get1), + Return(promise1.future()))) + .WillOnce(DoAll(FutureArg<0>(&get2), + Return(promise2.future()))); + + http::Request request1, request2; + + request1.method = "GET"; + request2.method = "GET"; + + request1.url = url; + request2.url = url; + + request1.keepAlive = true; + request2.keepAlive = false; + + Future<http::Response> response1 = connection.send(request1); + Future<http::Response> response2 = connection.send(request2); + + // After a closing request, sends should fail. + AWAIT_FAILED(connection.send(request1)); + + // Complete the responses. + promise1.set(http::OK("body")); + promise2.set(http::OK("body")); + + AWAIT_READY(response1); + AWAIT_READY(response2); + + AWAIT_READY(connection.disconnected()); +} + + +TEST(HTTPConnectionTest, ClosingResponse) +{ + Http http; + + http::URL url = http::URL( + "http", + http.process->self().address.ip, + http.process->self().address.port, + http.process->self().id + "/get"); + + Future<http::Connection> connect = http::connect(url); + AWAIT_READY(connect); + + http::Connection connection = connect.get(); + + // Issue two pipelined requests, the server will respond + // with a 'Connection: close' for the first response, which + // will lead to a disconnection. + Promise<http::Response> promise1, promise2; + Future<http::Request> get1, get2; + + EXPECT_CALL(*http.process, get(_)) + .WillOnce(DoAll(FutureArg<0>(&get1), Return(promise1.future()))) + .WillOnce(DoAll(FutureArg<0>(&get2), Return(promise2.future()))); + + http::Request request1, request2; + + request1.method = "GET"; + request2.method = "GET"; + + request1.url = url; + request2.url = url; + + request1.keepAlive = true; + request2.keepAlive = true; + + Future<http::Response> response1 = connection.send(request1); + Future<http::Response> response2 = connection.send(request2); + + // The first response will close the connection. + http::Response ok = http::OK("body"); + ok.headers["Connection"] = "close"; + + promise1.set(ok); + + AWAIT_READY(response1); + AWAIT_FAILED(response2); + + AWAIT_READY(connection.disconnected()); +} + + +TEST(HTTPConnectionTest, ReferenceCounting) +{ + Http http; + + http::URL url = http::URL( + "http", + http.process->self().address.ip, + http.process->self().address.port, + http.process->self().id + "/get"); + + // Capture the connection as a Owned in order to test that + // when the last copy of the Connection is destructed, a + // disconnection occurs. + auto connect = Owned<Future<http::Connection>>( + new Future<http::Connection>(http::connect(url))); + + AWAIT_READY(*connect); + + auto connection = Owned<http::Connection>( + new http::Connection(connect->get())); + + connect.reset(); + + Future<Nothing> disconnected = connection->disconnected(); + + // This should be the last remaining copy of the connection. + connection.reset(); + + AWAIT_READY(disconnected); +} + + +TEST(HTTPConnectionTest, Equality) +{ + Http http; + + http::URL url = http::URL( + "http", + http.process->self().address.ip, + http.process->self().address.port, + http.process->self().id + "/get"); + + Future<http::Connection> connect = http::connect(url); + AWAIT_READY(connect); + + http::Connection connection1 = connect.get(); + + connect = http::connect(url); + AWAIT_READY(connect); + + http::Connection connection2 = connect.get(); + + EXPECT_NE(connection1, connection2); + EXPECT_EQ(connection2, connection2); +} + + TEST(HTTPTest, QueryEncodeDecode) { // If we use Type<a, b> directly inside a macro without surrounding
