Improved framework rate limiting by imposing the max number of outstanding messages per framework principal.
Review: https://reviews.apache.org/r/24343 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4a831c4c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4a831c4c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4a831c4c Branch: refs/heads/tmp Commit: 4a831c4cb249ef1fcf8a47cc76e935ea87458da9 Parents: e5fcb43 Author: Jiang Yan Xu <[email protected]> Authored: Wed Aug 6 16:31:53 2014 -0700 Committer: Jiang Yan Xu <[email protected]> Committed: Thu Aug 7 00:11:33 2014 -0700 ---------------------------------------------------------------------- include/mesos/mesos.proto | 25 +++- src/examples/load_generator_framework.cpp | 7 +- src/master/master.cpp | 109 +++++++++++++---- src/master/master.hpp | 37 +++++- src/tests/rate_limiting_tests.cpp | 160 +++++++++++++++++++++++++ 5 files changed, 303 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/4a831c4c/include/mesos/mesos.proto ---------------------------------------------------------------------- diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto index 628cce1..efb4239 100644 --- a/include/mesos/mesos.proto +++ b/include/mesos/mesos.proto @@ -779,26 +779,39 @@ message ACLs { * principal. */ message RateLimit { - // Leaving QPS unset gives it unlimited rate (i.e., not throttled). + // Leaving QPS unset gives it unlimited rate (i.e., not throttled), + // which also implies unlimited capacity. optional double qps = 1; // Principal of framework(s) to be throttled. Should match // FrameworkInfo.princpal and Credential.principal (if using authentication). required string principal = 2; + + // Max number of outstanding messages from frameworks of this principal + // allowed by master before the next message is dropped and an error is sent + // back to the sender. Messages received before the capacity is reached are + // still going to be processed after the error is sent. + // If unspecified, this principal is assigned unlimited capacity. + // NOTE: This value is ignored if 'qps' is not set. + optional uint64 capacity = 3; } /** * Collection of RateLimit. - * Frameworks without rate limits defined here are not throttled - * unless 'aggregate_default_qps' is specified. + * Frameworks without rate limits defined here are not throttled unless + * 'aggregate_default_qps' is specified. */ message RateLimits { // Items should have unique principals. repeated RateLimit limits = 1; - // All the frameworks not specified in 'limits' get this default - // rate. This rate is an aggregate rate for all of them, i.e., - // their combined traffic is throttled together at this rate. + // All the frameworks not specified in 'limits' get this default rate. + // This rate is an aggregate rate for all of them, i.e., their combined + // traffic is throttled together at this rate. optional double aggregate_default_qps = 2; + + // All the frameworks not specified in 'limits' get this default capacity. + // This is an aggregate value similar to 'aggregate_default_qps'. + optional uint64 aggregate_default_capacity = 3; } http://git-wip-us.apache.org/repos/asf/mesos/blob/4a831c4c/src/examples/load_generator_framework.cpp ---------------------------------------------------------------------- diff --git a/src/examples/load_generator_framework.cpp b/src/examples/load_generator_framework.cpp index 7d94c49..01a567b 100644 --- a/src/examples/load_generator_framework.cpp +++ b/src/examples/load_generator_framework.cpp @@ -211,7 +211,12 @@ public: const SlaveID&, int) {} - virtual void error(SchedulerDriver*, const string&) {} + virtual void error(SchedulerDriver*, const string& error) + { + // Terminating process with EXIT here because we cannot interrupt + // LoadGenerator's long-running loop. + EXIT(1) << "Error received: " << error; + } private: LoadGenerator* generator; http://git-wip-us.apache.org/repos/asf/mesos/blob/4a831c4c/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 97e4340..d279edb 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -84,7 +84,6 @@ using process::Owned; using process::PID; using process::Process; using process::Promise; -using process::RateLimiter; using process::Time; using process::Timer; using process::UPID; @@ -371,12 +370,18 @@ void Master::initialize() << ". It must be a positive number"; } - limiters.put( - limit_.principal(), - limit_.has_qps() - ? Option<Owned<RateLimiter> >::some( - Owned<RateLimiter>(new RateLimiter(limit_.qps()))) - : Option<Owned<RateLimiter> >::none()); + if (limit_.has_qps()) { + Option<uint64_t> capacity; + if (limit_.has_capacity()) { + capacity = limit_.capacity(); + } + limiters.put( + limit_.principal(), + Owned<BoundedRateLimiter>( + new BoundedRateLimiter(limit_.qps(), capacity))); + } else { + limiters.put(limit_.principal(), None()); + } } if (flags.rate_limits.get().has_aggregate_default_qps() && @@ -387,8 +392,13 @@ void Master::initialize() } if (flags.rate_limits.get().has_aggregate_default_qps()) { - defaultLimiter = Owned<RateLimiter>( - new RateLimiter(flags.rate_limits.get().aggregate_default_qps())); + Option<uint64_t> capacity; + if (flags.rate_limits.get().has_aggregate_default_capacity()) { + capacity = flags.rate_limits.get().aggregate_default_capacity(); + } + defaultLimiter = Owned<BoundedRateLimiter>( + new BoundedRateLimiter( + flags.rate_limits.get().aggregate_default_qps(), capacity)); } LOG(INFO) << "Framework rate limiting enabled"; @@ -851,9 +861,6 @@ void Master::visit(const MessageEvent& event) return; } - // Necessary to disambiguate below. - typedef void(Self::*F)(const MessageEvent&); - // Throttle the message if it's a framework message and a // RateLimiter is configured for the framework's principal. // The framework is throttled by the default RateLimiter if: @@ -864,30 +871,42 @@ void Master::visit(const MessageEvent& event) // 1) the default RateLimiter is not configured to handle case 2) // above. (or) // 2) the principal exists in RateLimits but 'qps' is not set. - if (principal.isSome() && - limiters.contains(principal.get()) && - limiters[principal.get()].isSome()) { - limiters[principal.get()].get()->acquire() - .onReady(defer(self(), static_cast<F>(&Self::_visit), event)); + Option<Owned<BoundedRateLimiter> > limiter; + if (principal.isSome() && limiters.contains(principal.get())) { + limiter = limiters[principal.get()]; } else if ((principal.isNone() || !limiters.contains(principal.get())) && - isRegisteredFramework && - defaultLimiter.isSome()) { - defaultLimiter.get()->acquire() - .onReady(defer(self(), static_cast<F>(&Self::_visit), event)); + isRegisteredFramework) { + limiter = defaultLimiter; + } + + // Now throttle the message if a limiter is found, unless its + // capacity is already reached. + if (limiter.isSome()) { + if (limiter.get()->capacity.isNone() || + limiter.get()->messages < limiter.get()->capacity.get()) { + limiter.get()->messages++; + limiter.get()->limiter->acquire() + .onReady(defer(self(), &Self::throttled, event, principal)); + } else { + exceededCapacity( + event, + principal, + limiter.get()->capacity.get()); + } } else { _visit(event); } } -void Master::visit(const process::ExitedEvent& event) +void Master::visit(const ExitedEvent& event) { // See comments in 'visit(const MessageEvent& event)' for which // RateLimiter is used to throttle this UPID and when it is not // throttled. // Note that throttling ExitedEvent is necessary so the order // between MessageEvents and ExitedEvents from the same PID is - // maintained. + // maintained. Also ExitedEvents are not subject to the capacity. bool isRegisteredFramework = frameworks.principals.contains(event.pid); const Option<string> principal = isRegisteredFramework ? frameworks.principals[event.pid] @@ -899,12 +918,12 @@ void Master::visit(const process::ExitedEvent& event) if (principal.isSome() && limiters.contains(principal.get()) && limiters[principal.get()].isSome()) { - limiters[principal.get()].get()->acquire() + limiters[principal.get()].get()->limiter->acquire() .onReady(defer(self(), static_cast<F>(&Self::_visit), event)); } else if ((principal.isNone() || !limiters.contains(principal.get())) && isRegisteredFramework && defaultLimiter.isSome()) { - defaultLimiter.get()->acquire() + defaultLimiter.get()->limiter->acquire() .onReady(defer(self(), static_cast<F>(&Self::_visit), event)); } else { _visit(event); @@ -912,6 +931,22 @@ void Master::visit(const process::ExitedEvent& event) } +void Master::throttled( + const MessageEvent& event, + const Option<std::string>& principal) +{ + // We already know a RateLimiter is used to throttle this event so + // here we only need to determine which. + if (principal.isSome()) { + limiters[principal.get()].get()->messages--; + } else { + defaultLimiter.get()->messages--; + } + + _visit(event); +} + + void Master::_visit(const MessageEvent& event) { // Obtain the principal before processing the Message because the @@ -936,6 +971,30 @@ void Master::_visit(const MessageEvent& event) } +void Master::exceededCapacity( + const MessageEvent& event, + const Option<string>& principal, + uint64_t capacity) +{ + LOG(WARNING) << "Dropping message " << event.message->name << " from " + << event.message->from + << (principal.isSome() ? "(" + principal.get() + ")" : "") + << ": capacity(" << capacity << ") exceeded"; + + // Send an error to the framework which will abort the scheduler + // driver. + // NOTE: The scheduler driver will send back a + // DeactivateFrameworkMessage which may be dropped as well but this + // should be fine because the scheduler is already informed of an + // unrecoverable error and should take action to recover. + FrameworkErrorMessage message; + message.set_message( + "Message " + event.message->name + + " dropped: capacity(" + stringify(capacity) + ") exceeded"); + send(event.message->from, message); +} + + void Master::_visit(const ExitedEvent& event) { Process<Master>::visit(event); http://git-wip-us.apache.org/repos/asf/mesos/blob/4a831c4c/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index d8a4d9e..29e8f49 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -241,10 +241,25 @@ protected: virtual void visit(const process::MessageEvent& event); virtual void visit(const process::ExitedEvent& event); + // Invoked when the message is ready to be executed after + // being throttled. + // 'principal' being None indicates it is throttled by + // 'defaultLimiter'. + void throttled( + const process::MessageEvent& event, + const Option<std::string>& principal); + // Continuations of visit(). void _visit(const process::MessageEvent& event); void _visit(const process::ExitedEvent& event); + // Helper method invoked when the capacity for a framework + // principal is exceeded. + void exceededCapacity( + const process::MessageEvent& event, + const Option<std::string>& principal, + uint64_t capacity); + // Recovers state from the registrar. process::Future<Nothing> recover(); void recoveredSlavesTimeout(const Registry& registry); @@ -744,14 +759,30 @@ private: const FrameworkInfo& frameworkInfo, const process::UPID& from); - // RateLimiters keyed by the framework principal. + struct BoundedRateLimiter + { + BoundedRateLimiter(double qps, Option<uint64_t> _capacity) + : limiter(new process::RateLimiter(qps)), + capacity(_capacity), + messages(0) {} + + process::Owned<process::RateLimiter> limiter; + const Option<uint64_t> capacity; + + // Number of outstanding messages for this RateLimiter. + // NOTE: ExitedEvents are throttled but not counted towards + // the capacity here. + uint64_t messages; + }; + + // BoundedRateLimiters keyed by the framework principal. // Like Metrics::Frameworks, all frameworks of the same principal // are throttled together at a common rate limit. - hashmap<std::string, Option<process::Owned<process::RateLimiter> > > limiters; + hashmap<std::string, Option<process::Owned<BoundedRateLimiter> > > limiters; // The default limiter is for frameworks not specified in // 'flags.rate_limits'. - Option<process::Owned<process::RateLimiter> > defaultLimiter; + Option<process::Owned<BoundedRateLimiter> > defaultLimiter; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/4a831c4c/src/tests/rate_limiting_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp index fc23a19..4adebd1 100644 --- a/src/tests/rate_limiting_tests.cpp +++ b/src/tests/rate_limiting_tests.cpp @@ -954,6 +954,166 @@ TEST_F(RateLimitingTest, SchedulerFailover) Shutdown(); } + +TEST_F(RateLimitingTest, CapacityReached) +{ + master::Flags flags = CreateMasterFlags(); + RateLimits limits; + RateLimit* limit = limits.mutable_limits()->Add(); + limit->set_principal(DEFAULT_CREDENTIAL.principal()); + limit->set_qps(1); + limit->set_capacity(2); + flags.rate_limits = limits; + + Try<PID<Master> > master = StartMaster(flags); + ASSERT_SOME(master); + + Clock::pause(); + + // Advance before the test so that the 1st call to Metrics endpoint + // is not throttled. MetricsProcess which hosts the endpoint + // throttles requests at 2qps and its singleton instance is shared + // across tests. + Clock::advance(Milliseconds(501)); + + MockScheduler sched; + + FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line. + frameworkInfo = DEFAULT_FRAMEWORK_INFO; + + // Use a long failover timeout so the master doesn't unregister the + // framework right away when it aborts. + frameworkInfo.set_failover_timeout(10); + + // Create MesosSchedulerDriver on the heap because of the need to + // destroy it during the test due to MESOS-1456. + MesosSchedulerDriver* driver = new MesosSchedulerDriver( + &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(driver, _, _)) + .Times(1); + + // Grab the stuff we need to replay the RegisterFrameworkMessage. + Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF( + RegisterFrameworkMessage(), _, master.get()); + Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE( + Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); + + ASSERT_EQ(DRIVER_RUNNING, driver->start()); + + AWAIT_READY(registerFrameworkMessage); + AWAIT_READY(frameworkRegisteredMessage); + + const process::UPID schedulerPid = frameworkRegisteredMessage.get().to; + + // Keep sending duplicate RegisterFrameworkMessages. Master sends + // FrameworkRegisteredMessage back after processing each of them. + { + Future<process::Message> duplicateFrameworkRegisteredMessage = + FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), + master.get(), + _); + + process::post(schedulerPid, master.get(), registerFrameworkMessage.get()); + + // The first message is not throttled because it's at the head of + // the queue. + AWAIT_READY(duplicateFrameworkRegisteredMessage); + + // Verify that one message is received and processed (after + // registration). + JSON::Object metrics = METRICS_SNAPSHOT; + + const string& messages_received = + "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received"; + EXPECT_EQ(1u, metrics.values.count(messages_received)); + EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value); + + const string& messages_processed = + "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed"; + EXPECT_EQ(1u, metrics.values.count(messages_processed)); + EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value); + } + + // The subsequent messages are going to be throttled. + Future<process::Message> frameworkErrorMessage = + FUTURE_MESSAGE(Eq(FrameworkErrorMessage().GetTypeName()), + master.get(), + _); + + // Send two messages which will be queued up. This will reach but not + // exceed the capacity. + for (int i = 0; i < 2; i++) { + process::post(schedulerPid, master.get(), registerFrameworkMessage.get()); + } + + // Settle to make sure no error is sent just yet. + Clock::settle(); + EXPECT_TRUE(frameworkErrorMessage.isPending()); + + // The 3rd message results in an immediate error. + Future<Nothing> error; + EXPECT_CALL(sched, error( + driver, + "Message mesos.internal.RegisterFrameworkMessage dropped: capacity(2) " + "exceeded")) + .WillOnce(FutureSatisfy(&error)); + + process::post(schedulerPid, master.get(), registerFrameworkMessage.get()); + AWAIT_READY(frameworkErrorMessage); + + // Settle to make sure scheduler aborts and its + // DeactivateFrameworkMessage is received by master. + Clock::settle(); + + AWAIT_READY(error); + + // Stop the driver but indicate it wants to failover. + EXPECT_EQ(DRIVER_ABORTED, driver->stop(true)); + EXPECT_EQ(DRIVER_STOPPED, driver->join()); + delete driver; + + // Wait for half a second for metrics endpoint. + Clock::advance(Milliseconds(501)); + + { + JSON::Object metrics = METRICS_SNAPSHOT; + + const string& messages_received = + "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received"; + EXPECT_EQ(1u, metrics.values.count(messages_received)); + EXPECT_EQ(5, metrics.values[messages_received].as<JSON::Number>().value); + const string& messages_processed = + "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed"; + EXPECT_EQ(1u, metrics.values.count(messages_processed)); + // Four messages not processed, two in the queue and two dropped. + EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value); + } + + // Advance three times for the two pending messages and the exited + // event to be processed. + for (int i = 0; i < 3; i++) { + Clock::advance(Milliseconds(1001)); + Clock::settle(); + } + + // Counters are not removed because the scheduler is not + // unregistered and the master expects it to failover. + JSON::Object metrics = METRICS_SNAPSHOT; + + const string& messages_received = + "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received"; + EXPECT_EQ(1u, metrics.values.count(messages_received)); + EXPECT_EQ(5, metrics.values[messages_received].as<JSON::Number>().value); + const string& messages_processed = + "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed"; + EXPECT_EQ(1u, metrics.values.count(messages_processed)); + // Two messages are dropped. + EXPECT_EQ(3, metrics.values[messages_processed].as<JSON::Number>().value); + + Shutdown(); +} + } // namespace master { } // namespace internal { } // namespace mesos {
