Used io::poll instead of libev for send*. Updated sending_connect, send_data, and send_file.
Review: https://reviews.apache.org/r/27510 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b774ecb2 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b774ecb2 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b774ecb2 Branch: refs/heads/master Commit: b774ecb245288c2a0a33b94e9c92ed0ee806b9c1 Parents: bc23da1 Author: Benjamin Hindman <[email protected]> Authored: Sun Nov 2 21:21:51 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sat Nov 15 16:26:49 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/src/encoder.hpp | 9 ++-- 3rdparty/libprocess/src/process.cpp | 90 ++++++++++---------------------- 2 files changed, 35 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b774ecb2/3rdparty/libprocess/src/encoder.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp index 9c5aa81..800f324 100644 --- a/3rdparty/libprocess/src/encoder.hpp +++ b/3rdparty/libprocess/src/encoder.hpp @@ -25,10 +25,13 @@ namespace process { const uint32_t GZIP_MINIMUM_BODY_LENGTH = 1024; -typedef void (*Sender)(struct ev_loop*, ev_io*, int); +// Forward declarations. +class Encoder; -extern void send_data(struct ev_loop*, ev_io*, int); -extern void send_file(struct ev_loop*, ev_io*, int); +extern void send_data(Encoder*); +extern void send_file(Encoder*); + +typedef void (*Sender)(Encoder*); class Encoder http://git-wip-us.apache.org/repos/asf/mesos/blob/b774ecb2/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index a33a201..ac12876 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -681,11 +681,11 @@ void ignore_data(Socket* socket, int s) } -void send_data(struct ev_loop* loop, ev_io* watcher, int revents) +void send_data(Encoder* e) { - DataEncoder* encoder = (DataEncoder*) watcher->data; + DataEncoder* encoder = CHECK_NOTNULL(dynamic_cast<DataEncoder*>(e)); - int s = watcher->fd; + int s = encoder->socket(); while (true) { const void* data; @@ -703,6 +703,8 @@ void send_data(struct ev_loop* loop, ev_io* watcher, int revents) } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { // Might block, try again later. encoder->backup(size); + io::poll(s, io::WRITE) + .onAny(lambda::bind(&send_data, e)); break; } else if (length <= 0) { // Socket error or closed. @@ -714,8 +716,6 @@ void send_data(struct ev_loop* loop, ev_io* watcher, int revents) } socket_manager->close(s); delete encoder; - ev_io_stop(loop, watcher); - delete watcher; break; } else { CHECK(length > 0); @@ -727,18 +727,11 @@ void send_data(struct ev_loop* loop, ev_io* watcher, int revents) if (encoder->remaining() == 0) { delete encoder; - // Stop this watcher for now. - ev_io_stop(loop, watcher); - // Check for more stuff to send on socket. Encoder* next = socket_manager->next(s); if (next != NULL) { - watcher->data = next; - ev_io_init(watcher, next->sender(), s, EV_WRITE); - ev_io_start(loop, watcher); - } else { - // Nothing more to send right now, clean up. - delete watcher; + io::poll(s, io::WRITE) + .onAny(lambda::bind(next->sender(), next)); } break; } @@ -747,11 +740,11 @@ void send_data(struct ev_loop* loop, ev_io* watcher, int revents) } -void send_file(struct ev_loop* loop, ev_io* watcher, int revents) +void send_file(Encoder* e) { - FileEncoder* encoder = (FileEncoder*) watcher->data; + FileEncoder* encoder = CHECK_NOTNULL(dynamic_cast<FileEncoder*>(e)); - int s = watcher->fd; + int s = encoder->socket(); while (true) { int fd; @@ -770,6 +763,8 @@ void send_file(struct ev_loop* loop, ev_io* watcher, int revents) } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { // Might block, try again later. encoder->backup(size); + io::poll(s, io::WRITE) + .onAny(lambda::bind(&send_file, e)); break; } else if (length <= 0) { // Socket error or closed. @@ -781,8 +776,6 @@ void send_file(struct ev_loop* loop, ev_io* watcher, int revents) } socket_manager->close(s); delete encoder; - ev_io_stop(loop, watcher); - delete watcher; break; } else { CHECK(length > 0); @@ -794,18 +787,11 @@ void send_file(struct ev_loop* loop, ev_io* watcher, int revents) if (encoder->remaining() == 0) { delete encoder; - // Stop this watcher for now. - ev_io_stop(loop, watcher); - // Check for more stuff to send on socket. Encoder* next = socket_manager->next(s); if (next != NULL) { - watcher->data = next; - ev_io_init(watcher, next->sender(), s, EV_WRITE); - ev_io_start(loop, watcher); - } else { - // Nothing more to send right now, clean up. - delete watcher; + io::poll(s, io::WRITE) + .onAny(lambda::bind(next->sender(), next)); } break; } @@ -814,9 +800,9 @@ void send_file(struct ev_loop* loop, ev_io* watcher, int revents) } -void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents) +void sending_connect(Encoder* encoder) { - int s = watcher->fd; + int s = encoder->socket(); // Now check that a successful connection was made. int opt; @@ -826,15 +812,11 @@ void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents) // Connect failure. VLOG(1) << "Socket error while connecting"; socket_manager->close(s); - MessageEncoder* encoder = (MessageEncoder*) watcher->data; delete encoder; - ev_io_stop(loop, watcher); - delete watcher; } else { // We're connected! Now let's do some sending. - ev_io_stop(loop, watcher); - ev_io_init(watcher, send_data, s, EV_WRITE); - ev_io_start(loop, watcher); + io::poll(s, io::WRITE) + .onAny(lambda::bind(&send_data, encoder)); } } @@ -1640,17 +1622,9 @@ void SocketManager::send(Encoder* encoder, bool persist) // Initialize the outgoing queue. outgoing[encoder->socket()]; - // Allocate and initialize the watcher. - ev_io* watcher = new ev_io(); - watcher->data = encoder; - - ev_io_init(watcher, encoder->sender(), encoder->socket(), EV_WRITE); - - synchronized (watchers) { - watchers->push(watcher); - } - - ev_async_send(loop, &async_watcher); + // Start polling in order to send with this encoder. + io::poll(encoder->socket(), io::WRITE) + .onAny(lambda::bind(encoder->sender(), encoder)); } } else { VLOG(1) << "Attempting to send on a no longer valid socket!"; @@ -1728,9 +1702,8 @@ void SocketManager::send(Message* message) io::poll(s, io::READ) .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s)); - // Allocate and initialize a watcher for sending the message. - ev_io* watcher = new ev_io(); - watcher->data = new MessageEncoder(sockets[s], message); + // Create a message encoder to handle sending this message. + Encoder* encoder = new MessageEncoder(sockets[s], message); // Try and connect to the node using this socket. sockaddr_in addr; @@ -1744,19 +1717,14 @@ void SocketManager::send(Message* message) PLOG(FATAL) << "Failed to send, connect"; } - // Initialize watcher for connecting. - ev_io_init(watcher, sending_connect, s, EV_WRITE); + // Start polling in order to wait for being connected. + io::poll(s, io::WRITE) + .onAny(lambda::bind(&sending_connect, encoder)); } else { - // Initialize watcher for sending. - ev_io_init(watcher, send_data, s, EV_WRITE); - } - - // Enqueue the watcher. - synchronized (watchers) { - watchers->push(watcher); + // Start polling in order to send data. + io::poll(s, io::WRITE) + .onAny(lambda::bind(&send_data, encoder)); } - - ev_async_send(loop, &async_watcher); } } }
