Rate limited the removal of slaves failing health checks. Review: https://reviews.apache.org/r/30514
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/886efefc Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/886efefc Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/886efefc Branch: refs/heads/master Commit: 886efefc5f294b3ea22c1fa2ce70a9e9324eca19 Parents: fafccbd Author: Vinod Kone <[email protected]> Authored: Thu Jan 29 11:51:02 2015 -0800 Committer: Vinod Kone <[email protected]> Committed: Wed Feb 4 16:27:49 2015 -0800 ---------------------------------------------------------------------- src/master/flags.hpp | 10 ++ src/master/master.cpp | 104 +++++++++++++++++-- src/master/master.hpp | 6 ++ src/tests/partition_tests.cpp | 2 +- src/tests/slave_tests.cpp | 204 +++++++++++++++++++++++++++++++------ 5 files changed, 287 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/master/flags.hpp ---------------------------------------------------------------------- diff --git a/src/master/flags.hpp b/src/master/flags.hpp index 6c18a1a..e9f6fff 100644 --- a/src/master/flags.hpp +++ b/src/master/flags.hpp @@ -147,6 +147,15 @@ public: "Values: [0%-100%]", stringify(RECOVERY_SLAVE_REMOVAL_PERCENT_LIMIT * 100.0) + "%"); + // TODO(vinod): Add a 'Rate' abstraction in stout and the + // corresponding parser for flags. + add(&Flags::slave_removal_rate_limit, + "slave_removal_rate_limit", + "The maximum rate (e.g., 1/10mins, 2/3hrs, etc) at which slaves will\n" + "be removed from the master when they fail health checks. By default\n" + "slaves will be removed as soon as they fail the health checks.\n" + "The value is of the form <Number of slaves>/<Duration>."); + add(&Flags::webui_dir, "webui_dir", "Directory path of the webui files/assets", @@ -377,6 +386,7 @@ public: bool log_auto_initialize; Duration slave_reregister_timeout; std::string recovery_slave_removal_limit; + Option<std::string> slave_removal_rate_limit; std::string webui_dir; std::string whitelist; std::string user_sorter; http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index d5c4beb..e42b922 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -94,6 +94,7 @@ using process::Owned; using process::PID; using process::Process; using process::Promise; +using process::RateLimiter; using process::Shared; using process::Time; using process::Timer; @@ -116,12 +117,14 @@ public: SlaveObserver(const UPID& _slave, const SlaveInfo& _slaveInfo, const SlaveID& _slaveId, - const PID<Master>& _master) + const PID<Master>& _master, + const Option<shared_ptr<RateLimiter>>& _limiter) : ProcessBase(process::ID::generate("slave-observer")), slave(_slave), slaveInfo(_slaveInfo), slaveId(_slaveId), master(_master), + limiter(_limiter), timeouts(0), pinged(false), connected(true) @@ -167,23 +170,75 @@ protected: { timeouts = 0; pinged = false; + + // Cancel any pending shutdown. + if (shuttingDown.isSome()) { + // Need a copy for non-const access. + Future<Nothing> future = shuttingDown.get(); + future.discard(); + } } void timeout() { - if (pinged) { // So we haven't got back a pong yet ... - if (++timeouts >= MAX_SLAVE_PING_TIMEOUTS) { + if (pinged) { + timeouts++; // No pong has been received before the timeout. + if (timeouts >= MAX_SLAVE_PING_TIMEOUTS) { + // No pong has been received for the last + // 'MAX_SLAVE_PING_TIMEOUTS' pings. shutdown(); - return; } } + // NOTE: We keep pinging even if we schedule a shutdown. This is + // because if the slave eventually responds to a ping, we can + // cancel the shutdown. ping(); } + // NOTE: The shutdown of the slave is rate limited and can be + // canceled if a pong was received before the actual shutdown is + // called. void shutdown() { - dispatch(master, &Master::shutdownSlave, slaveId, "health check timed out"); + if (shuttingDown.isSome()) { + return; // Shutdown is already in progress. + } + + Future<Nothing> acquire = Nothing(); + + if (limiter.isSome()) { + LOG(INFO) << "Scheduling shutdown of slave " << slaveId + << " due to health check timeout"; + + acquire = limiter.get()->acquire(); + } + + shuttingDown = acquire.onAny(defer(self(), &Self::_shutdown)); + } + + void _shutdown() + { + CHECK_SOME(shuttingDown); + + const Future<Nothing>& future = shuttingDown.get(); + + CHECK(!future.isFailed()); + + if (future.isReady()) { + LOG(INFO) << "Shutting down slave " << slaveId + << " due to health check timeout"; + + dispatch(master, + &Master::shutdownSlave, + slaveId, + "health check timed out"); + } else if (future.isDiscarded()) { + LOG(INFO) << "Canceling shutdown of slave " << slaveId + << " since a pong is received!"; + } + + shuttingDown = None(); } private: @@ -191,6 +246,8 @@ private: const SlaveInfo slaveInfo; const SlaveID slaveId; const PID<Master> master; + const Option<shared_ptr<RateLimiter>> limiter; + Option<Future<Nothing>> shuttingDown; uint32_t timeouts; bool pinged; bool connected; @@ -423,6 +480,41 @@ void Master::initialize() LOG(INFO) << "Framework rate limiting enabled"; } + if (flags.slave_removal_rate_limit.isSome()) { + LOG(INFO) << "Slave removal is rate limited to " + << flags.slave_removal_rate_limit.get(); + + // Parse the flag value. + // TODO(vinod): Move this parsing logic to flags once we have a + // 'Rate' abstraction in stout. + vector<string> tokens = + strings::tokenize(flags.slave_removal_rate_limit.get(), "/"); + + if (tokens.size() != 2) { + EXIT(1) << "Invalid slave_removal_rate_limit: " + << flags.slave_removal_rate_limit.get() + << ". Format is <Number of slaves>/<Duration>"; + } + + Try<int> permits = numify<int>(tokens[0]); + if (permits.isError()) { + EXIT(1) << "Invalid slave_removal_rate_limit: " + << flags.slave_removal_rate_limit.get() + << ". Format is <Number of slaves>/<Duration>" + << ": " << permits.error(); + } + + Try<Duration> duration = Duration::parse(tokens[1]); + if (duration.isError()) { + EXIT(1) << "Invalid slave_removal_rate_limit: " + << flags.slave_removal_rate_limit.get() + << ". Format is <Number of slaves>/<Duration>" + << ": " << duration.error(); + } + + slaves.limiter = new RateLimiter(permits.get(), duration.get()); + } + hashmap<string, RoleInfo> roleInfos; // Add the default role. @@ -4229,7 +4321,7 @@ void Master::addSlave( // Set up an observer for the slave. slave->observer = new SlaveObserver( - slave->pid, slave->info, slave->id, self()); + slave->pid, slave->info, slave->id, self(), slaves.limiter); spawn(slave->observer); http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index d98e1b6..5a5c86f 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -585,6 +585,12 @@ private: // TODO(bmahler): Ideally we could use a cache with set semantics. Cache<SlaveID, Nothing> removed; + // This rate limiter is used to limit the removal of slaves failing + // health checks. + // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is + // a wrapper around libprocess process which is thread safe. + Option<memory::shared_ptr<process::RateLimiter>> limiter; + bool transitioning(const Option<SlaveID>& slaveId) { if (slaveId.isSome()) { http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/tests/partition_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp index fea7801..b3af282 100644 --- a/src/tests/partition_tests.cpp +++ b/src/tests/partition_tests.cpp @@ -64,7 +64,7 @@ class PartitionTest : public MesosTest {}; // This test checks that a scheduler gets a slave lost -// message for a partioned slave. +// message for a partitioned slave. TEST_F(PartitionTest, PartitionedSlave) { Try<PID<Master> > master = StartMaster(); http://git-wip-us.apache.org/repos/asf/mesos/blob/886efefc/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index e7e2af6..956ce64 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -64,6 +64,8 @@ using namespace mesos; using namespace mesos::internal; using namespace mesos::internal::tests; +using namespace process; + using mesos::internal::master::Master; using mesos::internal::slave::Containerizer; @@ -73,12 +75,6 @@ using mesos::internal::slave::MesosContainerizer; using mesos::internal::slave::MesosContainerizerProcess; using mesos::internal::slave::Slave; -using process::Clock; -using process::Future; -using process::Message; -using process::Owned; -using process::PID; - using std::map; using std::string; using std::vector; @@ -151,7 +147,7 @@ TEST_F(SlaveTest, ShutdownUnregisteredExecutor) tasks.push_back(task); // Drop the registration message from the executor to the slave. - Future<process::Message> registerExecutor = + Future<Message> registerExecutor = DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); driver.launchTasks(offers.get()[0].id(), tasks); @@ -234,7 +230,7 @@ TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor) tasks.push_back(task); // Drop the registration message from the executor to the slave. - Future<process::Message> registerExecutorMessage = + Future<Message> registerExecutorMessage = DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); driver.launchTasks(offers.get()[0].id(), tasks); @@ -322,7 +318,7 @@ TEST_F(SlaveTest, CommandExecutorWithOverride) // Expect wait after launch is called but don't return anything // until after we've finished everything below. Future<Nothing> wait; - process::Promise<containerizer::Termination> promise; + Promise<containerizer::Termination> promise; EXPECT_CALL(containerizer, wait(_)) .WillOnce(DoAll(FutureSatisfy(&wait), Return(promise.future()))); @@ -359,12 +355,12 @@ TEST_F(SlaveTest, CommandExecutorWithOverride) .WillOnce(FutureArg<1>(&status1)) .WillOnce(FutureArg<1>(&status2)); - Try<process::Subprocess> executor = - process::subprocess( + Try<Subprocess> executor = + subprocess( executorCommand, - process::Subprocess::PIPE(), - process::Subprocess::PIPE(), - process::Subprocess::PIPE(), + Subprocess::PIPE(), + Subprocess::PIPE(), + Subprocess::PIPE(), environment); ASSERT_SOME(executor); @@ -728,13 +724,13 @@ TEST_F(SlaveTest, IgnoreNonLeaderStatusUpdateAcknowledgement) .WillRepeatedly(Return()); // Ignore subsequent offers. // We need to grab this message to get the scheduler's pid. - Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE( + Future<Message> frameworkRegisteredMessage = FUTURE_MESSAGE( Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); schedDriver.start(); AWAIT_READY(frameworkRegisteredMessage); - const process::UPID schedulerPid = frameworkRegisteredMessage.get().to; + const UPID schedulerPid = frameworkRegisteredMessage.get().to; AWAIT_READY(offers); EXPECT_NE(0u, offers.get().size()); @@ -776,10 +772,9 @@ TEST_F(SlaveTest, IgnoreNonLeaderStatusUpdateAcknowledgement) AWAIT_READY(acknowledgementMessage); // Send the acknowledgement to the slave with a non-leading master. - process::post( - process::UPID("master@localhost:1"), - slave.get(), - acknowledgementMessage.get()); + post(process::UPID("master@localhost:1"), + slave.get(), + acknowledgementMessage.get()); // Make sure the acknowledgement was ignored. Clock::settle(); @@ -821,8 +816,8 @@ TEST_F(SlaveTest, MetricsInStatsEndpoint) Try<PID<Slave> > slave = StartSlave(); ASSERT_SOME(slave); - Future<process::http::Response> response = - process::http::get(slave.get(), "stats.json"); + Future<http::Response> response = + http::get(slave.get(), "stats.json"); AWAIT_READY(response); @@ -891,8 +886,8 @@ TEST_F(SlaveTest, StateEndpoint) Try<PID<Slave> > slave = StartSlave(flags); ASSERT_SOME(slave); - Future<process::http::Response> response = - process::http::get(slave.get(), "state.json"); + Future<http::Response> response = + http::get(slave.get(), "state.json"); AWAIT_READY(response); @@ -1009,7 +1004,7 @@ TEST_F(SlaveTest, TerminatingSlaveDoesNotReregister) // Send a ShutdownMessage instead of calling Stop() directly // to avoid blocking. - process::post(master.get(), slave.get(), ShutdownMessage()); + post(master.get(), slave.get(), ShutdownMessage()); // Advance the clock to trigger doReliableRegistration(). Clock::advance(slave::REGISTER_RETRY_INTERVAL_MAX * 2); @@ -1096,7 +1091,7 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails) // Set up the containerizer so the next update() will fail. EXPECT_CALL(containerizer, update(_, _)) - .WillOnce(Return(process::Failure("update() failed"))) + .WillOnce(Return(Failure("update() failed"))) .WillRepeatedly(Return(Nothing())); EXPECT_CALL(exec, killTask(_, _)) @@ -1201,6 +1196,151 @@ TEST_F(SlaveTest, PingTimeoutSomePings) } +// This test ensures that when slave removal rate limit is specified +// a slave that fails health checks is removed after acquiring permit +// from the rate limiter. +TEST_F(SlaveTest, RateLimitSlaveShutdown) +{ + // Start a master. + master::Flags flags = CreateMasterFlags(); + flags.slave_removal_rate_limit = "1/1secs"; + Try<PID<Master> > master = StartMaster(flags); + ASSERT_SOME(master); + + // Set these expectations up before we spawn the slave so that we + // don't miss the first PING. + Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _); + + // Drop all the PONGs to simulate health check timeout. + DROP_MESSAGES(Eq("PONG"), _, _); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + // Start a slave. + Try<PID<Slave> > slave = StartSlave(); + ASSERT_SOME(slave); + + AWAIT_READY(slaveRegisteredMessage); + + Future<Nothing> acquire = + FUTURE_DISPATCH(_, &RateLimiterProcess::acquire); + + Future<ShutdownMessage> shutdown = FUTURE_PROTOBUF(ShutdownMessage(), _, _); + + // Now advance through the PINGs. + Clock::pause(); + uint32_t pings = 0; + while (true) { + AWAIT_READY(ping); + pings++; + if (pings == master::MAX_SLAVE_PING_TIMEOUTS) { + break; + } + ping = FUTURE_MESSAGE(Eq("PING"), _, _); + Clock::advance(master::SLAVE_PING_TIMEOUT); + } + + Clock::advance(master::SLAVE_PING_TIMEOUT); + Clock::settle(); + + // Master should acquire the permit for shutting down the slave. + AWAIT_READY(acquire); + + // Master should shutdown the slave. + AWAIT_READY(shutdown); +} + + +// This test verifies that when a slave responds to pings after the +// slave observer has scheduled it for shutdown (due to health check +// failure), the shutdown is cancelled. +TEST_F(SlaveTest, CancelSlaveShutdown) +{ + // Start a master. + master::Flags flags = CreateMasterFlags(); + // Interval between slave removals. + Duration interval = master::SLAVE_PING_TIMEOUT * 10; + flags.slave_removal_rate_limit = "1/" + stringify(interval); + Try<PID<Master> > master = StartMaster(flags); + ASSERT_SOME(master); + + // Set these expectations up before we spawn the slave so that we + // don't miss the first PING. + Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _); + + // Drop all the PONGs to simulate health check timeout. + DROP_MESSAGES(Eq("PONG"), _, _); + + // NOTE: We start two slaves in this test so that the rate limiter + // used by the slave observers gives out 2 permits. The first permit + // gets satisfied immediately. And since the 2nd permit will be + // enqueued, it's corresponding future can be discarded before it + // becomes ready. + // TODO(vinod): Inject a rate limiter into 'Master' instead to + // simplify the test. + + // Start the first slave. + Future<SlaveRegisteredMessage> slaveRegisteredMessage1 = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Try<PID<Slave> > slave1 = StartSlave(); + ASSERT_SOME(slave1); + + AWAIT_READY(slaveRegisteredMessage1); + + // Start the second slave. + Future<SlaveRegisteredMessage> slaveRegisteredMessage2 = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Try<PID<Slave> > slave2 = StartSlave(); + ASSERT_SOME(slave2); + + AWAIT_READY(slaveRegisteredMessage2); + + Future<Nothing> acquire1 = + FUTURE_DISPATCH(_, &RateLimiterProcess::acquire); + + Future<Nothing> acquire2 = + FUTURE_DISPATCH(_, &RateLimiterProcess::acquire); + + Future<ShutdownMessage> shutdown = FUTURE_PROTOBUF(ShutdownMessage(), _, _); + + // Now advance through the PINGs until shutdown permits are given + // out for both the slaves. + Clock::pause(); + while (true) { + AWAIT_READY(ping); + if (acquire1.isReady() && acquire2.isReady()) { + break; + } + ping = FUTURE_MESSAGE(Eq("PING"), _, _); + Clock::advance(master::SLAVE_PING_TIMEOUT); + } + Clock::settle(); + + // The slave that first timed out should be shutdown right away. + AWAIT_READY(shutdown); + + // Ensure the 2nd slave's shutdown is canceled. + EXPECT_NO_FUTURE_PROTOBUFS(ShutdownMessage(), _, _); + + // Reset the filters to allow pongs from the 2nd slave. + filter(NULL); + + // Advance clock enough to do a ping pong. + Clock::advance(master::SLAVE_PING_TIMEOUT); + Clock::settle(); + + // Now advance the clock to the time the 2nd permit is acquired. + Clock::advance(interval); + + // Settle the clock to give a chance for the master to shutdown + // the 2nd slave (it shouldn't in this test). + Clock::settle(); +} + + // This test ensures that a killTask() can happen between runTask() // and _runTask() and then gets "handled properly". This means that // the task never gets started, but also does not get lost. The end @@ -1220,7 +1360,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts) StandaloneMasterDetector detector(master.get()); MockSlave slave(CreateSlaveFlags(), &detector, &containerizer); - process::spawn(slave); + spawn(slave); MockScheduler sched; MesosSchedulerDriver driver( @@ -1311,8 +1451,8 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts) driver.stop(); driver.join(); - process::terminate(slave); - process::wait(slave); + terminate(slave); + wait(slave); Shutdown(); // Must shutdown before 'containerizer' gets deallocated. } @@ -1497,8 +1637,8 @@ TEST_F(SlaveTest, TaskLabels) AWAIT_READY(update); // Verify label key and value in slave state.json. - Future<process::http::Response> response = - process::http::get(slave.get(), "state.json"); + Future<http::Response> response = + http::get(slave.get(), "state.json"); AWAIT_READY(response); EXPECT_SOME_EQ( @@ -1685,7 +1825,7 @@ TEST_F(SlaveTest, CommandExecutorGracefulShutdown) // Ensure that a reap will occur within the grace period. Duration timeout = slave::getExecutorGracePeriod( flags.executor_shutdown_grace_period); - EXPECT_GT(timeout, process::MAX_REAP_INTERVAL()); + EXPECT_GT(timeout, MAX_REAP_INTERVAL()); Fetcher fetcher; Try<MesosContainerizer*> containerizer = MesosContainerizer::create(
