Repository: mesos Updated Branches: refs/heads/master 49d4553a0 -> 83d90df12
Created accept, bind, connect and getsockname wrappers Created accept, bind, connect and getsockname wrappers in socket.hpp for different protocol families Review: https://reviews.apache.org/r/28716 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3f53f7f5 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3f53f7f5 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3f53f7f5 Branch: refs/heads/master Commit: 3f53f7f50dba65c2eabec38d581acc31e2957548 Parents: 49d4553 Author: Evelina Dumitrescu <[email protected]> Authored: Fri Dec 5 13:02:40 2014 -0800 Committer: Dominic Hamon <[email protected]> Committed: Fri Dec 5 13:03:04 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/socket.hpp | 73 +++++++++++++++++++- 3rdparty/libprocess/src/http.cpp | 12 ++-- 3rdparty/libprocess/src/process.cpp | 47 ++++--------- 3rdparty/libprocess/src/tests/http_tests.cpp | 13 ++-- 3rdparty/libprocess/src/tests/process_tests.cpp | 38 ++++------ 5 files changed, 109 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3f53f7f5/3rdparty/libprocess/include/process/socket.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp index ab080c1..e237658 100644 --- a/3rdparty/libprocess/include/process/socket.hpp +++ b/3rdparty/libprocess/include/process/socket.hpp @@ -37,6 +37,75 @@ inline Try<int> socket(int family, int type, int protocol) return s; } +// accept, bind, connect, getsockname wrappers for different protocol families +inline Try<int> accept(int s, sa_family_t family) +{ + switch (family) { + case AF_INET: { + sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + + socklen_t addrlen = sizeof(addr); + + int rc = ::accept(s, (sockaddr*) &addr, &addrlen); + if (rc < 0) + return ErrnoError("Failed to accept"); + + return rc; + } + default: + return Error("Unsupported family type: " + stringify(family)); + } +} + +inline Try<int> bind(int s, const Node& node) +{ + sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = node.ip; + addr.sin_port = htons(node.port); + + int rc = ::bind(s, (sockaddr*) &addr, sizeof(addr)); + if (rc < 0) + return ErrnoError("Failed to bind on " + stringify(node)); + + return rc; +} + +inline Try<int> connect(int s, const Node& node) +{ + sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(node.port); + addr.sin_addr.s_addr = node.ip; + + int rc = ::connect(s, (sockaddr*) &addr, sizeof(addr)); + if (rc < 0) + return ErrnoError("Failed to connect to " + stringify(node)); + + return rc; +} + +inline Try<Node> getsockname(int s, sa_family_t family) +{ + switch (family) { + case AF_INET: { + sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + + socklen_t addrlen = sizeof(addr); + + if(::getsockname(s, (sockaddr*) &addr, &addrlen) < 0) + return ErrnoError("Failed to getsockname"); + + return Node(addr.sin_addr.s_addr, ntohs(addr.sin_port)); + } + default: + return Error("Unsupported family type: " + stringify(family)); + } +} // An abstraction around a socket (file descriptor) that provides // reference counting such that the socket is only closed (and thus, @@ -96,13 +165,13 @@ public: // Supported in Linux >= 2.6.27. #if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC) Try<int> fd = - process::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); + socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); if (fd.isError()) { ABORT("Failed to create socket: " + fd.error()); } #else - Try<int> fd = process::socket(AF_INET, SOCK_STREAM, 0); + Try<int> fd = socket(AF_INET, SOCK_STREAM, 0); if (fd.isError()) { ABORT("Failed to create socket: " + fd.error()); } http://git-wip-us.apache.org/repos/asf/mesos/blob/3f53f7f5/3rdparty/libprocess/src/http.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp index b00f333..789be4a 100644 --- a/3rdparty/libprocess/src/http.cpp +++ b/3rdparty/libprocess/src/http.cpp @@ -10,6 +10,7 @@ #include <process/future.hpp> #include <process/http.hpp> #include <process/io.hpp> +#include <process/socket.hpp> #include <stout/lambda.hpp> #include <stout/nothing.hpp> @@ -81,15 +82,10 @@ Future<Response> request( const string host = stringify(upid.node); - sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_port = htons(upid.node.port); - addr.sin_addr.s_addr = upid.node.ip; - - if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) { + Try<int> tryConnect = process::connect(s, upid.node); + if (tryConnect.isError()) { os::close(s); - return Failure(ErrnoError("Failed to connect to '" + host + "'")); + return Failure(tryConnect.error()); } std::ostringstream out; http://git-wip-us.apache.org/repos/asf/mesos/blob/3f53f7f5/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index 4db7d56..611889b 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -1272,19 +1272,14 @@ Future<Nothing> connect(const Socket& socket) Future<Nothing> Socket::Impl::connect(const Node& node) { - sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = PF_INET; - addr.sin_port = htons(node.port); - addr.sin_addr.s_addr = node.ip; - - if (::connect(get(), (sockaddr*) &addr, sizeof(addr)) < 0) { - if (errno != EINPROGRESS) { - return Failure(ErrnoError("Failed to connect socket")); + Try<int> tryConnect = process::connect(get(), node); + if (tryConnect.isError()) { + if (errno == EINPROGRESS) { + return io::poll(get(), io::WRITE) + .then(lambda::bind(&internal::connect, Socket(shared_from_this()))); } - return io::poll(get(), io::WRITE) - .then(lambda::bind(&internal::connect, Socket(shared_from_this()))); + return Failure(tryConnect.error()); } return Nothing(); @@ -1389,24 +1384,13 @@ Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size) Try<Node> Socket::Impl::bind(const Node& node) { - sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = PF_INET; - addr.sin_addr.s_addr = node.ip; - addr.sin_port = htons(node.port); - - if (::bind(get(), (sockaddr*) &addr, sizeof(addr)) < 0) { - return Error("Failed to bind: " + string(inet_ntoa(addr.sin_addr)) + - ":" + stringify(node.port)); + Try<int> tryBind = process::bind(get(), node); + if (tryBind.isError()) { + return Error(tryBind.error()); } // Lookup and store assigned ip and assigned port. - socklen_t addrlen = sizeof(addr); - if (getsockname(get(), (sockaddr*) &addr, &addrlen) < 0) { - return ErrnoError("Failed to bind, getsockname"); - } - - return Node(addr.sin_addr.s_addr, ntohs(addr.sin_port)); + return process::getsockname(get(), AF_INET); } @@ -1423,15 +1407,12 @@ namespace internal { Future<Socket> accept(int fd) { - sockaddr_in addr; - socklen_t addrlen = sizeof(addr); - - int s = ::accept(fd, (sockaddr*) &addr, &addrlen); - - if (s < 0) { - return Failure(ErrnoError("Failed to accept")); + Try<int> tryAccept = process::accept(fd, AF_INET); + if (tryAccept.isError()) { + return Failure(tryAccept.error()); } + int s = tryAccept.get(); Try<Nothing> nonblock = os::nonblock(s); if (nonblock.isError()) { LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: " http://git-wip-us.apache.org/repos/asf/mesos/blob/3f53f7f5/3rdparty/libprocess/src/tests/http_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp index a90e65f..7286682 100644 --- a/3rdparty/libprocess/src/tests/http_tests.cpp +++ b/3rdparty/libprocess/src/tests/http_tests.cpp @@ -12,6 +12,7 @@ #include <process/gtest.hpp> #include <process/http.hpp> #include <process/io.hpp> +#include <process/socket.hpp> #include <stout/base64.hpp> #include <stout/gtest.hpp> @@ -113,17 +114,13 @@ TEST(HTTP, Endpoints) spawn(process); // First hit '/body' (using explicit sockets and HTTP/1.0). - int s = ::socket(AF_INET, SOCK_STREAM, IPPROTO_IP); + Try<int> trySocket = process::socket(AF_INET, SOCK_STREAM, IPPROTO_IP); - ASSERT_LE(0, s); + ASSERT_TRUE(trySocket.isSome()); - sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = PF_INET; - addr.sin_port = htons(process.self().node.port); - addr.sin_addr.s_addr = process.self().node.ip; + int s = trySocket.get(); - ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr))); + ASSERT_TRUE(process::connect(s, process.self().node).isSome()); std::ostringstream out; out << "GET /" << process.self().id << "/body" http://git-wip-us.apache.org/repos/asf/mesos/blob/3f53f7f5/3rdparty/libprocess/src/tests/process_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp index dec62e8..03917c4 100644 --- a/3rdparty/libprocess/src/tests/process_tests.cpp +++ b/3rdparty/libprocess/src/tests/process_tests.cpp @@ -23,6 +23,7 @@ #include <process/limiter.hpp> #include <process/process.hpp> #include <process/run.hpp> +#include <process/socket.hpp> #include <process/time.hpp> #include <stout/duration.hpp> @@ -1418,17 +1419,13 @@ TEST(Process, remote) EXPECT_CALL(process, handler(_, _)) .WillOnce(FutureSatisfy(&handler)); - int s = ::socket(AF_INET, SOCK_STREAM, IPPROTO_IP); + Try<int> trySocket = process::socket(AF_INET, SOCK_STREAM, IPPROTO_IP); - ASSERT_LE(0, s); + ASSERT_TRUE(trySocket.isSome()); - sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = PF_INET; - addr.sin_port = htons(process.self().node.port); - addr.sin_addr.s_addr = process.self().node.ip; + int s = trySocket.get(); - ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr))); + ASSERT_TRUE(process::connect(s, process.self().node).isSome()); Message message; message.name = "handler"; @@ -1488,24 +1485,19 @@ TEST(Process, http2) spawn(process); // Create a receiving socket so we can get messages back. - int s = ::socket(AF_INET, SOCK_STREAM, IPPROTO_IP); - ASSERT_LE(0, s); + Try<int> trySocket = process::socket(AF_INET, SOCK_STREAM, IPPROTO_IP); + ASSERT_TRUE(trySocket.isSome()); - // Set up socket. - sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = PF_INET; - addr.sin_addr.s_addr = INADDR_ANY; - addr.sin_port = 0; + int s = trySocket.get(); - ASSERT_EQ(0, ::bind(s, (sockaddr*) &addr, sizeof(addr))); + ASSERT_TRUE(process::bind(s, Node()).isSome()); // Create a UPID for 'Libprocess-From' based on the IP and port we // got assigned. - socklen_t addrlen = sizeof(addr); - ASSERT_EQ(0, getsockname(s, (sockaddr*) &addr, &addrlen)); + Try<Node> node = process::getsockname(s, AF_INET); + ASSERT_TRUE(node.isSome()); - UPID from("", addr.sin_addr.s_addr, ntohs(addr.sin_port)); + UPID from("", node.get()); ASSERT_EQ(0, listen(s, 1)); @@ -1535,10 +1527,10 @@ TEST(Process, http2) post(process.self(), from, name); // Accept the incoming connection. - memset(&addr, 0, sizeof(addr)); - addrlen = sizeof(addr); + Try<int> tryAccept = process::accept(s, AF_INET); + ASSERT_TRUE(tryAccept.isSome()); - int c = ::accept(s, (sockaddr*) &addr, &addrlen); + int c = tryAccept.get(); ASSERT_LT(0, c); const string data = "POST /" + name + " HTTP/1.1";
