Add connect() to the Socket interface. Review: https://reviews.apache.org/r/27958
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4d616f8d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4d616f8d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4d616f8d Branch: refs/heads/master Commit: 4d616f8d7194be9b251aa5a3ddc51783c4d4411a Parents: 702b382 Author: Joris Van Remoortere <[email protected]> Authored: Sat Nov 15 16:45:49 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sat Nov 15 17:38:21 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/socket.hpp | 41 ++++- 3rdparty/libprocess/src/process.cpp | 193 +++++++++++--------- 2 files changed, 138 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/4d616f8d/3rdparty/libprocess/include/process/socket.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp index 9f4302e..fdad91f 100644 --- a/3rdparty/libprocess/include/process/socket.hpp +++ b/3rdparty/libprocess/include/process/socket.hpp @@ -3,8 +3,12 @@ #include <assert.h> +#include <memory> + +#include <process/future.hpp> +#include <process/node.hpp> + #include <stout/abort.hpp> -#include <stout/memory.hpp> #include <stout/nothing.hpp> #include <stout/os.hpp> #include <stout/try.hpp> @@ -14,7 +18,8 @@ namespace process { // Returns a socket fd for the specified options. Note that on OS X, // the returned socket will have the SO_NOSIGPIPE option set. -inline Try<int> socket(int family, int type, int protocol) { +inline Try<int> socket(int family, int type, int protocol) +{ int s; if ((s = ::socket(family, type, protocol)) == -1) { return ErrnoError(); @@ -41,7 +46,12 @@ inline Try<int> socket(int family, int type, int protocol) { class Socket { public: - class Impl + // Each socket is a reference counted, shared by default, concurrent + // object. However, since we want to support multiple + // implementations we use the Pimpl pattern (often called the + // compilation firewall) rather than forcing each Socket + // implementation to do this themselves. + class Impl : public std::enable_shared_from_this<Impl> { public: Impl() : s(-1) {} @@ -53,8 +63,8 @@ public: if (s >= 0) { Try<Nothing> close = os::close(s); if (close.isError()) { - ABORT( - "Failed to close socket " + stringify(s) + ": " + close.error()); + ABORT("Failed to close socket " + + stringify(s) + ": " + close.error()); } } } @@ -64,6 +74,8 @@ public: return s >= 0 ? s : create().get(); } + Future<Socket> connect(const Node& node); + private: const Impl& create() const { @@ -78,6 +90,11 @@ public: } // Mutable so that the socket can be lazily created. + // + // TODO(benh): Create a factory for sockets and don't lazily + // create but instead return a Try<Socket> from the factory in + // order to eliminate the need for a mutable member or the call to + // ABORT above. mutable int s; }; @@ -95,8 +112,20 @@ public: return impl->get(); } + int get() const + { + return impl->get(); + } + + Future<Socket> connect(const Node& node) + { + return impl->connect(node); + } + private: - memory::shared_ptr<Impl> impl; + explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {} + + std::shared_ptr<Impl> impl; }; } // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/4d616f8d/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index 48e5486..6916cbb 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -1484,6 +1484,48 @@ void HttpProxy::stream(const Future<short>& poll, const Request& request) } +namespace internal { + +Future<Socket> connect(const Socket& socket) +{ + // Now check that a successful connection was made. + int opt; + socklen_t optlen = sizeof(opt); + int s = socket.get(); + + if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) { + // Connect failure. + VLOG(1) << "Socket error while connecting"; + return Failure("Socket error while connecting"); + } + + return socket; +} + +} // namespace internal { + + +Future<Socket> Socket::Impl::connect(const Node& node) +{ + sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = PF_INET; + addr.sin_port = htons(node.port); + addr.sin_addr.s_addr = node.ip; + + if (::connect(get(), (sockaddr*) &addr, sizeof(addr)) < 0) { + if (errno != EINPROGRESS) { + return Failure(ErrnoError("Failed to connect socket")); + } + + return io::poll(get(), io::WRITE) + .then(lambda::bind(&internal::connect, Socket(shared_from_this()))); + } + + return Socket(shared_from_this()); +} + + SocketManager::SocketManager() { synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE; @@ -1503,6 +1545,25 @@ Socket SocketManager::accepted(int s) } +namespace internal { + +void link_connect(const Future<Socket>& socket) +{ + if (socket.isDiscarded() || socket.isFailed()) { + if (socket.isFailed()) { + VLOG(1) << "Failed to link, connect: " << socket.failure(); + } + socket_manager->close(socket.get()); + return; + } + + io::poll(socket.get(), io::READ) + .onAny(lambda::bind(&ignore_data, new Socket(socket.get()), socket.get())); +} + +} // namespace internal { + + void SocketManager::link(ProcessBase* process, const UPID& to) { // TODO(benh): The semantics we want to support for link are such @@ -1516,58 +1577,22 @@ void SocketManager::link(ProcessBase* process, const UPID& to) CHECK(process != NULL); synchronized (this) { + links[to].insert(process); + // Check if node is remote and there isn't a persistant link. if (to.node != __node__ && persists.count(to.node) == 0) { // Okay, no link, let's create a socket. - Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0); - if (socket.isError()) { - LOG(FATAL) << "Failed to link, socket: " << socket.error(); - } - - int s = socket.get(); - - Try<Nothing> nonblock = os::nonblock(s); - if (nonblock.isError()) { - LOG(FATAL) << "Failed to link, nonblock: " << nonblock.error(); - } - - Try<Nothing> cloexec = os::cloexec(s); - if (cloexec.isError()) { - LOG(FATAL) << "Failed to link, cloexec: " << cloexec.error(); - } + Socket socket; + int s = socket; - sockets[s] = Socket(s); + sockets[s] = socket; nodes[s] = to.node; persists[to.node] = s; - // Try and connect to the node using this socket in order to - // start reading data. Note that we don't expect to receive - // anything other than HTTP '202 Accepted' responses which we - // anyway ignore. We do, however, want to react when it gets - // closed so we can generate appropriate lost events (since this - // is a 'link'). - sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = PF_INET; - addr.sin_port = htons(to.node.port); - addr.sin_addr.s_addr = to.node.ip; - - if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) { - if (errno != EINPROGRESS) { - PLOG(FATAL) << "Failed to link, connect"; - } - - // Wait for socket to be connected. - io::poll(s, io::WRITE) - .onAny(lambda::bind(&receiving_connect, new Socket(sockets[s]), s)); - } else { - io::poll(s, io::READ) - .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s)); - } + socket.connect(to.node) + .onAny(lambda::bind(&internal::link_connect, lambda::_1)); } - - links[to].insert(process); } } @@ -1653,11 +1678,40 @@ void SocketManager::send( } +namespace internal { + +void send_connect(const Future<Socket>& socket, Message* message) +{ + if (socket.isDiscarded() || socket.isFailed()) { + if (socket.isFailed()) { + VLOG(1) << "Failed to send, connect: " << socket.failure(); + } + socket_manager->close(socket.get()); + delete message; + return; + } + + Encoder* encoder = new MessageEncoder(socket.get(), message); + + // Read and ignore data from this socket. Note that we don't + // expect to receive anything other than HTTP '202 Accepted' + // responses which we just ignore. + io::poll(socket.get(), io::READ) + .onAny(lambda::bind(&ignore_data, new Socket(socket.get()), socket.get())); + + // Start polling in order to send data. + io::poll(socket.get(), io::WRITE) + .onAny(lambda::bind(&send_data, encoder)); +} + +} // namespace internal { + + void SocketManager::send(Message* message) { CHECK(message != NULL); - Node node(message->to.node); + const Node& node = message->to.node; synchronized (this) { // Check if there is already a socket. @@ -1670,24 +1724,10 @@ void SocketManager::send(Message* message) } else { // No peristent or temporary socket to the node currently // exists, so we create a temporary one. - Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0); - if (socket.isError()) { - LOG(FATAL) << "Failed to send, socket: " << socket.error(); - } - - int s = socket.get(); + Socket socket; + int s = socket; - Try<Nothing> nonblock = os::nonblock(s); - if (nonblock.isError()) { - LOG(FATAL) << "Failed to send, nonblock: " << nonblock.error(); - } - - Try<Nothing> cloexec = os::cloexec(s); - if (cloexec.isError()) { - LOG(FATAL) << "Failed to send, cloexec: " << cloexec.error(); - } - - sockets[s] = Socket(s); + sockets[s] = socket; nodes[s] = node; temps[node] = s; @@ -1696,35 +1736,8 @@ void SocketManager::send(Message* message) // Initialize the outgoing queue. outgoing[s]; - // Create a message encoder to handle sending this message. - Encoder* encoder = new MessageEncoder(sockets[s], message); - - // Read and ignore data from this socket. Note that we don't - // expect to receive anything other than HTTP '202 Accepted' - // responses which we just ignore. - io::poll(s, io::READ) - .onAny(lambda::bind(&ignore_data, new Socket(sockets[s]), s)); - - // Try and connect to the node using this socket. - sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = PF_INET; - addr.sin_port = htons(node.port); - addr.sin_addr.s_addr = node.ip; - - if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) { - if (errno != EINPROGRESS) { - PLOG(FATAL) << "Failed to send, connect"; - } - - // Start polling in order to wait for being connected. - io::poll(s, io::WRITE) - .onAny(lambda::bind(&sending_connect, encoder)); - } else { - // Start polling in order to send data. - io::poll(s, io::WRITE) - .onAny(lambda::bind(&send_data, encoder)); - } + socket.connect(node) + .onAny(lambda::bind(&internal::send_connect, lambda::_1, message)); } } }
