Fix a race between SocketManager link() and send().

Review: https://reviews.apache.org/r/27967


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4b6e4010
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4b6e4010
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4b6e4010

Branch: refs/heads/master
Commit: 4b6e4010a3d6461e249b11320e1d39d373da7c94
Parents: 3c6d431
Author: Joris Van Remoortere <[email protected]>
Authored: Fri Nov 21 08:03:00 2014 -0800
Committer: Benjamin Hindman <[email protected]>
Committed: Fri Nov 21 08:03:01 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 29 +++++++++++++++++++++++++++++
 1 file changed, 29 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4b6e4010/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index 00cd89f..cf75b51 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1511,6 +1511,10 @@ void ignore_read_data(
 }
 
 
+// Forward declaration.
+void send(Encoder* encoder, Socket* socket);
+
+
 void link_connect(const Future<Nothing>& future, Socket* socket)
 {
   if (future.isDiscarded() || future.isFailed()) {
@@ -1532,8 +1536,26 @@ void link_connect(const Future<Nothing>& future, Socket* 
socket)
         socket,
         data,
         size));
+
+  // In order to avoid a race condition where internal::send() is
+  // called after SocketManager::link() but before the socket is
+  // connected, we initialize the 'outgoing' queue in
+  // SocketManager::link() and then check if the queue has anything in
+  // it to send during this connection completion. When a subsequent
+  // call to SocketManager::send() occurs we'll now just add the
+  // encoder to the 'outgoing' queue, and when we complete the
+  // connection here we'll start sending, otherwise when we call
+  // SocketManager::next() the 'outgoing' queue will get removed and
+  // any subsequent call to SocketManager::send() will take care of
+  // setting it back up and sending.
+  Encoder* encoder = socket_manager->next(*socket);
+
+  if (encoder != NULL) {
+    send(encoder, new Socket(*socket));
+  }
 }
 
+
 } // namespace internal {
 
 
@@ -1563,6 +1585,13 @@ void SocketManager::link(ProcessBase* process, const 
UPID& to)
 
       persists[to.node] = s;
 
+      // Initialize 'outgoing' to prevent a race with
+      // SocketManager::send() while the socket is not yet connected.
+      // Initializing the 'outgoing' queue prevents
+      // SocketManager::send() from trying to write before it's
+      // connected.
+      outgoing[s];
+
       socket.connect(to.node)
         .onAny(lambda::bind(
             &internal::link_connect,

Reply via email to