Repository: mesos Updated Branches: refs/heads/master b05e2f0bb -> 3048e5e16
Moved framework related rate limiters into Master::Frameworks. Review: https://reviews.apache.org/r/30511 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fafccbd9 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fafccbd9 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fafccbd9 Branch: refs/heads/master Commit: fafccbd9dcb92d4db27bc272d7443ac6ebecbae8 Parents: b05e2f0 Author: Vinod Kone <[email protected]> Authored: Thu Jan 29 12:44:53 2015 -0800 Committer: Vinod Kone <[email protected]> Committed: Wed Feb 4 16:27:48 2015 -0800 ---------------------------------------------------------------------- src/master/master.cpp | 71 ++++++++++++++++++++++++++++++---------------- src/master/master.hpp | 35 +++++++---------------- 2 files changed, 57 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/fafccbd9/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 46caa55..d5c4beb 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -257,6 +257,25 @@ Master::Master( Master::~Master() {} +// TODO(vinod): Update this interface to return failed futures when +// capacity is reached. +struct BoundedRateLimiter +{ + BoundedRateLimiter(double qps, Option<uint64_t> _capacity) + : limiter(new process::RateLimiter(qps)), + capacity(_capacity), + messages(0) {} + + process::Owned<process::RateLimiter> limiter; + const Option<uint64_t> capacity; + + // Number of outstanding messages for this RateLimiter. + // NOTE: ExitedEvents are throttled but not counted towards + // the capacity here. + uint64_t messages; +}; + + void Master::initialize() { LOG(INFO) << "Master " << info_.id() << " (" << info_.hostname() << ")" @@ -360,7 +379,7 @@ void Master::initialize() if (flags.rate_limits.isSome()) { // Add framework rate limiters. foreach (const RateLimit& limit_, flags.rate_limits.get().limits()) { - if (limiters.contains(limit_.principal())) { + if (frameworks.limiters.contains(limit_.principal())) { EXIT(1) << "Duplicate principal " << limit_.principal() << " found in RateLimits configuration"; } @@ -375,12 +394,12 @@ void Master::initialize() if (limit_.has_capacity()) { capacity = limit_.capacity(); } - limiters.put( + frameworks.limiters.put( limit_.principal(), Owned<BoundedRateLimiter>( new BoundedRateLimiter(limit_.qps(), capacity))); } else { - limiters.put(limit_.principal(), None()); + frameworks.limiters.put(limit_.principal(), None()); } } @@ -396,7 +415,7 @@ void Master::initialize() if (flags.rate_limits.get().has_aggregate_default_capacity()) { capacity = flags.rate_limits.get().aggregate_default_capacity(); } - defaultLimiter = Owned<BoundedRateLimiter>( + frameworks.defaultLimiter = Owned<BoundedRateLimiter>( new BoundedRateLimiter( flags.rate_limits.get().aggregate_default_qps(), capacity)); } @@ -905,9 +924,10 @@ void Master::visit(const MessageEvent& event) // above. (or) // 2) the principal exists in RateLimits but 'qps' is not set. if (principal.isSome() && - limiters.contains(principal.get()) && - limiters[principal.get()].isSome()) { - const Owned<BoundedRateLimiter>& limiter = limiters[principal.get()].get(); + frameworks.limiters.contains(principal.get()) && + frameworks.limiters[principal.get()].isSome()) { + const Owned<BoundedRateLimiter>& limiter = + frameworks.limiters[principal.get()].get(); if (limiter->capacity.isNone() || limiter->messages < limiter->capacity.get()) { @@ -920,19 +940,21 @@ void Master::visit(const MessageEvent& event) principal, limiter->capacity.get()); } - } else if ((principal.isNone() || !limiters.contains(principal.get())) && + } else if ((principal.isNone() || + !frameworks.limiters.contains(principal.get())) && isRegisteredFramework && - defaultLimiter.isSome()) { - if (defaultLimiter.get()->capacity.isNone() || - defaultLimiter.get()->messages < defaultLimiter.get()->capacity.get()) { - defaultLimiter.get()->messages++; - defaultLimiter.get()->limiter->acquire() + frameworks.defaultLimiter.isSome()) { + if (frameworks.defaultLimiter.get()->capacity.isNone() || + frameworks.defaultLimiter.get()->messages < + frameworks.defaultLimiter.get()->capacity.get()) { + frameworks.defaultLimiter.get()->messages++; + frameworks.defaultLimiter.get()->limiter->acquire() .onReady(defer(self(), &Self::throttled, event, None())); } else { exceededCapacity( event, principal, - defaultLimiter.get()->capacity.get()); + frameworks.defaultLimiter.get()->capacity.get()); } } else { _visit(event); @@ -957,14 +979,15 @@ void Master::visit(const ExitedEvent& event) typedef void(Self::*F)(const ExitedEvent&); if (principal.isSome() && - limiters.contains(principal.get()) && - limiters[principal.get()].isSome()) { - limiters[principal.get()].get()->limiter->acquire() + frameworks.limiters.contains(principal.get()) && + frameworks.limiters[principal.get()].isSome()) { + frameworks.limiters[principal.get()].get()->limiter->acquire() .onReady(defer(self(), static_cast<F>(&Self::_visit), event)); - } else if ((principal.isNone() || !limiters.contains(principal.get())) && + } else if ((principal.isNone() || + !frameworks.limiters.contains(principal.get())) && isRegisteredFramework && - defaultLimiter.isSome()) { - defaultLimiter.get()->limiter->acquire() + frameworks.defaultLimiter.isSome()) { + frameworks.defaultLimiter.get()->limiter->acquire() .onReady(defer(self(), static_cast<F>(&Self::_visit), event)); } else { _visit(event); @@ -979,11 +1002,11 @@ void Master::throttled( // We already know a RateLimiter is used to throttle this event so // here we only need to determine which. if (principal.isSome()) { - CHECK_SOME(limiters[principal.get()]); - limiters[principal.get()].get()->messages--; + CHECK_SOME(frameworks.limiters[principal.get()]); + frameworks.limiters[principal.get()].get()->messages--; } else { - CHECK_SOME(defaultLimiter); - defaultLimiter.get()->messages--; + CHECK_SOME(frameworks.defaultLimiter); + frameworks.defaultLimiter.get()->messages--; } _visit(event); http://git-wip-us.apache.org/repos/asf/mesos/blob/fafccbd9/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 9d8c508..d98e1b6 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -86,6 +86,7 @@ class Allocator; class Repairer; class SlaveObserver; +struct BoundedRateLimiter; struct Framework; struct Role; struct Slave; @@ -615,6 +616,15 @@ private: // allows them) if they have principals specified in // FrameworkInfo. hashmap<process::UPID, Option<std::string>> principals; + + // BoundedRateLimiters keyed by the framework principal. + // Like Metrics::Frameworks, all frameworks of the same principal + // are throttled together at a common rate limit. + hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters; + + // The default limiter is for frameworks not specified in + // 'flags.rate_limits'. + Option<process::Owned<BoundedRateLimiter>> defaultLimiter; } frameworks; hashmap<OfferID, Offer*> offers; @@ -712,31 +722,6 @@ private: process::Future<Option<Error>> validate( const FrameworkInfo& frameworkInfo, const process::UPID& from); - - struct BoundedRateLimiter - { - BoundedRateLimiter(double qps, Option<uint64_t> _capacity) - : limiter(new process::RateLimiter(qps)), - capacity(_capacity), - messages(0) {} - - process::Owned<process::RateLimiter> limiter; - const Option<uint64_t> capacity; - - // Number of outstanding messages for this RateLimiter. - // NOTE: ExitedEvents are throttled but not counted towards - // the capacity here. - uint64_t messages; - }; - - // BoundedRateLimiters keyed by the framework principal. - // Like Metrics::Frameworks, all frameworks of the same principal - // are throttled together at a common rate limit. - hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters; - - // The default limiter is for frameworks not specified in - // 'flags.rate_limits'. - Option<process::Owned<BoundedRateLimiter>> defaultLimiter; };
