Repository: mesos Updated Branches: refs/heads/master 1a7ad5e35 -> b6e31887d
Join threads in libprocess when shutting down. Review: https://reviews.apache.org/r/37821 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b6e31887 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b6e31887 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b6e31887 Branch: refs/heads/master Commit: b6e31887df55e561bccdd0e06cb777d7812984fb Parents: 1a7ad5e Author: Greg Mann <[email protected]> Authored: Sun Sep 27 16:59:46 2015 -0700 Committer: Joris Van Remoortere <[email protected]> Committed: Sun Sep 27 17:38:25 2015 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/src/event_loop.hpp | 3 + 3rdparty/libprocess/src/libev.cpp | 22 ++++- 3rdparty/libprocess/src/libevent.cpp | 6 ++ 3rdparty/libprocess/src/process.cpp | 140 ++++++++++++++++++---------- 4 files changed, 120 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b6e31887/3rdparty/libprocess/src/event_loop.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/event_loop.hpp b/3rdparty/libprocess/src/event_loop.hpp index 36a4cd2..296c3dc 100644 --- a/3rdparty/libprocess/src/event_loop.hpp +++ b/3rdparty/libprocess/src/event_loop.hpp @@ -41,6 +41,9 @@ public: // Runs the event loop. static void run(); + + // Asynchronously tells the event loop to stop and then returns. + static void stop(); }; } // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/b6e31887/3rdparty/libprocess/src/libev.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp index 97a2694..d9cf3d9 100644 --- a/3rdparty/libprocess/src/libev.cpp +++ b/3rdparty/libprocess/src/libev.cpp @@ -27,12 +27,14 @@ namespace process { -// Defines the initial values for all of the declarations made in +ev_async async_watcher; +// We need an asynchronous watcher to receive the request to shutdown. +ev_async shutdown_watcher; + +// Define the initial values for all of the declarations made in // libev.hpp (since these need to live in the static data space). struct ev_loop* loop = NULL; -ev_async async_watcher; - std::queue<ev_io*>* watchers = new std::queue<ev_io*>(); std::mutex* watchers_mutex = new std::mutex(); @@ -74,12 +76,21 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents) } +void handle_shutdown(struct ev_loop* loop, ev_async* _, int revents) +{ + ev_unloop(loop, EVUNLOOP_ALL); +} + + void EventLoop::initialize() { loop = ev_default_loop(EVFLAG_AUTO); ev_async_init(&async_watcher, handle_async); + ev_async_init(&shutdown_watcher, handle_shutdown); + ev_async_start(loop, &async_watcher); + ev_async_start(loop, &shutdown_watcher); } @@ -150,4 +161,9 @@ void EventLoop::run() __in_event_loop__ = false; } +void EventLoop::stop() +{ + ev_async_send(loop, &shutdown_watcher); +} + } // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/b6e31887/3rdparty/libprocess/src/libevent.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/libevent.cpp b/3rdparty/libprocess/src/libevent.cpp index ee79064..00e0f08 100644 --- a/3rdparty/libprocess/src/libevent.cpp +++ b/3rdparty/libprocess/src/libevent.cpp @@ -129,6 +129,12 @@ void EventLoop::run() } +void EventLoop::stop() +{ + event_base_loopexit(base, NULL); +} + + namespace internal { struct Delay http://git-wip-us.apache.org/repos/asf/mesos/blob/b6e31887/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index 4afa305..c03fba4 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -377,6 +377,10 @@ public: explicit ProcessManager(const string& delegate); ~ProcessManager(); + // Initializes the processing threads and the event loop thread, + // and returns the number of processing threads created. + long init_threads(); + ProcessReference use(const UPID& pid); bool handle( @@ -429,6 +433,12 @@ private: // Number of running processes, to support Clock::settle operation. std::atomic_long running; + // Stores the thread handles so that we can join during shutdown. + vector<std::thread*> threads; + + // Boolean used to signal processing threads to stop running. + std::atomic_bool joining_threads; + // List of rules applied to all incoming HTTP requests. vector<Owned<FirewallRule>> firewallRules; std::recursive_mutex firewall_mutex; @@ -641,25 +651,6 @@ void decode_recv( .onAny(lambda::bind(&decode_recv, lambda::_1, data, size, socket, decoder)); } - -void schedule() -{ - do { - ProcessBase* process = process_manager->dequeue(); - if (process == NULL) { - Gate::state_t old = gate->approach(); - process = process_manager->dequeue(); - if (process == NULL) { - gate->arrive(old); // Wait at gate if idle. - continue; - } else { - gate->leave(); - } - } - process_manager->resume(process); - } while (true); -} - } // namespace internal { @@ -810,27 +801,12 @@ void initialize(const string& delegate) process_manager = new ProcessManager(delegate); socket_manager = new SocketManager(); - // Setup processing threads. - // We create no fewer than 8 threads because some tests require - // more worker threads than 'sysconf(_SC_NPROCESSORS_ONLN)' on - // computers with fewer cores. - // e.g. https://issues.apache.org/jira/browse/MESOS-818 - // - // TODO(xujyan): Use a smarter algorithm to allocate threads. - // Allocating a static number of threads can cause starvation if - // there are more waiting Processes than the number of worker - // threads. - long cpus = std::max(8L, sysconf(_SC_NPROCESSORS_ONLN)); - - for (int i = 0; i < cpus; i++) { - // We detach and forget the thread handle as we are not joining it - // for a clean shutdown. - std::thread* thread = new std::thread(&internal::schedule); - thread->detach(); - } - // Initialize the event loop. EventLoop::initialize(); + + // Setup processing threads. + long cpus = process_manager->init_threads(); + Clock::initialize(lambda::bind(&timedout, lambda::_1)); // ev_child_init(&child_watcher, child_exited, pid, 0); @@ -848,11 +824,6 @@ void initialize(const string& delegate) // sigaddset (&sa.sa_mask, w->signum); // sigprocmask (SIG_UNBLOCK, &sa.sa_mask, 0); - // We detach and forget the thread handle as we are not joining it - // for a clean shutdown. - std::thread* thread = new std::thread(&EventLoop::run); - thread->detach(); - __address__ = Address::LOCALHOST_ANY(); // Check environment for ip. @@ -2121,18 +2092,83 @@ ProcessManager::ProcessManager(const string& _delegate) ProcessManager::~ProcessManager() { ProcessBase* process = NULL; - // Pop a process off the top and terminate it. Don't hold the lock - // or process the whole map as terminating one process might - // trigger other terminations. Deal with them one at a time. + // Terminate the first process in the queue. Events are deleted + // and the process is erased in ProcessManager::cleanup(). Don't + // hold the lock or process the whole map as terminating one process + // might trigger other terminations. Deal with them one at a time. do { synchronized (processes_mutex) { process = !processes.empty() ? processes.begin()->second : NULL; } if (process != NULL) { - process::terminate(process); + // Terminate this process but do not inject the message, + // i.e. allow it to finish its work first. + process::terminate(process, false); process::wait(process); } } while (process != NULL); + + // Send signal to all processing threads to stop running. + joining_threads.store(true); + gate->open(); + EventLoop::stop(); + + // Join all threads. + foreach (std::thread* thread, threads) { + thread->join(); + delete thread; + } +} + + +long ProcessManager::init_threads() +{ + joining_threads.store(false); + + // We create no fewer than 8 threads because some tests require + // more worker threads than `sysconf(_SC_NPROCESSORS_ONLN)` on + // computers with fewer cores. + // e.g. https://issues.apache.org/jira/browse/MESOS-818 + // + // TODO(xujyan): Use a smarter algorithm to allocate threads. + // Allocating a static number of threads can cause starvation if + // there are more waiting Processes than the number of worker + // threads. + long cpus = std::max(8L, sysconf(_SC_NPROCESSORS_ONLN)); + threads.reserve(cpus+1); + + // Create processing threads. + for (long i = 0; i < cpus; i++) { + // Retain the thread handles so that we can join when shutting down. + threads.emplace_back( + // We pass a constant reference to `joining` to make it clear that this + // value is only being tested (read), and not manipulated. + new std::thread(std::bind([](const std::atomic_bool& joining) { + do { + ProcessBase* process = process_manager->dequeue(); + if (process == NULL) { + Gate::state_t old = gate->approach(); + process = process_manager->dequeue(); + if (process == NULL) { + if (joining.load()) { + break; + } + gate->arrive(old); // Wait at gate if idle. + continue; + } else { + gate->leave(); + } + } + process_manager->resume(process); + } while (true); + }, + std::cref(joining_threads)))); + } + + // Create a thread for the event loop. + threads.emplace_back(new std::thread(&EventLoop::run)); + + return cpus; } @@ -2190,7 +2226,6 @@ bool ProcessManager::handle( } delete request; - return accepted; } @@ -2754,6 +2789,15 @@ void ProcessManager::enqueue(ProcessBase* process) { CHECK(process != NULL); + // If libprocess is shutting down and the processing threads + // are currently joining, then do not enqueue the process. + if (joining_threads.load()) { + VLOG(1) << "Libprocess shutting down, cannot enqueue process: " + << process->pid.id; + + return; + } + // TODO(benh): Check and see if this process has it's own thread. If // it does, push it on that threads runq, and wake up that thread if // it's not running. Otherwise, check and see which thread this
