Made http::internal::ConnectionProcess a managed Process.

Per the details in MESOS-4658, the `http::Connection` abstraction is
prone to the deadlock of `process::wait`ing on itself. This occurs
when the `http::internal::ConnectionProcess` exposes a Future on
which the caller binds a copy of `http::Connection`. When completing
the Future, the Future clears its callbacks within the execution
context of the `http::internal::ConnectionProcess`. If the last copy
of the `http::Connection` was present within one of the callbacks,
the `http::internal::ConnectionProcess` will wait on itself, leading
to a deadlock.

This surfaces a general pattern of having libprocess manage a
Process when it cannot be guaranteed that the Process will be
waited on by another execution context.

Review: https://reviews.apache.org/r/47091


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/92cff922
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/92cff922
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/92cff922

Branch: refs/heads/master
Commit: 92cff92246761d417d1e1dd8b263b98845380009
Parents: 40a5b23
Author: Benjamin Mahler <[email protected]>
Authored: Sat May 7 15:23:29 2016 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Mon May 9 17:29:53 2016 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/http.cpp | 43 ++++++++++++++++++-----------------
 1 file changed, 22 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/92cff922/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 48f91d0..b7839b1 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -31,7 +31,6 @@
 #include <tuple>
 #include <vector>
 
-#include <process/async.hpp>
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
 #include <process/future.hpp>
@@ -1158,11 +1157,18 @@ private:
 
 struct Connection::Data
 {
+  // We spawn `ConnectionProcess` as a managed process to guarantee
+  // that it does not wait on itself (this would cause a deadlock!).
+  // See MESOS-4658 for details.
+  //
+  // TODO(bmahler): This surfaces a general pattern that we
+  // should enforce: have libprocess manage a Process when
+  // it cannot be guaranteed that the Process will be waited
+  // on within a different execution context. More generally,
+  // we should be passing Process ownership to libprocess to
+  // ensure all interaction with a Process occurs through a PID.
   Data(const Socket& s)
-    : process(new internal::ConnectionProcess(s))
-  {
-    spawn(process.get());
-  }
+    : process(spawn(new internal::ConnectionProcess(s), true)) {}
 
   ~Data()
   {
@@ -1171,11 +1177,10 @@ struct Connection::Data
     // to ensure we don't drop any queued request dispatches
     // which would leave the caller with a future stuck in
     // a pending state.
-    terminate(process.get(), false);
-    wait(process.get());
+    terminate(process, false);
   }
 
-  Owned<internal::ConnectionProcess> process;
+  PID<internal::ConnectionProcess> process;
 };
 
 
@@ -1188,7 +1193,7 @@ Future<Response> Connection::send(
     bool streamedResponse)
 {
   return dispatch(
-      data->process.get(),
+      data->process,
       &internal::ConnectionProcess::send,
       request,
       streamedResponse);
@@ -1198,7 +1203,7 @@ Future<Response> Connection::send(
 Future<Nothing> Connection::disconnect()
 {
   return dispatch(
-      data->process.get(),
+      data->process,
       &internal::ConnectionProcess::disconnect,
       None());
 }
@@ -1207,7 +1212,7 @@ Future<Nothing> Connection::disconnect()
 Future<Nothing> Connection::disconnected()
 {
   return dispatch(
-      data->process.get(),
+      data->process,
       &internal::ConnectionProcess::disconnected);
 }
 
@@ -1321,20 +1326,16 @@ Future<Response> request(const Request& request, bool 
streamedResponse)
   // We rely on the connection closing after the response.
   CHECK(!request.keepAlive);
 
-  // This is a one time request which will close the connection when
-  // the response is received. Since 'Connection' is reference-counted,
-  // we must keep a copy around until the disconnection occurs. Note
-  // that in order to avoid a deadlock (Connection destruction occurring
-  // from the ConnectionProcess execution context), we use 'async'.
   return http::connect(request.url)
     .then([=](Connection connection) {
       Future<Response> response = connection.send(request, streamedResponse);
 
-      Connection* copy = new Connection(std::move(connection));
-      auto deleter = [copy](){ delete copy; };
-
-      copy->disconnected()
-        .onAny([=]() { async(deleter); });
+      // This is a non Keep-Alive request which means the connection
+      // will be closed when the response is received. Since the
+      // 'Connection' is reference-counted, we must maintain a copy
+      // until the disconnection occurs.
+      connection.disconnected()
+        .onAny([connection]() {});
 
       return response;
     });

Reply via email to