Repository: mesos
Updated Branches:
  refs/heads/master 49d4553a0 -> 83d90df12


Created accept, bind, connect and getsockname wrappers

Created accept, bind, connect and getsockname wrappers in socket.hpp for 
different protocol families

Review: https://reviews.apache.org/r/28716


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3f53f7f5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3f53f7f5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3f53f7f5

Branch: refs/heads/master
Commit: 3f53f7f50dba65c2eabec38d581acc31e2957548
Parents: 49d4553
Author: Evelina Dumitrescu <[email protected]>
Authored: Fri Dec 5 13:02:40 2014 -0800
Committer: Dominic Hamon <[email protected]>
Committed: Fri Dec 5 13:03:04 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/socket.hpp  | 73 +++++++++++++++++++-
 3rdparty/libprocess/src/http.cpp                | 12 ++--
 3rdparty/libprocess/src/process.cpp             | 47 ++++---------
 3rdparty/libprocess/src/tests/http_tests.cpp    | 13 ++--
 3rdparty/libprocess/src/tests/process_tests.cpp | 38 ++++------
 5 files changed, 109 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3f53f7f5/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp 
b/3rdparty/libprocess/include/process/socket.hpp
index ab080c1..e237658 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -37,6 +37,75 @@ inline Try<int> socket(int family, int type, int protocol)
   return s;
 }
 
+// accept, bind, connect, getsockname wrappers for different protocol families
+inline Try<int> accept(int s, sa_family_t family)
+{
+  switch (family) {
+    case AF_INET: {
+      sockaddr_in addr;
+      memset(&addr, 0, sizeof(addr));
+
+      socklen_t addrlen = sizeof(addr);
+
+      int rc = ::accept(s, (sockaddr*) &addr, &addrlen);
+      if (rc < 0)
+         return ErrnoError("Failed to accept");
+
+      return rc;
+    }
+    default:
+      return Error("Unsupported family type: " + stringify(family));
+  }
+}
+
+inline Try<int> bind(int s, const Node& node)
+{
+  sockaddr_in addr;
+  memset(&addr, 0, sizeof(addr));
+  addr.sin_family = AF_INET;
+  addr.sin_addr.s_addr = node.ip;
+  addr.sin_port = htons(node.port);
+
+  int rc = ::bind(s, (sockaddr*) &addr, sizeof(addr));
+  if (rc < 0)
+    return ErrnoError("Failed to bind on " + stringify(node));
+
+  return rc;
+}
+
+inline Try<int> connect(int s, const Node& node)
+{
+  sockaddr_in addr;
+  memset(&addr, 0, sizeof(addr));
+  addr.sin_family = AF_INET;
+  addr.sin_port = htons(node.port);
+  addr.sin_addr.s_addr = node.ip;
+
+  int rc = ::connect(s, (sockaddr*) &addr, sizeof(addr));
+  if (rc < 0)
+    return ErrnoError("Failed to connect to " + stringify(node));
+
+  return rc;
+}
+
+inline Try<Node> getsockname(int s, sa_family_t family)
+{
+  switch (family) {
+    case AF_INET: {
+      sockaddr_in addr;
+      memset(&addr, 0, sizeof(addr));
+
+      socklen_t addrlen = sizeof(addr);
+
+      if(::getsockname(s, (sockaddr*) &addr, &addrlen) < 0)
+        return ErrnoError("Failed to getsockname");
+
+      return Node(addr.sin_addr.s_addr, ntohs(addr.sin_port));
+    }
+    default:
+      return Error("Unsupported family type: " + stringify(family));
+  }
+}
 
 // An abstraction around a socket (file descriptor) that provides
 // reference counting such that the socket is only closed (and thus,
@@ -96,13 +165,13 @@ public:
       // Supported in Linux >= 2.6.27.
 #if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
       Try<int> fd =
-        process::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 
0);
+        socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
 
       if (fd.isError()) {
         ABORT("Failed to create socket: " + fd.error());
       }
 #else
-      Try<int> fd = process::socket(AF_INET, SOCK_STREAM, 0);
+      Try<int> fd = socket(AF_INET, SOCK_STREAM, 0);
       if (fd.isError()) {
         ABORT("Failed to create socket: " + fd.error());
       }

