Repository: mesos
Updated Branches:
  refs/heads/master f98f26fa5 -> f7fccce2e


Added failure semantics for http::Pipe::Writer.

Review: https://reviews.apache.org/r/32346


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8b0bba19
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8b0bba19
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8b0bba19

Branch: refs/heads/master
Commit: 8b0bba195058c9f209e9b4bb9716fb805161e847
Parents: f98f26f
Author: Benjamin Mahler <[email protected]>
Authored: Fri Mar 20 11:32:04 2015 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Mon Mar 30 16:38:21 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/http.hpp | 46 +++++++++++++++------
 3rdparty/libprocess/src/http.cpp             | 50 +++++++++++++++++++----
 3rdparty/libprocess/src/tests/http_tests.cpp | 32 +++++++++++++++
 3 files changed, 106 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8b0bba19/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp 
b/3rdparty/libprocess/include/process/http.hpp
index faffae7..a9ef5b7 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -117,17 +117,26 @@ public:
   public:
     // Returns data written to the pipe.
     // Returns an empty read when end-of-file is reached.
-    // Returns Failure if the read-end of the pipe is closed.
+    // Returns Failure if the writer failed, or the read-end
+    // is closed.
     Future<std::string> read();
 
     // Closing the read-end of the pipe before the write-end closes
-    // will notify the writer that the reader is no longer interested.
-    // Returns false if the read-end of the pipe was already closed.
+    // or fails will notify the writer that the reader is no longer
+    // interested. Returns false if the read-end was already closed.
     bool close();
 
   private:
     friend class Pipe;
+
+    enum State
+    {
+      OPEN,
+      CLOSED,
+    };
+
     explicit Reader(const memory::shared_ptr<Data>& _data) : data(_data) {}
+
     memory::shared_ptr<Data> data;
   };
 
@@ -141,9 +150,14 @@ public:
 
     // Closing the write-end of the pipe will send end-of-file
     // to the reader. Returns false if the write-end of the pipe
-    // was already closed.
+    // was already closed or failed.
     bool close();
 
+    // Closes the write-end of the pipe but sends a failure
+    // to the reader rather than end-of-file. Returns false
+    // if the write-end of the pipe was already closed or failed.
+    bool fail(const std::string& message);
+
     // Returns Nothing when the read-end of the pipe is closed
     // before the write-end is closed, which means the reader
     // was unable to continue reading!
@@ -151,7 +165,16 @@ public:
 
   private:
     friend class Pipe;
+
+    enum State
+    {
+      OPEN,
+      CLOSED,
+      FAILED,
+    };
+
     explicit Writer(const memory::shared_ptr<Data>& _data) : data(_data) {}
+
     memory::shared_ptr<Data> data;
   };
 
@@ -161,23 +184,17 @@ public:
   Writer writer() const;
 
 private:
-  enum State
-  {
-    OPEN,
-    CLOSED,
-  };
-
   struct Data
   {
-    Data() : lock(0), readEnd(OPEN), writeEnd(OPEN) {}
+    Data() : lock(0), readEnd(Reader::OPEN), writeEnd(Writer::OPEN) {}
 
     // Rather than use a process to serialize access to the pipe's
     // internal data we use a low-level "lock" which we acquire and
     // release using atomic builtins.
     int lock;
 
-    State readEnd;
-    State writeEnd;
+    Reader::State readEnd;
+    Writer::State writeEnd;
 
     // Represents readers waiting for data from the pipe.
     std::queue<Owned<Promise<std::string>>> reads;
@@ -188,6 +205,9 @@ private:
 
     // Signals when the read-end is closed before the write-end.
     Promise<Nothing> readerClosure;
+
+    // Failure reason when the 'writeEnd' is FAILED.
+    Option<Failure> failure;
   };
 
   memory::shared_ptr<Data> data;

