Repository: mesos Updated Branches: refs/heads/master 3f693f23a -> 76bfb4930
libprocess: Replaced the ip and port pairs from UPID class and process namespace with Node class. At the moment, the Node class is used to keep a mapping from a socket to the ip & port pair in the process namespace. I want to propose to extend its use by replacing the ip & port fields from the UPID class and process namespace with this type. Review: https://reviews.apache.org/r/27446 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f64562fa Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f64562fa Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f64562fa Branch: refs/heads/master Commit: f64562fa66a272b695560971d0e548d131f42682 Parents: 3f693f2 Author: Evelina Dumitrescu <[email protected]> Authored: Wed Nov 12 12:56:23 2014 -0800 Committer: Dominic Hamon <[email protected]> Committed: Wed Nov 12 12:56:47 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/node.hpp | 28 ++++++- 3rdparty/libprocess/include/process/pid.hpp | 49 +++++------ 3rdparty/libprocess/include/process/process.hpp | 11 +-- 3rdparty/libprocess/src/http.cpp | 13 +-- 3rdparty/libprocess/src/pid.cpp | 28 +++---- 3rdparty/libprocess/src/process.cpp | 88 ++++++++------------ 3rdparty/libprocess/src/tests/benchmarks.cpp | 4 +- 3rdparty/libprocess/src/tests/http_tests.cpp | 4 +- 3rdparty/libprocess/src/tests/metrics_tests.cpp | 6 +- 3rdparty/libprocess/src/tests/process_tests.cpp | 6 +- 10 files changed, 103 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/node.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/node.hpp b/3rdparty/libprocess/include/process/node.hpp index 7a96894..24132a5 100644 --- a/3rdparty/libprocess/include/process/node.hpp +++ b/3rdparty/libprocess/include/process/node.hpp @@ -1,17 +1,22 @@ #ifndef __PROCESS_NODE_HPP__ #define __PROCESS_NODE_HPP__ +#include <arpa/inet.h> #include <unistd.h> #include <sstream> +#include <glog/logging.h> + namespace process { // Represents a remote "node" (encapsulates IP address and port). class Node { public: - Node(uint32_t _ip = 0, uint16_t _port = 0) : ip(_ip), port(_port) {} + Node() : ip(0), port(0) {} + + Node(uint32_t _ip, uint16_t _port) : ip(_ip), port(_port) {} bool operator < (const Node& that) const { @@ -22,16 +27,31 @@ public: } } - std::ostream& operator << (std::ostream& stream) const + bool operator == (const Node& that) const + { + return (ip == that.ip && port == that.port); + } + + bool operator != (const Node& that) const { - stream << ip << ":" << port; - return stream; + return !(*this == that); } uint32_t ip; uint16_t port; }; +inline std::ostream& operator << (std::ostream & stream, const Node & node) +{ + char ip[INET_ADDRSTRLEN]; + if (inet_ntop(AF_INET, (in_addr*) &node.ip, ip, INET_ADDRSTRLEN) == NULL) { + PLOG(FATAL) << "Failed to get human-readable IP address for '" + << node.ip << "'"; + } + stream << ip << ":" << node.port; + return stream; +} + } // namespace process { #endif // __PROCESS_NODE_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/pid.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/pid.hpp b/3rdparty/libprocess/include/process/pid.hpp index 2345322..7dccf29 100644 --- a/3rdparty/libprocess/include/process/pid.hpp +++ b/3rdparty/libprocess/include/process/pid.hpp @@ -7,6 +7,7 @@ #include <sstream> #include <string> +#include <process/node.hpp> namespace process { @@ -16,17 +17,22 @@ class ProcessBase; struct UPID { - UPID() - : ip(0), port(0) {} + UPID() = default; UPID(const UPID& that) - : id(that.id), ip(that.ip), port(that.port) {} + : id(that.id), node(that.node) {} UPID(const char* id_, uint32_t ip_, uint16_t port_) - : id(id_), ip(ip_), port(port_) {} + : id(id_), node(ip_, port_) {} + + UPID(const char* id_, const Node& node_) + : id(id_), node(node_) {} UPID(const std::string& id_, uint32_t ip_, uint16_t port_) - : id(id_), ip(ip_), port(port_) {} + : id(id_), node(ip_, port_) {} + + UPID(const std::string& id_, const Node& node_) + : id(id_), node(node_) {} /*implicit*/ UPID(const char* s); @@ -38,47 +44,33 @@ struct UPID operator bool () const { - return id != "" && ip != 0 && port != 0; + return id != "" && node.ip != 0 && node.port != 0; } bool operator ! () const // NOLINT(whitespace/operators) { - return id == "" && ip == 0 && port == 0; + return id == "" && node.ip == 0 && node.port == 0; } bool operator < (const UPID& that) const { - if (this != &that) { - if (ip == that.ip && port == that.port) - return id < that.id; - else if (ip == that.ip && port != that.port) - return port < that.port; - else - return ip < that.ip; - } - - return false; + if (node == that.node) + return id < that.id; + else return node < that.node; } bool operator == (const UPID& that) const { - if (this != &that) { - return (id == that.id && - ip == that.ip && - port == that.port); - } - - return true; + return (id == that.id && node == that.node); } bool operator != (const UPID& that) const { - return !(this->operator == (that)); + return !(*this == that); } std::string id; - uint32_t ip; - uint16_t port; + Node node; }; @@ -99,8 +91,7 @@ struct PID : UPID (void)base; // Eliminate unused base warning. PID<Base> pid; pid.id = id; - pid.ip = ip; - pid.port = port; + pid.node = node; return pid; } }; http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/include/process/process.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp index 81a1f7a..cb3e0a6 100644 --- a/3rdparty/libprocess/include/process/process.hpp +++ b/3rdparty/libprocess/include/process/process.hpp @@ -276,16 +276,9 @@ void finalize(); /** - * Returns the IP address associated with this instance of the - * library. + * Returns the node associated with this instance of the library. */ -uint32_t ip(); - - -/** - * Returns the port associated with this instance of the library. - */ -uint16_t port(); +Node node(); /** http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/http.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp index 4ef00d1..b00f333 100644 --- a/3rdparty/libprocess/src/http.cpp +++ b/3rdparty/libprocess/src/http.cpp @@ -79,20 +79,13 @@ Future<Response> request( return Failure("Failed to cloexec: " + cloexec.error()); } - // We use inet_ntop since inet_ntoa is not thread-safe! - char ip[INET_ADDRSTRLEN]; - if (inet_ntop(AF_INET, (in_addr*) &upid.ip, ip, INET_ADDRSTRLEN) == NULL) { - return Failure(ErrnoError("Failed to get human-readable IP address for '" + - stringify(upid.ip) + "'")); - } - - const string host = string(ip) + ":" + stringify(upid.port); + const string host = stringify(upid.node); sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; - addr.sin_port = htons(upid.port); - addr.sin_addr.s_addr = upid.ip; + addr.sin_port = htons(upid.node.port); + addr.sin_addr.s_addr = upid.node.ip; if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) { os::close(s); http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/pid.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/pid.cpp b/3rdparty/libprocess/src/pid.cpp index 20ff25c..a2c620e 100644 --- a/3rdparty/libprocess/src/pid.cpp +++ b/3rdparty/libprocess/src/pid.cpp @@ -47,8 +47,7 @@ UPID::UPID(const std::string& s) UPID::UPID(const ProcessBase& process) { id = process.self().id; - ip = process.self().ip; - port = process.self().port; + node = process.self().node; } @@ -62,12 +61,7 @@ UPID::operator std::string() const ostream& operator << (ostream& stream, const UPID& pid) { - // Call inet_ntop since inet_ntoa is not thread-safe! - char ip[INET_ADDRSTRLEN]; - if (inet_ntop(AF_INET, (in_addr *) &pid.ip, ip, INET_ADDRSTRLEN) == NULL) - memset(ip, 0, INET_ADDRSTRLEN); - - stream << pid.id << "@" << ip << ":" << pid.port; + stream << pid.id << "@" << pid.node; return stream; } @@ -75,8 +69,8 @@ ostream& operator << (ostream& stream, const UPID& pid) istream& operator >> (istream& stream, UPID& pid) { pid.id = ""; - pid.ip = 0; - pid.port = 0; + pid.node.ip = 0; + pid.node.port = 0; string str; if (!(stream >> str)) { @@ -93,8 +87,7 @@ istream& operator >> (istream& stream, UPID& pid) string id; string host; - uint32_t ip; - uint16_t port; + Node node; size_t index = str.find('@'); @@ -149,20 +142,19 @@ istream& operator >> (istream& stream, UPID& pid) return stream; } - ip = *((uint32_t*) hep->h_addr_list[0]); + node.ip = *((uint32_t*) hep->h_addr_list[0]); delete[] temp; str = str.substr(index + 1); - if (sscanf(str.c_str(), "%hu", &port) != 1) { + if (sscanf(str.c_str(), "%hu", &node.port) != 1) { stream.setstate(std::ios_base::badbit); return stream; } pid.id = id; - pid.ip = ip; - pid.port = port; + pid.node = node; return stream; } @@ -172,8 +164,8 @@ size_t hash_value(const UPID& pid) { size_t seed = 0; boost::hash_combine(seed, pid.id); - boost::hash_combine(seed, pid.ip); - boost::hash_combine(seed, pid.port); + boost::hash_combine(seed, pid.node.ip); + boost::hash_combine(seed, pid.node.port); return seed; } http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index 85fb995..a34b870 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -436,11 +436,8 @@ static uint32_t __id__ = 0; // Local server socket. static int __s__ = -1; -// Local IP address. -static uint32_t __ip__ = 0; - -// Local port. -static uint16_t __port__ = 0; +// Local node. +static Node __node__; // Active SocketManager (eventually will probably be thread-local). static SocketManager* socket_manager = NULL; @@ -709,7 +706,7 @@ static Message* encode(const UPID& from, static void transport(Message* message, ProcessBase* sender = NULL) { - if (message->to.ip == __ip__ && message->to.port == __port__) { + if (message->to.node == __node__) { // Local message. process_manager->deliver(message->to, new MessageEvent(message), sender); } else { @@ -766,7 +763,7 @@ static Message* parse(Request* request) return NULL; } - const UPID to(decode.get(), __ip__, __port__); + const UPID to(decode.get(), __node__); // And now determine 'name'. index = index != string::npos ? index + 2: request->path.size(); @@ -1472,15 +1469,15 @@ void initialize(const string& delegate) } } - __ip__ = 0; - __port__ = 0; + __node__.ip = 0; + __node__.port = 0; char* value; // Check environment for ip. value = getenv("LIBPROCESS_IP"); if (value != NULL) { - int result = inet_pton(AF_INET, value, &__ip__); + int result = inet_pton(AF_INET, value, &__node__.ip); if (result == 0) { LOG(FATAL) << "LIBPROCESS_IP=" << value << " was unparseable"; } else if (result < 0) { @@ -1495,7 +1492,7 @@ void initialize(const string& delegate) if (result < 0 || result > USHRT_MAX) { LOG(FATAL) << "LIBPROCESS_PORT=" << value << " is not a valid port"; } - __port__ = result; + __node__.port = result; } // Create a "server" socket for communicating with other nodes. @@ -1525,12 +1522,11 @@ void initialize(const string& delegate) sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = PF_INET; - addr.sin_addr.s_addr = __ip__; - addr.sin_port = htons(__port__); + addr.sin_addr.s_addr = __node__.ip; + addr.sin_port = htons(__node__.port); if (bind(__s__, (sockaddr*) &addr, sizeof(addr)) < 0) { - PLOG(FATAL) << "Failed to initialize, bind " - << inet_ntoa(addr.sin_addr) << ":" << __port__; + PLOG(FATAL) << "Failed to initialize, bind " << __node__; } // Lookup and store assigned ip and assigned port. @@ -1539,14 +1535,14 @@ void initialize(const string& delegate) PLOG(FATAL) << "Failed to initialize, getsockname"; } - __ip__ = addr.sin_addr.s_addr; - __port__ = ntohs(addr.sin_port); + __node__.ip = addr.sin_addr.s_addr; + __node__.port = ntohs(addr.sin_port); // Lookup hostname if missing ip or if ip is 127.0.0.1 in case we // actually have a valid external ip address. Note that we need only // one ip address, so that other processes can send and receive and // don't get confused as to whom they are sending to. - if (__ip__ == 0 || __ip__ == 2130706433) { + if (__node__.ip == 0 || __node__.ip == 2130706433) { char hostname[512]; if (gethostname(hostname, sizeof(hostname)) < 0) { @@ -1562,7 +1558,7 @@ void initialize(const string& delegate) << hstrerror(h_errno); } - __ip__ = *((uint32_t *) he->h_addr_list[0]); + __node__.ip = *((uint32_t *) he->h_addr_list[0]); } if (listen(__s__, 500000) < 0) { @@ -1663,13 +1659,8 @@ void initialize(const string& delegate) new Route("/__processes__", None(), __processes__); - char temp[INET_ADDRSTRLEN]; - if (inet_ntop(AF_INET, (in_addr*) &__ip__, temp, INET_ADDRSTRLEN) == NULL) { - PLOG(FATAL) << "Failed to initialize, inet_ntop"; - } - - VLOG(1) << "libprocess is initialized on " << temp << ":" << __port__ - << " for " << cpus << " cpus"; + VLOG(1) << "libprocess is initialized on " << node << " for " << cpus + << " cpus"; } @@ -1679,17 +1670,10 @@ void finalize() } -uint32_t ip() -{ - process::initialize(); - return __ip__; -} - - -uint16_t port() +Node node() { process::initialize(); - return __port__; + return __node__; } @@ -1968,12 +1952,9 @@ void SocketManager::link(ProcessBase* process, const UPID& to) CHECK(process != NULL); - Node node(to.ip, to.port); - synchronized (this) { // Check if node is remote and there isn't a persistant link. - if ((node.ip != __ip__ || node.port != __port__) - && persists.count(node) == 0) { + if (to.node != __node__ && persists.count(to.node) == 0) { // Okay, no link, let's create a socket. Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0); if (socket.isError()) { @@ -1993,9 +1974,9 @@ void SocketManager::link(ProcessBase* process, const UPID& to) } sockets[s] = Socket(s); - nodes[s] = node; + nodes[s] = to.node; - persists[node] = s; + persists[to.node] = s; // Allocate and initialize a watcher for reading data from this // socket. Note that we don't expect to receive anything other @@ -2009,8 +1990,8 @@ void SocketManager::link(ProcessBase* process, const UPID& to) sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = PF_INET; - addr.sin_port = htons(to.port); - addr.sin_addr.s_addr = to.ip; + addr.sin_port = htons(to.node.port); + addr.sin_addr.s_addr = to.node.ip; if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) { if (errno != EINPROGRESS) { @@ -2130,7 +2111,7 @@ void SocketManager::send(Message* message) { CHECK(message != NULL); - Node node(message->to.ip, message->to.port); + Node node(message->to.node); synchronized (this) { // Check if there is already a socket. @@ -2190,8 +2171,8 @@ void SocketManager::send(Message* message) sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = PF_INET; - addr.sin_port = htons(message->to.port); - addr.sin_addr.s_addr = message->to.ip; + addr.sin_port = htons(node.port); + addr.sin_addr.s_addr = node.ip; if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) { if (errno != EINPROGRESS) { @@ -2380,7 +2361,7 @@ void SocketManager::exited(const Node& node) list<UPID> removed; // Look up all linked processes. foreachpair (const UPID& linkee, set<ProcessBase*>& processes, links) { - if (linkee.ip == node.ip && linkee.port == node.port) { + if (linkee.node == node) { foreach (ProcessBase* linker, processes) { linker->enqueue(new ExitedEvent(linkee)); } @@ -2461,7 +2442,7 @@ ProcessManager::~ProcessManager() ProcessReference ProcessManager::use(const UPID& pid) { - if (pid.ip == __ip__ && pid.port == __port__) { + if (pid.node == __node__) { synchronized (processes) { if (processes.count(pid.id) > 0) { // Note that the ProcessReference constructor _must_ get @@ -2567,12 +2548,12 @@ bool ProcessManager::handle( if (tokens.size() == 0 && delegate != "") { request->path = "/" + delegate; - receiver = use(UPID(delegate, __ip__, __port__)); + receiver = use(UPID(delegate, __node__)); } else if (tokens.size() > 0) { // Decode possible percent-encoded path. Try<string> decode = http::decode(tokens[0]); if (!decode.isError()) { - receiver = use(UPID(decode.get(), __ip__, __port__)); + receiver = use(UPID(decode.get(), __node__)); } else { VLOG(1) << "Failed to decode URL path: " << decode.error(); } @@ -2581,7 +2562,7 @@ bool ProcessManager::handle( if (!receiver && delegate != "") { // Try and delegate the request. request->path = "/" + delegate + request->path; - receiver = use(UPID(delegate, __ip__, __port__)); + receiver = use(UPID(delegate, __node__)); } if (receiver) { @@ -2900,7 +2881,7 @@ void ProcessManager::cleanup(ProcessBase* process) void ProcessManager::link(ProcessBase* process, const UPID& to) { // Check if the pid is local. - if (!(to.ip == __ip__ && to.port == __port__)) { + if (to.node != __node__) { socket_manager->link(process, to); } else { // Since the pid is local we want to get a reference to it's @@ -3249,8 +3230,7 @@ ProcessBase::ProcessBase(const string& id) refs = 0; pid.id = id != "" ? id : ID::generate(); - pid.ip = __ip__; - pid.port = __port__; + pid.node = __node__; // If using a manual clock, try and set current time of process // using happens before relationship between creator and createe! http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/benchmarks.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp index 3177a8e..227b8e7 100644 --- a/3rdparty/libprocess/src/tests/benchmarks.cpp +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp @@ -102,9 +102,9 @@ public: private: void ping(const UPID& from, const string& body) { - if (linkedPorts.find(from.port) == linkedPorts.end()) { + if (linkedPorts.find(from.node.port) == linkedPorts.end()) { setLink(from); - linkedPorts.insert(from.port); + linkedPorts.insert(from.node.port); } static const string message("hi"); send(from, "pong", message.c_str(), message.size()); http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/http_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp index a1c3685..a90e65f 100644 --- a/3rdparty/libprocess/src/tests/http_tests.cpp +++ b/3rdparty/libprocess/src/tests/http_tests.cpp @@ -120,8 +120,8 @@ TEST(HTTP, Endpoints) sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = PF_INET; - addr.sin_port = htons(process.self().port); - addr.sin_addr.s_addr = process.self().ip; + addr.sin_port = htons(process.self().node.port); + addr.sin_addr.s_addr = process.self().node.ip; ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr))); http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/metrics_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/metrics_tests.cpp b/3rdparty/libprocess/src/tests/metrics_tests.cpp index 33539e4..0c80c69 100644 --- a/3rdparty/libprocess/src/tests/metrics_tests.cpp +++ b/3rdparty/libprocess/src/tests/metrics_tests.cpp @@ -147,7 +147,7 @@ TEST(Metrics, Snapshot) { ASSERT_TRUE(GTEST_IS_THREADSAFE); - UPID upid("metrics", process::ip(), process::port()); + UPID upid("metrics", process::node()); Clock::pause(); @@ -219,7 +219,7 @@ TEST(Metrics, SnapshotTimeout) { ASSERT_TRUE(GTEST_IS_THREADSAFE); - UPID upid("metrics", process::ip(), process::port()); + UPID upid("metrics", process::node()); Clock::pause(); @@ -320,7 +320,7 @@ TEST(Metrics, SnapshotTimeout) // Ensures that the aggregate statistics are correct in the snapshot. TEST(Metrics, SnapshotStatistics) { - UPID upid("metrics", process::ip(), process::port()); + UPID upid("metrics", process::node()); Clock::pause(); http://git-wip-us.apache.org/repos/asf/mesos/blob/f64562fa/3rdparty/libprocess/src/tests/process_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp index b985fb7..902d4d3 100644 --- a/3rdparty/libprocess/src/tests/process_tests.cpp +++ b/3rdparty/libprocess/src/tests/process_tests.cpp @@ -1425,8 +1425,8 @@ TEST(Process, remote) sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = PF_INET; - addr.sin_port = htons(process.self().port); - addr.sin_addr.s_addr = process.self().ip; + addr.sin_port = htons(process.self().node.port); + addr.sin_addr.s_addr = process.self().node.ip; ASSERT_EQ(0, connect(s, (sockaddr*) &addr, sizeof(addr))); @@ -1866,7 +1866,7 @@ TEST(Process, PercentEncodedURLs) spawn(process); // Construct the PID using percent-encoding. - UPID pid("id%2842%29", process.self().ip, process.self().port); + UPID pid("id%2842%29", process.self().node); // Mimic a libprocess message sent to an installed handler. Future<Nothing> handler1;
