Introduced a callback for timer expiration. This enables seperating the Clock specific functionality from the ProcessManager functionality so that the Clock implementation can ultimately be completely isolated.
Review: https://reviews.apache.org/r/27499 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1f2bb9b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1f2bb9b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1f2bb9b Branch: refs/heads/master Commit: b1f2bb9b7eabf74be19bcd642e94842a3a476f30 Parents: 84efa18 Author: Benjamin Hindman <[email protected]> Authored: Sun Nov 2 15:50:36 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sat Nov 15 16:25:58 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/clock.hpp | 10 ++++ 3rdparty/libprocess/src/process.cpp | 63 +++++++++++++--------- 2 files changed, 48 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b1f2bb9b/3rdparty/libprocess/include/process/clock.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/clock.hpp b/3rdparty/libprocess/include/process/clock.hpp index d60742a..ae7d0fb 100644 --- a/3rdparty/libprocess/include/process/clock.hpp +++ b/3rdparty/libprocess/include/process/clock.hpp @@ -1,6 +1,8 @@ #ifndef __PROCESS_CLOCK_HPP__ #define __PROCESS_CLOCK_HPP__ +#include <list> + #include <process/time.hpp> #include <process/timer.hpp> @@ -17,6 +19,14 @@ class Timer; class Clock { public: + // Initialize the clock with the specified callback that will be + // invoked whenever a batch of timers has expired. + // + // TODO(benh): Introduce a "channel" or listener pattern for getting + // the expired Timers rather than passing in a callback. This might + // mean we don't need 'initialize' or 'shutdown'. + static void initialize(lambda::function<void(std::list<Timer>&&)>&& callback); + static Time now(); static Time now(ProcessBase* process); http://git-wip-us.apache.org/repos/asf/mesos/blob/b1f2bb9b/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index e7e5520..9551d99 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -526,9 +526,18 @@ bool paused = false; // to settle (and we're paused). bool settling = false; +// Lambda function to invoke when timers have expired. +lambda::function<void(list<Timer>&&)> callback; + } // namespace clock { +void Clock::initialize(lambda::function<void(list<Timer>&&)>&& callback) +{ + clock::callback = callback; +} + + Time Clock::now() { return now(__process__); @@ -890,7 +899,7 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents) void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents) { - list<Timer> timedout; + list<Timer> timers; synchronized (timeouts) { Time now = Clock::now(); @@ -912,7 +921,7 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents) } foreach (const Timer& timer, (*timeouts)[timeout]) { - timedout.push_back(timer); + timers.push_back(timer); } } @@ -948,29 +957,7 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents) update_timer = false; // Since we might have a queued update_timer. } - // Update current time of process (if it's present/valid). It might - // be necessary to actually add some more synchronization around - // this so that, for example, pausing and resuming the clock doesn't - // cause some processes to get thier current times updated and - // others not. Since ProcessManager::use acquires the 'processes' - // lock we had to move this out of the synchronized (timeouts) above - // since there was a deadlock with acquring 'processes' then - // 'timeouts' (reverse order) in ProcessManager::cleanup. Note that - // current time may be greater than the timeout if a local message - // was received (and happens-before kicks in). - if (Clock::paused()) { - foreach (const Timer& timer, timedout) { - if (ProcessReference process = process_manager->use(timer.creator())) { - Clock::update(process, timer.timeout().time()); - } - } - } - - // Invoke the timers that timed out (TODO(benh): Do this - // asynchronously so that we don't tie up the event thread!). - foreach (const Timer& timer, timedout) { - timer(); - } + clock::callback(std::move(timers)); // Mark 'settling' as false since there are not any more timeouts // that will expire before the paused time and we've finished @@ -1412,6 +1399,27 @@ void* schedule(void* arg) } +void timedout(list<Timer>&& timers) +{ + // Update current time of process (if it's present/valid). Note that + // current time may be greater than the timeout if a local message + // was received (and happens-before kicks in). + if (Clock::paused()) { + foreach (const Timer& timer, timers) { + if (ProcessReference process = process_manager->use(timer.creator())) { + Clock::update(process, timer.timeout().time()); + } + } + } + + // Invoke the timers that timed out (TODO(benh): Do this + // asynchronously so that we don't tie up the event thread!). + foreach (const Timer& timer, timers) { + timer(); + } +} + + // We might find value in catching terminating signals at some point. // However, for now, adding signal handlers freely is not allowed // because they will clash with Java and Python virtual machines and @@ -1484,6 +1492,8 @@ void initialize(const string& delegate) process_manager = new ProcessManager(delegate); socket_manager = new SocketManager(); + Clock::initialize(lambda::bind(&timedout, lambda::_1)); + // Setup processing threads. // We create no fewer than 8 threads because some tests require // more worker threads than 'sysconf(_SC_NPROCESSORS_ONLN)' on @@ -1701,6 +1711,9 @@ void initialize(const string& delegate) void finalize() { delete process_manager; + + // TODO(benh): Finialize/shutdown Clock so that it doesn't attempt + // to dereference 'process_manager' in the 'timedout' callback. }
