Repository: mesos
Updated Branches:
  refs/heads/master 81d830f50 -> d47cf3960


Added a ping timeout in the slave to trigger a re-detection.

Review: https://reviews.apache.org/r/23868


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d47cf396
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d47cf396
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d47cf396

Branch: refs/heads/master
Commit: d47cf3960d9aeacaed1507a7ab61fcee5a62a86e
Parents: e20ea63
Author: Benjamin Mahler <[email protected]>
Authored: Wed Jul 23 12:01:09 2014 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Mon Aug 4 13:55:27 2014 -0700

----------------------------------------------------------------------
 src/master/constants.hpp  |  6 ++++
 src/slave/constants.cpp   |  4 +++
 src/slave/constants.hpp   |  3 ++
 src/slave/slave.cpp       | 62 +++++++++++++++++++++++++++------
 src/slave/slave.hpp       | 11 ++++++
 src/tests/slave_tests.cpp | 79 ++++++++++++++++++++++++++++++++++++++++++
 6 files changed, 154 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d47cf396/src/master/constants.hpp
----------------------------------------------------------------------
diff --git a/src/master/constants.hpp b/src/master/constants.hpp
index 3b4d68b..eadc52b 100644
--- a/src/master/constants.hpp
+++ b/src/master/constants.hpp
@@ -23,6 +23,7 @@
 
 #include <string>
 
+#include <stout/bytes.hpp>
 #include <stout/duration.hpp>
 
 namespace mesos {
@@ -51,6 +52,11 @@ extern const double MIN_CPUS;
 extern const Bytes MIN_MEM;
 
 // Amount of time within which a slave PING should be received.
+// NOTE: The slave uses these PING constants to determine when
+// the master has stopped sending pings. If these are made
+// configurable, then we'll need to rely on upper/lower bounds
+// to ensure that the slave is not unnecessarily triggering
+// re-registrations.
 extern const Duration SLAVE_PING_TIMEOUT;
 
 // Maximum number of ping timeouts until slave is considered failed.

http://git-wip-us.apache.org/repos/asf/mesos/blob/d47cf396/src/slave/constants.cpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp
index a75b1ef..f00300d 100644
--- a/src/slave/constants.cpp
+++ b/src/slave/constants.cpp
@@ -18,6 +18,8 @@
 
 #include <stdint.h>
 
+#include "master/constants.hpp"
+
 #include "slave/constants.hpp"
 
 namespace mesos {
@@ -30,6 +32,8 @@ const Duration EXECUTOR_REREGISTER_TIMEOUT = Seconds(2);
 const Duration EXECUTOR_SIGNAL_ESCALATION_TIMEOUT = Seconds(3);
 const Duration STATUS_UPDATE_RETRY_INTERVAL_MIN = Seconds(10);
 const Duration STATUS_UPDATE_RETRY_INTERVAL_MAX = Minutes(10);
+const Duration MASTER_PING_TIMEOUT =
+  master::SLAVE_PING_TIMEOUT * master::MAX_SLAVE_PING_TIMEOUTS;
 const Duration REGISTRATION_BACKOFF_FACTOR = Seconds(1);
 const Duration REGISTER_RETRY_INTERVAL_MAX = Minutes(1);
 const Duration GC_DELAY = Weeks(1);

http://git-wip-us.apache.org/repos/asf/mesos/blob/d47cf396/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index 97dc1b3..bc16fe5 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -45,6 +45,9 @@ extern const Duration GC_DELAY;
 extern const Duration DISK_WATCH_INTERVAL;
 extern const Duration RESOURCE_MONITORING_INTERVAL;
 
+// If no pings received within this timeout, then the slave will
+// trigger a re-detection of the master to cause a re-registration.
+extern const Duration MASTER_PING_TIMEOUT;
 
 // Default backoff interval used by the slave to wait before registration.
 extern const Duration REGISTRATION_BACKOFF_FACTOR;

http://git-wip-us.apache.org/repos/asf/mesos/blob/d47cf396/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index df69b75..2f39d61 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -585,19 +585,24 @@ void Slave::detected(const Future<Option<MasterInfo> >& 
_master)
     state = DISCONNECTED;
   }
 
-  CHECK(!_master.isDiscarded());
-
   if (_master.isFailed()) {
     EXIT(1) << "Failed to detect a master: " << _master.failure();
   }
 
-  if (_master.get().isSome()) {
-    master = UPID(_master.get().get().pid());
-  } else {
+  Option<MasterInfo> latest;
+
+  if (_master.isDiscarded()) {
+    LOG(INFO) << "No pings from master received within " << 
MASTER_PING_TIMEOUT;
+    latest = None();
     master = None();
-  }
+  } else if (_master.get().isNone()) {
+    LOG(INFO) << "Lost leading master";
+    latest = None();
+    master = None();
+  } else {
+    latest = _master.get();
+    master = UPID(_master.get().get().pid());
 
-  if (master.isSome()) {
     LOG(INFO) << "New master detected at " << master.get();
     link(master.get());
 
@@ -642,13 +647,11 @@ void Slave::detected(const Future<Option<MasterInfo> >& 
_master)
             &Slave::doReliableRegistration,
             flags.registration_backoff_factor * 2); // Backoff
     }
