Used io::poll instead of libev for send*.

Updated sending_connect, send_data, and send_file.

Review: https://reviews.apache.org/r/27510


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b774ecb2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b774ecb2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b774ecb2

Branch: refs/heads/master
Commit: b774ecb245288c2a0a33b94e9c92ed0ee806b9c1
Parents: bc23da1
Author: Benjamin Hindman <[email protected]>
Authored: Sun Nov 2 21:21:51 2014 -0800
Committer: Benjamin Hindman <[email protected]>
Committed: Sat Nov 15 16:26:49 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/encoder.hpp |  9 ++--
 3rdparty/libprocess/src/process.cpp | 90 ++++++++++----------------------
 2 files changed, 35 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b774ecb2/3rdparty/libprocess/src/encoder.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/encoder.hpp 
b/3rdparty/libprocess/src/encoder.hpp
index 9c5aa81..800f324 100644
--- a/3rdparty/libprocess/src/encoder.hpp
+++ b/3rdparty/libprocess/src/encoder.hpp
@@ -25,10 +25,13 @@ namespace process {
 
 const uint32_t GZIP_MINIMUM_BODY_LENGTH = 1024;
 
-typedef void (*Sender)(struct ev_loop*, ev_io*, int);
+// Forward declarations.
+class Encoder;
 
-extern void send_data(struct ev_loop*, ev_io*, int);
-extern void send_file(struct ev_loop*, ev_io*, int);
+extern void send_data(Encoder*);
+extern void send_file(Encoder*);
+
+typedef void (*Sender)(Encoder*);
 
 
 class Encoder

http://git-wip-us.apache.org/repos/asf/mesos/blob/b774ecb2/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index a33a201..ac12876 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -681,11 +681,11 @@ void ignore_data(Socket* socket, int s)
 }
 
 
-void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
+void send_data(Encoder* e)
 {
-  DataEncoder* encoder = (DataEncoder*) watcher->data;
+  DataEncoder* encoder = CHECK_NOTNULL(dynamic_cast<DataEncoder*>(e));
 
-  int s = watcher->fd;
+  int s = encoder->socket();
 
   while (true) {
     const void* data;
@@ -703,6 +703,8 @@ void send_data(struct ev_loop* loop, ev_io* watcher, int 
revents)
     } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
       // Might block, try again later.
       encoder->backup(size);
+      io::poll(s, io::WRITE)
+        .onAny(lambda::bind(&send_data, e));
       break;
     } else if (length <= 0) {
       // Socket error or closed.
@@ -714,8 +716,6 @@ void send_data(struct ev_loop* loop, ev_io* watcher, int 
revents)
       }
       socket_manager->close(s);
       delete encoder;
-      ev_io_stop(loop, watcher);
-      delete watcher;
       break;
     } else {
       CHECK(length > 0);
@@ -727,18 +727,11 @@ void send_data(struct ev_loop* loop, ev_io* watcher, int 
revents)
       if (encoder->remaining() == 0) {
         delete encoder;
 
-        // Stop this watcher for now.
-        ev_io_stop(loop, watcher);
-
         // Check for more stuff to send on socket.
         Encoder* next = socket_manager->next(s);
         if (next != NULL) {
-          watcher->data = next;
-          ev_io_init(watcher, next->sender(), s, EV_WRITE);
-          ev_io_start(loop, watcher);
-        } else {
-          // Nothing more to send right now, clean up.
-          delete watcher;
+          io::poll(s, io::WRITE)
+            .onAny(lambda::bind(next->sender(), next));
         }
         break;
       }
@@ -747,11 +740,11 @@ void send_data(struct ev_loop* loop, ev_io* watcher, int 
revents)
 }
 
 
-void send_file(struct ev_loop* loop, ev_io* watcher, int revents)
+void send_file(Encoder* e)
 {
-  FileEncoder* encoder = (FileEncoder*) watcher->data;
+  FileEncoder* encoder = CHECK_NOTNULL(dynamic_cast<FileEncoder*>(e));
 
-  int s = watcher->fd;
+  int s = encoder->socket();
 
   while (true) {
     int fd;
@@ -770,6 +763,8 @@ void send_file(struct ev_loop* loop, ev_io* watcher, int 
revents)
     } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
       // Might block, try again later.
       encoder->backup(size);
+      io::poll(s, io::WRITE)
+        .onAny(lambda::bind(&send_file, e));
       break;
     } else if (length <= 0) {
       // Socket error or closed.
@@ -781,8 +776,6 @@ void send_file(struct ev_loop* loop, ev_io* watcher, int 
revents)
       }
       socket_manager->close(s);
       delete encoder;
