Used io::poll instead of libev for recv_data.

Review: https://reviews.apache.org/r/27507


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/47511670
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/47511670
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/47511670

Branch: refs/heads/master
Commit: 47511670aaad85ab068902151c9c3f84573fbc99
Parents: 37bba65
Author: Benjamin Hindman <[email protected]>
Authored: Sun Nov 2 20:30:48 2014 -0800
Committer: Benjamin Hindman <[email protected]>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 23 ++++++-----------------
 1 file changed, 6 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/47511670/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index aeaac0c..528cb88 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -589,12 +589,8 @@ void handle_async(struct ev_loop* loop, ev_async* _, int 
revents)
 }
 
 
-void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
+void recv_data(DataDecoder* decoder, int s)
 {
-  DataDecoder* decoder = (DataDecoder*) watcher->data;
-
-  int s = watcher->fd;
-
   while (true) {
     const ssize_t size = 80 * 1024;
     ssize_t length = 0;
@@ -608,6 +604,8 @@ void recv_data(struct ev_loop* loop, ev_io* watcher, int 
revents)
       continue;
     } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
       // Might block, try again later.
+      io::poll(s, io::READ)
+        .onAny(lambda::bind(&recv_data, decoder, s));
       break;
     } else if (length <= 0) {
       // Socket error or closed.
@@ -619,8 +617,6 @@ void recv_data(struct ev_loop* loop, ev_io* watcher, int 
revents)
       }
       socket_manager->close(s);
       delete decoder;
-      ev_io_stop(loop, watcher);
-      delete watcher;
       break;
     } else {
       CHECK(length > 0);
@@ -636,8 +632,6 @@ void recv_data(struct ev_loop* loop, ev_io* watcher, int 
revents)
         VLOG(1) << "Decoder error while receiving";
         socket_manager->close(s);
         delete decoder;
-        ev_io_stop(loop, watcher);
-        delete watcher;
         break;
       }
     }
@@ -913,14 +907,9 @@ void accept(struct ev_loop* loop, ev_io* watcher, int 
revents)
     // Inform the socket manager for proper bookkeeping.
     const Socket& socket = socket_manager->accepted(s);
 
-    // Allocate and initialize the decoder and watcher.
-    DataDecoder* decoder = new DataDecoder(socket);
-
-    ev_io* watcher = new ev_io();
-    watcher->data = decoder;
-
-    ev_io_init(watcher, recv_data, s, EV_READ);
-    ev_io_start(loop, watcher);
+    // Start reading from the socket.
+    io::poll(s, io::READ)
+      .onAny(lambda::bind(&recv_data, new DataDecoder(socket), s));
   }
 }
 

Reply via email to