Added a StreamingResponseDecoder. Review: https://reviews.apache.org/r/32347
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6ac8eb1c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6ac8eb1c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6ac8eb1c Branch: refs/heads/master Commit: 6ac8eb1c524dc4adbc09d20dcb8e4e31d60eeb56 Parents: 8b0bba1 Author: Benjamin Mahler <[email protected]> Authored: Fri Mar 20 11:44:51 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Mon Mar 30 16:38:22 2015 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/src/decoder.hpp | 232 +++++++++++++++++++ 3rdparty/libprocess/src/tests/decoder_tests.cpp | 91 ++++++++ 2 files changed, 323 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/6ac8eb1c/3rdparty/libprocess/src/decoder.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/decoder.hpp b/3rdparty/libprocess/src/decoder.hpp index b3a667c..56adde0 100644 --- a/3rdparty/libprocess/src/decoder.hpp +++ b/3rdparty/libprocess/src/decoder.hpp @@ -14,6 +14,7 @@ #include <stout/foreach.hpp> #include <stout/gzip.hpp> +#include <stout/option.hpp> #include <stout/try.hpp> @@ -469,6 +470,237 @@ private: }; +// Provides a response decoder that returns 'PIPE' responses once +// the response headers are received, but before the body data +// is received. Callers are expected to read the body from the +// Pipe::Reader in the response. +// +// TODO(bmahler): Consolidate streaming and non-streaming response +// decoders. +class StreamingResponseDecoder +{ +public: + StreamingResponseDecoder() + : failure(false), header(HEADER_FIELD), response(NULL) + { + settings.on_message_begin = + &StreamingResponseDecoder::on_message_begin; + +#if !(HTTP_PARSER_VERSION_MAJOR >=2) + settings.on_path = + &StreamingResponseDecoder::on_path; + settings.on_fragment = + &StreamingResponseDecoder::on_fragment; + settings.on_query_string = + &StreamingResponseDecoder::on_query_string; +#endif + + settings.on_url = + &StreamingResponseDecoder::on_url; + settings.on_header_field = + &StreamingResponseDecoder::on_header_field; + settings.on_header_value = + &StreamingResponseDecoder::on_header_value; + settings.on_headers_complete = + &StreamingResponseDecoder::on_headers_complete; + settings.on_body = + &StreamingResponseDecoder::on_body; + settings.on_message_complete = + &StreamingResponseDecoder::on_message_complete; + + http_parser_init(&parser, HTTP_RESPONSE); + + parser.data = this; + } + + std::deque<http::Response*> decode(const char* data, size_t length) + { + size_t parsed = http_parser_execute(&parser, &settings, data, length); + + if (parsed != length) { + // TODO(bmahler): joyent/http-parser exposes error reasons. + failure = true; + + // If we're still writing the body, fail the writer! + if (writer.isSome()) { + http::Pipe::Writer writer_ = writer.get(); // Remove const. + writer_.fail("failed to decode body"); + writer = None(); + } + } + + if (!responses.empty()) { + std::deque<http::Response*> result = responses; + responses.clear(); + return result; + } + + return std::deque<http::Response*>(); + } + + bool failed() const + { + return failure; + } + +private: + static int on_message_begin(http_parser* p) + { + StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data; + + CHECK(!decoder->failure); + + decoder->header = HEADER_FIELD; + decoder->field.clear(); + decoder->value.clear(); + + CHECK(decoder->response == NULL); + CHECK(decoder->writer.isNone()); + + decoder->response = new http::Response(); + decoder->response->type = http::Response::PIPE; + decoder->writer = None(); + + return 0; + } + +#if !(HTTP_PARSER_VERSION_MAJOR >= 2) + static int on_path(http_parser* p, const char* data, size_t length) + { + return 0; + } + + static int on_query_string(http_parser* p, const char* data, size_t length) + { + return 0; + } + + static int on_fragment(http_parser* p, const char* data, size_t length) + { + return 0; + } +#endif // !(HTTP_PARSER_VERSION_MAJOR >= 2) + + static int on_url(http_parser* p, const char* data, size_t length) + { + return 0; + } + + static int on_header_field(http_parser* p, const char* data, size_t length) + { + StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data; + + CHECK_NOTNULL(decoder->response); + + if (decoder->header != HEADER_FIELD) { + decoder->response->headers[decoder->field] = decoder->value; + decoder->field.clear(); + decoder->value.clear(); + } + + decoder->field.append(data, length); + decoder->header = HEADER_FIELD; + + return 0; + } + + static int on_header_value(http_parser* p, const char* data, size_t length) + { + StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data; + + CHECK_NOTNULL(decoder->response); + + decoder->value.append(data, length); + decoder->header = HEADER_VALUE; + return 0; + } + + static int on_headers_complete(http_parser* p) + { + StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data; + + CHECK_NOTNULL(decoder->response); + + // Add final header. + decoder->response->headers[decoder->field] = decoder->value; + decoder->field.clear(); + decoder->value.clear(); + + // Get the response status string. + if (http::statuses.contains(decoder->parser.status_code)) { + decoder->response->status = http::statuses[decoder->parser.status_code]; + } else { + decoder->failure = true; + return 1; + } + + // We cannot provide streaming gzip decompression! + Option<std::string> encoding = + decoder->response->headers.get("Content-Encoding"); + if (encoding.isSome() && encoding.get() == "gzip") { + decoder->failure = true; + return 1; + } + + CHECK(decoder->writer.isNone()); + + http::Pipe pipe; + decoder->writer = pipe.writer(); + decoder->response->reader = pipe.reader(); + + // Send the response to the caller, but keep a Pipe::Writer for + // streaming the body content into the response. + decoder->responses.push_back(decoder->response); + decoder->response = NULL; + + return 0; + } + + static int on_body(http_parser* p, const char* data, size_t length) + { + StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data; + + CHECK_SOME(decoder->writer); + + http::Pipe::Writer writer = decoder->writer.get(); // Remove const. + writer.write(std::string(data, length)); + + return 0; + } + + static int on_message_complete(http_parser* p) + { + StreamingResponseDecoder* decoder = (StreamingResponseDecoder*) p->data; + + CHECK_SOME(decoder->writer); + + http::Pipe::Writer writer = decoder->writer.get(); // Remove const. + writer.close(); + + decoder->writer = None(); + + return 0; + } + + bool failure; + + http_parser parser; + http_parser_settings settings; + + enum { + HEADER_FIELD, + HEADER_VALUE + } header; + + std::string field; + std::string value; + + http::Response* response; + Option<http::Pipe::Writer> writer; + + std::deque<http::Response*> responses; +}; + } // namespace process { #endif // __DECODER_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/6ac8eb1c/3rdparty/libprocess/src/tests/decoder_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/decoder_tests.cpp b/3rdparty/libprocess/src/tests/decoder_tests.cpp index b996f00..efe364a 100644 --- a/3rdparty/libprocess/src/tests/decoder_tests.cpp +++ b/3rdparty/libprocess/src/tests/decoder_tests.cpp @@ -133,3 +133,94 @@ TEST(Decoder, Response) delete response; } + + +TEST(Decoder, StreamingResponse) +{ + StreamingResponseDecoder decoder; + + const string& headers = + "HTTP/1.1 200 OK\r\n" + "Date: Fri, 31 Dec 1999 23:59:59 GMT\r\n" + "Content-Type: text/plain\r\n" + "Content-Length: 2\r\n" + "\r\n"; + + const string& body = "hi"; + + deque<Response*> responses = decoder.decode(headers.data(), headers.length()); + ASSERT_FALSE(decoder.failed()); + ASSERT_EQ(1, responses.size()); + + Response* response = responses[0]; + + EXPECT_EQ("200 OK", response->status); + EXPECT_EQ(3, response->headers.size()); + + ASSERT_EQ(Response::PIPE, response->type); + ASSERT_SOME(response->reader); + + http::Pipe::Reader reader = response->reader.get(); + Future<string> read = reader.read(); + EXPECT_TRUE(read.isPending()); + + decoder.decode(body.data(), body.length()); + + // Feeding EOF to the decoder should be ok. + decoder.decode("", 0); + + EXPECT_TRUE(read.isReady()); + EXPECT_EQ("hi", read.get()); + + // Response should be complete. + read = reader.read(); + EXPECT_TRUE(read.isReady()); + EXPECT_EQ("", read.get()); +} + + +TEST(Decoder, StreamingResponseFailure) +{ + StreamingResponseDecoder decoder; + + const string& headers = + "HTTP/1.1 200 OK\r\n" + "Date: Fri, 31 Dec 1999 23:59:59 GMT\r\n" + "Content-Type: text/plain\r\n" + "Content-Length: 2\r\n" + "\r\n"; + + // The body is shorter than the content length! + const string& body = "1"; + + deque<Response*> responses = decoder.decode(headers.data(), headers.length()); + ASSERT_FALSE(decoder.failed()); + ASSERT_EQ(1, responses.size()); + + Response* response = responses[0]; + + EXPECT_EQ("200 OK", response->status); + EXPECT_EQ(3, response->headers.size()); + + ASSERT_EQ(Response::PIPE, response->type); + ASSERT_SOME(response->reader); + + http::Pipe::Reader reader = response->reader.get(); + Future<string> read = reader.read(); + EXPECT_TRUE(read.isPending()); + + decoder.decode(body.data(), body.length()); + + EXPECT_TRUE(read.isReady()); + EXPECT_EQ("1", read.get()); + + // Body is not yet complete. + read = reader.read(); + EXPECT_TRUE(read.isPending()); + + // Feeding EOF to the decoder should trigger a failure! + decoder.decode("", 0); + + EXPECT_TRUE(read.isFailed()); + EXPECT_EQ("failed to decode body", read.failure()); +}
