Introduce Socket::send() and Socket::sendfile(). Also used these when linking sockets.
Review: https://reviews.apache.org/r/27964 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/71de11e9 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/71de11e9 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/71de11e9 Branch: refs/heads/master Commit: 71de11e95f1945031e3ea1308445eddc0850c2be Parents: 3564c44 Author: Joris Van Remoortere <[email protected]> Authored: Sat Nov 15 16:49:44 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sat Nov 15 17:38:22 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/socket.hpp | 14 ++ 3rdparty/libprocess/src/process.cpp | 180 +++++++++++++++++++- 2 files changed, 186 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/71de11e9/3rdparty/libprocess/include/process/socket.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp index a7c22a0..5fd8d1b 100644 --- a/3rdparty/libprocess/include/process/socket.hpp +++ b/3rdparty/libprocess/include/process/socket.hpp @@ -78,6 +78,10 @@ public: Future<size_t> read(char* data, size_t size); + Future<size_t> send(const char* data, size_t size); + + Future<size_t> sendfile(int fd, off_t offset, size_t size); + private: const Impl& create() const { @@ -129,6 +133,16 @@ public: return impl->read(data, size); } + Future<size_t> send(const char* data, size_t size) const + { + return impl->send(data, size); + } + + Future<size_t> sendfile(int fd, off_t offset, size_t size) const + { + return impl->sendfile(fd, offset, size); + } + private: explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {} http://git-wip-us.apache.org/repos/asf/mesos/blob/71de11e9/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index 63f9df4..b062b85 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -1465,6 +1465,96 @@ Future<size_t> Socket::Impl::read(char* data, size_t size) } +namespace internal { + +Future<size_t> socket_send_data(int s, const char* data, size_t size) +{ + CHECK(size > 0); + + while (true) { + ssize_t length = send(s, data, size, MSG_NOSIGNAL); + + if (length < 0 && (errno == EINTR)) { + // Interrupted, try again now. + continue; + } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + // Might block, try again later. + return io::poll(s, io::WRITE) + .then(lambda::bind(&internal::socket_send_data, s, data, size)); + } else if (length <= 0) { + // Socket error or closed. + if (length < 0) { + const char* error = strerror(errno); + VLOG(1) << "Socket error while sending: " << error; + } else { + VLOG(1) << "Socket closed while sending"; + } + if (length == 0) { + return length; + } else { + return Failure(ErrnoError("Socket send failed")); + } + } else { + CHECK(length > 0); + + return length; + } + } +} + + +Future<size_t> socket_send_file(int s, int fd, off_t offset, size_t size) +{ + CHECK(size > 0); + + while (true) { + ssize_t length = os::sendfile(s, fd, offset, size); + + if (length < 0 && (errno == EINTR)) { + // Interrupted, try again now. + continue; + } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + // Might block, try again later. + return io::poll(s, io::WRITE) + .then(lambda::bind(&internal::socket_send_file, s, fd, offset, size)); + } else if (length <= 0) { + // Socket error or closed. + if (length < 0) { + const char* error = strerror(errno); + VLOG(1) << "Socket error while sending: " << error; + } else { + VLOG(1) << "Socket closed while sending"; + } + if (length == 0) { + return length; + } else { + return Failure(ErrnoError("Socket sendfile failed")); + } + } else { + CHECK(length > 0); + + return length; + } + } +} + +} // namespace internal { + + +Future<size_t> Socket::Impl::send(const char* data, size_t size) +{ + return io::poll(get(), io::WRITE) + .then(lambda::bind(&internal::socket_send_data, get(), data, size)); +} + + +Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size) +{ + return io::poll(get(), io::WRITE) + .then(lambda::bind(&internal::socket_send_file, get(), fd, offset, size)); +} + + SocketManager::SocketManager() { synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE; @@ -1600,27 +1690,101 @@ PID<HttpProxy> SocketManager::proxy(const Socket& socket) } +namespace internal { + +void _send( + const Future<size_t>& result, + Socket* socket, + Encoder* encoder, + size_t size); + + +void send(Encoder* encoder, Socket* socket) +{ + switch (encoder->kind()) { + case Encoder::DATA: { + size_t size; + const char* data = reinterpret_cast<DataEncoder*>(encoder)->next(&size); + socket->send(data, size) + .onAny(lambda::bind( + &internal::_send, + lambda::_1, + socket, + encoder, + size)); + break; + } + case Encoder::FILE: { + off_t offset; + size_t size; + int fd = reinterpret_cast<FileEncoder*>(encoder)->next(&offset, &size); + socket->sendfile(fd, offset, size) + .onAny(lambda::bind( + &internal::_send, + lambda::_1, + socket, + encoder, + size)); + break; + } + } +} + + +void _send( + const Future<size_t>& length, + Socket* socket, + Encoder* encoder, + size_t size) +{ + if (length.isDiscarded() || length.isFailed()) { + socket_manager->close(*socket); + delete socket; + delete encoder; + } else { + // Update the encoder with the amount sent. + encoder->backup(size - length.get()); + + // See if there is any more of the message to send. + if (encoder->remaining() == 0) { + delete encoder; + + // Check for more stuff to send on socket. + Encoder* next = socket_manager->next(*socket); + if (next != NULL) { + send(next, socket); + } else { + delete socket; + } + } else { + send(encoder, socket); + } + } +} + +} // namespace internal { + + void SocketManager::send(Encoder* encoder, bool persist) { CHECK(encoder != NULL); synchronized (this) { - if (sockets.count(encoder->socket()) > 0) { + Socket socket = encoder->socket(); + if (sockets.count(socket) > 0) { // Update whether or not this socket should get disposed after // there is no more data to send. if (!persist) { - dispose.insert(encoder->socket()); + dispose.insert(socket); } - if (outgoing.count(encoder->socket()) > 0) { - outgoing[encoder->socket()].push(encoder); + if (outgoing.count(socket) > 0) { + outgoing[socket].push(encoder); } else { // Initialize the outgoing queue. - outgoing[encoder->socket()]; + outgoing[socket]; - // Start polling in order to send with this encoder. - io::poll(encoder->socket(), io::WRITE) - .onAny(lambda::bind(encoder->sender(), encoder)); + internal::send(encoder, new Socket(socket)); } } else { VLOG(1) << "Attempting to send on a no longer valid socket!";
