Use Socket::send() for temporary connections. And removed send_data and send_file now that it is not being used.
Review: https://reviews.apache.org/r/27965 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8edab655 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8edab655 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8edab655 Branch: refs/heads/master Commit: 8edab655c990a310ef6ac66e64f116addcaf147c Parents: 71de11e Author: Joris Van Remoortere <[email protected]> Authored: Sat Nov 15 16:50:57 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sat Nov 15 17:38:22 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/src/encoder.hpp | 17 ----- 3rdparty/libprocess/src/process.cpp | 123 +------------------------------ 2 files changed, 1 insertion(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/8edab655/3rdparty/libprocess/src/encoder.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/encoder.hpp b/3rdparty/libprocess/src/encoder.hpp index 7afde0e..2b0d83f 100644 --- a/3rdparty/libprocess/src/encoder.hpp +++ b/3rdparty/libprocess/src/encoder.hpp @@ -28,11 +28,6 @@ const uint32_t GZIP_MINIMUM_BODY_LENGTH = 1024; // Forward declarations. class Encoder; -extern void send_data(Encoder*); -extern void send_file(Encoder*); - -typedef void (*Sender)(Encoder*); - class Encoder { @@ -45,8 +40,6 @@ public: explicit Encoder(const Socket& _s) : s(_s) {} virtual ~Encoder() {} - virtual Sender sender() = 0; - virtual Kind kind() const = 0; virtual void backup(size_t length) = 0; @@ -71,11 +64,6 @@ public: virtual ~DataEncoder() {} - virtual Sender sender() - { - return send_data; - } - virtual Kind kind() const { return Encoder::DATA; @@ -254,11 +242,6 @@ public: os::close(fd); } - virtual Sender sender() - { - return send_file; - } - virtual Kind kind() const { return Encoder::FILE; http://git-wip-us.apache.org/repos/asf/mesos/blob/8edab655/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index b062b85..9f91020 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -589,125 +589,6 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents) } -void send_data(Encoder* e) -{ - DataEncoder* encoder = CHECK_NOTNULL(dynamic_cast<DataEncoder*>(e)); - - int s = encoder->socket(); - - while (true) { - const void* data; - size_t size; - - data = encoder->next(&size); - CHECK(size > 0); - - ssize_t length = send(s, data, size, MSG_NOSIGNAL); - - if (length < 0 && (errno == EINTR)) { - // Interrupted, try again now. - encoder->backup(size); - continue; - } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { - // Might block, try again later. - encoder->backup(size); - io::poll(s, io::WRITE) - .onAny(lambda::bind(&send_data, e)); - break; - } else if (length <= 0) { - // Socket error or closed. - if (length < 0) { - const char* error = strerror(errno); - VLOG(1) << "Socket error while sending: " << error; - } else { - VLOG(1) << "Socket closed while sending"; - } - socket_manager->close(s); - delete encoder; - break; - } else { - CHECK(length > 0); - - // Update the encoder with the amount sent. - encoder->backup(size - length); - - // See if there is any more of the message to send. - if (encoder->remaining() == 0) { - delete encoder; - - // Check for more stuff to send on socket. - Encoder* next = socket_manager->next(s); - if (next != NULL) { - io::poll(s, io::WRITE) - .onAny(lambda::bind(next->sender(), next)); - } - break; - } - } - } -} - - -void send_file(Encoder* e) -{ - FileEncoder* encoder = CHECK_NOTNULL(dynamic_cast<FileEncoder*>(e)); - - int s = encoder->socket(); - - while (true) { - int fd; - off_t offset; - size_t size; - - fd = encoder->next(&offset, &size); - CHECK(size > 0); - - ssize_t length = os::sendfile(s, fd, offset, size); - - if (length < 0 && (errno == EINTR)) { - // Interrupted, try again now. - encoder->backup(size); - continue; - } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { - // Might block, try again later. - encoder->backup(size); - io::poll(s, io::WRITE) - .onAny(lambda::bind(&send_file, e)); - break; - } else if (length <= 0) { - // Socket error or closed. - if (length < 0) { - const char* error = strerror(errno); - VLOG(1) << "Socket error while sending: " << error; - } else { - VLOG(1) << "Socket closed while sending"; - } - socket_manager->close(s); - delete encoder; - break; - } else { - CHECK(length > 0); - - // Update the encoder with the amount sent. - encoder->backup(size - length); - - // See if there is any more of the message to send. - if (encoder->remaining() == 0) { - delete encoder; - - // Check for more stuff to send on socket. - Encoder* next = socket_manager->next(s); - if (next != NULL) { - io::poll(s, io::WRITE) - .onAny(lambda::bind(next->sender(), next)); - } - break; - } - } - } -} - - namespace internal { void decode_read( @@ -1841,9 +1722,7 @@ void send_connect(const Future<Socket>& socket, Message* message) data, size)); - // Start polling in order to send data. - io::poll(socket.get(), io::WRITE) - .onAny(lambda::bind(&send_data, encoder)); + internal::send(encoder, new Socket(socket.get())); } } // namespace internal {
