Add read() to Socket interface.

Also used it when accepting connections in libprocess.

Review: https://reviews.apache.org/r/27960


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/383f5479
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/383f5479
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/383f5479

Branch: refs/heads/master
Commit: 383f5479d71237c033acdda678571a1ad0b993b1
Parents: 149164b
Author: Joris Van Remoortere <[email protected]>
Authored: Sat Nov 15 16:46:34 2014 -0800
Committer: Benjamin Hindman <[email protected]>
Committed: Sat Nov 15 17:38:21 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/socket.hpp |   7 ++
 3rdparty/libprocess/src/process.cpp            | 133 +++++++++++---------
 2 files changed, 84 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/383f5479/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp 
b/3rdparty/libprocess/include/process/socket.hpp
index fdad91f..a7c22a0 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -76,6 +76,8 @@ public:
 
     Future<Socket> connect(const Node& node);
 
+    Future<size_t> read(char* data, size_t size);
+
   private:
     const Impl& create() const
     {
@@ -122,6 +124,11 @@ public:
     return impl->connect(node);
   }
 
+  Future<size_t> read(char* data, size_t size) const
+  {
+    return impl->read(data, size);
+  }
+
 private:
   explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/383f5479/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index 73d41c9..c37d522 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -589,56 +589,6 @@ void handle_async(struct ev_loop* loop, ev_async* _, int 
revents)
 }
 
 
-void recv_data(DataDecoder* decoder, int s)
-{
-  while (true) {
-    const ssize_t size = 80 * 1024;
-    ssize_t length = 0;
-
-    char data[size];
-
-    length = recv(s, data, size, 0);
-
-    if (length < 0 && (errno == EINTR)) {
-      // Interrupted, try again now.
-      continue;
-    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-      // Might block, try again later.
-      io::poll(s, io::READ)
-        .onAny(lambda::bind(&recv_data, decoder, s));
-      break;
-    } else if (length <= 0) {
-      // Socket error or closed.
-      if (length < 0) {
-        const char* error = strerror(errno);
-        VLOG(1) << "Socket error while receiving: " << error;
-      } else {
-        VLOG(2) << "Socket closed while receiving";
-      }
-      socket_manager->close(s);
-      delete decoder;
-      break;
-    } else {
-      CHECK(length > 0);
-
-      // Decode as much of the data as possible into HTTP requests.
-      const deque<Request*>& requests = decoder->decode(data, length);
-
-      if (!requests.empty()) {
-        foreach (Request* request, requests) {
-          process_manager->handle(decoder->socket(), request);
-        }
-      } else if (requests.empty() && decoder->failed()) {
-        VLOG(1) << "Decoder error while receiving";
-        socket_manager->close(s);
-        delete decoder;
-        break;
-      }
-    }
-  }
-}
-
-
 // A variant of 'recv_data' that doesn't do anything with the
 // data. Used by sockets created via SocketManager::link as well as
 // SocketManager::send(Message) where we don't care about the data
@@ -800,6 +750,57 @@ void send_file(Encoder* e)
 }
 
 
+namespace internal {
+
+void decode_read(
+    const Future<size_t>& length,
+    char* data,
+    size_t size,
+    Socket* socket,
+    DataDecoder* decoder)
+{
+  if (length.isDiscarded() || length.isFailed()) {
+    if (length.isFailed()) {
+      VLOG(1) << "Decode failure: " << length.failure();
+    }
+
+    socket_manager->close(*socket);
+    delete[] data;
+    delete decoder;
+    delete socket;
+    return;
+  }
+
+  if (length.get() == 0) {
+    socket_manager->close(*socket);
+    delete[] data;
+    delete decoder;
+    delete socket;
+    return;
+  }
+  // Decode as much of the data as possible into HTTP requests.
+  const deque<Request*>& requests = decoder->decode(data, length.get());
+
+  if (!requests.empty()) {
+    foreach (Request* request, requests) {
+      process_manager->handle(decoder->socket(), request);
+    }
+  } else if (requests.empty() && decoder->failed()) {
+    VLOG(1) << "Decoder error while receiving";
+    socket_manager->close(*socket);
+    delete[] data;
+    delete decoder;
+    delete socket;
+    return;
+  }
+
+  socket->read(data, size)
+    .onAny(lambda::bind(&decode_read, lambda::_1, data, size, socket, 
decoder));
+}
+
+} // namespace internal {
+
+
 void accept(struct ev_loop* loop, ev_io* watcher, int revents)
 {
   CHECK_EQ(__s__, watcher->fd);
@@ -837,11 +838,25 @@ void accept(struct ev_loop* loop, ev_io* watcher, int 
revents)
     os::close(s);
   } else {
     // Inform the socket manager for proper bookkeeping.
-    const Socket& socket = socket_manager->accepted(s);
+    Socket socket = socket_manager->accepted(s);
+
+    // Allocate a buffer to read into. This can be replaced later
+    // when socket supports a read function that provides the
+    // buffered data in the resulting callback.
+    const size_t size = 80 * 1024;
+    char* data = new char[size];
+    memset(data, 0, size);
+
+    DataDecoder* decoder = new DataDecoder(socket);
 
-    // Start reading from the socket.
-    io::poll(s, io::READ)
-      .onAny(lambda::bind(&recv_data, new DataDecoder(socket), s));
+    socket.read(data, size)
+      .onAny(lambda::bind(
+          &internal::decode_read,
+          lambda::_1,
+          data,
+          size,
+          new Socket(socket),
+          decoder));
   }
 }
 
@@ -1486,6 +1501,12 @@ Future<Socket> Socket::Impl::connect(const Node& node)
 }
 
 
+Future<size_t> Socket::Impl::read(char* data, size_t size)
+{
+  return io::read(get(), data, size);
+}
+
+
 SocketManager::SocketManager()
 {
   synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
@@ -1711,8 +1732,8 @@ Encoder* SocketManager::next(int s)
     // We cannot assume 'sockets.count(s) > 0' here because it's
     // possible that 's' has been removed with a a call to
     // SocketManager::close. For example, it could be the case that a
-    // socket has gone to CLOSE_WAIT and the call to 'recv' in
-    // recv_data returned 0 causing SocketManager::close to get
+    // socket has gone to CLOSE_WAIT and the call to read in
+    // io::read returned 0 causing SocketManager::close to get
     // invoked. Later a call to 'send' or 'sendfile' (e.g., in
     // send_data or send_file) can "succeed" (because the socket is
     // not "closed" yet because there are still some Socket

Reply via email to