-      ev_io_stop(loop, watcher);
-      delete watcher;
       break;
     } else {
       CHECK(length > 0);
@@ -794,18 +787,11 @@ void send_file(struct ev_loop* loop, ev_io* watcher, int 
revents)
       if (encoder->remaining() == 0) {
         delete encoder;
 
-        // Stop this watcher for now.
-        ev_io_stop(loop, watcher);
-
         // Check for more stuff to send on socket.
         Encoder* next = socket_manager->next(s);
         if (next != NULL) {
-          watcher->data = next;
-          ev_io_init(watcher, next->sender(), s, EV_WRITE);
-          ev_io_start(loop, watcher);
-        } else {
-          // Nothing more to send right now, clean up.
-          delete watcher;
+          io::poll(s, io::WRITE)
+            .onAny(lambda::bind(next->sender(), next));
         }
         break;
       }
@@ -814,9 +800,9 @@ void send_file(struct ev_loop* loop, ev_io* watcher, int 
revents)
 }
 
 
-void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents)
+void sending_connect(Encoder* encoder)
 {
-  int s = watcher->fd;
+  int s = encoder->socket();
 
   // Now check that a successful connection was made.
   int opt;
@@ -826,15 +812,11 @@ void sending_connect(struct ev_loop* loop, ev_io* 
watcher, int revents)
     // Connect failure.
     VLOG(1) << "Socket error while connecting";
     socket_manager->close(s);
-    MessageEncoder* encoder = (MessageEncoder*) watcher->data;
     delete encoder;
-    ev_io_stop(loop, watcher);
-    delete watcher;
   } else {
     // We're connected! Now let's do some sending.
-    ev_io_stop(loop, watcher);
-    ev_io_init(watcher, send_data, s, EV_WRITE);
-    ev_io_start(loop, watcher);
+    io::poll(s, io::WRITE)
+      .onAny(lambda::bind(&send_data, encoder));
   }
 }
 
@@ -1640,17 +1622,9 @@ void SocketManager::send(Encoder* encoder, bool persist)
         // Initialize the outgoing queue.
         outgoing[encoder->socket()];
 
-        // Allocate and initialize the watcher.
-        ev_io* watcher = new ev_io();
-        watcher->data = encoder;
-
-        ev_io_init(watcher, encoder->sender(), encoder->socket(), EV_WRITE);
-
-        synchronized (watchers) {
-          watchers->push(watcher);
-        }
-
-        ev_async_send(loop, &async_watcher);
+        // Start polling in order to send with this encoder.
+        io::poll(encoder->socket(), io::WRITE)
+          .onAny(lambda::bind(encoder->sender(), encoder));
       }
     } else {
       VLOG(1) << "Attempting to send on a no longer valid socket!";
@@ -1728,9 +1702,8 @@ void SocketManager::send(Message* message)
       io::poll(s, io::READ)
         .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s));
 
-      // Allocate and initialize a watcher for sending the message.
-      ev_io* watcher = new ev_io();
-      watcher->data = new MessageEncoder(sockets[s], message);
+      // Create a message encoder to handle sending this message.
+      Encoder* encoder = new MessageEncoder(sockets[s], message);
 
       // Try and connect to the node using this socket.
       sockaddr_in addr;
@@ -1744,19 +1717,14 @@ void SocketManager::send(Message* message)
           PLOG(FATAL) << "Failed to send, connect";
         }
 
-        // Initialize watcher for connecting.
-        ev_io_init(watcher, sending_connect, s, EV_WRITE);
+        // Start polling in order to wait for being connected.
+        io::poll(s, io::WRITE)
+          .onAny(lambda::bind(&sending_connect, encoder));
       } else {
-        // Initialize watcher for sending.
-        ev_io_init(watcher, send_data, s, EV_WRITE);
-      }
-
-      // Enqueue the watcher.
-      synchronized (watchers) {
-        watchers->push(watcher);
+        // Start polling in order to send data.
+        io::poll(s, io::WRITE)
+          .onAny(lambda::bind(&send_data, encoder));
       }
-
-      ev_async_send(loop, &async_watcher);
     }
   }
 }

Reply via email to