Repository: mesos Updated Branches: refs/heads/master d9dd07960 -> 33befcb17
Fix deadlock between SocketManager::send() and internal::send(). This was a deadlock between the 'this' and 'processes' synchronized blocks that occured because the .then() on socket futures can be invoked immediately with the new implementation of io::poll(). Review: https://reviews.apache.org/r/28367 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0a84e441 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0a84e441 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0a84e441 Branch: refs/heads/master Commit: 0a84e441e51bd81fead2aaad5b7fbd2b2e8ad590 Parents: d9dd079 Author: Joris Van Remoortere <[email protected]> Authored: Sun Nov 23 15:57:22 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sun Nov 23 15:57:23 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/src/process.cpp | 67 ++++++++++++++++++++++++-------- 1 file changed, 51 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/0a84e441/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index cf75b51..7799bd8 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -1571,13 +1571,15 @@ void SocketManager::link(ProcessBase* process, const UPID& to) CHECK(process != NULL); + Socket socket; + bool connect = false; + synchronized (this) { links[to].insert(process); // Check if node is remote and there isn't a persistant link. if (to.node != __node__ && persists.count(to.node) == 0) { // Okay, no link, let's create a socket. - Socket socket; int s = socket; sockets[s] = socket; @@ -1592,13 +1594,17 @@ void SocketManager::link(ProcessBase* process, const UPID& to) // connected. outgoing[s]; - socket.connect(to.node) - .onAny(lambda::bind( - &internal::link_connect, - lambda::_1, - new Socket(socket))); + connect = true; } } + + if (connect) { + socket.connect(to.node) + .onAny(lambda::bind( + &internal::link_connect, + lambda::_1, + new Socket(socket))); + } } @@ -1724,17 +1730,21 @@ void SocketManager::send(Encoder* encoder, bool persist) if (outgoing.count(socket) > 0) { outgoing[socket].push(encoder); + encoder = NULL; } else { // Initialize the outgoing queue. outgoing[socket]; - - internal::send(encoder, new Socket(socket)); } } else { VLOG(1) << "Attempting to send on a no longer valid socket!"; delete encoder; + encoder = NULL; } } + + if (encoder != NULL) { + internal::send(encoder, new Socket(encoder->socket())); + } } @@ -1802,6 +1812,9 @@ void SocketManager::send(Message* message) const Node& node = message->to.node; + Socket socket; + bool connect = false; + synchronized (this) { // Check if there is already a socket. bool persist = persists.count(node) > 0; @@ -1809,11 +1822,25 @@ void SocketManager::send(Message* message) if (persist || temp) { int s = persist ? persists[node] : temps[node]; CHECK(sockets.count(s) > 0); - send(new MessageEncoder(sockets[s], message), persist); + socket = sockets[s]; + + // Update whether or not this socket should get disposed after + // there is no more data to send. + if (!persist) { + dispose.insert(socket); + } + + if (outgoing.count(socket) > 0) { + outgoing[socket].push(new MessageEncoder(socket, message)); + return; + } else { + // Initialize the outgoing queue. + outgoing[socket]; + } + } else { // No peristent or temporary socket to the node currently // exists, so we create a temporary one. - Socket socket; int s = socket; sockets[s] = socket; @@ -1825,14 +1852,22 @@ void SocketManager::send(Message* message) // Initialize the outgoing queue. outgoing[s]; - socket.connect(node) - .onAny(lambda::bind( - &internal::send_connect, - lambda::_1, - new Socket(socket), - message)); + connect = true; } } + + if (connect) { + socket.connect(node) + .onAny(lambda::bind( + &internal::send_connect, + lambda::_1, + new Socket(socket), + message)); + } else { + // If we're not connecting and we haven't added the encoder to + // the 'outgoing' queue then schedule it to be sent. + internal::send(new MessageEncoder(socket, message), new Socket(socket)); + } }
