Repository: mesos
Updated Branches:
  refs/heads/master acd656c4e -> 8279b45ed


Abstract clock internals from ProcessManager::settle.

Review: https://reviews.apache.org/r/27498


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/84efa184
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/84efa184
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/84efa184

Branch: refs/heads/master
Commit: 84efa184159ebaa7f5110073358d35773d149f0e
Parents: 788a136
Author: Benjamin Hindman <[email protected]>
Authored: Sun Nov 2 15:03:22 2014 -0800
Committer: Benjamin Hindman <[email protected]>
Committed: Sat Nov 15 16:25:57 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/clock.hpp  |  22 +++-
 3rdparty/libprocess/include/process/future.hpp |  22 +++-
 3rdparty/libprocess/src/process.cpp            | 117 ++++++++++++++------
 3 files changed, 121 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/84efa184/3rdparty/libprocess/include/process/clock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/clock.hpp 
b/3rdparty/libprocess/include/process/clock.hpp
index 80190ef..d60742a 100644
--- a/3rdparty/libprocess/include/process/clock.hpp
+++ b/3rdparty/libprocess/include/process/clock.hpp
@@ -39,11 +39,27 @@ public:
 
   static void order(ProcessBase* from, ProcessBase* to);
 
-  // When the clock is paused, settle() synchronously ensures that:
+  // When the clock is paused this returns only after
   //   (1) all expired timers are executed,
-  //   (2) no Processes are running, and
-  //   (3) no Processes are ready to run.
+  //   (2) no processes are running, and
+  //   (3) no processes are ready to run.
+  //
+  // In other words, this function blocks synchronously until no other
+  // execution on any processes will occur unless the clock is
+  // advanced.
+  //
+  // TODO(benh): Move this function elsewhere, for instance, to a
+  // top-level function in the 'process' namespace since it deals with
+  // both processes and the clock.
   static void settle();
