Updated RateLimiter to allow discard semantics.

Review: https://reviews.apache.org/r/30192


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/572d117d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/572d117d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/572d117d

Branch: refs/heads/master
Commit: 572d117deeaf71c15b967fee8ed79558bc4c9f1a
Parents: 311e15a
Author: Vinod Kone <[email protected]>
Authored: Thu Jan 22 13:00:09 2015 -0800
Committer: Vinod Kone <[email protected]>
Committed: Wed Jan 28 14:25:39 2015 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/limiter.hpp | 44 ++++++++---
 3rdparty/libprocess/src/tests/limiter_tests.cpp | 78 ++++++++++++++++++++
 2 files changed, 111 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/572d117d/3rdparty/libprocess/include/process/limiter.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/limiter.hpp 
b/3rdparty/libprocess/include/process/limiter.hpp
index a1c4f18..4bf24a8 100644
--- a/3rdparty/libprocess/include/process/limiter.hpp
+++ b/3rdparty/libprocess/include/process/limiter.hpp
@@ -3,6 +3,7 @@
 
 #include <deque>
 
+#include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/dispatch.hpp>
 #include <process/id.hpp>
@@ -34,6 +35,7 @@ public:
   ~RateLimiter();
 
   // Returns a future that becomes ready when the permit is acquired.
+  // Discarding this future cancels this acquisition.
   Future<Nothing> acquire();
 
 private:
@@ -66,7 +68,7 @@ public:
   virtual void finalize()
   {
     foreach (Promise<Nothing>* promise, promises) {
-      promise->future().discard();
+      promise->discard();
       delete promise;
     }
     promises.clear();
@@ -78,13 +80,17 @@ public:
       // Need to wait for others to get permits first.
       Promise<Nothing>* promise = new Promise<Nothing>();
       promises.push_back(promise);
-      return promise->future();
-    } if (timeout.remaining() > Seconds(0)) {
+      return promise->future()
+        .onDiscard(defer(self(), &Self::discard, promise->future()));
+    }
+
+    if (timeout.remaining() > Seconds(0)) {
       // Need to wait a bit longer, but first one in the queue.
       Promise<Nothing>* promise = new Promise<Nothing>();
       promises.push_back(promise);
       delay(timeout.remaining(), self(), &Self::_acquire);
-      return promise->future();
+      return promise->future()
+        .onDiscard(defer(self(), &Self::discard, promise->future()));
     }
 
     // No need to wait!
@@ -101,13 +107,20 @@ private:
   {
     CHECK(!promises.empty());
 
-    Promise<Nothing>* promise = promises.front();
-    promises.pop_front();
-
-    promise->set(Nothing());
-    delete promise;
-
-    timeout = Seconds(1) / permitsPerSecond;
+    // Keep removing the top of the queue until we find a promise
+    // whose future is not discarded.
+    while (!promises.empty()) {
+      Promise<Nothing>* promise = promises.front();
+      promises.pop_front();
+      if (!promise->future().isDiscarded()) {
+        promise->set(Nothing());
+        delete promise;
+        timeout = Seconds(1) / permitsPerSecond;
+        break;
+      } else {
+        delete promise;
+      }
+    }
 
     // Repeat if necessary.
     if (!promises.empty()) {
@@ -115,6 +128,15 @@ private:
     }
   }
 
+  void discard(const Future<Nothing>& future)
+  {
+    foreach (Promise<Nothing>* promise, promises) {
+      if (promise->future() == future) {
+        promise->discard();
+      }
+    }
+  }
+
   double permitsPerSecond;
 
   Timeout timeout;

http://git-wip-us.apache.org/repos/asf/mesos/blob/572d117d/3rdparty/libprocess/src/tests/limiter_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/limiter_tests.cpp 
b/3rdparty/libprocess/src/tests/limiter_tests.cpp
index f27111c..00cc03d 100644
--- a/3rdparty/libprocess/src/tests/limiter_tests.cpp
+++ b/3rdparty/libprocess/src/tests/limiter_tests.cpp
@@ -37,3 +37,81 @@ TEST(Limiter, Acquire)
   AWAIT_READY(acquire3);
   ASSERT_LE(interval * 2, stopwatch.elapsed());
 }
+
+
+// In this test 4 permits are given, but the 2nd permit's acquire
+// is immediately discarded. So, 1st, 3rd and 4th permits should
+// be acquired according to the rate limit.
+TEST(Limiter, DiscardMiddle)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  int permits = 2;
+  Duration duration = Milliseconds(5);
+
+  RateLimiter limiter(permits, duration);
+  Milliseconds interval = duration / permits;
+
+  Stopwatch stopwatch;
+  stopwatch.start();
+
+  Future<Nothing> acquire1 = limiter.acquire();
+  Future<Nothing> acquire2 = limiter.acquire();
+  Future<Nothing> acquire3 = limiter.acquire();
+  Future<Nothing> acquire4 = limiter.acquire();
+
+  AWAIT_READY(acquire1);
+
+  // Discard 'acquire2.
+  acquire2.discard();
+
+  // Wait until 'acquire3' is ready.
+  AWAIT_READY(acquire3);
+
+  // 'acquire2' should be in 'DISCARDED' state.
+  AWAIT_DISCARDED(acquire2);
+
+  // 'acquire3' should be satisfied within one 'interval'.
+  ASSERT_LE(interval, stopwatch.elapsed());
+  ASSERT_GE(interval * 2, stopwatch.elapsed());
+
+  // 'acquire4' should be satisfied one 'interval' after
+  // 'acquire3' is satisfied.
+  AWAIT_READY(acquire4);
+  ASSERT_LE(interval * 2, stopwatch.elapsed());
+}
+
+
+// In this test 2 permits are initially given, but the 2nd permit's
+// future is immediately discarded. Then the 3rd permit is given. So,
+// 1st and 3rd permits should be acquired according to the rate limit.
+TEST(Limiter, DiscardLast)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  int permits = 2;
+  Duration duration = Milliseconds(5);
+
+  RateLimiter limiter(permits, duration);
+  Milliseconds interval = duration / permits;
+
+  Stopwatch stopwatch;
+  stopwatch.start();
+
+  Future<Nothing> acquire1 = limiter.acquire();
+  Future<Nothing> acquire2 = limiter.acquire();
+
+  AWAIT_READY(acquire1);
+
+  // Discard 'acquire2'.
+  acquire2.discard();
+
+  // Now acquire 'acquire3'.
+  Future<Nothing> acquire3 = limiter.acquire();
+
+  // 'acquire3' should be satisfied one 'interval' after
+  // 'acquire1' is satisfied.
+  AWAIT_READY(acquire3);
+  ASSERT_LE(interval, stopwatch.elapsed());
+  ASSERT_GE(interval * 2, stopwatch.elapsed());
+}

Reply via email to