Introduce Socket::send() and Socket::sendfile().

Also used these when linking sockets.

Review: https://reviews.apache.org/r/27964


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/71de11e9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/71de11e9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/71de11e9

Branch: refs/heads/master
Commit: 71de11e95f1945031e3ea1308445eddc0850c2be
Parents: 3564c44
Author: Joris Van Remoortere <[email protected]>
Authored: Sat Nov 15 16:49:44 2014 -0800
Committer: Benjamin Hindman <[email protected]>
Committed: Sat Nov 15 17:38:22 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/socket.hpp |  14 ++
 3rdparty/libprocess/src/process.cpp            | 180 +++++++++++++++++++-
 2 files changed, 186 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/71de11e9/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp 
b/3rdparty/libprocess/include/process/socket.hpp
index a7c22a0..5fd8d1b 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -78,6 +78,10 @@ public:
 
     Future<size_t> read(char* data, size_t size);
 
+    Future<size_t> send(const char* data, size_t size);
+
+    Future<size_t> sendfile(int fd, off_t offset, size_t size);
+
   private:
     const Impl& create() const
     {
@@ -129,6 +133,16 @@ public:
     return impl->read(data, size);
   }
 
+  Future<size_t> send(const char* data, size_t size) const
+  {
+    return impl->send(data, size);
+  }
+
+  Future<size_t> sendfile(int fd, off_t offset, size_t size) const
+  {
+    return impl->sendfile(fd, offset, size);
+  }
+
 private:
   explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/71de11e9/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index 63f9df4..b062b85 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1465,6 +1465,96 @@ Future<size_t> Socket::Impl::read(char* data, size_t 
size)
 }
 
 
+namespace internal {
+
+Future<size_t> socket_send_data(int s, const char* data, size_t size)
+{
+  CHECK(size > 0);
+
+  while (true) {
+    ssize_t length = send(s, data, size, MSG_NOSIGNAL);
+
+    if (length < 0 && (errno == EINTR)) {
+      // Interrupted, try again now.
+      continue;
+    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+      // Might block, try again later.
+      return io::poll(s, io::WRITE)
+        .then(lambda::bind(&internal::socket_send_data, s, data, size));
+    } else if (length <= 0) {
+      // Socket error or closed.
+      if (length < 0) {
+        const char* error = strerror(errno);
+        VLOG(1) << "Socket error while sending: " << error;
+      } else {
+        VLOG(1) << "Socket closed while sending";
+      }
+      if (length == 0) {
+        return length;
+      } else {
+        return Failure(ErrnoError("Socket send failed"));
+      }
+    } else {
+      CHECK(length > 0);
+
+      return length;
+    }
+  }
+}
+
+
+Future<size_t> socket_send_file(int s, int fd, off_t offset, size_t size)
+{
+  CHECK(size > 0);
+
+  while (true) {
+    ssize_t length = os::sendfile(s, fd, offset, size);
+
+    if (length < 0 && (errno == EINTR)) {
+      // Interrupted, try again now.
+      continue;
+    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+      // Might block, try again later.
+      return io::poll(s, io::WRITE)
+        .then(lambda::bind(&internal::socket_send_file, s, fd, offset, size));
+    } else if (length <= 0) {
+      // Socket error or closed.
+      if (length < 0) {
+        const char* error = strerror(errno);
+        VLOG(1) << "Socket error while sending: " << error;
+      } else {
+        VLOG(1) << "Socket closed while sending";
+      }
+      if (length == 0) {
+        return length;
+      } else {
+        return Failure(ErrnoError("Socket sendfile failed"));
+      }
+    } else {
+      CHECK(length > 0);
+
+      return length;
+    }
+  }
+}
+
+} // namespace internal {
+
+
+Future<size_t> Socket::Impl::send(const char* data, size_t size)
+{
+  return io::poll(get(), io::WRITE)
+    .then(lambda::bind(&internal::socket_send_data, get(), data, size));
+}
+
+
+Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
+{
+  return io::poll(get(), io::WRITE)
+    .then(lambda::bind(&internal::socket_send_file, get(), fd, offset, size));
+}
+
+
 SocketManager::SocketManager()
 {
   synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
@@ -1600,27 +1690,101 @@ PID<HttpProxy> SocketManager::proxy(const Socket& 
socket)
 }
 
 
+namespace internal {
+
+void _send(
+    const Future<size_t>& result,
+    Socket* socket,
+    Encoder* encoder,
+    size_t size);
+
+
+void send(Encoder* encoder, Socket* socket)
+{
+  switch (encoder->kind()) {
+    case Encoder::DATA: {
+      size_t size;
+      const char* data = reinterpret_cast<DataEncoder*>(encoder)->next(&size);
+      socket->send(data, size)
+        .onAny(lambda::bind(
+            &internal::_send,
+            lambda::_1,
+            socket,
+            encoder,
+            size));
+      break;
+    }
+    case Encoder::FILE: {
+      off_t offset;
+      size_t size;
+      int fd = reinterpret_cast<FileEncoder*>(encoder)->next(&offset, &size);
+      socket->sendfile(fd, offset, size)
+        .onAny(lambda::bind(
+            &internal::_send,
+            lambda::_1,
+            socket,
+            encoder,
+            size));
+      break;
+    }
+  }
+}
+
+
+void _send(
+    const Future<size_t>& length,
+    Socket* socket,
+    Encoder* encoder,
+    size_t size)
+{
+  if (length.isDiscarded() || length.isFailed()) {
+    socket_manager->close(*socket);
+    delete socket;
+    delete encoder;
+  } else {
+    // Update the encoder with the amount sent.
+    encoder->backup(size - length.get());
+
+    // See if there is any more of the message to send.
+    if (encoder->remaining() == 0) {
+      delete encoder;
+
+      // Check for more stuff to send on socket.
+      Encoder* next = socket_manager->next(*socket);
+      if (next != NULL) {
+        send(next, socket);
+      } else {
+        delete socket;
+      }
+    } else {
+      send(encoder, socket);
+    }
+  }
+}
+
+} // namespace internal {
+
+
 void SocketManager::send(Encoder* encoder, bool persist)
 {
   CHECK(encoder != NULL);
 
   synchronized (this) {
-    if (sockets.count(encoder->socket()) > 0) {
+    Socket socket = encoder->socket();
+    if (sockets.count(socket) > 0) {
       // Update whether or not this socket should get disposed after
       // there is no more data to send.
       if (!persist) {
-        dispose.insert(encoder->socket());
+        dispose.insert(socket);
       }
 
-      if (outgoing.count(encoder->socket()) > 0) {
-        outgoing[encoder->socket()].push(encoder);
+      if (outgoing.count(socket) > 0) {
+        outgoing[socket].push(encoder);
       } else {
         // Initialize the outgoing queue.
-        outgoing[encoder->socket()];
+        outgoing[socket];
 
-        // Start polling in order to send with this encoder.
-        io::poll(encoder->socket(), io::WRITE)
-          .onAny(lambda::bind(encoder->sender(), encoder));
+        internal::send(encoder, new Socket(socket));
       }
     } else {
       VLOG(1) << "Attempting to send on a no longer valid socket!";

Reply via email to