Repository: mesos
Updated Branches:
  refs/heads/master 81fc89d1d -> 7bf1e8a6b


Consolidated slave re-registration Timers into a single Timer.

Review: https://reviews.apache.org/r/19857


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1a88f09c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1a88f09c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1a88f09c

Branch: refs/heads/master
Commit: 1a88f09c7188f5456d7c8aa1606b95b7cac5b650
Parents: 81fc89d
Author: Benjamin Mahler <[email protected]>
Authored: Fri Mar 28 12:39:48 2014 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Wed Apr 16 19:17:03 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 91 ++++++++++++++++++++++++----------------------
 src/master/master.hpp | 12 ++++--
 2 files changed, 56 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1a88f09c/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 3c3c989..3803c60 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -609,14 +609,14 @@ void Master::finalize()
   }
   roles.clear();
 
-  foreachvalue (const Timer& timer, slaves.recovered) {
-    // NOTE: This is necessary during tests because we don't want the
-    // timer to fire in a different test and invoke the callback.
-    // The callback would be invoked because the master pid doesn't
-    // change across the tests.
-    // TODO(vinod): This seems to be a bug in libprocess or the
-    // testing infrastructure.
-    Timer::cancel(timer);
+  // NOTE: This is necessary during tests because we don't want the
+  // timer to fire in a different test and invoke the callback.
+  // The callback would be invoked because the master pid doesn't
+  // change across the tests.
+  // TODO(vinod): This seems to be a bug in libprocess or the
+  // testing infrastructure.
+  if (slaves.recoveredTimer.isSome()) {
+    Timer::cancel(slaves.recoveredTimer.get());
   }
 
   terminate(whitelistWatcher);
@@ -747,51 +747,59 @@ Future<Nothing> Master::recover()
 
 Future<Nothing> Master::_recover(const Registry& registry)
 {
-  const Registry::Slaves& slaves = registry.slaves();
-
-  foreach (const Registry::Slave& slave, slaves.slaves()) {
-    // Set up a timeout for this slave to re-register. This timeout
-    // is based on the maximum amount of time the SlaveObserver
-    // allows slaves to not respond to health checks. Re-registration
-    // of the slave will cancel this timer.
-    // XXX: What if there is a ZK issue that delays detection for slaves?
-    //      Should we be more conservative here to avoid a full shutdown?
-    this->slaves.recovered[slave.info().id()] =
-      delay(SLAVE_PING_TIMEOUT * MAX_SLAVE_PING_TIMEOUTS,
-            self(),
-            &Self::__recoverSlaveTimeout,
-            slave);
+  foreach (const Registry::Slave& slave, registry.slaves().slaves()) {
+    slaves.recovered.insert(slave.info().id());
   }
 
+  // Set up a timeout for slaves to re-register. This timeout is based
+  // on the maximum amount of time the SlaveObserver allows slaves to
+  // not respond to health checks.
+  // TODO(bmahler): Consider making this configurable.
+  slaves.recoveredTimer =
+    delay(SLAVE_PING_TIMEOUT * MAX_SLAVE_PING_TIMEOUTS,
+          self(),
+          &Self::recoveredSlavesTimeout,
+          registry);
+
   // Recovery is now complete!
-  LOG(INFO) << "Recovered " << slaves.slaves().size() << " slaves "
-            << " from the Registry (" << Bytes(registry.ByteSize()) << ")";
+  LOG(INFO) << "Recovered " << registry.slaves().slaves().size() << " slaves"
+            << " from the Registry (" << Bytes(registry.ByteSize()) << ")"
+            << " ; allowing " << SLAVE_PING_TIMEOUT * MAX_SLAVE_PING_TIMEOUTS
+            << " for slaves to re-register";
 
   return Nothing();
 }
 
 
-void Master::__recoverSlaveTimeout(const Registry::Slave& slave)
+void Master::recoveredSlavesTimeout(const Registry& registry)
 {
   CHECK(elected());
 
-  if (!slaves.recovered.contains(slave.info().id())) {
-    return; // Slave re-registered.
-  }
+  // TODO(bmahler): Provide a (configurable) limit on the number of
+  // slaves that can be removed here, e.g. maximum 10% of slaves can
+  // be removed after failover if they do not re-register.
+  // This can serve as a configurable safety net for operators of
+  // production environments.
 
-  LOG(WARNING) << "Slave " << slave.info().id()
-               << " (" << slave.info().hostname() << ") did not re-register "
-               << "within the timeout; Removing it from the registrar";
+  foreach (const Registry::Slave& slave, registry.slaves().slaves()) {
+    if (!slaves.recovered.contains(slave.info().id())) {
+      continue; // Slave re-registered.
+    }
 
-  slaves.recovered.erase(slave.info().id());
-  slaves.removing.insert(slave.info().id());
+    LOG(WARNING) << "Slave " << slave.info().id()
+                 << " (" << slave.info().hostname() << ") did not re-register "
+                 << "within the timeout; removing it from the registrar";
 
-  registrar->apply(Owned<Operation>(new RemoveSlave(slave.info())))
-    .onAny(defer(self(),
-                 &Self::_removeSlave,
-                 slave.info(),
-                 vector<StatusUpdate>(), // No TASK_LOST updates to send.
-                 lambda::_1));
+    slaves.recovered.erase(slave.info().id());
+    slaves.removing.insert(slave.info().id());
+
+    registrar->apply(Owned<Operation>(new RemoveSlave(slave.info())))
+      .onAny(defer(self(),
+                   &Self::_removeSlave,
+                   slave.info(),
+                   vector<StatusUpdate>(), // No TASK_LOST updates to send.
+                   lambda::_1));
+  }
 }
 
 
@@ -2156,10 +2164,7 @@ void Master::reregisterSlave(
 
   // Ensure we don't remove the slave for not re-registering after
   // we've recovered it from the registry.
-  if (slaves.recovered.contains(slaveInfo.id())) {
-    Timer::cancel(slaves.recovered[slaveInfo.id()]);
-    slaves.recovered.erase(slaveInfo.id());
-  }
+  slaves.recovered.erase(slaveInfo.id());
 
   // If we're already re-registering this slave, then no need to ask
   // the registrar again.

http://git-wip-us.apache.org/repos/asf/mesos/blob/1a88f09c/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index fef59c9..2fe0379 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -204,7 +204,7 @@ protected:
   // Recovers state from the registrar.
   process::Future<Nothing> recover();
   process::Future<Nothing> _recover(const Registry& registry);
-  void __recoverSlaveTimeout(const Registry::Slave& slave);
+  void recoveredSlavesTimeout(const Registry& registry);
 
   void _registerSlave(
       const SlaveInfo& slaveInfo,
@@ -387,10 +387,14 @@ private:
   {
     Slaves() : deactivated(MAX_DEACTIVATED_SLAVES) {}
 
+    // Imposes a time limit for slaves that we recover from the
+    // registry to re-register with the master.
+    Option<process::Timer> recoveredTimer;
+
     // Slaves that have been recovered from the registrar but have yet
-    // to re-register. We keep a Timer for the removal of these slaves
-    // so that we can cancel it to avoid unnecessary dispatches.
-    hashmap<SlaveID, process::Timer> recovered;
+    // to re-register. We keep a "reregistrationTimer" above to ensure
+    // we remove these slaves if they do not re-register.
+    hashset<SlaveID> recovered;
 
     // Slaves that are in the process of registering.
     hashset<process::UPID> registering;

Reply via email to