Repository: mesos Updated Branches: refs/heads/master 9f8ab2866 -> e76954abb
Introduced an http::Pipe abstraction to simplify streaming HTTP Responses. Review: https://reviews.apache.org/r/31930 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e76954ab Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e76954ab Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e76954ab Branch: refs/heads/master Commit: e76954abb37a30da5bb211829d7033e53d830a7f Parents: 9f8ab28 Author: Benjamin Mahler <[email protected]> Authored: Thu Mar 5 18:33:28 2015 -0800 Committer: Benjamin Mahler <[email protected]> Committed: Thu Mar 19 23:12:24 2015 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/http.hpp | 148 +++++++++++++++++++-- 3rdparty/libprocess/src/http.cpp | 150 ++++++++++++++++++++++ 3rdparty/libprocess/src/process.cpp | 110 +++++++--------- 3rdparty/libprocess/src/tests/http_tests.cpp | 102 +++++++++++++-- 4 files changed, 427 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e76954ab/3rdparty/libprocess/include/process/http.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp index 10143fd..2b36698 100644 --- a/3rdparty/libprocess/include/process/http.hpp +++ b/3rdparty/libprocess/include/process/http.hpp @@ -8,10 +8,13 @@ #include <cctype> #include <cstdlib> #include <iomanip> +#include <queue> #include <sstream> #include <string> #include <vector> +#include <process/future.hpp> +#include <process/owned.hpp> #include <process/pid.hpp> #include <stout/error.hpp> @@ -19,7 +22,9 @@ #include <stout/hashmap.hpp> #include <stout/ip.hpp> #include <stout/json.hpp> +#include <stout/memory.hpp> #include <stout/none.hpp> +#include <stout/nothing.hpp> #include <stout/numify.hpp> #include <stout/option.hpp> #include <stout/stringify.hpp> @@ -121,6 +126,130 @@ struct Request }; +// Represents an asynchronous in-memory unbuffered Pipe, currently +// used for streaming HTTP responses via chunked encoding. Note that +// being an in-memory pipe means that this cannot be used across OS +// processes. +// +// Much like unix pipes, data is read until end-of-file is +// encountered; this occurs when the the write-end of the pipe is +// closed and there is no outstanding data left to read. +// +// Unlike unix pipes, if the read-end of the pipe is closed before +// the write-end is closed, rather than receiving SIGPIPE or EPIPE +// during a write, the writer is notified via a future. Like unix +// pipes, we are not notified if the read-end is closed after the +// write-end is closed, even if data is remaining in the pipe! +// +// No buffering means that each non-empty write to the pipe will +// correspond to to an equivalent read from the pipe, and the +// reader must "keep up" with the writer in order to avoid +// unbounded memory growth. +// +// TODO(bmahler): The writer needs to be able to induce a failure +// on the reader to signal an error has occurred. For example, if +// we are receiving a response but a disconnection occurs before +// the response is completed, we want the reader to detect that a +// disconnection occurred! +// +// TODO(bmahler): Consider aggregating writes into larger reads to +// help the reader keep up (a process::Stream abstraction with +// backpressure would obviate the need for this). +// +// TODO(bmahler): Add a more general process::Stream<T> abstraction +// to represent asynchronous finite/infinite streams (possibly +// with "backpressure" on the writer). This is broadly useful +// (e.g. allocator can expose Stream<Allocation>, http::Pipe +// becomes Stream<string>, process::Queue<T> is just an infinite +// Stream<T> (i.e. completion and error semantics hidden)). +class Pipe +{ +private: + struct Data; // Forward declaration. + +public: + class Reader + { + public: + // Returns data written to the pipe. + // Returns an empty read when end-of-file is reached. + // Returns Failure if the read-end of the pipe is closed. + Future<std::string> read(); + + // Closing the read-end of the pipe before the write-end closes + // will notify the writer that the reader is no longer interested. + // Returns false if the read-end of the pipe was already closed. + bool close(); + + private: + friend class Pipe; + explicit Reader(const memory::shared_ptr<Data>& _data) : data(_data) {} + memory::shared_ptr<Data> data; + }; + + class Writer + { + public: + // Returns false if the data could not be written because + // either end of the pipe was already closed. Note that an + // empty write has no effect. + bool write(const std::string& s); + + // Closing the write-end of the pipe will send end-of-file + // to the reader. Returns false if the write-end of the pipe + // was already closed. + bool close(); + + // Returns Nothing when the read-end of the pipe is closed + // before the write-end is closed, which means the reader + // was unable to continue reading! + Future<Nothing> readerClosed(); + + private: + friend class Pipe; + explicit Writer(const memory::shared_ptr<Data>& _data) : data(_data) {} + memory::shared_ptr<Data> data; + }; + + Pipe() : data(new Data()) {} + + Reader reader() const; + Writer writer() const; + +private: + enum State + { + OPEN, + CLOSED, + }; + + struct Data + { + Data() : lock(0), readEnd(OPEN), writeEnd(OPEN) {} + + // Rather than use a process to serialize access to the pipe's + // internal data we use a low-level "lock" which we acquire and + // release using atomic builtins. + int lock; + + State readEnd; + State writeEnd; + + // Represents readers waiting for data from the pipe. + std::queue<Owned<Promise<std::string>>> reads; + + // Represents unread writes in the pipe. Note that we omit + // empty strings as they serve as a signal for end-of-file. + std::queue<std::string> writes; + + // Signals when the read-end is closed before the write-end. + Promise<Nothing> readerClosure; + }; + + memory::shared_ptr<Data> data; +}; + + struct Response { Response() @@ -138,8 +267,8 @@ struct Response std::string status; hashmap<std::string, std::string> headers; - // Either provide a "body", an absolute "path" to a file, or a - // "pipe" for streaming a response. Distinguish between the cases + // Either provide a 'body', an absolute 'path' to a file, or a + // 'pipe' for streaming a response. Distinguish between the cases // using 'type' below. // // BODY: Uses 'body' as the body of the response. These may be @@ -149,13 +278,12 @@ struct Response // PATH: Attempts to perform a 'sendfile' operation on the file // found at 'path'. // - // PIPE: Splices data from 'pipe' using 'Transfer-Encoding=chunked'. - // Note that the read end of the pipe will be closed by libprocess - // either after the write end has been closed or if the socket the - // data is being spliced to has been closed (i.e., nobody is - // listening any longer). This can cause writes to the pipe to - // generate a SIGPIPE (which will terminate your program unless you - // explicitly ignore them or handle them). + // PIPE: Splices data from the Pipe 'reader' using a "chunked" + // 'Transfer-Encoding'. The writer uses a Pipe::Writer to + // perform writes and to detect a closed read-end of the Pipe + // (i.e. nobody is listening any longer). Once the writer is + // finished, it will close its end of the pipe to signal end + // of file to the Reader. // // In all cases (BODY, PATH, PIPE), you are expected to properly // specify the 'Content-Type' header, but the 'Content-Length' and @@ -169,7 +297,7 @@ struct Response std::string body; std::string path; - int pipe; // See comment above regarding the semantics for closing. + Option<Pipe::Reader> reader; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/e76954ab/3rdparty/libprocess/src/http.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp index 7c0cee4..276cecd 100644 --- a/3rdparty/libprocess/src/http.cpp +++ b/3rdparty/libprocess/src/http.cpp @@ -2,14 +2,17 @@ #include <stdint.h> +#include <algorithm> #include <cstring> #include <deque> #include <iostream> +#include <queue> #include <string> #include <vector> #include <process/future.hpp> #include <process/http.hpp> +#include <process/internal.hpp> #include <process/owned.hpp> #include <process/socket.hpp> @@ -23,6 +26,7 @@ #include "decoder.hpp" using std::deque; +using std::queue; using std::string; using std::vector; @@ -35,6 +39,152 @@ using process::network::Socket; namespace process { namespace http { +Pipe::Reader Pipe::reader() const +{ + return Pipe::Reader(data); +} + + +Pipe::Writer Pipe::writer() const +{ + return Pipe::Writer(data); +} + + +Future<string> Pipe::Reader::read() +{ + Future<string> future; + + process::internal::acquire(&data->lock); + { + if (data->readEnd == CLOSED) { + future = Failure("closed"); + } else if (!data->writes.empty()) { + future = data->writes.front(); + data->writes.pop(); + } else if (data->writeEnd == CLOSED) { + future = ""; // End-of-file. + } else { + data->reads.push(Owned<Promise<string>>(new Promise<string>())); + future = data->reads.back()->future(); + } + } + process::internal::release(&data->lock); + + return future; +} + + +bool Pipe::Reader::close() +{ + bool closed = false; + bool notify = false; + queue<Owned<Promise<string>>> reads; + + process::internal::acquire(&data->lock); + { + if (data->readEnd == OPEN) { + // Throw away outstanding data. + while (!data->writes.empty()) { + data->writes.pop(); + } + + // Extract the pending reads so we can fail them. + std::swap(data->reads, reads); + + closed = true; + data->readEnd = CLOSED; + + // Notify if write-end is still open! + notify = data->writeEnd == OPEN; + } + } + process::internal::release(&data->lock); + + // NOTE: We transition the promises outside the critical section + // to avoid triggering callbacks that try to reacquire the lock. + if (closed) { + while (!reads.empty()) { + reads.front()->fail("closed"); + reads.pop(); + } + + if (notify) { + data->readerClosure.set(Nothing()); + } + } + + return closed; +} + + +bool Pipe::Writer::write(const string& s) +{ + bool written = false; + Owned<Promise<string>> read; + + process::internal::acquire(&data->lock); + { + // Ignore writes if either end of the pipe is closed! + if (data->writeEnd == OPEN && data->readEnd == OPEN) { + // Don't bother surfacing empty writes to the readers. + if (!s.empty()) { + if (data->reads.empty()) { + data->writes.push(s); + } else { + read = data->reads.front(); + data->reads.pop(); + } + } + written = true; + } + } + process::internal::release(&data->lock); + + // NOTE: We set the promise outside the critical section to avoid + // triggering callbacks that try to reacquire the lock. + if (read.get() != NULL) { + read->set(s); + } + + return written; +} + + +bool Pipe::Writer::close() +{ + bool closed = false; + queue<Owned<Promise<string>>> reads; + + process::internal::acquire(&data->lock); + { + if (data->writeEnd == OPEN) { + // Extract all the pending reads so we can complete them. + std::swap(data->reads, reads); + + data->writeEnd = CLOSED; + closed = true; + } + } + process::internal::release(&data->lock); + + // NOTE: We set the promises outside the critical section to avoid + // triggering callbacks that try to reacquire the lock. + while (!reads.empty()) { + reads.front()->set(string("")); // End-of-file. + reads.pop(); + } + + return closed; +} + + +Future<Nothing> Pipe::Writer::readerClosed() +{ + return data->readerClosure.future(); +} + + hashmap<uint16_t, string> statuses; namespace query { http://git-wip-us.apache.org/repos/asf/mesos/blob/e76954ab/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index e7b029b..10ad670 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -175,8 +175,8 @@ private: // Demuxes and handles a response. bool process(const Future<Response>& future, const Request& request); - // Handles stream (i.e., pipe) based responses. - void stream(const Future<short>& poll, const Request& request); + // Handles stream based responses. + void stream(const Request& request, const Future<string>& chunk); Socket socket; // Wrap the socket to keep it from getting closed. @@ -194,12 +194,14 @@ private: delete future; } - // Helper for cleaning up a response (i.e., closing any open pipes + // Helper for cleaning up a response (i.e., closing any open Pipes // in the event Response::type is PIPE). static void cleanup(const Response& response) { if (response.type == Response::PIPE) { - os::close(response.pipe); + CHECK_SOME(response.reader); + http::Pipe::Reader reader = response.reader.get(); // Remove const. + reader.close(); } } @@ -209,7 +211,7 @@ private: queue<Item*> items; - Option<int> pipe; // Current pipe, if streaming. + Option<http::Pipe::Reader> pipe; // Current pipe, if streaming. }; @@ -939,7 +941,8 @@ HttpProxy::~HttpProxy() // Need to make sure response producers know not to continue to // create a response (streaming or otherwise). if (pipe.isSome()) { - os::close(pipe.get()); + http::Pipe::Reader reader = pipe.get(); + reader.close(); } pipe = None(); @@ -1074,29 +1077,23 @@ bool HttpProxy::process(const Future<Response>& future, const Request& request) // should be reported and no response sent. response.body.clear(); - // Make sure the pipe is nonblocking. - Try<Nothing> nonblock = os::nonblock(response.pipe); - if (nonblock.isError()) { - const char* error = strerror(errno); - VLOG(1) << "Failed make pipe nonblocking: " << error; - socket_manager->send(InternalServerError(), request, socket); - return true; // All done, can process next response. - } - // While the user is expected to properly set a 'Content-Type' // header, we fill in (or overwrite) 'Transfer-Encoding' header. response.headers["Transfer-Encoding"] = "chunked"; - VLOG(1) << "Starting \"chunked\" streaming"; + VLOG(3) << "Starting \"chunked\" streaming"; socket_manager->send( new HttpResponseEncoder(socket, response, request), true); - pipe = response.pipe; + CHECK_SOME(response.reader); + http::Pipe::Reader reader = response.reader.get(); - io::poll(pipe.get(), io::READ).onAny( - defer(self(), &Self::stream, lambda::_1, request)); + pipe = reader; + + reader.read() + .onAny(defer(self(), &Self::stream, request, lambda::_1)); return false; // Streaming, don't process next response (yet)! } else { @@ -1107,66 +1104,51 @@ bool HttpProxy::process(const Future<Response>& future, const Request& request) } -void HttpProxy::stream(const Future<short>& poll, const Request& request) +void HttpProxy::stream( + const Request& request, + const Future<string>& chunk) { - // TODO(benh): Use 'splice' on Linux. + CHECK_SOME(pipe); - CHECK(pipe.isSome()); + http::Pipe::Reader reader = pipe.get(); bool finished = false; // Whether we're done streaming. - if (poll.isReady()) { - // Read and write. - CHECK(poll.get() == io::READ); - const size_t size = 4 * 1024; // 4K. - char data[size]; - while (!finished) { - ssize_t length = ::read(pipe.get(), data, size); - if (length < 0 && (errno == EINTR)) { - // Interrupted, try again now. - continue; - } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { - // Might block, try again later. - io::poll(pipe.get(), io::READ).onAny( - defer(self(), &Self::stream, lambda::_1, request)); - break; - } else { - std::ostringstream out; - if (length <= 0) { - // Error or closed, treat both as closed. - if (length < 0) { - // Error. - const char* error = strerror(errno); - VLOG(1) << "Read error while streaming: " << error; - } - out << "0\r\n" << "\r\n"; - finished = true; - } else { - // Data! - out << std::hex << length << "\r\n"; - out.write(data, length); - out << "\r\n"; - } + if (chunk.isReady()) { + std::ostringstream out; - // We always persist the connection when we're not finished - // streaming. - socket_manager->send( - new DataEncoder(socket, out.str()), - finished ? request.keepAlive : true); - } + if (chunk.get().empty()) { + // Finished reading. + out << "0\r\n" << "\r\n"; + finished = true; + } else { + out << std::hex << chunk.get().size() << "\r\n"; + out << chunk.get(); + out << "\r\n"; + + // Keep reading. + reader.read() + .onAny(defer(self(), &Self::stream, request, lambda::_1)); } - } else if (poll.isFailed()) { - VLOG(1) << "Failed to poll: " << poll.failure(); + + // Always persist the connection when streaming is not finished. + socket_manager->send( + new DataEncoder(socket, out.str()), + finished ? request.keepAlive : true); + } else if (chunk.isFailed()) { + VLOG(1) << "Failed to read from stream: " << chunk.failure(); + // TODO(bmahler): Have to close connection if headers were sent! socket_manager->send(InternalServerError(), request, socket); finished = true; } else { - VLOG(1) << "Unexpected discarded future while polling"; + VLOG(1) << "Failed to read from stream: discarded"; + // TODO(bmahler): Have to close connection if headers were sent! socket_manager->send(InternalServerError(), request, socket); finished = true; } if (finished) { - os::close(pipe.get()); + reader.close(); pipe = None(); next(); } http://git-wip-us.apache.org/repos/asf/mesos/blob/e76954ab/3rdparty/libprocess/src/tests/http_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp index 800752a..17fb092 100644 --- a/3rdparty/libprocess/src/tests/http_tests.cpp +++ b/3rdparty/libprocess/src/tests/http_tests.cpp @@ -143,24 +143,24 @@ TEST(HTTP, Endpoints) AWAIT_EXPECT_EQ(response, socket.recv(response.size())); // Now hit '/pipe' (by using http::get). - int pipes[2]; - ASSERT_NE(-1, ::pipe(pipes)); - + http::Pipe pipe; http::OK ok; ok.type = http::Response::PIPE; - ok.pipe = pipes[0]; + ok.reader = pipe.reader(); - Future<Nothing> pipe; + Future<Nothing> request; EXPECT_CALL(process, pipe(_)) - .WillOnce(DoAll(FutureSatisfy(&pipe), + .WillOnce(DoAll(FutureSatisfy(&request), Return(ok))); Future<http::Response> future = http::get(process.self(), "pipe"); - AWAIT_READY(pipe); + AWAIT_READY(request); - ASSERT_SOME(os::write(pipes[1], "Hello World\n")); - ASSERT_SOME(os::close(pipes[1])); + // Write the response. + http::Pipe::Writer writer = pipe.writer(); + EXPECT_TRUE(writer.write("Hello World\n")); + EXPECT_TRUE(writer.close()); AWAIT_READY(future); EXPECT_EQ(http::statuses[200], future.get().status); @@ -172,6 +172,90 @@ TEST(HTTP, Endpoints) } +TEST(HTTP, PipeEOF) +{ + http::Pipe pipe; + http::Pipe::Reader reader = pipe.reader(); + http::Pipe::Writer writer = pipe.writer(); + + // A 'read' on an empty pipe should block. + Future<string> read = reader.read(); + EXPECT_TRUE(read.isPending()); + + // Writing an empty string should have no effect. + EXPECT_TRUE(writer.write("")); + EXPECT_TRUE(read.isPending()); + + // After a 'write' the pending 'read' should complete. + EXPECT_TRUE(writer.write("hello")); + ASSERT_TRUE(read.isReady()); + EXPECT_EQ("hello", read.get()); + + // After a 'write' a call to 'read' should be completed immediately. + ASSERT_TRUE(writer.write("world")); + + read = reader.read(); + ASSERT_TRUE(read.isReady()); + EXPECT_EQ("world", read.get()); + + // Close the write end of the pipe and ensure the remaining + // data can be read. + EXPECT_TRUE(writer.write("!")); + EXPECT_TRUE(writer.close()); + AWAIT_EQ("!", reader.read()); + + // End of file should be reached. + AWAIT_EQ("", reader.read()); + AWAIT_EQ("", reader.read()); + + // Writes to a pipe with the write end closed are ignored. + EXPECT_FALSE(writer.write("!")); + AWAIT_EQ("", reader.read()); + + // The write end cannot be closed twice. + EXPECT_FALSE(writer.close()); + + // Close the read end, this should not notify the writer + // since the write end was already closed. + EXPECT_TRUE(reader.close()); + EXPECT_TRUE(writer.readerClosed().isPending()); +} + + +TEST(HTTP, PipeReaderCloses) +{ + http::Pipe pipe; + http::Pipe::Reader reader = pipe.reader(); + http::Pipe::Writer writer = pipe.writer(); + + // If the read end of the pipe is closed, + // it should discard any unread data. + EXPECT_TRUE(writer.write("hello")); + EXPECT_TRUE(writer.write("world")); + + // The writer should discover the closure. + Future<Nothing> closed = writer.readerClosed(); + EXPECT_TRUE(reader.close()); + EXPECT_TRUE(closed.isReady()); + + // The read end is closed, subsequent reads will fail. + AWAIT_FAILED(reader.read()); + + // The read end is closed, writes are ignored. + EXPECT_FALSE(writer.write("!")); + AWAIT_FAILED(reader.read()); + + // The read end cannot be closed twice. + EXPECT_FALSE(reader.close()); + + // Close the write end. + EXPECT_TRUE(writer.close()); + + // Reads should fail since the read end is closed. + AWAIT_FAILED(reader.read()); +} + + TEST(HTTP, Encode) { string unencoded = "a$&+,/:;=?@ \"<>#%{}|\\^~[]`\x19\x80\xFF";
