Repository: mesos Updated Branches: refs/heads/master f98f26fa5 -> f7fccce2e
Added failure semantics for http::Pipe::Writer. Review: https://reviews.apache.org/r/32346 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8b0bba19 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8b0bba19 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8b0bba19 Branch: refs/heads/master Commit: 8b0bba195058c9f209e9b4bb9716fb805161e847 Parents: f98f26f Author: Benjamin Mahler <[email protected]> Authored: Fri Mar 20 11:32:04 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Mon Mar 30 16:38:21 2015 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/http.hpp | 46 +++++++++++++++------ 3rdparty/libprocess/src/http.cpp | 50 +++++++++++++++++++---- 3rdparty/libprocess/src/tests/http_tests.cpp | 32 +++++++++++++++ 3 files changed, 106 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/8b0bba19/3rdparty/libprocess/include/process/http.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp index faffae7..a9ef5b7 100644 --- a/3rdparty/libprocess/include/process/http.hpp +++ b/3rdparty/libprocess/include/process/http.hpp @@ -117,17 +117,26 @@ public: public: // Returns data written to the pipe. // Returns an empty read when end-of-file is reached. - // Returns Failure if the read-end of the pipe is closed. + // Returns Failure if the writer failed, or the read-end + // is closed. Future<std::string> read(); // Closing the read-end of the pipe before the write-end closes - // will notify the writer that the reader is no longer interested. - // Returns false if the read-end of the pipe was already closed. + // or fails will notify the writer that the reader is no longer + // interested. Returns false if the read-end was already closed. bool close(); private: friend class Pipe; + + enum State + { + OPEN, + CLOSED, + }; + explicit Reader(const memory::shared_ptr<Data>& _data) : data(_data) {} + memory::shared_ptr<Data> data; }; @@ -141,9 +150,14 @@ public: // Closing the write-end of the pipe will send end-of-file // to the reader. Returns false if the write-end of the pipe - // was already closed. + // was already closed or failed. bool close(); + // Closes the write-end of the pipe but sends a failure + // to the reader rather than end-of-file. Returns false + // if the write-end of the pipe was already closed or failed. + bool fail(const std::string& message); + // Returns Nothing when the read-end of the pipe is closed // before the write-end is closed, which means the reader // was unable to continue reading! @@ -151,7 +165,16 @@ public: private: friend class Pipe; + + enum State + { + OPEN, + CLOSED, + FAILED, + }; + explicit Writer(const memory::shared_ptr<Data>& _data) : data(_data) {} + memory::shared_ptr<Data> data; }; @@ -161,23 +184,17 @@ public: Writer writer() const; private: - enum State - { - OPEN, - CLOSED, - }; - struct Data { - Data() : lock(0), readEnd(OPEN), writeEnd(OPEN) {} + Data() : lock(0), readEnd(Reader::OPEN), writeEnd(Writer::OPEN) {} // Rather than use a process to serialize access to the pipe's // internal data we use a low-level "lock" which we acquire and // release using atomic builtins. int lock; - State readEnd; - State writeEnd; + Reader::State readEnd; + Writer::State writeEnd; // Represents readers waiting for data from the pipe. std::queue<Owned<Promise<std::string>>> reads; @@ -188,6 +205,9 @@ private: // Signals when the read-end is closed before the write-end. Promise<Nothing> readerClosure; + + // Failure reason when the 'writeEnd' is FAILED. + Option<Failure> failure; }; memory::shared_ptr<Data> data; http://git-wip-us.apache.org/repos/asf/mesos/blob/8b0bba19/3rdparty/libprocess/src/http.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp index 150ff33..cd52cc8 100644 --- a/3rdparty/libprocess/src/http.cpp +++ b/3rdparty/libprocess/src/http.cpp @@ -180,13 +180,16 @@ Future<string> Pipe::Reader::read() process::internal::acquire(&data->lock); { - if (data->readEnd == CLOSED) { + if (data->readEnd == Reader::CLOSED) { future = Failure("closed"); } else if (!data->writes.empty()) { future = data->writes.front(); data->writes.pop(); - } else if (data->writeEnd == CLOSED) { + } else if (data->writeEnd == Writer::CLOSED) { future = ""; // End-of-file. + } else if (data->writeEnd == Writer::FAILED) { + CHECK_SOME(data->failure); + future = data->failure.get(); } else { data->reads.push(Owned<Promise<string>>(new Promise<string>())); future = data->reads.back()->future(); @@ -206,7 +209,7 @@ bool Pipe::Reader::close() process::internal::acquire(&data->lock); { - if (data->readEnd == OPEN) { + if (data->readEnd == Reader::OPEN) { // Throw away outstanding data. while (!data->writes.empty()) { data->writes.pop(); @@ -216,10 +219,10 @@ bool Pipe::Reader::close() std::swap(data->reads, reads); closed = true; - data->readEnd = CLOSED; + data->readEnd = Reader::CLOSED; // Notify if write-end is still open! - notify = data->writeEnd == OPEN; + notify = data->writeEnd == Writer::OPEN; } } process::internal::release(&data->lock); @@ -248,8 +251,8 @@ bool Pipe::Writer::write(const string& s) process::internal::acquire(&data->lock); { - // Ignore writes if either end of the pipe is closed! - if (data->writeEnd == OPEN && data->readEnd == OPEN) { + // Ignore writes if either end of the pipe is closed or failed! + if (data->writeEnd == Writer::OPEN && data->readEnd == Reader::OPEN) { // Don't bother surfacing empty writes to the readers. if (!s.empty()) { if (data->reads.empty()) { @@ -281,11 +284,11 @@ bool Pipe::Writer::close() process::internal::acquire(&data->lock); { - if (data->writeEnd == OPEN) { + if (data->writeEnd == Writer::OPEN) { // Extract all the pending reads so we can complete them. std::swap(data->reads, reads); - data->writeEnd = CLOSED; + data->writeEnd = Writer::CLOSED; closed = true; } } @@ -302,6 +305,35 @@ bool Pipe::Writer::close() } +bool Pipe::Writer::fail(const string& message) +{ + bool failed = false; + queue<Owned<Promise<string>>> reads; + + process::internal::acquire(&data->lock); + { + if (data->writeEnd == Writer::OPEN) { + // Extract all the pending reads so we can fail them. + std::swap(data->reads, reads); + + data->writeEnd = Writer::FAILED; + data->failure = Failure(message); + failed = true; + } + } + process::internal::release(&data->lock); + + // NOTE: We set the promises outside the critical section to avoid + // triggering callbacks that try to reacquire the lock. + while (!reads.empty()) { + reads.front()->fail(message); + reads.pop(); + } + + return failed; +} + + Future<Nothing> Pipe::Writer::readerClosed() { return data->readerClosure.future(); http://git-wip-us.apache.org/repos/asf/mesos/blob/8b0bba19/3rdparty/libprocess/src/tests/http_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp index e254506..83219da 100644 --- a/3rdparty/libprocess/src/tests/http_tests.cpp +++ b/3rdparty/libprocess/src/tests/http_tests.cpp @@ -230,6 +230,38 @@ TEST(HTTP, PipeEOF) } +TEST(HTTP, PipeFailure) +{ + http::Pipe pipe; + http::Pipe::Reader reader = pipe.reader(); + http::Pipe::Writer writer = pipe.writer(); + + // Fail the writer after writing some data. + EXPECT_TRUE(writer.write("hello")); + EXPECT_TRUE(writer.write("world")); + + EXPECT_TRUE(writer.fail("disconnected!")); + + // The reader should read the data, followed by the failure. + AWAIT_EQ("hello", reader.read()); + AWAIT_EQ("world", reader.read()); + + Future<string> read = reader.read(); + EXPECT_TRUE(read.isFailed()); + EXPECT_EQ("disconnected!", read.failure()); + + // The writer cannot close or fail an already failed pipe. + EXPECT_FALSE(writer.close()); + EXPECT_FALSE(writer.fail("not again")); + + // The writer shouldn't be notified of the reader closing, + // since the writer had already failed. + EXPECT_TRUE(reader.close()); + EXPECT_TRUE(writer.readerClosed().isPending()); +} + + + TEST(HTTP, PipeReaderCloses) { http::Pipe pipe;
