Fix a race between SocketManager link() and send(). Review: https://reviews.apache.org/r/27967
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4b6e4010 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4b6e4010 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4b6e4010 Branch: refs/heads/master Commit: 4b6e4010a3d6461e249b11320e1d39d373da7c94 Parents: 3c6d431 Author: Joris Van Remoortere <[email protected]> Authored: Fri Nov 21 08:03:00 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Fri Nov 21 08:03:01 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/src/process.cpp | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/4b6e4010/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index 00cd89f..cf75b51 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -1511,6 +1511,10 @@ void ignore_read_data( } +// Forward declaration. +void send(Encoder* encoder, Socket* socket); + + void link_connect(const Future<Nothing>& future, Socket* socket) { if (future.isDiscarded() || future.isFailed()) { @@ -1532,8 +1536,26 @@ void link_connect(const Future<Nothing>& future, Socket* socket) socket, data, size)); + + // In order to avoid a race condition where internal::send() is + // called after SocketManager::link() but before the socket is + // connected, we initialize the 'outgoing' queue in + // SocketManager::link() and then check if the queue has anything in + // it to send during this connection completion. When a subsequent + // call to SocketManager::send() occurs we'll now just add the + // encoder to the 'outgoing' queue, and when we complete the + // connection here we'll start sending, otherwise when we call + // SocketManager::next() the 'outgoing' queue will get removed and + // any subsequent call to SocketManager::send() will take care of + // setting it back up and sending. + Encoder* encoder = socket_manager->next(*socket); + + if (encoder != NULL) { + send(encoder, new Socket(*socket)); + } } + } // namespace internal { @@ -1563,6 +1585,13 @@ void SocketManager::link(ProcessBase* process, const UPID& to) persists[to.node] = s; + // Initialize 'outgoing' to prevent a race with + // SocketManager::send() while the socket is not yet connected. + // Initializing the 'outgoing' queue prevents + // SocketManager::send() from trying to write before it's + // connected. + outgoing[s]; + socket.connect(to.node) .onAny(lambda::bind( &internal::link_connect,
