Moved Clock implementation into clock.cpp. Review: https://reviews.apache.org/r/27503
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0e19796d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0e19796d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0e19796d Branch: refs/heads/master Commit: 0e19796dec8db07e2a60df15b88970dc387fbe21 Parents: 18cc45f Author: Benjamin Hindman <[email protected]> Authored: Sun Nov 2 17:04:25 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sat Nov 15 16:25:58 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/Makefile.am | 1 + 3rdparty/libprocess/src/clock.cpp | 437 +++++++++++++++++++++++++++++++ 3rdparty/libprocess/src/process.cpp | 410 +---------------------------- 3 files changed, 444 insertions(+), 404 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/0e19796d/3rdparty/libprocess/Makefile.am ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am index a55d562..0008e68 100644 --- a/3rdparty/libprocess/Makefile.am +++ b/3rdparty/libprocess/Makefile.am @@ -30,6 +30,7 @@ PICOJSON = 3rdparty/picojson-$(PICOJSON_VERSION) noinst_LTLIBRARIES = libprocess.la libprocess_la_SOURCES = \ + src/clock.cpp \ src/config.hpp \ src/decoder.hpp \ src/encoder.hpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/0e19796d/3rdparty/libprocess/src/clock.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/clock.cpp b/3rdparty/libprocess/src/clock.cpp new file mode 100644 index 0000000..5623b58 --- /dev/null +++ b/3rdparty/libprocess/src/clock.cpp @@ -0,0 +1,437 @@ +#include <ev.h> + +#include <glog/logging.h> + +#include <list> +#include <map> + +#include <process/clock.hpp> +#include <process/pid.hpp> +#include <process/process.hpp> +#include <process/time.hpp> +#include <process/timeout.hpp> + +#include <stout/duration.hpp> +#include <stout/foreach.hpp> +#include <stout/lambda.hpp> +#include <stout/try.hpp> + +#include "synchronized.hpp" + +using std::list; +using std::map; + +namespace process { + +// Event loop. +extern struct ev_loop* loop; + +// Asynchronous watcher for interrupting loop to specifically deal +// with updating timers. +static ev_async async_update_timer_watcher; + +// Watcher for timeouts. +static ev_timer timeouts_watcher; + +// We store the timers in a map of lists indexed by the timeout of the +// timer so that we can have two timers that have the same timeout. We +// exploit that the map is SORTED! +static map<Time, list<Timer>>* timeouts = new map<Time, list<Timer>>(); +static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE; + +// Flag to indicate whether or to update the timer on async interrupt. +static bool update_timer = false; + + +// We namespace the clock related variables to keep them well +// named. In the future we'll probably want to associate a clock with +// a specific ProcessManager/SocketManager instance pair, so this will +// likely change. +namespace clock { + +map<ProcessBase*, Time>* currents = new map<ProcessBase*, Time>(); + +// TODO(dhamon): These static non-POD instances should be replaced by pointers +// or functions. +Time initial = Time::epoch(); +Time current = Time::epoch(); + +Duration advanced = Duration::zero(); + +bool paused = false; + +// For supporting Clock::settled(), false if we're not currently +// settling (or we're not paused), true if we're currently attempting +// to settle (and we're paused). +bool settling = false; + +// Lambda function to invoke when timers have expired. +lambda::function<void(list<Timer>&&)> callback; + +} // namespace clock { + + +void handle_async_update_timer(struct ev_loop* loop, ev_async* _, int revents) +{ + synchronized (timeouts) { + if (update_timer) { + if (!timeouts->empty()) { + // Determine when the next timer should fire. + timeouts_watcher.repeat = + (timeouts->begin()->first - Clock::now()).secs(); + + if (timeouts_watcher.repeat <= 0) { + // Feed the event now! + timeouts_watcher.repeat = 0; + ev_timer_again(loop, &timeouts_watcher); + ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT); + } else { + // Don't fire the timer if the clock is paused since we + // don't want time to advance (instead a call to + // clock::advance() will handle the timer). + if (Clock::paused() && timeouts_watcher.repeat > 0) { + timeouts_watcher.repeat = 0; + } + + ev_timer_again(loop, &timeouts_watcher); + } + } + + update_timer = false; + } + } +} + + +void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents) +{ + list<Timer> timers; + + synchronized (timeouts) { + Time now = Clock::now(); + + VLOG(3) << "Handling timeouts up to " << now; + + foreachkey (const Time& timeout, *timeouts) { + if (timeout > now) { + break; + } + + VLOG(3) << "Have timeout(s) at " << timeout; + + // Need to toggle 'settling' so that we don't prematurely say + // we're settled until after the timers are executed below, + // outside of the critical section. + if (clock::paused) { + clock::settling = true; + } + + foreach (const Timer& timer, (*timeouts)[timeout]) { + timers.push_back(timer); + } + } + + // Now erase the range of timeouts that timed out. + timeouts->erase(timeouts->begin(), timeouts->upper_bound(now)); + + // Okay, so the timeout for the next timer should not have fired. + CHECK(timeouts->empty() || (timeouts->begin()->first > now)); + + // Update the timer as necessary. + if (!timeouts->empty()) { + // Determine when the next timer should fire. + timeouts_watcher.repeat = + (timeouts->begin()->first - Clock::now()).secs(); + + if (timeouts_watcher.repeat <= 0) { + // Feed the event now! + timeouts_watcher.repeat = 0; + ev_timer_again(loop, &timeouts_watcher); + ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT); + } else { + // Don't fire the timer if the clock is paused since we don't + // want time to advance (instead a call to Clock::advance() + // will handle the timer). + if (Clock::paused() && timeouts_watcher.repeat > 0) { + timeouts_watcher.repeat = 0; + } + + ev_timer_again(loop, &timeouts_watcher); + } + } + + update_timer = false; // Since we might have a queued update_timer. + } + + clock::callback(std::move(timers)); + + // Mark 'settling' as false since there are not any more timeouts + // that will expire before the paused time and we've finished + // executing expired timers. + synchronized (timeouts) { + if (clock::paused && + (timeouts->size() == 0 || + timeouts->begin()->first > clock::current)) { + VLOG(3) << "Clock has settled"; + clock::settling = false; + } + } +} + + +void Clock::initialize(lambda::function<void(list<Timer>&&)>&& callback) +{ + // TODO(benh): Currently this function is expected to get called + // just after initializing libev in process::initialize. But that is + // too tightly coupled so and we really need to move libev specific + // intialization outside of process::initialize that both + // process::initialize and Clock::initialize can depend on (and thus + // call). + + clock::callback = callback; + + ev_async_init(&async_update_timer_watcher, handle_async_update_timer); + ev_async_start(loop, &async_update_timer_watcher); + + ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0); + ev_timer_again(loop, &timeouts_watcher); +} + + +Time Clock::now() +{ + return now(__process__); +} + + +Time Clock::now(ProcessBase* process) +{ + synchronized (timeouts) { + if (Clock::paused()) { + if (process != NULL) { + if (clock::currents->count(process) != 0) { + return (*clock::currents)[process]; + } else { + return (*clock::currents)[process] = clock::initial; + } + } else { + return clock::current; + } + } + } + + // TODO(benh): Versus ev_now()? + double d = ev_time(); + Try<Time> time = Time::create(d); // Compensates for clock::advanced. + + // TODO(xujyan): Move CHECK_SOME to libprocess and add CHECK_SOME + // here. + if (time.isError()) { + LOG(FATAL) << "Failed to create a Time from " << d << ": " + << time.error(); + } + return time.get(); +} + + +Timer Clock::timer( + const Duration& duration, + const lambda::function<void(void)>& thunk) +{ + static uint64_t id = 1; // Start at 1 since Timer() instances use id 0. + + // Assumes Clock::now() does Clock::now(__process__). + Timeout timeout = Timeout::in(duration); + + UPID pid = __process__ != NULL ? __process__->self() : UPID(); + + Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk); + + VLOG(3) << "Created a timer for " << pid << " in " << stringify(duration) + << " in the future (" << timeout.time() << ")"; + + // Add the timer. + synchronized (timeouts) { + if (timeouts->size() == 0 || + timer.timeout().time() < timeouts->begin()->first) { + // Need to interrupt the loop to update/set timer repeat. + (*timeouts)[timer.timeout().time()].push_back(timer); + update_timer = true; + ev_async_send(loop, &async_update_timer_watcher); + } else { + // Timer repeat is adequate, just add the timeout. + CHECK(timeouts->size() >= 1); + (*timeouts)[timer.timeout().time()].push_back(timer); + } + } + + return timer; +} + + +bool Clock::cancel(const Timer& timer) +{ + bool canceled = false; + synchronized (timeouts) { + // Check if the timeout is still pending, and if so, erase it. In + // addition, erase an empty list if we just removed the last + // timeout. + Time time = timer.timeout().time(); + if (timeouts->count(time) > 0) { + canceled = true; + (*timeouts)[time].remove(timer); + if ((*timeouts)[time].empty()) { + timeouts->erase(time); + } + } + } + + return canceled; +} + + +void Clock::pause() +{ + process::initialize(); // To make sure the libev watchers are ready. + + synchronized (timeouts) { + if (!clock::paused) { + clock::initial = clock::current = now(); + clock::paused = true; + VLOG(2) << "Clock paused at " << clock::initial; + } + } + + // Note that after pausing the clock an existing libev timer might + // still fire (invoking handle_timeout), but since paused == true no + // "time" will actually have passed, so no timer will actually fire. +} + + +bool Clock::paused() +{ + return clock::paused; +} + + +void Clock::resume() +{ + process::initialize(); // To make sure the libev watchers are ready. + + synchronized (timeouts) { + if (clock::paused) { + VLOG(2) << "Clock resumed at " << clock::current; + clock::paused = false; + clock::settling = false; + clock::currents->clear(); + update_timer = true; + ev_async_send(loop, &async_update_timer_watcher); + } + } +} + + +void Clock::advance(const Duration& duration) +{ + synchronized (timeouts) { + if (clock::paused) { + clock::advanced += duration; + clock::current += duration; + VLOG(2) << "Clock advanced (" << duration << ") to " << clock::current; + if (!update_timer) { + update_timer = true; + ev_async_send(loop, &async_update_timer_watcher); + } + } + } +} + + +void Clock::advance(ProcessBase* process, const Duration& duration) +{ + synchronized (timeouts) { + if (clock::paused) { + Time current = now(process); + current += duration; + (*clock::currents)[process] = current; + VLOG(2) << "Clock of " << process->self() << " advanced (" << duration + << ") to " << current; + } + } +} + + +void Clock::update(const Time& time) +{ + synchronized (timeouts) { + if (clock::paused) { + if (clock::current < time) { + clock::advanced += (time - clock::current); + clock::current = Time(time); + VLOG(2) << "Clock updated to " << clock::current; + if (!update_timer) { + update_timer = true; + ev_async_send(loop, &async_update_timer_watcher); + } + } + } + } +} + + +void Clock::update(ProcessBase* process, const Time& time, Update update) +{ + synchronized (timeouts) { + if (clock::paused) { + if (now(process) < time || update == Clock::FORCE) { + VLOG(2) << "Clock of " << process->self() << " updated to " << time; + (*clock::currents)[process] = Time(time); + } + } + } +} + + +void Clock::order(ProcessBase* from, ProcessBase* to) +{ + VLOG(2) << "Clock of " << to->self() << " being updated to " << from->self(); + update(to, now(from)); +} + + +bool Clock::settled() +{ + synchronized (timeouts) { + CHECK(clock::paused); + + if (update_timer) { + return false; + } else if (clock::settling) { + VLOG(3) << "Clock still not settled"; + return false; + } else if (timeouts->size() == 0 || + timeouts->begin()->first > clock::current) { + VLOG(3) << "Clock is settled"; + return true; + } + + VLOG(3) << "Clock is not settled"; + return false; + } +} + + +// TODO(benh): Introduce a Clock::time(seconds) that replaces this +// function for getting a 'Time' value. +Try<Time> Time::create(double seconds) +{ + Try<Duration> duration = Duration::create(seconds); + if (duration.isSome()) { + // In production code, clock::advanced will always be zero! + return Time(duration.get() + clock::advanced); + } else { + return Error("Argument too large for Time: " + duration.error()); + } +} + +} // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/0e19796d/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index a0b4ca0..bac4200 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -447,19 +447,12 @@ static SocketManager* socket_manager = NULL; static ProcessManager* process_manager = NULL; // Event loop. -static struct ev_loop* loop = NULL; +struct ev_loop* loop = NULL; // Asynchronous watcher for interrupting loop to specifically deal // with IO watchers and functions (via run_in_event_loop). static ev_async async_watcher; -// Asynchronous watcher for interrupting loop to specifically deal -// with updating timers. -static ev_async async_update_timer_watcher; - -// Watcher for timeouts. -static ev_timer timeouts_watcher; - // Server watcher for accepting connections. static ev_io server_watcher; @@ -475,15 +468,6 @@ static synchronizable(watchers) = SYNCHRONIZED_INITIALIZER; static queue<lambda::function<void(void)> >* functions = new queue<lambda::function<void(void)> >(); -// We store the timers in a map of lists indexed by the timeout of the -// timer so that we can have two timers that have the same timeout. We -// exploit that the map is SORTED! -static map<Time, list<Timer> >* timeouts = new map<Time, list<Timer> >(); -static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE; - -// Flag to indicate whether or to update the timer on async interrupt. -static bool update_timer = false; - // Scheduling gate that threads wait at when there is nothing to run. static Gate* gate = new Gate(); @@ -509,227 +493,15 @@ ThreadLocal<Executor>* _executor_ = new ThreadLocal<Executor>(); // const Duration LIBPROCESS_STATISTICS_WINDOW = Days(1); -// We namespace the clock related variables to keep them well -// named. In the future we'll probably want to associate a clock with -// a specific ProcessManager/SocketManager instance pair, so this will -// likely change. -namespace clock { - -map<ProcessBase*, Time>* currents = new map<ProcessBase*, Time>(); - -// TODO(dhamon): These static non-POD instances should be replaced by pointers -// or functions. -Time initial = Time::epoch(); -Time current = Time::epoch(); - -Duration advanced = Duration::zero(); - -bool paused = false; - -// For supporting Clock::settled(), false if we're not currently -// settling (or we're not paused), true if we're currently attempting -// to settle (and we're paused). -bool settling = false; - -// Lambda function to invoke when timers have expired. -lambda::function<void(list<Timer>&&)> callback; - -} // namespace clock { - - -void Clock::initialize(lambda::function<void(list<Timer>&&)>&& callback) -{ - clock::callback = callback; -} - - -Time Clock::now() -{ - return now(__process__); -} - - -Time Clock::now(ProcessBase* process) -{ - synchronized (timeouts) { - if (Clock::paused()) { - if (process != NULL) { - if (clock::currents->count(process) != 0) { - return (*clock::currents)[process]; - } else { - return (*clock::currents)[process] = clock::initial; - } - } else { - return clock::current; - } - } - } - - // TODO(benh): Versus ev_now()? - double d = ev_time(); - Try<Time> time = Time::create(d); // Compensates for clock::advanced. - - // TODO(xujyan): Move CHECK_SOME to libprocess and add CHECK_SOME - // here. - if (time.isError()) { - LOG(FATAL) << "Failed to create a Time from " << d << ": " - << time.error(); - } - return time.get(); -} - - -void Clock::pause() -{ - process::initialize(); // To make sure the libev watchers are ready. - - synchronized (timeouts) { - if (!clock::paused) { - clock::initial = clock::current = now(); - clock::paused = true; - VLOG(2) << "Clock paused at " << clock::initial; - } - } - - // Note that after pausing the clock an existing libev timer might - // still fire (invoking handle_timeout), but since paused == true no - // "time" will actually have passed, so no timer will actually fire. -} - - -bool Clock::paused() -{ - return clock::paused; -} - - -void Clock::resume() -{ - process::initialize(); // To make sure the libev watchers are ready. - - synchronized (timeouts) { - if (clock::paused) { - VLOG(2) << "Clock resumed at " << clock::current; - clock::paused = false; - clock::settling = false; - clock::currents->clear(); - update_timer = true; - ev_async_send(loop, &async_update_timer_watcher); - } - } -} - - -void Clock::advance(const Duration& duration) -{ - synchronized (timeouts) { - if (clock::paused) { - clock::advanced += duration; - clock::current += duration; - VLOG(2) << "Clock advanced (" << duration << ") to " << clock::current; - if (!update_timer) { - update_timer = true; - ev_async_send(loop, &async_update_timer_watcher); - } - } - } -} - - -void Clock::advance(ProcessBase* process, const Duration& duration) -{ - synchronized (timeouts) { - if (clock::paused) { - Time current = now(process); - current += duration; - (*clock::currents)[process] = current; - VLOG(2) << "Clock of " << process->self() << " advanced (" << duration - << ") to " << current; - } - } -} - - -void Clock::update(const Time& time) -{ - synchronized (timeouts) { - if (clock::paused) { - if (clock::current < time) { - clock::advanced += (time - clock::current); - clock::current = Time(time); - VLOG(2) << "Clock updated to " << clock::current; - if (!update_timer) { - update_timer = true; - ev_async_send(loop, &async_update_timer_watcher); - } - } - } - } -} - - -void Clock::update(ProcessBase* process, const Time& time, Update update) -{ - synchronized (timeouts) { - if (clock::paused) { - if (now(process) < time || update == Clock::FORCE) { - VLOG(2) << "Clock of " << process->self() << " updated to " << time; - (*clock::currents)[process] = Time(time); - } - } - } -} - - -void Clock::order(ProcessBase* from, ProcessBase* to) -{ - VLOG(2) << "Clock of " << to->self() << " being updated to " << from->self(); - update(to, now(from)); -} - - +// NOTE: Clock::* implementations are in clock.cpp except for +// Clock::settle which currently has a dependency on +// 'process_manager'. void Clock::settle() { - CHECK(clock::paused); // TODO(benh): Consider returning a bool instead. process_manager->settle(); } -bool Clock::settled() -{ - synchronized (timeouts) { - CHECK(clock::paused); - - if (update_timer) { - return false; - } else if (clock::settling) { - VLOG(3) << "Clock still not settled"; - return false; - } else if (timeouts->size() == 0 || - timeouts->begin()->first > clock::current) { - VLOG(3) << "Clock is settled"; - return true; - } - - VLOG(3) << "Clock is not settled"; - - return false; - } -} - - -Try<Time> Time::create(double seconds) -{ - Try<Duration> duration = Duration::create(seconds); - if (duration.isSome()) { - // In production code, clock::advanced will always be zero! - return Time(duration.get() + clock::advanced); - } else { - return Error("Argument too large for Time: " + duration.error()); - } -} - - static Message* encode(const UPID& from, const UPID& to, const string& name, @@ -874,114 +646,6 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents) } -void handle_async_update_timer(struct ev_loop* loop, ev_async* _, int revents) -{ - synchronized (timeouts) { - if (update_timer) { - if (!timeouts->empty()) { - // Determine when the next timer should fire. - timeouts_watcher.repeat = - (timeouts->begin()->first - Clock::now()).secs(); - - if (timeouts_watcher.repeat <= 0) { - // Feed the event now! - timeouts_watcher.repeat = 0; - ev_timer_again(loop, &timeouts_watcher); - ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT); - } else { - // Don't fire the timer if the clock is paused since we - // don't want time to advance (instead a call to - // clock::advance() will handle the timer). - if (Clock::paused() && timeouts_watcher.repeat > 0) { - timeouts_watcher.repeat = 0; - } - - ev_timer_again(loop, &timeouts_watcher); - } - } - - update_timer = false; - } - } -} - - -void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents) -{ - list<Timer> timers; - - synchronized (timeouts) { - Time now = Clock::now(); - - VLOG(3) << "Handling timeouts up to " << now; - - foreachkey (const Time& timeout, *timeouts) { - if (timeout > now) { - break; - } - - VLOG(3) << "Have timeout(s) at " << timeout; - - // Need to toggle 'settling' so that we don't prematurely say - // we're settled until after the timers are executed below, - // outside of the critical section. - if (clock::paused) { - clock::settling = true; - } - - foreach (const Timer& timer, (*timeouts)[timeout]) { - timers.push_back(timer); - } - } - - // Now erase the range of timeouts that timed out. - timeouts->erase(timeouts->begin(), timeouts->upper_bound(now)); - - // Okay, so the timeout for the next timer should not have fired. - CHECK(timeouts->empty() || (timeouts->begin()->first > now)); - - // Update the timer as necessary. - if (!timeouts->empty()) { - // Determine when the next timer should fire. - timeouts_watcher.repeat = - (timeouts->begin()->first - Clock::now()).secs(); - - if (timeouts_watcher.repeat <= 0) { - // Feed the event now! - timeouts_watcher.repeat = 0; - ev_timer_again(loop, &timeouts_watcher); - ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT); - } else { - // Don't fire the timer if the clock is paused since we don't - // want time to advance (instead a call to Clock::advance() - // will handle the timer). - if (Clock::paused() && timeouts_watcher.repeat > 0) { - timeouts_watcher.repeat = 0; - } - - ev_timer_again(loop, &timeouts_watcher); - } - } - - update_timer = false; // Since we might have a queued update_timer. - } - - clock::callback(std::move(timers)); - - // Mark 'settling' as false since there are not any more timeouts - // that will expire before the paused time and we've finished - // executing expired timers. - synchronized (timeouts) { - if (clock::paused && - (timeouts->size() == 0 || - timeouts->begin()->first > clock::current)) { - VLOG(3) << "Clock has settled"; - clock::settling = false; - } - } -} - - void recv_data(struct ev_loop* loop, ev_io* watcher, int revents) { DataDecoder* decoder = (DataDecoder*) watcher->data; @@ -1501,8 +1165,6 @@ void initialize(const string& delegate) process_manager = new ProcessManager(delegate); socket_manager = new SocketManager(); - Clock::initialize(lambda::bind(&timedout, lambda::_1)); - // Setup processing threads. // We create no fewer than 8 threads because some tests require // more worker threads than 'sysconf(_SC_NPROCESSORS_ONLN)' on @@ -1628,15 +1290,11 @@ void initialize(const string& delegate) ev_async_init(&async_watcher, handle_async); ev_async_start(loop, &async_watcher); - ev_async_init(&async_update_timer_watcher, handle_async_update_timer); - ev_async_start(loop, &async_update_timer_watcher); - - ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0); - ev_timer_again(loop, &timeouts_watcher); - ev_io_init(&server_watcher, accept, __s__, EV_READ); ev_io_start(loop, &server_watcher); + Clock::initialize(lambda::bind(&timedout, lambda::_1)); + // ev_child_init(&child_watcher, child_exited, pid, 0); // ev_child_start(loop, &cw); @@ -3215,62 +2873,6 @@ Future<Response> ProcessManager::__processes__(const Request&) } -Timer Clock::timer( - const Duration& duration, - const lambda::function<void(void)>& thunk) -{ - static uint64_t id = 1; // Start at 1 since Timer() instances use id 0. - - // Assumes Clock::now() does Clock::now(__process__). - Timeout timeout = Timeout::in(duration); - - UPID pid = __process__ != NULL ? __process__->self() : UPID(); - - Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk); - - VLOG(3) << "Created a timer for " << pid << " in " << stringify(duration) - << " in the future (" << timeout.time() << ")"; - - // Add the timer. - synchronized (timeouts) { - if (timeouts->size() == 0 || - timer.timeout().time() < timeouts->begin()->first) { - // Need to interrupt the loop to update/set timer repeat. - (*timeouts)[timer.timeout().time()].push_back(timer); - update_timer = true; - ev_async_send(loop, &async_update_timer_watcher); - } else { - // Timer repeat is adequate, just add the timeout. - CHECK(timeouts->size() >= 1); - (*timeouts)[timer.timeout().time()].push_back(timer); - } - } - - return timer; -} - - -bool Clock::cancel(const Timer& timer) -{ - bool canceled = false; - synchronized (timeouts) { - // Check if the timeout is still pending, and if so, erase it. In - // addition, erase an empty list if we just removed the last - // timeout. - Time time = timer.timeout().time(); - if (timeouts->count(time) > 0) { - canceled = true; - (*timeouts)[time].remove(timer); - if ((*timeouts)[time].empty()) { - timeouts->erase(time); - } - } - } - - return canceled; -} - - ProcessBase::ProcessBase(const string& id) { process::initialize();
