Use Socket::read for ignore_data. Review: https://reviews.apache.org/r/27961
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/541e8e5b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/541e8e5b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/541e8e5b Branch: refs/heads/master Commit: 541e8e5b2b8353c62fbe42d6e63ca15809b2fdb7 Parents: 383f547 Author: Joris Van Remoortere <[email protected]> Authored: Sat Nov 15 16:47:18 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sat Nov 15 17:38:21 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/src/process.cpp | 89 +++++++++++++++----------------- 1 file changed, 43 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/541e8e5b/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index c37d522..63f9df4 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -589,48 +589,6 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents) } -// A variant of 'recv_data' that doesn't do anything with the -// data. Used by sockets created via SocketManager::link as well as -// SocketManager::send(Message) where we don't care about the data -// received we mostly just want to know when the socket has been -// closed. -void ignore_data(Socket* socket, int s) -{ - while (true) { - const ssize_t size = 80 * 1024; - ssize_t length = 0; - - char data[size]; - - length = recv(s, data, size, 0); - - if (length < 0 && (errno == EINTR)) { - // Interrupted, try again now. - continue; - } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { - // Might block, try again later. - io::poll(s, io::READ) - .onAny(lambda::bind(&ignore_data, socket, s)); - break; - } else if (length <= 0) { - // Socket error or closed. - if (length < 0) { - const char* error = strerror(errno); - VLOG(1) << "Socket error while receiving: " << error; - } else { - VLOG(2) << "Socket closed while receiving"; - } - socket_manager->close(s); - delete socket; - break; - } else { - VLOG(2) << "Ignoring " << length << " bytes of data received " - << "on socket used only for sending"; - } - } -} - - void send_data(Encoder* e) { DataEncoder* encoder = CHECK_NOTNULL(dynamic_cast<DataEncoder*>(e)); @@ -1528,6 +1486,31 @@ Socket SocketManager::accepted(int s) namespace internal { +void ignore_read_data( + const Future<size_t>& length, + Socket* socket, + char* data, + size_t size) +{ + if (length.isDiscarded() || length.isFailed()) { + socket_manager->close(*socket); + delete[] data; + delete socket; + return; + } + + if (length.get() == 0) { + socket_manager->close(*socket); + delete[] data; + delete socket; + return; + } + + socket->read(data, size) + .onAny(lambda::bind(&ignore_read_data, lambda::_1, socket, data, size)); +} + + void link_connect(const Future<Socket>& socket) { if (socket.isDiscarded() || socket.isFailed()) { @@ -1538,8 +1521,15 @@ void link_connect(const Future<Socket>& socket) return; } - io::poll(socket.get(), io::READ) - .onAny(lambda::bind(&ignore_data, new Socket(socket.get()), socket.get())); + size_t size = 80 * 1024; + char* data = new char[size]; + socket.get().read(data, size) + .onAny(lambda::bind( + &ignore_read_data, + lambda::_1, + new Socket(socket.get()), + data, + size)); } } // namespace internal { @@ -1677,8 +1667,15 @@ void send_connect(const Future<Socket>& socket, Message* message) // Read and ignore data from this socket. Note that we don't // expect to receive anything other than HTTP '202 Accepted' // responses which we just ignore. - io::poll(socket.get(), io::READ) - .onAny(lambda::bind(&ignore_data, new Socket(socket.get()), socket.get())); + size_t size = 80 * 1024; + char* data = new char[size]; + socket.get().read(data, size) + .onAny(lambda::bind( + &ignore_read_data, + lambda::_1, + new Socket(socket.get()), + data, + size)); // Start polling in order to send data. io::poll(socket.get(), io::WRITE)
