Used io::poll instead of libev for ignore_data. Review: https://reviews.apache.org/r/27508
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9f94d9a8 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9f94d9a8 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9f94d9a8 Branch: refs/heads/master Commit: 9f94d9a8a42352345bffb0f1be0253952731b316 Parents: 4751167 Author: Benjamin Hindman <[email protected]> Authored: Sun Nov 2 20:43:29 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sat Nov 15 16:25:58 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/src/process.cpp | 54 ++++++++++++++------------------ 1 file changed, 23 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/9f94d9a8/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index 528cb88..bc2884b 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -644,12 +644,8 @@ void recv_data(DataDecoder* decoder, int s) // SocketManager::send(Message) where we don't care about the data // received we mostly just want to know when the socket has been // closed. -void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents) +void ignore_data(Socket* socket, int s) { - Socket* socket = (Socket*) watcher->data; - - int s = watcher->fd; - while (true) { const ssize_t size = 80 * 1024; ssize_t length = 0; @@ -663,6 +659,8 @@ void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents) continue; } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { // Might block, try again later. + io::poll(s, io::READ) + .onAny(lambda::bind(&ignore_data, socket, s)); break; } else if (length <= 0) { // Socket error or closed. @@ -673,9 +671,7 @@ void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents) VLOG(2) << "Socket closed while receiving"; } socket_manager->close(s); - ev_io_stop(loop, watcher); delete socket; - delete watcher; break; } else { VLOG(2) << "Ignoring " << length << " bytes of data received " @@ -861,9 +857,11 @@ void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents) delete watcher; } else { // We're connected! Now let's do some receiving. + Socket* socket = (Socket*) watcher->data; ev_io_stop(loop, watcher); - ev_io_init(watcher, ignore_data, s, EV_READ); - ev_io_start(loop, watcher); + delete watcher; + io::poll(s, io::READ) + .onAny(lambda::bind(&ignore_data, socket, s)); } } @@ -1591,17 +1589,18 @@ void SocketManager::link(ProcessBase* process, const UPID& to) // Wait for socket to be connected. ev_io_init(watcher, receiving_connect, s, EV_WRITE); - } else { - ev_io_init(watcher, ignore_data, s, EV_READ); - } - // Enqueue the watcher. - synchronized (watchers) { - watchers->push(watcher); - } + // Enqueue the watcher. + synchronized (watchers) { + watchers->push(watcher); + } - // Interrupt the loop. - ev_async_send(loop, &async_watcher); + // Interrupt the loop. + ev_async_send(loop, &async_watcher); + } else { + io::poll(s, io::READ) + .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s)); + } } links[to].insert(process); @@ -1741,21 +1740,14 @@ void SocketManager::send(Message* message) // Initialize the outgoing queue. outgoing[s]; - // Allocate and initialize a watcher for reading data from this - // socket. Note that we don't expect to receive anything other - // than HTTP '202 Accepted' responses which we anyway ignore. - ev_io* watcher = new ev_io(); - watcher->data = new Socket(sockets[s]); - - ev_io_init(watcher, ignore_data, s, EV_READ); - - // Enqueue the watcher. - synchronized (watchers) { - watchers->push(watcher); - } + // Read and ignore data from this socket. Note that we don't + // expect to receive anything other than HTTP '202 Accepted' + // responses which we just ignore. + io::poll(s, io::READ) + .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s)); // Allocate and initialize a watcher for sending the message. - watcher = new ev_io(); + ev_io* watcher = new ev_io(); watcher->data = new MessageEncoder(sockets[s], message); // Try and connect to the node using this socket.
