Repository: mesos Updated Branches: refs/heads/master acd656c4e -> 8279b45ed
Abstract clock internals from ProcessManager::settle. Review: https://reviews.apache.org/r/27498 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/84efa184 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/84efa184 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/84efa184 Branch: refs/heads/master Commit: 84efa184159ebaa7f5110073358d35773d149f0e Parents: 788a136 Author: Benjamin Hindman <[email protected]> Authored: Sun Nov 2 15:03:22 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sat Nov 15 16:25:57 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/clock.hpp | 22 +++- 3rdparty/libprocess/include/process/future.hpp | 22 +++- 3rdparty/libprocess/src/process.cpp | 117 ++++++++++++++------ 3 files changed, 121 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/84efa184/3rdparty/libprocess/include/process/clock.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/clock.hpp b/3rdparty/libprocess/include/process/clock.hpp index 80190ef..d60742a 100644 --- a/3rdparty/libprocess/include/process/clock.hpp +++ b/3rdparty/libprocess/include/process/clock.hpp @@ -39,11 +39,27 @@ public: static void order(ProcessBase* from, ProcessBase* to); - // When the clock is paused, settle() synchronously ensures that: + // When the clock is paused this returns only after // (1) all expired timers are executed, - // (2) no Processes are running, and - // (3) no Processes are ready to run. + // (2) no processes are running, and + // (3) no processes are ready to run. + // + // In other words, this function blocks synchronously until no other + // execution on any processes will occur unless the clock is + // advanced. + // + // TODO(benh): Move this function elsewhere, for instance, to a + // top-level function in the 'process' namespace since it deals with + // both processes and the clock. static void settle(); + + // When the clock is paused this returns true if all timers that + // expire before the paused time have executed, otherwise false. + // Note that if the clock continually gets advanced concurrently + // this function may never return true because the "paused" time + // will continue to get pushed out farther in the future making more + // timers candidates for execution. + static bool settled(); }; } // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/84efa184/3rdparty/libprocess/include/process/future.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp index 68e5f7b..2e4f9ef 100644 --- a/3rdparty/libprocess/include/process/future.hpp +++ b/3rdparty/libprocess/include/process/future.hpp @@ -1121,7 +1121,7 @@ bool Future<T>::hasDiscard() const namespace internal { -inline void awaited(const Owned<Latch>& latch) +inline void awaited(Owned<Latch> latch) { latch->trigger(); } @@ -1132,18 +1132,32 @@ inline void awaited(const Owned<Latch>& latch) template <typename T> bool Future<T>::await(const Duration& duration) const { - Owned<Latch> latch; + // NOTE: We need to preemptively allocate the Latch on the stack + // instead of lazily create it in the critical section below because + // instantiating a Latch requires creating a new process (at the + // time of writing this comment) which might need to do some + // synchronization in libprocess which might deadlock if some other + // code in libprocess is already holding a lock and then attempts to + // do Promise::set (or something similar) that attempts to acquire + // the lock that we acquire here. This is an artifact of using + // Future/Promise within the implementation of libprocess. + // + // We mostly only call 'await' in tests so this should not be a + // performance concern. + Owned<Latch> latch(new Latch()); + + bool pending = false; internal::acquire(&data->lock); { if (data->state == PENDING) { - latch.reset(new Latch()); + pending = true; data->onAnyCallbacks.push(lambda::bind(&internal::awaited, latch)); } } internal::release(&data->lock); - if (latch.get() != NULL) { + if (pending) { return latch->await(duration); } http://git-wip-us.apache.org/repos/asf/mesos/blob/84efa184/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index 9ebac08..e7e5520 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -43,6 +43,7 @@ #include <boost/shared_array.hpp> +#include <process/check.hpp> #include <process/clock.hpp> #include <process/defer.hpp> #include <process/delay.hpp> @@ -475,11 +476,6 @@ static queue<lambda::function<void(void)> >* functions = static map<Time, list<Timer> >* timeouts = new map<Time, list<Timer> >(); static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE; -// For supporting Clock::settle(), true if timers have been removed -// from 'timeouts' but may not have been executed yet. Protected by -// the timeouts lock. This is only used when the clock is paused. -static bool pending_timers = false; - // Flag to indicate whether or to update the timer on async interrupt. static bool update_timer = false; @@ -525,6 +521,11 @@ Duration advanced = Duration::zero(); bool paused = false; +// For supporting Clock::settled(), false if we're not currently +// settling (or we're not paused), true if we're currently attempting +// to settle (and we're paused). +bool settling = false; + } // namespace clock { @@ -596,6 +597,7 @@ void Clock::resume() if (clock::paused) { VLOG(2) << "Clock resumed at " << clock::current; clock::paused = false; + clock::settling = false; clock::currents->clear(); update_timer = true; ev_async_send(loop, &async_watcher); @@ -667,6 +669,7 @@ void Clock::update(ProcessBase* process, const Time& time) void Clock::order(ProcessBase* from, ProcessBase* to) { + VLOG(2) << "Clock of " << to->self() << " being updated to " << from->self(); update(to, now(from)); } @@ -678,6 +681,29 @@ void Clock::settle() } +bool Clock::settled() +{ + synchronized (timeouts) { + CHECK(clock::paused); + + if (update_timer) { + return false; + } else if (clock::settling) { + VLOG(3) << "Clock still not settled"; + return false; + } else if (timeouts->size() == 0 || + timeouts->begin()->first > clock::current) { + VLOG(3) << "Clock is settled"; + return true; + } + + VLOG(3) << "Clock is not settled"; + + return false; + } +} + + Try<Time> Time::create(double seconds) { Try<Duration> duration = Duration::create(seconds); @@ -878,9 +904,12 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents) VLOG(3) << "Have timeout(s) at " << timeout; - // Record that we have pending timers to execute so the - // Clock::settle() operation can wait until we're done. - pending_timers = true; + // Need to toggle 'settling' so that we don't prematurely say + // we're settled until after the timers are executed below, + // outside of the critical section. + if (clock::paused) { + clock::settling = true; + } foreach (const Timer& timer, (*timeouts)[timeout]) { timedout.push_back(timer); @@ -943,11 +972,16 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents) timer(); } - // Mark ourselves as done executing the timers since it's now safe - // for a call to Clock::settle() to check if there will be any - // future timeouts reached. + // Mark 'settling' as false since there are not any more timeouts + // that will expire before the paused time and we've finished + // executing expired timers. synchronized (timeouts) { - pending_timers = false; + if (clock::paused && + (timeouts->size() == 0 || + timeouts->begin()->first > clock::current)) { + VLOG(3) << "Clock has settled"; + clock::settling = false; + } } } @@ -2961,7 +2995,15 @@ bool ProcessManager::wait(const UPID& pid) list<ProcessBase*>::iterator it = find(runq.begin(), runq.end(), process); if (it != runq.end()) { + // Found it! Remove it from the run queue since we'll be + // donating our thread and also increment 'running' before + // leaving this 'runq' protected critical section so that + // everyone that is waiting for the processes to settle + // continue to wait (otherwise they could see nothing in + // 'runq' and 'running' equal to 0 between when we exit + // this critical section and increment 'running'). runq.erase(it); + __sync_fetch_and_add(&running, 1); } else { // Another thread has resumed the process ... process = NULL; @@ -2977,7 +3019,6 @@ bool ProcessManager::wait(const UPID& pid) if (process != NULL) { VLOG(2) << "Donating thread to " << process->pid << " while waiting"; ProcessBase* donator = __process__; - __sync_fetch_and_add(&running, 1); process_manager->resume(process); __process__ = donator; } @@ -3046,30 +3087,39 @@ void ProcessManager::settle() { bool done = true; do { + // While refactoring in order to isolate libev behind abstractions + // it became evident that this os::sleep is vital for tests to + // pass. In particular, there are certain tests that assume too + // much before they attempt to do a settle. One such example is + // tests doing http::get followed by Clock::settle, where they + // expect the http::get will have properly enqueued a process on + // the run queue but http::get is just sending bytes on a + // socket. Without sleeping at the beginning of this function we + // can get unlucky and appear settled when in actuallity the + // kernel just hasn't copied the bytes to a socket or we haven't + // yet read the bytes and enqueued an event on a process (and the + // process on the run queue). os::sleep(Milliseconds(10)); - done = true; - // Hopefully this is the only place we acquire both these locks. - synchronized (runq) { - synchronized (timeouts) { - CHECK(Clock::paused()); // Since another thread could resume the clock! - if (!runq.empty()) { - done = false; - } + done = true; // Assume to start that we are settled. - __sync_synchronize(); // Read barrier for 'running'. - if (running > 0) { - done = false; - } + synchronized (runq) { + if (!runq.empty()) { + done = false; + continue; + } - if (timeouts->size() > 0 && - timeouts->begin()->first <= clock::current) { - done = false; - } + // Read barrier for 'running'. + __sync_synchronize(); - if (pending_timers) { - done = false; - } + if (running > 0) { + done = false; + continue; + } + + if (!Clock::settled()) { + done = false; + continue; } } } while (!done); @@ -3173,7 +3223,8 @@ Timer Clock::timer( Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk); - VLOG(3) << "Created a timer for " << timeout.time(); + VLOG(3) << "Created a timer for " << pid << " in " << stringify(duration) + << " in the future (" << timeout.time() << ")"; // Add the timer. synchronized (timeouts) {
