Repository: mesos
Updated Branches:
  refs/heads/master 1a7ad5e35 -> b6e31887d


Join threads in libprocess when shutting down.

Review: https://reviews.apache.org/r/37821


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b6e31887
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b6e31887
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b6e31887

Branch: refs/heads/master
Commit: b6e31887df55e561bccdd0e06cb777d7812984fb
Parents: 1a7ad5e
Author: Greg Mann <[email protected]>
Authored: Sun Sep 27 16:59:46 2015 -0700
Committer: Joris Van Remoortere <[email protected]>
Committed: Sun Sep 27 17:38:25 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/event_loop.hpp |   3 +
 3rdparty/libprocess/src/libev.cpp      |  22 ++++-
 3rdparty/libprocess/src/libevent.cpp   |   6 ++
 3rdparty/libprocess/src/process.cpp    | 140 ++++++++++++++++++----------
 4 files changed, 120 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b6e31887/3rdparty/libprocess/src/event_loop.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/event_loop.hpp 
b/3rdparty/libprocess/src/event_loop.hpp
index 36a4cd2..296c3dc 100644
--- a/3rdparty/libprocess/src/event_loop.hpp
+++ b/3rdparty/libprocess/src/event_loop.hpp
@@ -41,6 +41,9 @@ public:
 
   // Runs the event loop.
   static void run();
+
+  // Asynchronously tells the event loop to stop and then returns.
+  static void stop();
 };
 
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b6e31887/3rdparty/libprocess/src/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.cpp 
b/3rdparty/libprocess/src/libev.cpp
index 97a2694..d9cf3d9 100644
--- a/3rdparty/libprocess/src/libev.cpp
+++ b/3rdparty/libprocess/src/libev.cpp
@@ -27,12 +27,14 @@
 
 namespace process {
 
-// Defines the initial values for all of the declarations made in
+ev_async async_watcher;
+// We need an asynchronous watcher to receive the request to shutdown.
+ev_async shutdown_watcher;
+
+// Define the initial values for all of the declarations made in
 // libev.hpp (since these need to live in the static data space).
 struct ev_loop* loop = NULL;
 
-ev_async async_watcher;
-
 std::queue<ev_io*>* watchers = new std::queue<ev_io*>();
 
 std::mutex* watchers_mutex = new std::mutex();
@@ -74,12 +76,21 @@ void handle_async(struct ev_loop* loop, ev_async* _, int 
revents)
 }
 
 
+void handle_shutdown(struct ev_loop* loop, ev_async* _, int revents)
+{
+  ev_unloop(loop, EVUNLOOP_ALL);
+}
+
+
 void EventLoop::initialize()
 {
   loop = ev_default_loop(EVFLAG_AUTO);
 
   ev_async_init(&async_watcher, handle_async);
+  ev_async_init(&shutdown_watcher, handle_shutdown);
+
   ev_async_start(loop, &async_watcher);
+  ev_async_start(loop, &shutdown_watcher);
 }
 
 
@@ -150,4 +161,9 @@ void EventLoop::run()
   __in_event_loop__ = false;
 }
 
+void EventLoop::stop()
+{
+  ev_async_send(loop, &shutdown_watcher);
+}
+
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b6e31887/3rdparty/libprocess/src/libevent.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent.cpp 
b/3rdparty/libprocess/src/libevent.cpp
index ee79064..00e0f08 100644
--- a/3rdparty/libprocess/src/libevent.cpp
+++ b/3rdparty/libprocess/src/libevent.cpp
@@ -129,6 +129,12 @@ void EventLoop::run()
 }
 
 
