Repository: mesos Updated Branches: refs/heads/master 81fc89d1d -> 7bf1e8a6b
Consolidated slave re-registration Timers into a single Timer. Review: https://reviews.apache.org/r/19857 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1a88f09c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1a88f09c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1a88f09c Branch: refs/heads/master Commit: 1a88f09c7188f5456d7c8aa1606b95b7cac5b650 Parents: 81fc89d Author: Benjamin Mahler <[email protected]> Authored: Fri Mar 28 12:39:48 2014 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Wed Apr 16 19:17:03 2014 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 91 ++++++++++++++++++++++++---------------------- src/master/master.hpp | 12 ++++-- 2 files changed, 56 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/1a88f09c/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 3c3c989..3803c60 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -609,14 +609,14 @@ void Master::finalize() } roles.clear(); - foreachvalue (const Timer& timer, slaves.recovered) { - // NOTE: This is necessary during tests because we don't want the - // timer to fire in a different test and invoke the callback. - // The callback would be invoked because the master pid doesn't - // change across the tests. - // TODO(vinod): This seems to be a bug in libprocess or the - // testing infrastructure. - Timer::cancel(timer); + // NOTE: This is necessary during tests because we don't want the + // timer to fire in a different test and invoke the callback. + // The callback would be invoked because the master pid doesn't + // change across the tests. + // TODO(vinod): This seems to be a bug in libprocess or the + // testing infrastructure. + if (slaves.recoveredTimer.isSome()) { + Timer::cancel(slaves.recoveredTimer.get()); } terminate(whitelistWatcher); @@ -747,51 +747,59 @@ Future<Nothing> Master::recover() Future<Nothing> Master::_recover(const Registry& registry) { - const Registry::Slaves& slaves = registry.slaves(); - - foreach (const Registry::Slave& slave, slaves.slaves()) { - // Set up a timeout for this slave to re-register. This timeout - // is based on the maximum amount of time the SlaveObserver - // allows slaves to not respond to health checks. Re-registration - // of the slave will cancel this timer. - // XXX: What if there is a ZK issue that delays detection for slaves? - // Should we be more conservative here to avoid a full shutdown? - this->slaves.recovered[slave.info().id()] = - delay(SLAVE_PING_TIMEOUT * MAX_SLAVE_PING_TIMEOUTS, - self(), - &Self::__recoverSlaveTimeout, - slave); + foreach (const Registry::Slave& slave, registry.slaves().slaves()) { + slaves.recovered.insert(slave.info().id()); } + // Set up a timeout for slaves to re-register. This timeout is based + // on the maximum amount of time the SlaveObserver allows slaves to + // not respond to health checks. + // TODO(bmahler): Consider making this configurable. + slaves.recoveredTimer = + delay(SLAVE_PING_TIMEOUT * MAX_SLAVE_PING_TIMEOUTS, + self(), + &Self::recoveredSlavesTimeout, + registry); + // Recovery is now complete! - LOG(INFO) << "Recovered " << slaves.slaves().size() << " slaves " - << " from the Registry (" << Bytes(registry.ByteSize()) << ")"; + LOG(INFO) << "Recovered " << registry.slaves().slaves().size() << " slaves" + << " from the Registry (" << Bytes(registry.ByteSize()) << ")" + << " ; allowing " << SLAVE_PING_TIMEOUT * MAX_SLAVE_PING_TIMEOUTS + << " for slaves to re-register"; return Nothing(); } -void Master::__recoverSlaveTimeout(const Registry::Slave& slave) +void Master::recoveredSlavesTimeout(const Registry& registry) { CHECK(elected()); - if (!slaves.recovered.contains(slave.info().id())) { - return; // Slave re-registered. - } + // TODO(bmahler): Provide a (configurable) limit on the number of + // slaves that can be removed here, e.g. maximum 10% of slaves can + // be removed after failover if they do not re-register. + // This can serve as a configurable safety net for operators of + // production environments. - LOG(WARNING) << "Slave " << slave.info().id() - << " (" << slave.info().hostname() << ") did not re-register " - << "within the timeout; Removing it from the registrar"; + foreach (const Registry::Slave& slave, registry.slaves().slaves()) { + if (!slaves.recovered.contains(slave.info().id())) { + continue; // Slave re-registered. + } - slaves.recovered.erase(slave.info().id()); - slaves.removing.insert(slave.info().id()); + LOG(WARNING) << "Slave " << slave.info().id() + << " (" << slave.info().hostname() << ") did not re-register " + << "within the timeout; removing it from the registrar"; - registrar->apply(Owned<Operation>(new RemoveSlave(slave.info()))) - .onAny(defer(self(), - &Self::_removeSlave, - slave.info(), - vector<StatusUpdate>(), // No TASK_LOST updates to send. - lambda::_1)); + slaves.recovered.erase(slave.info().id()); + slaves.removing.insert(slave.info().id()); + + registrar->apply(Owned<Operation>(new RemoveSlave(slave.info()))) + .onAny(defer(self(), + &Self::_removeSlave, + slave.info(), + vector<StatusUpdate>(), // No TASK_LOST updates to send. + lambda::_1)); + } } @@ -2156,10 +2164,7 @@ void Master::reregisterSlave( // Ensure we don't remove the slave for not re-registering after // we've recovered it from the registry. - if (slaves.recovered.contains(slaveInfo.id())) { - Timer::cancel(slaves.recovered[slaveInfo.id()]); - slaves.recovered.erase(slaveInfo.id()); - } + slaves.recovered.erase(slaveInfo.id()); // If we're already re-registering this slave, then no need to ask // the registrar again. http://git-wip-us.apache.org/repos/asf/mesos/blob/1a88f09c/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index fef59c9..2fe0379 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -204,7 +204,7 @@ protected: // Recovers state from the registrar. process::Future<Nothing> recover(); process::Future<Nothing> _recover(const Registry& registry); - void __recoverSlaveTimeout(const Registry::Slave& slave); + void recoveredSlavesTimeout(const Registry& registry); void _registerSlave( const SlaveInfo& slaveInfo, @@ -387,10 +387,14 @@ private: { Slaves() : deactivated(MAX_DEACTIVATED_SLAVES) {} + // Imposes a time limit for slaves that we recover from the + // registry to re-register with the master. + Option<process::Timer> recoveredTimer; + // Slaves that have been recovered from the registrar but have yet - // to re-register. We keep a Timer for the removal of these slaves - // so that we can cancel it to avoid unnecessary dispatches. - hashmap<SlaveID, process::Timer> recovered; + // to re-register. We keep a "reregistrationTimer" above to ensure + // we remove these slaves if they do not re-register. + hashset<SlaveID> recovered; // Slaves that are in the process of registering. hashset<process::UPID> registering;
