Repository: mesos
Updated Branches:
  refs/heads/master b05e2f0bb -> 3048e5e16


Moved framework related rate limiters into Master::Frameworks.

Review: https://reviews.apache.org/r/30511


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fafccbd9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fafccbd9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fafccbd9

Branch: refs/heads/master
Commit: fafccbd9dcb92d4db27bc272d7443ac6ebecbae8
Parents: b05e2f0
Author: Vinod Kone <[email protected]>
Authored: Thu Jan 29 12:44:53 2015 -0800
Committer: Vinod Kone <[email protected]>
Committed: Wed Feb 4 16:27:48 2015 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 71 ++++++++++++++++++++++++++++++----------------
 src/master/master.hpp | 35 +++++++----------------
 2 files changed, 57 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fafccbd9/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 46caa55..d5c4beb 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -257,6 +257,25 @@ Master::Master(
 Master::~Master() {}
 
 
+// TODO(vinod): Update this interface to return failed futures when
+// capacity is reached.
+struct BoundedRateLimiter
+{
+  BoundedRateLimiter(double qps, Option<uint64_t> _capacity)
+    : limiter(new process::RateLimiter(qps)),
+      capacity(_capacity),
+      messages(0) {}
+
+  process::Owned<process::RateLimiter> limiter;
+  const Option<uint64_t> capacity;
+
+  // Number of outstanding messages for this RateLimiter.
+  // NOTE: ExitedEvents are throttled but not counted towards
+  // the capacity here.
+  uint64_t messages;
+};
+
+
 void Master::initialize()
 {
   LOG(INFO) << "Master " << info_.id() << " (" << info_.hostname() << ")"
@@ -360,7 +379,7 @@ void Master::initialize()
   if (flags.rate_limits.isSome()) {
     // Add framework rate limiters.
     foreach (const RateLimit& limit_, flags.rate_limits.get().limits()) {
-      if (limiters.contains(limit_.principal())) {
+      if (frameworks.limiters.contains(limit_.principal())) {
         EXIT(1) << "Duplicate principal " << limit_.principal()
                 << " found in RateLimits configuration";
       }
@@ -375,12 +394,12 @@ void Master::initialize()
         if (limit_.has_capacity()) {
           capacity = limit_.capacity();
         }
-        limiters.put(
+        frameworks.limiters.put(
             limit_.principal(),
             Owned<BoundedRateLimiter>(
                 new BoundedRateLimiter(limit_.qps(), capacity)));
       } else {
-        limiters.put(limit_.principal(), None());
+        frameworks.limiters.put(limit_.principal(), None());
       }
     }
 
@@ -396,7 +415,7 @@ void Master::initialize()
       if (flags.rate_limits.get().has_aggregate_default_capacity()) {
         capacity = flags.rate_limits.get().aggregate_default_capacity();
       }
-      defaultLimiter = Owned<BoundedRateLimiter>(
+      frameworks.defaultLimiter = Owned<BoundedRateLimiter>(
           new BoundedRateLimiter(
               flags.rate_limits.get().aggregate_default_qps(), capacity));
     }
@@ -905,9 +924,10 @@ void Master::visit(const MessageEvent& event)
   //    above. (or)
   // 2) the principal exists in RateLimits but 'qps' is not set.
   if (principal.isSome() &&
-      limiters.contains(principal.get()) &&
-      limiters[principal.get()].isSome()) {
-    const Owned<BoundedRateLimiter>& limiter = limiters[principal.get()].get();
+      frameworks.limiters.contains(principal.get()) &&
+      frameworks.limiters[principal.get()].isSome()) {
+    const Owned<BoundedRateLimiter>& limiter =
+      frameworks.limiters[principal.get()].get();
 
     if (limiter->capacity.isNone() ||
         limiter->messages < limiter->capacity.get()) {
@@ -920,19 +940,21 @@ void Master::visit(const MessageEvent& event)
           principal,
           limiter->capacity.get());
     }
-  } else if ((principal.isNone() || !limiters.contains(principal.get())) &&
+  } else if ((principal.isNone() ||
+              !frameworks.limiters.contains(principal.get())) &&
              isRegisteredFramework &&
-             defaultLimiter.isSome()) {
-    if (defaultLimiter.get()->capacity.isNone() ||
-        defaultLimiter.get()->messages < defaultLimiter.get()->capacity.get()) 
{
-      defaultLimiter.get()->messages++;
-      defaultLimiter.get()->limiter->acquire()
+             frameworks.defaultLimiter.isSome()) {
+    if (frameworks.defaultLimiter.get()->capacity.isNone() ||
+        frameworks.defaultLimiter.get()->messages <
+          frameworks.defaultLimiter.get()->capacity.get()) {
+      frameworks.defaultLimiter.get()->messages++;
+      frameworks.defaultLimiter.get()->limiter->acquire()
         .onReady(defer(self(), &Self::throttled, event, None()));
     } else {
       exceededCapacity(
           event,
           principal,
-          defaultLimiter.get()->capacity.get());
+          frameworks.defaultLimiter.get()->capacity.get());
     }
   } else {
     _visit(event);
@@ -957,14 +979,15 @@ void Master::visit(const ExitedEvent& event)
   typedef void(Self::*F)(const ExitedEvent&);
 
   if (principal.isSome() &&
-      limiters.contains(principal.get()) &&
-      limiters[principal.get()].isSome()) {
-    limiters[principal.get()].get()->limiter->acquire()
+      frameworks.limiters.contains(principal.get()) &&
+      frameworks.limiters[principal.get()].isSome()) {
+    frameworks.limiters[principal.get()].get()->limiter->acquire()
       .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
-  } else if ((principal.isNone() || !limiters.contains(principal.get())) &&
+  } else if ((principal.isNone() ||
+              !frameworks.limiters.contains(principal.get())) &&
              isRegisteredFramework &&
-             defaultLimiter.isSome()) {
-    defaultLimiter.get()->limiter->acquire()
+             frameworks.defaultLimiter.isSome()) {
+    frameworks.defaultLimiter.get()->limiter->acquire()
       .onReady(defer(self(), static_cast<F>(&Self::_visit), event));
   } else {
     _visit(event);
@@ -979,11 +1002,11 @@ void Master::throttled(
   // We already know a RateLimiter is used to throttle this event so
   // here we only need to determine which.
   if (principal.isSome()) {
-    CHECK_SOME(limiters[principal.get()]);
-    limiters[principal.get()].get()->messages--;
+    CHECK_SOME(frameworks.limiters[principal.get()]);
+    frameworks.limiters[principal.get()].get()->messages--;
   } else {
-    CHECK_SOME(defaultLimiter);
-    defaultLimiter.get()->messages--;
+    CHECK_SOME(frameworks.defaultLimiter);
+    frameworks.defaultLimiter.get()->messages--;
   }
 
   _visit(event);

http://git-wip-us.apache.org/repos/asf/mesos/blob/fafccbd9/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 9d8c508..d98e1b6 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -86,6 +86,7 @@ class Allocator;
 class Repairer;
 class SlaveObserver;
 
+struct BoundedRateLimiter;
 struct Framework;
 struct Role;
 struct Slave;
@@ -615,6 +616,15 @@ private:
     //    allows them) if they have principals specified in
     //    FrameworkInfo.
     hashmap<process::UPID, Option<std::string>> principals;
+
+    // BoundedRateLimiters keyed by the framework principal.
+    // Like Metrics::Frameworks, all frameworks of the same principal
+    // are throttled together at a common rate limit.
+    hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters;
+
+    // The default limiter is for frameworks not specified in
+    // 'flags.rate_limits'.
+    Option<process::Owned<BoundedRateLimiter>> defaultLimiter;
   } frameworks;
 
   hashmap<OfferID, Offer*> offers;
@@ -712,31 +722,6 @@ private:
   process::Future<Option<Error>> validate(
       const FrameworkInfo& frameworkInfo,
       const process::UPID& from);
-
-  struct BoundedRateLimiter
-  {
-    BoundedRateLimiter(double qps, Option<uint64_t> _capacity)
-      : limiter(new process::RateLimiter(qps)),
-        capacity(_capacity),
-        messages(0) {}
-
-    process::Owned<process::RateLimiter> limiter;
-    const Option<uint64_t> capacity;
-
-    // Number of outstanding messages for this RateLimiter.
-    // NOTE: ExitedEvents are throttled but not counted towards
-    // the capacity here.
-    uint64_t messages;
-  };
-
-  // BoundedRateLimiters keyed by the framework principal.
-  // Like Metrics::Frameworks, all frameworks of the same principal
-  // are throttled together at a common rate limit.
-  hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters;
-
-  // The default limiter is for frameworks not specified in
-  // 'flags.rate_limits'.
-  Option<process::Owned<BoundedRateLimiter>> defaultLimiter;
 };
 
 

Reply via email to