Introduced a callback for timer expiration.

This enables seperating the Clock specific functionality from the
ProcessManager functionality so that the Clock implementation can
ultimately be completely isolated.

Review: https://reviews.apache.org/r/27499


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1f2bb9b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1f2bb9b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1f2bb9b

Branch: refs/heads/master
Commit: b1f2bb9b7eabf74be19bcd642e94842a3a476f30
Parents: 84efa18
Author: Benjamin Hindman <[email protected]>
Authored: Sun Nov 2 15:50:36 2014 -0800
Committer: Benjamin Hindman <[email protected]>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/clock.hpp | 10 ++++
 3rdparty/libprocess/src/process.cpp           | 63 +++++++++++++---------
 2 files changed, 48 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b1f2bb9b/3rdparty/libprocess/include/process/clock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/clock.hpp 
b/3rdparty/libprocess/include/process/clock.hpp
index d60742a..ae7d0fb 100644
--- a/3rdparty/libprocess/include/process/clock.hpp
+++ b/3rdparty/libprocess/include/process/clock.hpp
@@ -1,6 +1,8 @@
 #ifndef __PROCESS_CLOCK_HPP__
 #define __PROCESS_CLOCK_HPP__
 
+#include <list>
+
 #include <process/time.hpp>
 #include <process/timer.hpp>
 
@@ -17,6 +19,14 @@ class Timer;
 class Clock
 {
 public:
+  // Initialize the clock with the specified callback that will be
+  // invoked whenever a batch of timers has expired.
+  //
+  // TODO(benh): Introduce a "channel" or listener pattern for getting
+  // the expired Timers rather than passing in a callback. This might
+  // mean we don't need 'initialize' or 'shutdown'.
+  static void initialize(lambda::function<void(std::list<Timer>&&)>&& 
callback);
+
   static Time now();
   static Time now(ProcessBase* process);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b1f2bb9b/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index e7e5520..9551d99 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -526,9 +526,18 @@ bool paused = false;
 // to settle (and we're paused).
 bool settling = false;
 
+// Lambda function to invoke when timers have expired.
+lambda::function<void(list<Timer>&&)> callback;
+
 } // namespace clock {
 
 
+void Clock::initialize(lambda::function<void(list<Timer>&&)>&& callback)
+{
+  clock::callback = callback;
+}
+
+
 Time Clock::now()
 {
   return now(__process__);
@@ -890,7 +899,7 @@ void handle_async(struct ev_loop* loop, ev_async* _, int 
revents)
 
 void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
 {
-  list<Timer> timedout;
+  list<Timer> timers;
 
   synchronized (timeouts) {
     Time now = Clock::now();
@@ -912,7 +921,7 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int 
revents)
       }
 
       foreach (const Timer& timer, (*timeouts)[timeout]) {
-        timedout.push_back(timer);
+        timers.push_back(timer);
       }
     }
 
@@ -948,29 +957,7 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, 
int revents)
     update_timer = false; // Since we might have a queued update_timer.
   }
 
-  // Update current time of process (if it's present/valid). It might
-  // be necessary to actually add some more synchronization around
-  // this so that, for example, pausing and resuming the clock doesn't
-  // cause some processes to get thier current times updated and
-  // others not. Since ProcessManager::use acquires the 'processes'
-  // lock we had to move this out of the synchronized (timeouts) above
-  // since there was a deadlock with acquring 'processes' then
-  // 'timeouts' (reverse order) in ProcessManager::cleanup. Note that
-  // current time may be greater than the timeout if a local message
-  // was received (and happens-before kicks in).
-  if (Clock::paused()) {
-    foreach (const Timer& timer, timedout) {
-      if (ProcessReference process = process_manager->use(timer.creator())) {
-        Clock::update(process, timer.timeout().time());
-      }
-    }
-  }
-
-  // Invoke the timers that timed out (TODO(benh): Do this
-  // asynchronously so that we don't tie up the event thread!).
-  foreach (const Timer& timer, timedout) {
-    timer();
-  }
+  clock::callback(std::move(timers));
 
   // Mark 'settling' as false since there are not any more timeouts
   // that will expire before the paused time and we've finished
@@ -1412,6 +1399,27 @@ void* schedule(void* arg)
 }
 
 
+void timedout(list<Timer>&& timers)
+{
+  // Update current time of process (if it's present/valid). Note that
+  // current time may be greater than the timeout if a local message
+  // was received (and happens-before kicks in).
+  if (Clock::paused()) {
+    foreach (const Timer& timer, timers) {
+      if (ProcessReference process = process_manager->use(timer.creator())) {
+        Clock::update(process, timer.timeout().time());
+      }
+    }
+  }
+
+  // Invoke the timers that timed out (TODO(benh): Do this
+  // asynchronously so that we don't tie up the event thread!).
+  foreach (const Timer& timer, timers) {
+    timer();
+  }
+}
+
+
 // We might find value in catching terminating signals at some point.
 // However, for now, adding signal handlers freely is not allowed
 // because they will clash with Java and Python virtual machines and
@@ -1484,6 +1492,8 @@ void initialize(const string& delegate)
   process_manager = new ProcessManager(delegate);
   socket_manager = new SocketManager();
 
+  Clock::initialize(lambda::bind(&timedout, lambda::_1));
+
   // Setup processing threads.
   // We create no fewer than 8 threads because some tests require
   // more worker threads than 'sysconf(_SC_NPROCESSORS_ONLN)' on
@@ -1701,6 +1711,9 @@ void initialize(const string& delegate)
 void finalize()
 {
   delete process_manager;
+
+  // TODO(benh): Finialize/shutdown Clock so that it doesn't attempt
+  // to dereference 'process_manager' in the 'timedout' callback.
 }
 
 

Reply via email to