+void EventLoop::stop()
+{
+  event_base_loopexit(base, NULL);
+}
+
+
 namespace internal {
 
 struct Delay

http://git-wip-us.apache.org/repos/asf/mesos/blob/b6e31887/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index 4afa305..c03fba4 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -377,6 +377,10 @@ public:
   explicit ProcessManager(const string& delegate);
   ~ProcessManager();
 
+  // Initializes the processing threads and the event loop thread,
+  // and returns the number of processing threads created.
+  long init_threads();
+
   ProcessReference use(const UPID& pid);
 
   bool handle(
@@ -429,6 +433,12 @@ private:
   // Number of running processes, to support Clock::settle operation.
   std::atomic_long running;
 
+  // Stores the thread handles so that we can join during shutdown.
+  vector<std::thread*> threads;
+
+  // Boolean used to signal processing threads to stop running.
+  std::atomic_bool joining_threads;
+
   // List of rules applied to all incoming HTTP requests.
   vector<Owned<FirewallRule>> firewallRules;
   std::recursive_mutex firewall_mutex;
@@ -641,25 +651,6 @@ void decode_recv(
     .onAny(lambda::bind(&decode_recv, lambda::_1, data, size, socket, 
decoder));
 }
 
-
-void schedule()
-{
-  do {
-    ProcessBase* process = process_manager->dequeue();
-    if (process == NULL) {
-      Gate::state_t old = gate->approach();
-      process = process_manager->dequeue();
-      if (process == NULL) {
-        gate->arrive(old); // Wait at gate if idle.
-        continue;
-      } else {
-        gate->leave();
-      }
-    }
-    process_manager->resume(process);
-  } while (true);
-}
-
 } // namespace internal {
 
 
@@ -810,27 +801,12 @@ void initialize(const string& delegate)
   process_manager = new ProcessManager(delegate);
   socket_manager = new SocketManager();
 
-  // Setup processing threads.
-  // We create no fewer than 8 threads because some tests require
-  // more worker threads than 'sysconf(_SC_NPROCESSORS_ONLN)' on
-  // computers with fewer cores.
-  // e.g. https://issues.apache.org/jira/browse/MESOS-818
-  //
-  // TODO(xujyan): Use a smarter algorithm to allocate threads.
-  // Allocating a static number of threads can cause starvation if
-  // there are more waiting Processes than the number of worker
-  // threads.
-  long cpus = std::max(8L, sysconf(_SC_NPROCESSORS_ONLN));
-
-  for (int i = 0; i < cpus; i++) {
-    // We detach and forget the thread handle as we are not joining it
-    // for a clean shutdown.
-    std::thread* thread = new std::thread(&internal::schedule);
-    thread->detach();
-  }
-
   // Initialize the event loop.
   EventLoop::initialize();
+
+  // Setup processing threads.
+  long cpus = process_manager->init_threads();
+
   Clock::initialize(lambda::bind(&timedout, lambda::_1));
 
 //   ev_child_init(&child_watcher, child_exited, pid, 0);
@@ -848,11 +824,6 @@ void initialize(const string& delegate)
 //   sigaddset (&sa.sa_mask, w->signum);
 //   sigprocmask (SIG_UNBLOCK, &sa.sa_mask, 0);
 
-  // We detach and forget the thread handle as we are not joining it
-  // for a clean shutdown.
-  std::thread* thread = new std::thread(&EventLoop::run);
-  thread->detach();
-
   __address__ = Address::LOCALHOST_ANY();
 
   // Check environment for ip.
@@ -2121,18 +2092,83 @@ ProcessManager::ProcessManager(const string& _delegate)
 ProcessManager::~ProcessManager()
 {
   ProcessBase* process = NULL;
-  // Pop a process off the top and terminate it. Don't hold the lock
-  // or process the whole map as terminating one process might
-  // trigger other terminations. Deal with them one at a time.
+  // Terminate the first process in the queue. Events are deleted
+  // and the process is erased in ProcessManager::cleanup(). Don't
+  // hold the lock or process the whole map as terminating one process
+  // might trigger other terminations. Deal with them one at a time.
   do {
     synchronized (processes_mutex) {
       process = !processes.empty() ? processes.begin()->second : NULL;
     }
     if (process != NULL) {
-      process::terminate(process);
+      // Terminate this process but do not inject the message,
+      // i.e. allow it to finish its work first.
+      process::terminate(process, false);
       process::wait(process);
     }
   } while (process != NULL);
+
+  // Send signal to all processing threads to stop running.
+  joining_threads.store(true);
+  gate->open();
+  EventLoop::stop();
+
+  // Join all threads.
+  foreach (std::thread* thread, threads) {
+    thread->join();
+    delete thread;
+  }
+}
+
+
+long ProcessManager::init_threads()
+{
+  joining_threads.store(false);
+
+  // We create no fewer than 8 threads because some tests require
+  // more worker threads than `sysconf(_SC_NPROCESSORS_ONLN)` on
+  // computers with fewer cores.
+  // e.g. https://issues.apache.org/jira/browse/MESOS-818
+  //
+  // TODO(xujyan): Use a smarter algorithm to allocate threads.
+  // Allocating a static number of threads can cause starvation if
+  // there are more waiting Processes than the number of worker
+  // threads.
+  long cpus = std::max(8L, sysconf(_SC_NPROCESSORS_ONLN));
+  threads.reserve(cpus+1);
+
+  // Create processing threads.
+  for (long i = 0; i < cpus; i++) {
+    // Retain the thread handles so that we can join when shutting down.
+    threads.emplace_back(
+        // We pass a constant reference to `joining` to make it clear that this
+        // value is only being tested (read), and not manipulated.
+        new std::thread(std::bind([](const std::atomic_bool& joining) {
+          do {
+            ProcessBase* process = process_manager->dequeue();
+            if (process == NULL) {
+              Gate::state_t old = gate->approach();
+              process = process_manager->dequeue();
+              if (process == NULL) {
+                if (joining.load()) {
+                  break;
+                }
+                gate->arrive(old); // Wait at gate if idle.
+                continue;
+              } else {
+                gate->leave();
+              }
+            }
+            process_manager->resume(process);
+          } while (true);
+        },
+        std::cref(joining_threads))));
+  }
+
+  // Create a thread for the event loop.
+  threads.emplace_back(new std::thread(&EventLoop::run));
+
+  return cpus;
 }
 
 
@@ -2190,7 +2226,6 @@ bool ProcessManager::handle(
       }
 
       delete request;
-
       return accepted;
     }
 
@@ -2754,6 +2789,15 @@ void ProcessManager::enqueue(ProcessBase* process)
 {
   CHECK(process != NULL);
 
+  // If libprocess is shutting down and the processing threads
+  // are currently joining, then do not enqueue the process.
+  if (joining_threads.load()) {
+    VLOG(1) << "Libprocess shutting down, cannot enqueue process: "
+            << process->pid.id;
+
+    return;
+  }
+
   // TODO(benh): Check and see if this process has it's own thread. If
   // it does, push it on that threads runq, and wake up that thread if
   // it's not running. Otherwise, check and see which thread this

Reply via email to