+
+  // When the clock is paused this returns true if all timers that
+  // expire before the paused time have executed, otherwise false.
+  // Note that if the clock continually gets advanced concurrently
+  // this function may never return true because the "paused" time
+  // will continue to get pushed out farther in the future making more
+  // timers candidates for execution.
+  static bool settled();
 };
 
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/84efa184/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp 
b/3rdparty/libprocess/include/process/future.hpp
index 68e5f7b..2e4f9ef 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -1121,7 +1121,7 @@ bool Future<T>::hasDiscard() const
 
 namespace internal {
 
-inline void awaited(const Owned<Latch>& latch)
+inline void awaited(Owned<Latch> latch)
 {
   latch->trigger();
 }
@@ -1132,18 +1132,32 @@ inline void awaited(const Owned<Latch>& latch)
 template <typename T>
 bool Future<T>::await(const Duration& duration) const
 {
-  Owned<Latch> latch;
+  // NOTE: We need to preemptively allocate the Latch on the stack
+  // instead of lazily create it in the critical section below because
+  // instantiating a Latch requires creating a new process (at the
+  // time of writing this comment) which might need to do some
+  // synchronization in libprocess which might deadlock if some other
+  // code in libprocess is already holding a lock and then attempts to
+  // do Promise::set (or something similar) that attempts to acquire
+  // the lock that we acquire here. This is an artifact of using
+  // Future/Promise within the implementation of libprocess.
+  //
+  // We mostly only call 'await' in tests so this should not be a
+  // performance concern.
+  Owned<Latch> latch(new Latch());
+
+  bool pending = false;
 
   internal::acquire(&data->lock);
   {
     if (data->state == PENDING) {
-      latch.reset(new Latch());
+      pending = true;
       data->onAnyCallbacks.push(lambda::bind(&internal::awaited, latch));
     }
   }
   internal::release(&data->lock);
 
-  if (latch.get() != NULL) {
+  if (pending) {
     return latch->await(duration);
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/84efa184/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index 9ebac08..e7e5520 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -43,6 +43,7 @@
 
 #include <boost/shared_array.hpp>
 
+#include <process/check.hpp>
 #include <process/clock.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
@@ -475,11 +476,6 @@ static queue<lambda::function<void(void)> >* functions =
 static map<Time, list<Timer> >* timeouts = new map<Time, list<Timer> >();
 static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
 
-// For supporting Clock::settle(), true if timers have been removed
-// from 'timeouts' but may not have been executed yet. Protected by
-// the timeouts lock. This is only used when the clock is paused.
-static bool pending_timers = false;
-
 // Flag to indicate whether or to update the timer on async interrupt.
 static bool update_timer = false;
 
@@ -525,6 +521,11 @@ Duration advanced = Duration::zero();
 
 bool paused = false;
 
+// For supporting Clock::settled(), false if we're not currently
+// settling (or we're not paused), true if we're currently attempting
+// to settle (and we're paused).
+bool settling = false;
+
 } // namespace clock {
 
 
@@ -596,6 +597,7 @@ void Clock::resume()
     if (clock::paused) {
       VLOG(2) << "Clock resumed at " << clock::current;
       clock::paused = false;
+      clock::settling = false;
       clock::currents->clear();
       update_timer = true;
       ev_async_send(loop, &async_watcher);
@@ -667,6 +669,7 @@ void Clock::update(ProcessBase* process, const Time& time)
 
 void Clock::order(ProcessBase* from, ProcessBase* to)
 {
+  VLOG(2) << "Clock of " << to->self() << " being updated to " << from->self();
   update(to, now(from));
 }
 
@@ -678,6 +681,29 @@ void Clock::settle()
 }
 
 
+bool Clock::settled()
+{
+  synchronized (timeouts) {
+    CHECK(clock::paused);
+
+    if (update_timer) {
+      return false;
+    } else if (clock::settling) {
+      VLOG(3) << "Clock still not settled";
+      return false;
+    } else if (timeouts->size() == 0 ||
+               timeouts->begin()->first > clock::current) {
+      VLOG(3) << "Clock is settled";
+      return true;
+    }
+
+    VLOG(3) << "Clock is not settled";
+
+    return false;
+  }
+}
+
+
 Try<Time> Time::create(double seconds)
 {
   Try<Duration> duration = Duration::create(seconds);
@@ -878,9 +904,12 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, 
int revents)
 
       VLOG(3) << "Have timeout(s) at " << timeout;
 
-      // Record that we have pending timers to execute so the
-      // Clock::settle() operation can wait until we're done.
-      pending_timers = true;
+      // Need to toggle 'settling' so that we don't prematurely say
+      // we're settled until after the timers are executed below,
+      // outside of the critical section.
+      if (clock::paused) {
+        clock::settling = true;
+      }
 
       foreach (const Timer& timer, (*timeouts)[timeout]) {
         timedout.push_back(timer);
@@ -943,11 +972,16 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, 
int revents)
     timer();
   }
 
-  // Mark ourselves as done executing the timers since it's now safe
-  // for a call to Clock::settle() to check if there will be any
-  // future timeouts reached.
+  // Mark 'settling' as false since there are not any more timeouts
+  // that will expire before the paused time and we've finished
+  // executing expired timers.
   synchronized (timeouts) {
-    pending_timers = false;
+    if (clock::paused &&
+        (timeouts->size() == 0 ||
+         timeouts->begin()->first > clock::current)) {
+      VLOG(3) << "Clock has settled";
+      clock::settling = false;
+    }
   }
 }
 
@@ -2961,7 +2995,15 @@ bool ProcessManager::wait(const UPID& pid)
           list<ProcessBase*>::iterator it =
             find(runq.begin(), runq.end(), process);
           if (it != runq.end()) {
+            // Found it! Remove it from the run queue since we'll be
+            // donating our thread and also increment 'running' before
+            // leaving this 'runq' protected critical section so that
+            // everyone that is waiting for the processes to settle
+            // continue to wait (otherwise they could see nothing in
+            // 'runq' and 'running' equal to 0 between when we exit
+            // this critical section and increment 'running').
             runq.erase(it);
+            __sync_fetch_and_add(&running, 1);
           } else {
             // Another thread has resumed the process ...
             process = NULL;
@@ -2977,7 +3019,6 @@ bool ProcessManager::wait(const UPID& pid)
   if (process != NULL) {
     VLOG(2) << "Donating thread to " << process->pid << " while waiting";
     ProcessBase* donator = __process__;
-    __sync_fetch_and_add(&running, 1);
     process_manager->resume(process);
     __process__ = donator;
   }
@@ -3046,30 +3087,39 @@ void ProcessManager::settle()
 {
   bool done = true;
   do {
+    // While refactoring in order to isolate libev behind abstractions
+    // it became evident that this os::sleep is vital for tests to
+    // pass. In particular, there are certain tests that assume too
+    // much before they attempt to do a settle. One such example is
+    // tests doing http::get followed by Clock::settle, where they
+    // expect the http::get will have properly enqueued a process on
+    // the run queue but http::get is just sending bytes on a
+    // socket. Without sleeping at the beginning of this function we
+    // can get unlucky and appear settled when in actuallity the
+    // kernel just hasn't copied the bytes to a socket or we haven't
+    // yet read the bytes and enqueued an event on a process (and the
+    // process on the run queue).
     os::sleep(Milliseconds(10));
-    done = true;
-    // Hopefully this is the only place we acquire both these locks.
-    synchronized (runq) {
-      synchronized (timeouts) {
-        CHECK(Clock::paused()); // Since another thread could resume the clock!
 
-        if (!runq.empty()) {
-          done = false;
-        }
+    done = true; // Assume to start that we are settled.
 
-        __sync_synchronize(); // Read barrier for 'running'.
-        if (running > 0) {
-          done = false;
-        }
+    synchronized (runq) {
+      if (!runq.empty()) {
+        done = false;
+        continue;
+      }
 
-        if (timeouts->size() > 0 &&
-            timeouts->begin()->first <= clock::current) {
-          done = false;
-        }
+      // Read barrier for 'running'.
+      __sync_synchronize();
 
-        if (pending_timers) {
-          done = false;
-        }
+      if (running > 0) {
+        done = false;
+        continue;
+      }
+
+      if (!Clock::settled()) {
+        done = false;
+        continue;
       }
     }
   } while (!done);
@@ -3173,7 +3223,8 @@ Timer Clock::timer(
 
   Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk);
 
-  VLOG(3) << "Created a timer for " << timeout.time();
+  VLOG(3) << "Created a timer for " << pid << " in " << stringify(duration)
+          << " in the future (" << timeout.time() << ")";
 
   // Add the timer.
   synchronized (timeouts) {

Reply via email to