Added http::streaming::get/post for client-side streaming responses. Review: https://reviews.apache.org/r/32351
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f7fccce2 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f7fccce2 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f7fccce2 Branch: refs/heads/master Commit: f7fccce2e3208cfc6481151c2bb016727e5ebdfc Parents: 6ac8eb1 Author: Benjamin Mahler <[email protected]> Authored: Fri Mar 20 14:45:28 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Mon Mar 30 16:59:19 2015 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/http.hpp | 44 ++++ 3rdparty/libprocess/src/http.cpp | 237 ++++++++++++++++++++-- 3rdparty/libprocess/src/tests/http_tests.cpp | 84 ++++++++ 3 files changed, 344 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/f7fccce2/3rdparty/libprocess/include/process/http.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp index a9ef5b7..07825b2 100644 --- a/3rdparty/libprocess/include/process/http.hpp +++ b/3rdparty/libprocess/include/process/http.hpp @@ -583,6 +583,50 @@ Future<Response> post( const Option<std::string>& body = None(), const Option<std::string>& contentType = None()); + +namespace streaming { + +// Asynchronously sends an HTTP GET request to the specified URL +// and returns the HTTP response of type 'PIPE' once the response +// headers are received. The caller must read the response body +// from the Pipe::Reader. +Future<Response> get( + const URL& url, + const Option<hashmap<std::string, std::string>>& headers = None()); + +// Asynchronously sends an HTTP GET request to the process with the +// given UPID and returns the HTTP response of type 'PIPE' once the +// response headers are received. The caller must read the response +// body from the Pipe::Reader. +Future<Response> get( + const UPID& upid, + const Option<std::string>& path = None(), + const Option<std::string>& query = None(), + const Option<hashmap<std::string, std::string>>& headers = None()); + +// Asynchronously sends an HTTP POST request to the specified URL +// and returns the HTTP response of type 'PIPE' once the response +// headers are received. The caller must read the response body +// from the Pipe::Reader. +Future<Response> post( + const URL& url, + const Option<hashmap<std::string, std::string>>& headers = None(), + const Option<std::string>& body = None(), + const Option<std::string>& contentType = None()); + +// Asynchronously sends an HTTP POST request to the process with the +// given UPID and returns the HTTP response of type 'PIPE' once the +// response headers are received. The caller must read the response +// body from the Pipe::Reader. +Future<Response> post( + const UPID& upid, + const Option<std::string>& path = None(), + const Option<hashmap<std::string, std::string>>& headers = None(), + const Option<std::string>& body = None(), + const Option<std::string>& contentType = None()); + +} // namespace streaming { + } // namespace http { } // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/f7fccce2/3rdparty/libprocess/src/http.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp index cd52cc8..7e6cbd3 100644 --- a/3rdparty/libprocess/src/http.cpp +++ b/3rdparty/libprocess/src/http.cpp @@ -548,35 +548,143 @@ ostream& operator << ( namespace internal { -Future<Response> decode(const string& buffer) +// Forward declarations. +Future<string> _convert( + Pipe::Reader reader, + const memory::shared_ptr<string>& buffer, + const string& read); +Response __convert( + const Response& pipeResponse, + const string& body); + + +// Returns a 'BODY' response once the body of the provided +// 'PIPE' response can be read completely. +Future<Response> convert(const Response& pipeResponse) { - ResponseDecoder decoder; - deque<Response*> responses = decoder.decode(buffer.c_str(), buffer.length()); + memory::shared_ptr<string> buffer(new string()); - if (decoder.failed() || responses.empty()) { - for (size_t i = 0; i < responses.size(); ++i) { - delete responses[i]; + CHECK_EQ(Response::PIPE, pipeResponse.type); + CHECK_SOME(pipeResponse.reader); + + Pipe::Reader reader = pipeResponse.reader.get(); + + return reader.read() + .then(lambda::bind(&_convert, reader, buffer, lambda::_1)) + .then(lambda::bind(&__convert, pipeResponse, lambda::_1)); +} + + +Future<string> _convert( + Pipe::Reader reader, + const memory::shared_ptr<string>& buffer, + const string& read) +{ + if (read.empty()) { // EOF. + return *buffer; + } + + buffer->append(read); + + return reader.read() + .then(lambda::bind(&_convert, reader, buffer, lambda::_1)); +} + + +Response __convert(const Response& pipeResponse, const string& body) +{ + Response bodyResponse = pipeResponse; + bodyResponse.type = Response::BODY; + bodyResponse.body = body; + bodyResponse.reader = None(); // Remove the reader. + return bodyResponse; +} + + +// Forward declaration. +void _decode( + Socket socket, + Owned<StreamingResponseDecoder> decoder, + const Future<string>& data); + + +Future<Response> decode( + Socket socket, + Owned<StreamingResponseDecoder> decoder, + const string& data) +{ + deque<Response*> responses = decoder->decode(data.c_str(), data.length()); + + if (decoder->failed() || responses.size() > 1) { + foreach (Response* response, responses) { + delete response; } - return Failure("Failed to decode HTTP response:\n" + buffer + "\n"); - } else if (responses.size() > 1) { - PLOG(ERROR) << "Received more than 1 HTTP Response"; + return Failure(string("Failed to decode HTTP response") + + (responses.size() > 1 ? ": more than one response received" : "")); } - Response response = *responses[0]; - for (size_t i = 0; i < responses.size(); ++i) { - delete responses[i]; + if (responses.empty()) { + // Keep reading until the headers are complete. + return socket.recv(None()) + .then(lambda::bind(&decode, socket, decoder, lambda::_1)); + } + + // Keep feeding data to the decoder until EOF or a 'recv' failure. + if (!data.empty()) { + socket.recv(None()) + .onAny(lambda::bind(&_decode, socket, decoder, lambda::_1)); } + Response response = *responses[0]; + delete responses[0]; return response; } +void _decode( + Socket socket, + Owned<StreamingResponseDecoder> decoder, + const Future<string>& data) +{ + deque<Response*> responses; + + if (!data.isReady()) { + // Let the decoder process EOF if a failure + // or discard is encountered. + responses = decoder->decode("", 0); + } else { + responses = decoder->decode(data.get().c_str(), data.get().length()); + } + + // We're not expecting more responses to arrive on this socket! + if (!responses.empty() || decoder->failed()) { + VLOG(1) << "Failed to decode HTTP response: " + << (responses.size() > 1 + ? ": more than one response received" + : ""); + + foreach (Response* response, responses) { + delete response; + } + + return; + } + + // Keep reading if the socket has more data. + if (data.isReady() && !data.get().empty()) { + socket.recv(None()) + .onAny(lambda::bind(&_decode, socket, decoder, lambda::_1)); + } +} + + // Forward declaration. Future<Response> _request( Socket socket, const Address& address, const URL& url, const string& method, + bool streamingResponse, const Option<hashmap<string, string>>& _headers, const Option<string>& body, const Option<string>& contentType); @@ -585,6 +693,7 @@ Future<Response> _request( Future<Response> request( const URL& url, const string& method, + bool streamedResponse, const Option<hashmap<string, string>>& headers, const Option<string>& body, const Option<string>& contentType) @@ -626,6 +735,7 @@ Future<Response> request( address, url, method, + streamedResponse, headers, body, contentType)); @@ -637,6 +747,7 @@ Future<Response> _request( const Address& address, const URL& url, const string& method, + bool streamedResponse, const Option<hashmap<string, string>>& _headers, const Option<string>& body, const Option<string>& contentType) @@ -685,6 +796,13 @@ Future<Response> _request( headers["Content-Length"] = stringify(body.get().length()); } + // TODO(bmahler): Use a 'Request' and a 'RequestEncoder' here! + // Currently this does not handle 'gzip' content encoding, + // unless the caller manually compresses the 'body'. For + // streaming requests we must wipe 'gzip' as an acceptable + // encoding as we don't currently have streaming gzip utilities + // to support decoding a streaming gzip response! + // Emit the headers. foreachpair (const string& key, const string& value, headers) { out << key << ": " << value << "\r\n"; @@ -699,13 +817,18 @@ Future<Response> _request( // Need to disambiguate the Socket::recv for binding below. Future<string> (Socket::*recv)(const Option<ssize_t>&) = &Socket::recv; - // TODO(bmahler): For efficiency, this should properly use the - // ResponseDecoder when reading, rather than parsing the full string - // response. - return socket.send(out.str()) - .then(lambda::function<Future<string>(void)>( - lambda::bind(recv, socket, -1))) - .then(lambda::bind(&internal::decode, lambda::_1)); + Owned<StreamingResponseDecoder> decoder(new StreamingResponseDecoder()); + + Future<Response> pipeResponse = socket.send(out.str()) + .then(lambda::bind(recv, socket, None())) + .then(lambda::bind(&internal::decode, socket, decoder, lambda::_1)); + + if (streamedResponse) { + return pipeResponse; + } else { + return pipeResponse + .then(lambda::bind(&internal::convert, lambda::_1)); + } } } // namespace internal { @@ -715,7 +838,7 @@ Future<Response> get( const URL& url, const Option<hashmap<string, string>>& headers) { - return internal::request(url, "GET", headers, None(), None()); + return internal::request(url, "GET", false, headers, None(), None()); } @@ -757,7 +880,7 @@ Future<Response> post( return Failure("Attempted to do a POST with a Content-Type but no body"); } - return internal::request(url, "POST", headers, body, contentType); + return internal::request(url, "POST", false, headers, body, contentType); } @@ -778,5 +901,77 @@ Future<Response> post( return post(url, headers, body, contentType); } + +namespace streaming { + +Future<Response> get( + const URL& url, + const Option<hashmap<string, string>>& headers) +{ + return internal::request(url, "GET", true, headers, None(), None()); +} + + +Future<Response> get( + const UPID& upid, + const Option<string>& path, + const Option<string>& query, + const Option<hashmap<string, string>>& headers) +{ + URL url("http", net::IP(upid.address.ip), upid.address.port, upid.id); + + if (path.isSome()) { + // TODO(benh): Get 'query' and/or 'fragment' out of 'path'. + url.path = strings::join("/", url.path, path.get()); + } + + if (query.isSome()) { + Try<hashmap<string, string>> decode = http::query::decode( + strings::remove(query.get(), "?", strings::PREFIX)); + + if (decode.isError()) { + return Failure("Failed to decode HTTP query string: " + decode.error()); + } + + url.query = decode.get(); + } + + return streaming::get(url, headers); +} + + +Future<Response> post( + const URL& url, + const Option<hashmap<string, string>>& headers, + const Option<string>& body, + const Option<string>& contentType) +{ + if (body.isNone() && contentType.isSome()) { + return Failure("Attempted to do a POST with a Content-Type but no body"); + } + + return internal::request(url, "POST", true, headers, body, contentType); +} + + +Future<Response> post( + const UPID& upid, + const Option<string>& path, + const Option<hashmap<string, string>>& headers, + const Option<string>& body, + const Option<string>& contentType) +{ + URL url("http", net::IP(upid.address.ip), upid.address.port, upid.id); + + if (path.isSome()) { + // TODO(benh): Get 'query' and/or 'fragment' out of 'path'. + url.path = strings::join("/", url.path, path.get()); + } + + return streaming::post(url, headers, body, contentType); +} + +} // namespace streaming { + } // namespace http { } // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/f7fccce2/3rdparty/libprocess/src/tests/http_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp index 83219da..dfdb233 100644 --- a/3rdparty/libprocess/src/tests/http_tests.cpp +++ b/3rdparty/libprocess/src/tests/http_tests.cpp @@ -414,6 +414,90 @@ TEST(HTTP, Get) } +TEST(HTTP, StreamingGetComplete) +{ + Http http; + + http::Pipe pipe; + http::OK ok; + ok.type = http::Response::PIPE; + ok.reader = pipe.reader(); + + EXPECT_CALL(*http.process, pipe(_)) + .WillOnce(Return(ok)); + + Future<http::Response> response = + http::streaming::get(http.process->self(), "pipe"); + + // The response should be ready since the headers were sent. + AWAIT_READY(response); + + EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding")); + ASSERT_EQ(http::Response::PIPE, response.get().type); + ASSERT_SOME(response.get().reader); + + http::Pipe::Reader reader = response.get().reader.get(); + + // There is no data to read yet. + Future<string> read = reader.read(); + EXPECT_TRUE(read.isPending()); + + // Stream data into the body and read it from the response. + http::Pipe::Writer writer = pipe.writer(); + EXPECT_TRUE(writer.write("hello")); + AWAIT_EQ("hello", read); + + EXPECT_TRUE(writer.write("goodbye")); + AWAIT_EQ("goodbye", reader.read()); + + // Complete the response. + EXPECT_TRUE(writer.close()); + AWAIT_EQ("", reader.read()); // EOF. +} + + +TEST(HTTP, StreamingGetFailure) +{ + Http http; + + http::Pipe pipe; + http::OK ok; + ok.type = http::Response::PIPE; + ok.reader = pipe.reader(); + + EXPECT_CALL(*http.process, pipe(_)) + .WillOnce(Return(ok)); + + Future<http::Response> response = + http::streaming::get(http.process->self(), "pipe"); + + // The response should be ready since the headers were sent. + AWAIT_READY(response); + + EXPECT_SOME_EQ("chunked", response.get().headers.get("Transfer-Encoding")); + ASSERT_EQ(http::Response::PIPE, response.get().type); + ASSERT_SOME(response.get().reader); + + http::Pipe::Reader reader = response.get().reader.get(); + + // There is no data to read yet. + Future<string> read = reader.read(); + EXPECT_TRUE(read.isPending()); + + // Stream data into the body and read it from the response. + http::Pipe::Writer writer = pipe.writer(); + EXPECT_TRUE(writer.write("hello")); + AWAIT_EQ("hello", read); + + EXPECT_TRUE(writer.write("goodbye")); + AWAIT_EQ("goodbye", reader.read()); + + // Fail the response. + EXPECT_TRUE(writer.fail("oops")); + AWAIT_FAILED(reader.read()); +} + + http::Response validatePost(const http::Request& request) { EXPECT_EQ("POST", request.method);