http://git-wip-us.apache.org/repos/asf/mesos/blob/8b0bba19/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 150ff33..cd52cc8 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -180,13 +180,16 @@ Future<string> Pipe::Reader::read()
 
   process::internal::acquire(&data->lock);
   {
-    if (data->readEnd == CLOSED) {
+    if (data->readEnd == Reader::CLOSED) {
       future = Failure("closed");
     } else if (!data->writes.empty()) {
       future = data->writes.front();
       data->writes.pop();
-    } else if (data->writeEnd == CLOSED) {
+    } else if (data->writeEnd == Writer::CLOSED) {
       future = ""; // End-of-file.
+    } else if (data->writeEnd == Writer::FAILED) {
+      CHECK_SOME(data->failure);
+      future = data->failure.get();
     } else {
       data->reads.push(Owned<Promise<string>>(new Promise<string>()));
       future = data->reads.back()->future();
@@ -206,7 +209,7 @@ bool Pipe::Reader::close()
 
   process::internal::acquire(&data->lock);
   {
-    if (data->readEnd == OPEN) {
+    if (data->readEnd == Reader::OPEN) {
       // Throw away outstanding data.
       while (!data->writes.empty()) {
         data->writes.pop();
@@ -216,10 +219,10 @@ bool Pipe::Reader::close()
       std::swap(data->reads, reads);
 
       closed = true;
-      data->readEnd = CLOSED;
+      data->readEnd = Reader::CLOSED;
 
       // Notify if write-end is still open!
-      notify = data->writeEnd == OPEN;
+      notify = data->writeEnd == Writer::OPEN;
     }
   }
   process::internal::release(&data->lock);
@@ -248,8 +251,8 @@ bool Pipe::Writer::write(const string& s)
 
   process::internal::acquire(&data->lock);
   {
-    // Ignore writes if either end of the pipe is closed!
-    if (data->writeEnd == OPEN && data->readEnd == OPEN) {
+    // Ignore writes if either end of the pipe is closed or failed!
+    if (data->writeEnd == Writer::OPEN && data->readEnd == Reader::OPEN) {
       // Don't bother surfacing empty writes to the readers.
       if (!s.empty()) {
         if (data->reads.empty()) {
@@ -281,11 +284,11 @@ bool Pipe::Writer::close()
 
   process::internal::acquire(&data->lock);
   {
-    if (data->writeEnd == OPEN) {
+    if (data->writeEnd == Writer::OPEN) {
       // Extract all the pending reads so we can complete them.
       std::swap(data->reads, reads);
 
-      data->writeEnd = CLOSED;
+      data->writeEnd = Writer::CLOSED;
       closed = true;
     }
   }
@@ -302,6 +305,35 @@ bool Pipe::Writer::close()
 }
 
 
+bool Pipe::Writer::fail(const string& message)
+{
+  bool failed = false;
+  queue<Owned<Promise<string>>> reads;
+
+  process::internal::acquire(&data->lock);
+  {
+    if (data->writeEnd == Writer::OPEN) {
+      // Extract all the pending reads so we can fail them.
+      std::swap(data->reads, reads);
+
+      data->writeEnd = Writer::FAILED;
+      data->failure = Failure(message);
+      failed = true;
+    }
+  }
+  process::internal::release(&data->lock);
+
+  // NOTE: We set the promises outside the critical section to avoid
+  // triggering callbacks that try to reacquire the lock.
+  while (!reads.empty()) {
+    reads.front()->fail(message);
+    reads.pop();
+  }
+
+  return failed;
+}
+
+
 Future<Nothing> Pipe::Writer::readerClosed()
 {
   return data->readerClosure.future();

http://git-wip-us.apache.org/repos/asf/mesos/blob/8b0bba19/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp 
b/3rdparty/libprocess/src/tests/http_tests.cpp
index e254506..83219da 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -230,6 +230,38 @@ TEST(HTTP, PipeEOF)
 }
 
 
+TEST(HTTP, PipeFailure)
+{
+  http::Pipe pipe;
+  http::Pipe::Reader reader = pipe.reader();
+  http::Pipe::Writer writer = pipe.writer();
+
+  // Fail the writer after writing some data.
+  EXPECT_TRUE(writer.write("hello"));
+  EXPECT_TRUE(writer.write("world"));
+
+  EXPECT_TRUE(writer.fail("disconnected!"));
+
+  // The reader should read the data, followed by the failure.
+  AWAIT_EQ("hello", reader.read());
+  AWAIT_EQ("world", reader.read());
+
+  Future<string> read = reader.read();
+  EXPECT_TRUE(read.isFailed());
+  EXPECT_EQ("disconnected!", read.failure());
+
+  // The writer cannot close or fail an already failed pipe.
+  EXPECT_FALSE(writer.close());
+  EXPECT_FALSE(writer.fail("not again"));
+
+  // The writer shouldn't be notified of the reader closing,
+  // since the writer had already failed.
+  EXPECT_TRUE(reader.close());
+  EXPECT_TRUE(writer.readerClosed().isPending());
+}
+
+
+
 TEST(HTTP, PipeReaderCloses)
 {
   http::Pipe pipe;

Reply via email to