Updated RateLimiter to allow discard semantics. Review: https://reviews.apache.org/r/30192
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/572d117d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/572d117d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/572d117d Branch: refs/heads/master Commit: 572d117deeaf71c15b967fee8ed79558bc4c9f1a Parents: 311e15a Author: Vinod Kone <[email protected]> Authored: Thu Jan 22 13:00:09 2015 -0800 Committer: Vinod Kone <[email protected]> Committed: Wed Jan 28 14:25:39 2015 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/limiter.hpp | 44 ++++++++--- 3rdparty/libprocess/src/tests/limiter_tests.cpp | 78 ++++++++++++++++++++ 2 files changed, 111 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/572d117d/3rdparty/libprocess/include/process/limiter.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/limiter.hpp b/3rdparty/libprocess/include/process/limiter.hpp index a1c4f18..4bf24a8 100644 --- a/3rdparty/libprocess/include/process/limiter.hpp +++ b/3rdparty/libprocess/include/process/limiter.hpp @@ -3,6 +3,7 @@ #include <deque> +#include <process/defer.hpp> #include <process/delay.hpp> #include <process/dispatch.hpp> #include <process/id.hpp> @@ -34,6 +35,7 @@ public: ~RateLimiter(); // Returns a future that becomes ready when the permit is acquired. + // Discarding this future cancels this acquisition. Future<Nothing> acquire(); private: @@ -66,7 +68,7 @@ public: virtual void finalize() { foreach (Promise<Nothing>* promise, promises) { - promise->future().discard(); + promise->discard(); delete promise; } promises.clear(); @@ -78,13 +80,17 @@ public: // Need to wait for others to get permits first. Promise<Nothing>* promise = new Promise<Nothing>(); promises.push_back(promise); - return promise->future(); - } if (timeout.remaining() > Seconds(0)) { + return promise->future() + .onDiscard(defer(self(), &Self::discard, promise->future())); + } + + if (timeout.remaining() > Seconds(0)) { // Need to wait a bit longer, but first one in the queue. Promise<Nothing>* promise = new Promise<Nothing>(); promises.push_back(promise); delay(timeout.remaining(), self(), &Self::_acquire); - return promise->future(); + return promise->future() + .onDiscard(defer(self(), &Self::discard, promise->future())); } // No need to wait! @@ -101,13 +107,20 @@ private: { CHECK(!promises.empty()); - Promise<Nothing>* promise = promises.front(); - promises.pop_front(); - - promise->set(Nothing()); - delete promise; - - timeout = Seconds(1) / permitsPerSecond; + // Keep removing the top of the queue until we find a promise + // whose future is not discarded. + while (!promises.empty()) { + Promise<Nothing>* promise = promises.front(); + promises.pop_front(); + if (!promise->future().isDiscarded()) { + promise->set(Nothing()); + delete promise; + timeout = Seconds(1) / permitsPerSecond; + break; + } else { + delete promise; + } + } // Repeat if necessary. if (!promises.empty()) { @@ -115,6 +128,15 @@ private: } } + void discard(const Future<Nothing>& future) + { + foreach (Promise<Nothing>* promise, promises) { + if (promise->future() == future) { + promise->discard(); + } + } + } + double permitsPerSecond; Timeout timeout; http://git-wip-us.apache.org/repos/asf/mesos/blob/572d117d/3rdparty/libprocess/src/tests/limiter_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/limiter_tests.cpp b/3rdparty/libprocess/src/tests/limiter_tests.cpp index f27111c..00cc03d 100644 --- a/3rdparty/libprocess/src/tests/limiter_tests.cpp +++ b/3rdparty/libprocess/src/tests/limiter_tests.cpp @@ -37,3 +37,81 @@ TEST(Limiter, Acquire) AWAIT_READY(acquire3); ASSERT_LE(interval * 2, stopwatch.elapsed()); } + + +// In this test 4 permits are given, but the 2nd permit's acquire +// is immediately discarded. So, 1st, 3rd and 4th permits should +// be acquired according to the rate limit. +TEST(Limiter, DiscardMiddle) +{ + ASSERT_TRUE(GTEST_IS_THREADSAFE); + + int permits = 2; + Duration duration = Milliseconds(5); + + RateLimiter limiter(permits, duration); + Milliseconds interval = duration / permits; + + Stopwatch stopwatch; + stopwatch.start(); + + Future<Nothing> acquire1 = limiter.acquire(); + Future<Nothing> acquire2 = limiter.acquire(); + Future<Nothing> acquire3 = limiter.acquire(); + Future<Nothing> acquire4 = limiter.acquire(); + + AWAIT_READY(acquire1); + + // Discard 'acquire2. + acquire2.discard(); + + // Wait until 'acquire3' is ready. + AWAIT_READY(acquire3); + + // 'acquire2' should be in 'DISCARDED' state. + AWAIT_DISCARDED(acquire2); + + // 'acquire3' should be satisfied within one 'interval'. + ASSERT_LE(interval, stopwatch.elapsed()); + ASSERT_GE(interval * 2, stopwatch.elapsed()); + + // 'acquire4' should be satisfied one 'interval' after + // 'acquire3' is satisfied. + AWAIT_READY(acquire4); + ASSERT_LE(interval * 2, stopwatch.elapsed()); +} + + +// In this test 2 permits are initially given, but the 2nd permit's +// future is immediately discarded. Then the 3rd permit is given. So, +// 1st and 3rd permits should be acquired according to the rate limit. +TEST(Limiter, DiscardLast) +{ + ASSERT_TRUE(GTEST_IS_THREADSAFE); + + int permits = 2; + Duration duration = Milliseconds(5); + + RateLimiter limiter(permits, duration); + Milliseconds interval = duration / permits; + + Stopwatch stopwatch; + stopwatch.start(); + + Future<Nothing> acquire1 = limiter.acquire(); + Future<Nothing> acquire2 = limiter.acquire(); + + AWAIT_READY(acquire1); + + // Discard 'acquire2'. + acquire2.discard(); + + // Now acquire 'acquire3'. + Future<Nothing> acquire3 = limiter.acquire(); + + // 'acquire3' should be satisfied one 'interval' after + // 'acquire1' is satisfied. + AWAIT_READY(acquire3); + ASSERT_LE(interval, stopwatch.elapsed()); + ASSERT_GE(interval * 2, stopwatch.elapsed()); +}
