Add bind(), listen(), and accept() to Socket interface. Review: https://reviews.apache.org/r/27966
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8279b45e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8279b45e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8279b45e Branch: refs/heads/master Commit: 8279b45ed325375bbf8cc5919fd7009c7d2335cf Parents: 8edab65 Author: Joris Van Remoortere <[email protected]> Authored: Sat Nov 15 16:55:07 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sat Nov 15 17:38:22 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/socket.hpp | 21 ++ 3rdparty/libprocess/src/libev.cpp | 1 - 3rdparty/libprocess/src/libev.hpp | 3 - 3rdparty/libprocess/src/process.cpp | 235 +++++++++++--------- 4 files changed, 153 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/include/process/socket.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp index 5fd8d1b..c022924 100644 --- a/3rdparty/libprocess/include/process/socket.hpp +++ b/3rdparty/libprocess/include/process/socket.hpp @@ -82,6 +82,12 @@ public: Future<size_t> sendfile(int fd, off_t offset, size_t size); + Try<Node> bind(const Node& node); + + Try<Nothing> listen(int backlog); + + Future<Socket> accept(); + private: const Impl& create() const { @@ -143,6 +149,21 @@ public: return impl->sendfile(fd, offset, size); } + Try<Node> bind(const Node& node) + { + return impl->bind(node); + } + + Try<Nothing> listen(int backlog) + { + return impl->listen(backlog); + } + + Future<Socket> accept() + { + return impl->accept(); + } + private: explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {} http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/src/libev.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp index efc89d8..6560050 100644 --- a/3rdparty/libprocess/src/libev.cpp +++ b/3rdparty/libprocess/src/libev.cpp @@ -12,7 +12,6 @@ namespace process { // libev.hpp (since these need to live in the static data space). struct ev_loop* loop = NULL; ev_async async_watcher; -ev_io server_watcher; std::queue<ev_io*>* watchers = new std::queue<ev_io*>(); synchronizable(watchers); std::queue<lambda::function<void(void)>>* functions = http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/src/libev.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/libev.hpp b/3rdparty/libprocess/src/libev.hpp index bac8b6a..04847e3 100644 --- a/3rdparty/libprocess/src/libev.hpp +++ b/3rdparty/libprocess/src/libev.hpp @@ -18,9 +18,6 @@ extern struct ev_loop* loop; // with IO watchers and functions (via run_in_event_loop). extern ev_async async_watcher; -// Server watcher for accepting connections. -extern ev_io server_watcher; - // Queue of I/O watchers to be asynchronously added to the event loop // (protected by 'watchers' below). // TODO(benh): Replace this queue with functions that we put in http://git-wip-us.apache.org/repos/asf/mesos/blob/8279b45e/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index 9f91020..5d3b947 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -267,7 +267,7 @@ public: SocketManager(); ~SocketManager(); - Socket accepted(int s); + void accepted(const Socket& socket); void link(ProcessBase* process, const UPID& to); @@ -433,8 +433,11 @@ const string Profiler::STOP_HELP = HELP( // Unique id that can be assigned to each process. static uint32_t __id__ = 0; +// Server socket listen backlog. +static const int LISTEN_BACKLOG = 500000; + // Local server socket. -static int __s__ = -1; +static Socket __s__; // Local node. static Node __node__; @@ -640,66 +643,6 @@ void decode_read( } // namespace internal { -void accept(struct ev_loop* loop, ev_io* watcher, int revents) -{ - CHECK_EQ(__s__, watcher->fd); - - sockaddr_in addr; - socklen_t addrlen = sizeof(addr); - - int s = ::accept(__s__, (sockaddr*) &addr, &addrlen); - - if (s < 0) { - return; - } - - Try<Nothing> nonblock = os::nonblock(s); - if (nonblock.isError()) { - LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: " - << nonblock.error(); - os::close(s); - return; - } - - Try<Nothing> cloexec = os::cloexec(s); - if (cloexec.isError()) { - LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: " - << cloexec.error(); - os::close(s); - return; - } - - // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait. - int on = 1; - if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) { - const char* error = strerror(errno); - VLOG(1) << "Failed to turn off the Nagle algorithm: " << error; - os::close(s); - } else { - // Inform the socket manager for proper bookkeeping. - Socket socket = socket_manager->accepted(s); - - // Allocate a buffer to read into. This can be replaced later - // when socket supports a read function that provides the - // buffered data in the resulting callback. - const size_t size = 80 * 1024; - char* data = new char[size]; - memset(data, 0, size); - - DataDecoder* decoder = new DataDecoder(socket); - - socket.read(data, size) - .onAny(lambda::bind( - &internal::decode_read, - lambda::_1, - data, - size, - new Socket(socket), - decoder)); - } -} - - void* serve(void* arg) { ev_loop(((struct ev_loop*) arg), 0); @@ -765,6 +708,37 @@ void timedout(list<Timer>&& timers) // } +namespace internal { + +void on_accept(const Future<Socket>& socket) +{ + if (socket.isReady()) { + // Inform the socket manager for proper bookkeeping. + socket_manager->accepted(socket.get()); + + const size_t size = 80 * 1024; + char* data = new char[size]; + memset(data, 0, size); + + DataDecoder* decoder = new DataDecoder(socket.get()); + + socket.get().read(data, size) + .onAny(lambda::bind( + &internal::decode_read, + lambda::_1, + data, + size, + new Socket(socket.get()), + decoder)); + } + + __s__.accept() + .onAny(lambda::bind(&on_accept, lambda::_1)); +} + +} // namespace internal { + + void initialize(const string& delegate) { // TODO(benh): Return an error if attempting to initialize again @@ -866,21 +840,6 @@ void initialize(const string& delegate) } // Create a "server" socket for communicating with other nodes. - if ((__s__ = ::socket(AF_INET, SOCK_STREAM, 0)) < 0) { - PLOG(FATAL) << "Failed to initialize, socket"; - } - - // Make socket non-blocking. - Try<Nothing> nonblock = os::nonblock(__s__); - if (nonblock.isError()) { - LOG(FATAL) << "Failed to initialize, nonblock: " << nonblock.error(); - } - - // Set FD_CLOEXEC flag. - Try<Nothing> cloexec = os::cloexec(__s__); - if (cloexec.isError()) { - LOG(FATAL) << "Failed to initialize, cloexec: " << cloexec.error(); - } // Allow address reuse. int on = 1; @@ -888,25 +847,12 @@ void initialize(const string& delegate) PLOG(FATAL) << "Failed to initialize, setsockopt(SO_REUSEADDR)"; } - // Set up socket. - sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = PF_INET; - addr.sin_addr.s_addr = __node__.ip; - addr.sin_port = htons(__node__.port); - - if (bind(__s__, (sockaddr*) &addr, sizeof(addr)) < 0) { - PLOG(FATAL) << "Failed to initialize, bind " << __node__; - } - - // Lookup and store assigned ip and assigned port. - socklen_t addrlen = sizeof(addr); - if (getsockname(__s__, (sockaddr*) &addr, &addrlen) < 0) { - PLOG(FATAL) << "Failed to initialize, getsockname"; + Try<Node> bind = __s__.bind(__node__); + if (bind.isError()) { + PLOG(FATAL) << "Failed to initialize: " << bind.error(); } - __node__.ip = addr.sin_addr.s_addr; - __node__.port = ntohs(addr.sin_port); + __node__ = bind.get(); // Lookup hostname if missing ip or if ip is 127.0.0.1 in case we // actually have a valid external ip address. Note that we need only @@ -931,8 +877,9 @@ void initialize(const string& delegate) __node__.ip = *((uint32_t *) he->h_addr_list[0]); } - if (listen(__s__, 500000) < 0) { - PLOG(FATAL) << "Failed to initialize, listen"; + Try<Nothing> listen = __s__.listen(LISTEN_BACKLOG); + if (listen.isError()) { + PLOG(FATAL) << "Failed to initialize: " << listen.error(); } // Initialize libev. @@ -950,9 +897,6 @@ void initialize(const string& delegate) ev_async_init(&async_watcher, handle_async); ev_async_start(loop, &async_watcher); - ev_io_init(&server_watcher, accept, __s__, EV_READ); - ev_io_start(loop, &server_watcher); - Clock::initialize(lambda::bind(&timedout, lambda::_1)); // ev_child_init(&child_watcher, child_exited, pid, 0); @@ -979,6 +923,9 @@ void initialize(const string& delegate) // 'spawn' below for the garbage collector. initializing = false; + __s__.accept() + .onAny(lambda::bind(&internal::on_accept, lambda::_1)); + // TODO(benh): Make sure creating the garbage collector, logging // process, and profiler always succeeds and use supervisors to make // sure that none terminate. @@ -1436,6 +1383,90 @@ Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size) } +Try<Node> Socket::Impl::bind(const Node& node) +{ + sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = PF_INET; + addr.sin_addr.s_addr = node.ip; + addr.sin_port = htons(node.port); + + if (::bind(get(), (sockaddr*) &addr, sizeof(addr)) < 0) { + return Error("Failed to bind: " + string(inet_ntoa(addr.sin_addr)) + + ":" + stringify(node.port)); + } + + // Lookup and store assigned ip and assigned port. + socklen_t addrlen = sizeof(addr); + if (getsockname(get(), (sockaddr*) &addr, &addrlen) < 0) { + return ErrnoError("Failed to bind, getsockname"); + } + + return Node(addr.sin_addr.s_addr, ntohs(addr.sin_port)); +} + + +Try<Nothing> Socket::Impl::listen(int backlog) +{ + if (::listen(get(), backlog) < 0) { + return ErrnoError(); + } + return Nothing(); +} + + +namespace internal { + +Future<Socket> accept(int fd) +{ + sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + + int s = ::accept(fd, (sockaddr*) &addr, &addrlen); + + if (s < 0) { + return Failure(ErrnoError("Failed to accept")); + } + + Try<Nothing> nonblock = os::nonblock(s); + if (nonblock.isError()) { + LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: " + << nonblock.error(); + os::close(s); + return Failure("Failed to accept, nonblock: " + nonblock.error()); + } + + Try<Nothing> cloexec = os::cloexec(s); + if (cloexec.isError()) { + LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: " + << cloexec.error(); + os::close(s); + return Failure("Failed to accept, cloexec: " + cloexec.error()); + } + + // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait. + int on = 1; + if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) { + const char* error = strerror(errno); + VLOG(1) << "Failed to turn off the Nagle algorithm: " << error; + os::close(s); + return Failure( + "Failed to turn off the Nagle algorithm: " + stringify(error)); + } + + return Socket(s); +} + +} // namespace internal { + + +Future<Socket> Socket::Impl::accept() +{ + return io::poll(get(), io::READ) + .then(lambda::bind(&internal::accept, get())); +} + + SocketManager::SocketManager() { synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE; @@ -1445,13 +1476,11 @@ SocketManager::SocketManager() SocketManager::~SocketManager() {} -Socket SocketManager::accepted(int s) +void SocketManager::accepted(const Socket& socket) { synchronized (this) { - return sockets[s] = Socket(s); + sockets[socket] = socket; } - - UNREACHABLE(); }