http://git-wip-us.apache.org/repos/asf/mesos/blob/3f53f7f5/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index b00f333..789be4a 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -10,6 +10,7 @@
 #include <process/future.hpp>
 #include <process/http.hpp>
 #include <process/io.hpp>
+#include <process/socket.hpp>
 
 #include <stout/lambda.hpp>
 #include <stout/nothing.hpp>
@@ -81,15 +82,10 @@ Future<Response> request(
 
   const string host = stringify(upid.node);
 
-  sockaddr_in addr;
-  memset(&addr, 0, sizeof(addr));
-  addr.sin_family = AF_INET;
-  addr.sin_port = htons(upid.node.port);
-  addr.sin_addr.s_addr = upid.node.ip;
-
-  if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
+  Try<int> tryConnect = process::connect(s, upid.node);
+  if (tryConnect.isError()) {
     os::close(s);
-    return Failure(ErrnoError("Failed to connect to '" + host + "'"));
+    return Failure(tryConnect.error());
   }
 
   std::ostringstream out;

http://git-wip-us.apache.org/repos/asf/mesos/blob/3f53f7f5/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index 4db7d56..611889b 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1272,19 +1272,14 @@ Future<Nothing> connect(const Socket& socket)
 
 Future<Nothing> Socket::Impl::connect(const Node& node)
 {
-  sockaddr_in addr;
-  memset(&addr, 0, sizeof(addr));
-  addr.sin_family = PF_INET;
-  addr.sin_port = htons(node.port);
-  addr.sin_addr.s_addr = node.ip;
-
-  if (::connect(get(), (sockaddr*) &addr, sizeof(addr)) < 0) {
-    if (errno != EINPROGRESS) {
-      return Failure(ErrnoError("Failed to connect socket"));
+  Try<int> tryConnect = process::connect(get(), node);
+  if (tryConnect.isError()) {
+    if (errno == EINPROGRESS) {
+      return io::poll(get(), io::WRITE)
+        .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
     }
 
-    return io::poll(get(), io::WRITE)
-      .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
+    return Failure(tryConnect.error());
   }
 
   return Nothing();
@@ -1389,24 +1384,13 @@ Future<size_t> Socket::Impl::sendfile(int fd, off_t 
offset, size_t size)
 
 Try<Node> Socket::Impl::bind(const Node& node)
 {
-  sockaddr_in addr;
-  memset(&addr, 0, sizeof(addr));
-  addr.sin_family = PF_INET;
-  addr.sin_addr.s_addr = node.ip;
-  addr.sin_port = htons(node.port);
-
-  if (::bind(get(), (sockaddr*) &addr, sizeof(addr)) < 0) {
-    return Error("Failed to bind: " + string(inet_ntoa(addr.sin_addr)) +
-                 ":" + stringify(node.port));
+  Try<int> tryBind = process::bind(get(), node);
+  if (tryBind.isError()) {
+    return Error(tryBind.error());
   }
 
   // Lookup and store assigned ip and assigned port.
-  socklen_t addrlen = sizeof(addr);
-  if (getsockname(get(), (sockaddr*) &addr, &addrlen) < 0) {
-    return ErrnoError("Failed to bind, getsockname");
-  }
-
-  return Node(addr.sin_addr.s_addr, ntohs(addr.sin_port));
+  return process::getsockname(get(), AF_INET);
 }
 
 
@@ -1423,15 +1407,12 @@ namespace internal {
 
 Future<Socket> accept(int fd)
 {
-  sockaddr_in addr;
-  socklen_t addrlen = sizeof(addr);
-
-  int s = ::accept(fd, (sockaddr*) &addr, &addrlen);
-
-  if (s < 0) {
-    return Failure(ErrnoError("Failed to accept"));
+  Try<int> tryAccept = process::accept(fd, AF_INET);
+  if (tryAccept.isError()) {
+    return Failure(tryAccept.error());
   }
 
+  int s = tryAccept.get();
   Try<Nothing> nonblock = os::nonblock(s);
   if (nonblock.isError()) {
     LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "

http://git-wip-us.apache.org/repos/asf/mesos/blob/3f53f7f5/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp 
b/3rdparty/libprocess/src/tests/http_tests.cpp
index a90e65f..7286682 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -12,6 +12,7 @@
 #include <process/gtest.hpp>
 #include <process/http.hpp>
 #include <process/io.hpp>
+#include <process/socket.hpp>
 
 #include <stout/base64.hpp>
 #include <stout/gtest.hpp>
@@ -113,17 +114,13 @@ TEST(HTTP, Endpoints)
   spawn(process);
 
   // First hit '/body' (using explicit sockets and HTTP/1.0).
-  int s = ::socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
+  Try<int> trySocket = process::socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
 
-  ASSERT_LE(0, s);
+  ASSERT_TRUE(trySocket.isSome());
 
-  sockaddr_in addr;
-  memset(&addr, 0, sizeof(addr));
-  addr.sin_family = PF_INET;
-  addr.sin_port = htons(process.self().node.port);
-  addr.sin_addr.s_addr = process.self().node.ip;
+  int s = trySocket.get();
 
-  ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr)));
+  ASSERT_TRUE(process::connect(s, process.self().node).isSome());
 
   std::ostringstream out;
   out << "GET /" << process.self().id << "/body"

http://git-wip-us.apache.org/repos/asf/mesos/blob/3f53f7f5/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp 
b/3rdparty/libprocess/src/tests/process_tests.cpp
index dec62e8..03917c4 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -23,6 +23,7 @@
 #include <process/limiter.hpp>
 #include <process/process.hpp>
 #include <process/run.hpp>
+#include <process/socket.hpp>
 #include <process/time.hpp>
 
 #include <stout/duration.hpp>
@@ -1418,17 +1419,13 @@ TEST(Process, remote)
   EXPECT_CALL(process, handler(_, _))
     .WillOnce(FutureSatisfy(&handler));
 
-  int s = ::socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
+  Try<int> trySocket = process::socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
 
-  ASSERT_LE(0, s);
+  ASSERT_TRUE(trySocket.isSome());
 
-  sockaddr_in addr;
-  memset(&addr, 0, sizeof(addr));
-  addr.sin_family = PF_INET;
-  addr.sin_port = htons(process.self().node.port);
-  addr.sin_addr.s_addr = process.self().node.ip;
+  int s = trySocket.get();
 
-  ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr)));
+  ASSERT_TRUE(process::connect(s, process.self().node).isSome());
 
   Message message;
   message.name = "handler";
@@ -1488,24 +1485,19 @@ TEST(Process, http2)
   spawn(process);
 
   // Create a receiving socket so we can get messages back.
-  int s = ::socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
-  ASSERT_LE(0, s);
+  Try<int> trySocket = process::socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
+  ASSERT_TRUE(trySocket.isSome());
 
-  // Set up socket.
-  sockaddr_in addr;
-  memset(&addr, 0, sizeof(addr));
-  addr.sin_family = PF_INET;
-  addr.sin_addr.s_addr = INADDR_ANY;
-  addr.sin_port = 0;
+  int s = trySocket.get();
 
-  ASSERT_EQ(0, ::bind(s, (sockaddr*) &addr, sizeof(addr)));
+  ASSERT_TRUE(process::bind(s, Node()).isSome());
 
   // Create a UPID for 'Libprocess-From' based on the IP and port we
   // got assigned.
-  socklen_t addrlen = sizeof(addr);
-  ASSERT_EQ(0, getsockname(s, (sockaddr*) &addr, &addrlen));
+  Try<Node> node = process::getsockname(s, AF_INET);
+  ASSERT_TRUE(node.isSome());
 
-  UPID from("", addr.sin_addr.s_addr, ntohs(addr.sin_port));
+  UPID from("", node.get());
 
   ASSERT_EQ(0, listen(s, 1));
 
@@ -1535,10 +1527,10 @@ TEST(Process, http2)
   post(process.self(), from, name);
 
   // Accept the incoming connection.
-  memset(&addr, 0, sizeof(addr));
-  addrlen = sizeof(addr);
+  Try<int> tryAccept = process::accept(s, AF_INET);
+  ASSERT_TRUE(tryAccept.isSome());
 
-  int c = ::accept(s, (sockaddr*) &addr, &addrlen);
+  int c = tryAccept.get();
   ASSERT_LT(0, c);
 
   const string data = "POST /" + name + " HTTP/1.1";

Reply via email to