Repository: mesos
Updated Branches:
  refs/heads/master d9dd07960 -> 33befcb17


Fix deadlock between SocketManager::send() and internal::send().

This was a deadlock between the 'this' and 'processes' synchronized
blocks that occured because the .then() on socket futures can be
invoked immediately with the new implementation of io::poll().

Review: https://reviews.apache.org/r/28367


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0a84e441
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0a84e441
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0a84e441

Branch: refs/heads/master
Commit: 0a84e441e51bd81fead2aaad5b7fbd2b2e8ad590
Parents: d9dd079
Author: Joris Van Remoortere <[email protected]>
Authored: Sun Nov 23 15:57:22 2014 -0800
Committer: Benjamin Hindman <[email protected]>
Committed: Sun Nov 23 15:57:23 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 67 ++++++++++++++++++++++++--------
 1 file changed, 51 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0a84e441/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index cf75b51..7799bd8 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1571,13 +1571,15 @@ void SocketManager::link(ProcessBase* process, const 
UPID& to)
 
   CHECK(process != NULL);
 
+  Socket socket;
+  bool connect = false;
+
   synchronized (this) {
     links[to].insert(process);
 
     // Check if node is remote and there isn't a persistant link.
     if (to.node != __node__  && persists.count(to.node) == 0) {
       // Okay, no link, let's create a socket.
-      Socket socket;
       int s = socket;
 
       sockets[s] = socket;
@@ -1592,13 +1594,17 @@ void SocketManager::link(ProcessBase* process, const 
UPID& to)
       // connected.
       outgoing[s];
 
-      socket.connect(to.node)
-        .onAny(lambda::bind(
-            &internal::link_connect,
-            lambda::_1,
-            new Socket(socket)));
+      connect = true;
     }
   }
+
+  if (connect) {
+    socket.connect(to.node)
+      .onAny(lambda::bind(
+          &internal::link_connect,
+          lambda::_1,
+          new Socket(socket)));
+  }
 }
 
 
@@ -1724,17 +1730,21 @@ void SocketManager::send(Encoder* encoder, bool persist)
 
       if (outgoing.count(socket) > 0) {
         outgoing[socket].push(encoder);
+        encoder = NULL;
       } else {
         // Initialize the outgoing queue.
         outgoing[socket];
-
-        internal::send(encoder, new Socket(socket));
       }
     } else {
       VLOG(1) << "Attempting to send on a no longer valid socket!";
       delete encoder;
+      encoder = NULL;
     }
   }
+
+  if (encoder != NULL) {
+    internal::send(encoder, new Socket(encoder->socket()));
+  }
 }
 
 
@@ -1802,6 +1812,9 @@ void SocketManager::send(Message* message)
 
   const Node& node = message->to.node;
 
+  Socket socket;
+  bool connect = false;
+
   synchronized (this) {
     // Check if there is already a socket.
     bool persist = persists.count(node) > 0;
@@ -1809,11 +1822,25 @@ void SocketManager::send(Message* message)
     if (persist || temp) {
       int s = persist ? persists[node] : temps[node];
       CHECK(sockets.count(s) > 0);
-      send(new MessageEncoder(sockets[s], message), persist);
+      socket = sockets[s];
+
+      // Update whether or not this socket should get disposed after
+      // there is no more data to send.
+      if (!persist) {
+        dispose.insert(socket);
+      }
+
+      if (outgoing.count(socket) > 0) {
+        outgoing[socket].push(new MessageEncoder(socket, message));
+        return;
+      } else {
+        // Initialize the outgoing queue.
+        outgoing[socket];
+      }
+
     } else {
       // No peristent or temporary socket to the node currently
       // exists, so we create a temporary one.
-      Socket socket;
       int s = socket;
 
       sockets[s] = socket;
@@ -1825,14 +1852,22 @@ void SocketManager::send(Message* message)
       // Initialize the outgoing queue.
       outgoing[s];
 
-      socket.connect(node)
-        .onAny(lambda::bind(
-            &internal::send_connect,
-            lambda::_1,
-            new Socket(socket),
-            message));
+      connect = true;
     }
   }
+
+  if (connect) {
+    socket.connect(node)
+      .onAny(lambda::bind(
+          &internal::send_connect,
+          lambda::_1,
+          new Socket(socket),
+          message));
+  } else {
+    // If we're not connecting and we haven't added the encoder to
+    // the 'outgoing' queue then schedule it to be sent.
+    internal::send(new MessageEncoder(socket, message), new Socket(socket));
+  }
 }
 
 

Reply via email to