Add read() to Socket interface. Also used it when accepting connections in libprocess.
Review: https://reviews.apache.org/r/27960 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/383f5479 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/383f5479 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/383f5479 Branch: refs/heads/master Commit: 383f5479d71237c033acdda678571a1ad0b993b1 Parents: 149164b Author: Joris Van Remoortere <[email protected]> Authored: Sat Nov 15 16:46:34 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sat Nov 15 17:38:21 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/socket.hpp | 7 ++ 3rdparty/libprocess/src/process.cpp | 133 +++++++++++--------- 2 files changed, 84 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/383f5479/3rdparty/libprocess/include/process/socket.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp index fdad91f..a7c22a0 100644 --- a/3rdparty/libprocess/include/process/socket.hpp +++ b/3rdparty/libprocess/include/process/socket.hpp @@ -76,6 +76,8 @@ public: Future<Socket> connect(const Node& node); + Future<size_t> read(char* data, size_t size); + private: const Impl& create() const { @@ -122,6 +124,11 @@ public: return impl->connect(node); } + Future<size_t> read(char* data, size_t size) const + { + return impl->read(data, size); + } + private: explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {} http://git-wip-us.apache.org/repos/asf/mesos/blob/383f5479/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index 73d41c9..c37d522 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -589,56 +589,6 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents) } -void recv_data(DataDecoder* decoder, int s) -{ - while (true) { - const ssize_t size = 80 * 1024; - ssize_t length = 0; - - char data[size]; - - length = recv(s, data, size, 0); - - if (length < 0 && (errno == EINTR)) { - // Interrupted, try again now. - continue; - } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { - // Might block, try again later. - io::poll(s, io::READ) - .onAny(lambda::bind(&recv_data, decoder, s)); - break; - } else if (length <= 0) { - // Socket error or closed. - if (length < 0) { - const char* error = strerror(errno); - VLOG(1) << "Socket error while receiving: " << error; - } else { - VLOG(2) << "Socket closed while receiving"; - } - socket_manager->close(s); - delete decoder; - break; - } else { - CHECK(length > 0); - - // Decode as much of the data as possible into HTTP requests. - const deque<Request*>& requests = decoder->decode(data, length); - - if (!requests.empty()) { - foreach (Request* request, requests) { - process_manager->handle(decoder->socket(), request); - } - } else if (requests.empty() && decoder->failed()) { - VLOG(1) << "Decoder error while receiving"; - socket_manager->close(s); - delete decoder; - break; - } - } - } -} - - // A variant of 'recv_data' that doesn't do anything with the // data. Used by sockets created via SocketManager::link as well as // SocketManager::send(Message) where we don't care about the data @@ -800,6 +750,57 @@ void send_file(Encoder* e) } +namespace internal { + +void decode_read( + const Future<size_t>& length, + char* data, + size_t size, + Socket* socket, + DataDecoder* decoder) +{ + if (length.isDiscarded() || length.isFailed()) { + if (length.isFailed()) { + VLOG(1) << "Decode failure: " << length.failure(); + } + + socket_manager->close(*socket); + delete[] data; + delete decoder; + delete socket; + return; + } + + if (length.get() == 0) { + socket_manager->close(*socket); + delete[] data; + delete decoder; + delete socket; + return; + } + // Decode as much of the data as possible into HTTP requests. + const deque<Request*>& requests = decoder->decode(data, length.get()); + + if (!requests.empty()) { + foreach (Request* request, requests) { + process_manager->handle(decoder->socket(), request); + } + } else if (requests.empty() && decoder->failed()) { + VLOG(1) << "Decoder error while receiving"; + socket_manager->close(*socket); + delete[] data; + delete decoder; + delete socket; + return; + } + + socket->read(data, size) + .onAny(lambda::bind(&decode_read, lambda::_1, data, size, socket, decoder)); +} + +} // namespace internal { + + void accept(struct ev_loop* loop, ev_io* watcher, int revents) { CHECK_EQ(__s__, watcher->fd); @@ -837,11 +838,25 @@ void accept(struct ev_loop* loop, ev_io* watcher, int revents) os::close(s); } else { // Inform the socket manager for proper bookkeeping. - const Socket& socket = socket_manager->accepted(s); + Socket socket = socket_manager->accepted(s); + + // Allocate a buffer to read into. This can be replaced later + // when socket supports a read function that provides the + // buffered data in the resulting callback. + const size_t size = 80 * 1024; + char* data = new char[size]; + memset(data, 0, size); + + DataDecoder* decoder = new DataDecoder(socket); - // Start reading from the socket. - io::poll(s, io::READ) - .onAny(lambda::bind(&recv_data, new DataDecoder(socket), s)); + socket.read(data, size) + .onAny(lambda::bind( + &internal::decode_read, + lambda::_1, + data, + size, + new Socket(socket), + decoder)); } } @@ -1486,6 +1501,12 @@ Future<Socket> Socket::Impl::connect(const Node& node) } +Future<size_t> Socket::Impl::read(char* data, size_t size) +{ + return io::read(get(), data, size); +} + + SocketManager::SocketManager() { synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE; @@ -1711,8 +1732,8 @@ Encoder* SocketManager::next(int s) // We cannot assume 'sockets.count(s) > 0' here because it's // possible that 's' has been removed with a a call to // SocketManager::close. For example, it could be the case that a - // socket has gone to CLOSE_WAIT and the call to 'recv' in - // recv_data returned 0 causing SocketManager::close to get + // socket has gone to CLOSE_WAIT and the call to read in + // io::read returned 0 causing SocketManager::close to get // invoked. Later a call to 'send' or 'sendfile' (e.g., in // send_data or send_file) can "succeed" (because the socket is // not "closed" yet because there are still some Socket
