Repository: mesos Updated Branches: refs/heads/master 81d830f50 -> d47cf3960
Added a ping timeout in the slave to trigger a re-detection. Review: https://reviews.apache.org/r/23868 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d47cf396 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d47cf396 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d47cf396 Branch: refs/heads/master Commit: d47cf3960d9aeacaed1507a7ab61fcee5a62a86e Parents: e20ea63 Author: Benjamin Mahler <[email protected]> Authored: Wed Jul 23 12:01:09 2014 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Mon Aug 4 13:55:27 2014 -0700 ---------------------------------------------------------------------- src/master/constants.hpp | 6 ++++ src/slave/constants.cpp | 4 +++ src/slave/constants.hpp | 3 ++ src/slave/slave.cpp | 62 +++++++++++++++++++++++++++------ src/slave/slave.hpp | 11 ++++++ src/tests/slave_tests.cpp | 79 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 154 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/d47cf396/src/master/constants.hpp ---------------------------------------------------------------------- diff --git a/src/master/constants.hpp b/src/master/constants.hpp index 3b4d68b..eadc52b 100644 --- a/src/master/constants.hpp +++ b/src/master/constants.hpp @@ -23,6 +23,7 @@ #include <string> +#include <stout/bytes.hpp> #include <stout/duration.hpp> namespace mesos { @@ -51,6 +52,11 @@ extern const double MIN_CPUS; extern const Bytes MIN_MEM; // Amount of time within which a slave PING should be received. +// NOTE: The slave uses these PING constants to determine when +// the master has stopped sending pings. If these are made +// configurable, then we'll need to rely on upper/lower bounds +// to ensure that the slave is not unnecessarily triggering +// re-registrations. extern const Duration SLAVE_PING_TIMEOUT; // Maximum number of ping timeouts until slave is considered failed. http://git-wip-us.apache.org/repos/asf/mesos/blob/d47cf396/src/slave/constants.cpp ---------------------------------------------------------------------- diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp index a75b1ef..f00300d 100644 --- a/src/slave/constants.cpp +++ b/src/slave/constants.cpp @@ -18,6 +18,8 @@ #include <stdint.h> +#include "master/constants.hpp" + #include "slave/constants.hpp" namespace mesos { @@ -30,6 +32,8 @@ const Duration EXECUTOR_REREGISTER_TIMEOUT = Seconds(2); const Duration EXECUTOR_SIGNAL_ESCALATION_TIMEOUT = Seconds(3); const Duration STATUS_UPDATE_RETRY_INTERVAL_MIN = Seconds(10); const Duration STATUS_UPDATE_RETRY_INTERVAL_MAX = Minutes(10); +const Duration MASTER_PING_TIMEOUT = + master::SLAVE_PING_TIMEOUT * master::MAX_SLAVE_PING_TIMEOUTS; const Duration REGISTRATION_BACKOFF_FACTOR = Seconds(1); const Duration REGISTER_RETRY_INTERVAL_MAX = Minutes(1); const Duration GC_DELAY = Weeks(1); http://git-wip-us.apache.org/repos/asf/mesos/blob/d47cf396/src/slave/constants.hpp ---------------------------------------------------------------------- diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp index 97dc1b3..bc16fe5 100644 --- a/src/slave/constants.hpp +++ b/src/slave/constants.hpp @@ -45,6 +45,9 @@ extern const Duration GC_DELAY; extern const Duration DISK_WATCH_INTERVAL; extern const Duration RESOURCE_MONITORING_INTERVAL; +// If no pings received within this timeout, then the slave will +// trigger a re-detection of the master to cause a re-registration. +extern const Duration MASTER_PING_TIMEOUT; // Default backoff interval used by the slave to wait before registration. extern const Duration REGISTRATION_BACKOFF_FACTOR; http://git-wip-us.apache.org/repos/asf/mesos/blob/d47cf396/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index df69b75..2f39d61 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -585,19 +585,24 @@ void Slave::detected(const Future<Option<MasterInfo> >& _master) state = DISCONNECTED; } - CHECK(!_master.isDiscarded()); - if (_master.isFailed()) { EXIT(1) << "Failed to detect a master: " << _master.failure(); } - if (_master.get().isSome()) { - master = UPID(_master.get().get().pid()); - } else { + Option<MasterInfo> latest; + + if (_master.isDiscarded()) { + LOG(INFO) << "No pings from master received within " << MASTER_PING_TIMEOUT; + latest = None(); master = None(); - } + } else if (_master.get().isNone()) { + LOG(INFO) << "Lost leading master"; + latest = None(); + master = None(); + } else { + latest = _master.get(); + master = UPID(_master.get().get().pid()); - if (master.isSome()) { LOG(INFO) << "New master detected at " << master.get(); link(master.get()); @@ -642,13 +647,11 @@ void Slave::detected(const Future<Option<MasterInfo> >& _master) &Slave::doReliableRegistration, flags.registration_backoff_factor * 2); // Backoff } - } else { - LOG(INFO) << "Lost leading master"; } // Keep detecting masters. LOG(INFO) << "Detecting new master"; - detector->detect(_master.get()) + detection = detector->detect(latest) .onAny(defer(self(), &Slave::detected, lambda::_1)); } @@ -781,6 +784,18 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId) LOG(INFO) << "Checkpointing SlaveInfo to '" << path << "'"; CHECK_SOME(state::checkpoint(path, info)); } + + // If we don't get a ping from the master, trigger a + // re-registration. This needs to be done once registered, + // in case we never receive an initial ping. + Timer::cancel(pingTimer); + + pingTimer = delay( + MASTER_PING_TIMEOUT, + self(), + &Slave::pingTimeout, + detection); + break; } case RUNNING: @@ -2324,10 +2339,35 @@ void Slave::executorMessage( void Slave::ping(const UPID& from, const string& body) { VLOG(1) << "Received ping from " << from; + + // If we don't get a ping from the master, trigger a + // re-registration. This can occur when the master no + // longer considers the slave to be registered, so it is + // essential for the slave to attempt a re-registration + // when this occurs. + Timer::cancel(pingTimer); + + pingTimer = delay( + MASTER_PING_TIMEOUT, + self(), + &Slave::pingTimeout, + detection); + send(from, "PONG"); } +void Slave::pingTimeout(Future<Option<MasterInfo> > future) +{ + // It's possible that a new ping arrived since the timeout fired + // and we were unable to cancel this timeout. If this occurs, don't + // bother trying to re-detect. + if (pingTimer.timeout().expired()) { + future.discard(); + } +} + + void Slave::exited(const UPID& pid) { LOG(INFO) << pid << " exited"; @@ -3211,7 +3251,7 @@ void Slave::__recover(const Future<Nothing>& future) } // Start detecting masters. - detector->detect() + detection = detector->detect() .onAny(defer(self(), &Slave::detected, lambda::_1)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/d47cf396/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 9ef597e..c12cd0a 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -246,6 +246,10 @@ public: Nothing detachFile(const std::string& path); + // Triggers a re-detection of the master when the slave does + // not receive a ping. + void pingTimeout(process::Future<Option<MasterInfo> > future); + void authenticate(); // Helper routine to lookup a framework. @@ -431,6 +435,13 @@ private: StatusUpdateManager* statusUpdateManager; + // Master detection future. + process::Future<Option<MasterInfo> > detection; + + // Timer for triggering re-detection when no ping is received from + // the master. + process::Timer pingTimer; + // Flag to indicate if recovery, including reconciling (i.e., reconnect/kill) // with executors is finished. process::Promise<Nothing> recovered; http://git-wip-us.apache.org/repos/asf/mesos/blob/d47cf396/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index b4f9f30..069fbda 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -67,6 +67,7 @@ using mesos::internal::slave::MesosContainerizerProcess; using process::Clock; using process::Future; +using process::Message; using process::Owned; using process::PID; @@ -897,3 +898,81 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails) Shutdown(); } + + +// This test ensures that the slave will re-register with the master +// if it does not receive any pings after registering. +TEST_F(SlaveTest, PingTimeoutNoPings) +{ + // Start a master. + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + // Block all pings to the slave. + DROP_MESSAGES(Eq("PING"), _, _); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + // Start a slave. + Try<PID<Slave> > slave = StartSlave(); + ASSERT_SOME(slave); + + AWAIT_READY(slaveRegisteredMessage); + + // Advance to the ping timeout to trigger a re-detection and + // re-registration. + Future<Nothing> detected = FUTURE_DISPATCH(_, &Slave::detected); + + Future<SlaveReregisteredMessage> slaveReregisteredMessage = + FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + + Clock::pause(); + Clock::advance(slave::MASTER_PING_TIMEOUT); + + AWAIT_READY(detected); + AWAIT_READY(slaveReregisteredMessage); +} + + +// This test ensures that the slave will re-register with the master +// if it stops receiving pings. +TEST_F(SlaveTest, PingTimeoutSomePings) +{ + // Start a master. + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + // Start a slave. + Try<PID<Slave> > slave = StartSlave(); + ASSERT_SOME(slave); + + AWAIT_READY(slaveRegisteredMessage); + + Clock::pause(); + + // Ensure a ping reaches the slave. + Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _); + + Clock::advance(master::SLAVE_PING_TIMEOUT); + + AWAIT_READY(ping); + + // Now block further pings from the master and advance + // the clock to trigger a re-detection and re-registration on + // the slave. + DROP_MESSAGES(Eq("PING"), _, _); + + Future<Nothing> detected = FUTURE_DISPATCH(_, &Slave::detected); + + Future<SlaveReregisteredMessage> slaveReregisteredMessage = + FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + + Clock::advance(slave::MASTER_PING_TIMEOUT); + + AWAIT_READY(detected); + AWAIT_READY(slaveReregisteredMessage); +}