-  } else {
-    LOG(INFO) << "Lost leading master";
   }
 
   // Keep detecting masters.
   LOG(INFO) << "Detecting new master";
-  detector->detect(_master.get())
+  detection = detector->detect(latest)
     .onAny(defer(self(), &Slave::detected, lambda::_1));
 }
 
@@ -781,6 +784,18 @@ void Slave::registered(const UPID& from, const SlaveID& 
slaveId)
         LOG(INFO) << "Checkpointing SlaveInfo to '" << path << "'";
         CHECK_SOME(state::checkpoint(path, info));
       }
+
+      // If we don't get a ping from the master, trigger a
+      // re-registration. This needs to be done once registered,
+      // in case we never receive an initial ping.
+      Timer::cancel(pingTimer);
+
+      pingTimer = delay(
+          MASTER_PING_TIMEOUT,
+          self(),
+          &Slave::pingTimeout,
+          detection);
+
       break;
     }
     case RUNNING:
@@ -2324,10 +2339,35 @@ void Slave::executorMessage(
 void Slave::ping(const UPID& from, const string& body)
 {
   VLOG(1) << "Received ping from " << from;
+
+  // If we don't get a ping from the master, trigger a
+  // re-registration. This can occur when the master no
+  // longer considers the slave to be registered, so it is
+  // essential for the slave to attempt a re-registration
+  // when this occurs.
+  Timer::cancel(pingTimer);
+
+  pingTimer = delay(
+      MASTER_PING_TIMEOUT,
+      self(),
+      &Slave::pingTimeout,
+      detection);
+
   send(from, "PONG");
 }
 
 
+void Slave::pingTimeout(Future<Option<MasterInfo> > future)
+{
+  // It's possible that a new ping arrived since the timeout fired
+  // and we were unable to cancel this timeout. If this occurs, don't
+  // bother trying to re-detect.
+  if (pingTimer.timeout().expired()) {
+    future.discard();
+  }
+}
+
+
 void Slave::exited(const UPID& pid)
 {
   LOG(INFO) << pid << " exited";
@@ -3211,7 +3251,7 @@ void Slave::__recover(const Future<Nothing>& future)
   }
 
   // Start detecting masters.
-  detector->detect()
+  detection = detector->detect()
     .onAny(defer(self(), &Slave::detected, lambda::_1));
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d47cf396/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 9ef597e..c12cd0a 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -246,6 +246,10 @@ public:
 
   Nothing detachFile(const std::string& path);
 
+  // Triggers a re-detection of the master when the slave does
+  // not receive a ping.
+  void pingTimeout(process::Future<Option<MasterInfo> > future);
+
   void authenticate();
 
   // Helper routine to lookup a framework.
@@ -431,6 +435,13 @@ private:
 
   StatusUpdateManager* statusUpdateManager;
 
+  // Master detection future.
+  process::Future<Option<MasterInfo> > detection;
+
+  // Timer for triggering re-detection when no ping is received from
+  // the master.
+  process::Timer pingTimer;
+
   // Flag to indicate if recovery, including reconciling (i.e., reconnect/kill)
   // with executors is finished.
   process::Promise<Nothing> recovered;

http://git-wip-us.apache.org/repos/asf/mesos/blob/d47cf396/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index b4f9f30..069fbda 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -67,6 +67,7 @@ using mesos::internal::slave::MesosContainerizerProcess;
 
 using process::Clock;
 using process::Future;
+using process::Message;
 using process::Owned;
 using process::PID;
 
@@ -897,3 +898,81 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
 
   Shutdown();
 }
+
+
+// This test ensures that the slave will re-register with the master
+// if it does not receive any pings after registering.
+TEST_F(SlaveTest, PingTimeoutNoPings)
+{
+  // Start a master.
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Block all pings to the slave.
+  DROP_MESSAGES(Eq("PING"), _, _);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  // Start a slave.
+  Try<PID<Slave> > slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Advance to the ping timeout to trigger a re-detection and
+  // re-registration.
+  Future<Nothing> detected = FUTURE_DISPATCH(_, &Slave::detected);
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  Clock::pause();
+  Clock::advance(slave::MASTER_PING_TIMEOUT);
+
+  AWAIT_READY(detected);
+  AWAIT_READY(slaveReregisteredMessage);
+}
+
+
+// This test ensures that the slave will re-register with the master
+// if it stops receiving pings.
+TEST_F(SlaveTest, PingTimeoutSomePings)
+{
+  // Start a master.
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  // Start a slave.
+  Try<PID<Slave> > slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  Clock::pause();
+
+  // Ensure a ping reaches the slave.
+  Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+
+  Clock::advance(master::SLAVE_PING_TIMEOUT);
+
+  AWAIT_READY(ping);
+
+  // Now block further pings from the master and advance
+  // the clock to trigger a re-detection and re-registration on
+  // the slave.
+  DROP_MESSAGES(Eq("PING"), _, _);
+
+  Future<Nothing> detected = FUTURE_DISPATCH(_, &Slave::detected);
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  Clock::advance(slave::MASTER_PING_TIMEOUT);
+
+  AWAIT_READY(detected);
+  AWAIT_READY(slaveReregisteredMessage);
+}

Reply via email to