Used io::poll instead of libev for recv_data. Review: https://reviews.apache.org/r/27507
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/47511670 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/47511670 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/47511670 Branch: refs/heads/master Commit: 47511670aaad85ab068902151c9c3f84573fbc99 Parents: 37bba65 Author: Benjamin Hindman <[email protected]> Authored: Sun Nov 2 20:30:48 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sat Nov 15 16:25:58 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/src/process.cpp | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/47511670/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index aeaac0c..528cb88 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -589,12 +589,8 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents) } -void recv_data(struct ev_loop* loop, ev_io* watcher, int revents) +void recv_data(DataDecoder* decoder, int s) { - DataDecoder* decoder = (DataDecoder*) watcher->data; - - int s = watcher->fd; - while (true) { const ssize_t size = 80 * 1024; ssize_t length = 0; @@ -608,6 +604,8 @@ void recv_data(struct ev_loop* loop, ev_io* watcher, int revents) continue; } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { // Might block, try again later. + io::poll(s, io::READ) + .onAny(lambda::bind(&recv_data, decoder, s)); break; } else if (length <= 0) { // Socket error or closed. @@ -619,8 +617,6 @@ void recv_data(struct ev_loop* loop, ev_io* watcher, int revents) } socket_manager->close(s); delete decoder; - ev_io_stop(loop, watcher); - delete watcher; break; } else { CHECK(length > 0); @@ -636,8 +632,6 @@ void recv_data(struct ev_loop* loop, ev_io* watcher, int revents) VLOG(1) << "Decoder error while receiving"; socket_manager->close(s); delete decoder; - ev_io_stop(loop, watcher); - delete watcher; break; } } @@ -913,14 +907,9 @@ void accept(struct ev_loop* loop, ev_io* watcher, int revents) // Inform the socket manager for proper bookkeeping. const Socket& socket = socket_manager->accepted(s); - // Allocate and initialize the decoder and watcher. - DataDecoder* decoder = new DataDecoder(socket); - - ev_io* watcher = new ev_io(); - watcher->data = decoder; - - ev_io_init(watcher, recv_data, s, EV_READ); - ev_io_start(loop, watcher); + // Start reading from the socket. + io::poll(s, io::READ) + .onAny(lambda::bind(&recv_data, new DataDecoder(socket), s)); } }
