Used io::poll instead of libev for receiving_connect. Review: https://reviews.apache.org/r/27509
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bc23da1b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bc23da1b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bc23da1b Branch: refs/heads/master Commit: bc23da1b04aeda9d35ee2f9952fc82248b1ff313 Parents: 9f94d9a Author: Benjamin Hindman <[email protected]> Authored: Sun Nov 2 20:49:51 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sat Nov 15 16:25:58 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/src/process.cpp | 36 ++++++++------------------------ 1 file changed, 9 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/bc23da1b/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index bc2884b..a33a201 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -839,10 +839,8 @@ void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents) } -void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents) +void receiving_connect(Socket* socket, int s) { - int s = watcher->fd; - // Now check that a successful connection was made. int opt; socklen_t optlen = sizeof(opt); @@ -851,15 +849,9 @@ void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents) // Connect failure. VLOG(1) << "Socket error while connecting"; socket_manager->close(s); - Socket* socket = (Socket*) watcher->data; delete socket; - ev_io_stop(loop, watcher); - delete watcher; } else { // We're connected! Now let's do some receiving. - Socket* socket = (Socket*) watcher->data; - ev_io_stop(loop, watcher); - delete watcher; io::poll(s, io::READ) .onAny(lambda::bind(&ignore_data, socket, s)); } @@ -1567,15 +1559,12 @@ void SocketManager::link(ProcessBase* process, const UPID& to) persists[to.node] = s; - // Allocate and initialize a watcher for reading data from this - // socket. Note that we don't expect to receive anything other - // than HTTP '202 Accepted' responses which we anyway ignore. - // We do, however, want to react when it gets closed so we can - // generate appropriate lost events (since this is a 'link'). - ev_io* watcher = new ev_io(); - watcher->data = new Socket(sockets[s]); - - // Try and connect to the node using this socket. + // Try and connect to the node using this socket in order to + // start reading data. Note that we don't expect to receive + // anything other than HTTP '202 Accepted' responses which we + // anyway ignore. We do, however, want to react when it gets + // closed so we can generate appropriate lost events (since this + // is a 'link'). sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = PF_INET; @@ -1588,15 +1577,8 @@ void SocketManager::link(ProcessBase* process, const UPID& to) } // Wait for socket to be connected. - ev_io_init(watcher, receiving_connect, s, EV_WRITE); - - // Enqueue the watcher. - synchronized (watchers) { - watchers->push(watcher); - } - - // Interrupt the loop. - ev_async_send(loop, &async_watcher); + io::poll(s, io::WRITE) + .onAny(lambda::bind(&receiving_connect, new Socket(sockets[s]), s)); } else { io::poll(s, io::READ) .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
