Updated ping message to embed the slave connected status.

Review: https://reviews.apache.org/r/25867


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bef49064
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bef49064
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bef49064

Branch: refs/heads/master
Commit: bef49064a830baea87891bfa61a68729a7c06029
Parents: 0b66d1d
Author: Vinod Kone <[email protected]>
Authored: Tue Sep 16 18:09:59 2014 -0700
Committer: Vinod Kone <[email protected]>
Committed: Thu Sep 25 13:54:46 2014 -0700

----------------------------------------------------------------------
 src/Makefile.am               |   1 +
 src/master/master.cpp         |  45 ++++++++++++---
 src/messages/messages.proto   |  31 +++--------
 src/slave/slave.cpp           |  67 +++++++++++++++++++++--
 src/slave/slave.hpp           |   8 ++-
 src/tests/partition_tests.cpp | 109 +++++++++++++++++++++++++++++++++++++
 6 files changed, 226 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bef49064/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index b821a3b..27c42df 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1107,6 +1107,7 @@ mesos_tests_SOURCES =                             \
   tests/master_tests.cpp                       \
   tests/mesos.cpp                              \
   tests/monitor_tests.cpp                      \
+  tests/partition_tests.cpp                    \
   tests/paths_tests.cpp                                \
   tests/protobuf_io_tests.cpp                  \
   tests/rate_limiting_tests.cpp                        \

http://git-wip-us.apache.org/repos/asf/mesos/blob/bef49064/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 92d93fe..a60308f 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -173,15 +173,42 @@ public:
       slaveId(_slaveId),
       master(_master),
       timeouts(0),
-      pinged(false)
+      pinged(false),
+      connected(true)
   {
+    // TODO(vinod): Deprecate this handler in 0.22.0 in favor of a
+    // new PongSlaveMessage handler.
     install("PONG", &SlaveObserver::pong);
   }
 
+  void reconnect()
+  {
+    connected = true;
+  }
+
+  void disconnect()
+  {
+    connected = false;
+  }
+
 protected:
   virtual void initialize()
   {
-    send(slave, "PING");
+    ping();
+  }
+
+  void ping()
+  {
+    // TODO(vinod): In 0.22.0, master should send the PingSlaveMessage
+    // instead of sending "PING" with the encoded PingSlaveMessage.
+    // Currently we do not do this for backwards compatibility with
+    // slaves on 0.20.0.
+    PingSlaveMessage message;
+    message.set_connected(connected);
+    string data;
+    CHECK(message.SerializeToString(&data));
+    send(slave, "PING", data.data(), data.size());
+
     pinged = true;
     delay(SLAVE_PING_TIMEOUT, self(), &SlaveObserver::timeout);
   }
@@ -201,9 +228,7 @@ protected:
       }
     }
 
-    send(slave, "PING");
-    pinged = true;
-    delay(SLAVE_PING_TIMEOUT, self(), &SlaveObserver::timeout);
+    ping();
   }
 
   void shutdown()
@@ -218,6 +243,7 @@ private:
   const PID<Master> master;
   uint32_t timeouts;
   bool pinged;
+  bool connected;
 };
 
 
@@ -1717,6 +1743,9 @@ void Master::disconnect(Slave* slave)
 
   slave->connected = false;
 
+  // Inform the slave observer.
+  dispatch(slave->observer, &SlaveObserver::disconnect);
+
   // Remove the slave from authenticated. This is safe because
   // a slave will always reauthenticate before (re-)registering.
   authenticated.erase(slave->pid);
@@ -3050,6 +3079,7 @@ void Master::reregisterSlave(
     // slave.
     if (!slave->connected) {
       slave->connected = true;
+      dispatch(slave->observer, &SlaveObserver::reconnect);
       slave->active = true;
       allocator->slaveActivated(slave->id);
     }
@@ -3683,8 +3713,9 @@ void Master::authenticate(const UPID& from, const UPID& 
pid)
   //       after restart; true for slave but not for framework.
   //       If the PID doesn't change the master might mark the client
   //       disconnected *after* the client re-registers.
-  // TODO(vinod): To ensure safety the client (slave) should be
-  // informed about this discrepancy so that it can re-register.
+  //       This is safe because the client (slave) will be informed
+  //       about this discrepancy via ping messages so that it can
+  //       re-register.
 
   authenticated.erase(pid);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bef49064/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 7cb3ce6..9ff06b3 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -256,11 +256,18 @@ message UnregisterSlaveMessage {
 }
 
 
-message HeartbeatMessage {
-  required SlaveID slave_id = 1;
+// This message is periodically sent by the master to the slave.
+// If the slave is connected to the master, "connected" is true.
+message PingSlaveMessage {
+  required bool connected = 1;
 }
 
 
+// This message is sent by the slave to the master in response to the
+// PingSlaveMessage.
+message PongSlaveMessage {}
+
+
 // Tells a slave to shut down all executors of the given framework.
 message ShutdownFrameworkMessage {
   required FrameworkID framework_id = 1;
@@ -320,26 +327,6 @@ message ReregisterExecutorMessage {
 }
 
 
-message RegisterProjdMessage {
-  required string project = 1;
-}
-
-
-message ProjdReadyMessage {
-  required string project = 1;
-}
-
-
-message ProjdUpdateResourcesMessage {
-  optional Parameters parameters = 1;
-}
-
-
-message FrameworkExpiredMessage {
-  required FrameworkID framework_id = 1;
-}
-
-
 message ShutdownMessage {
   optional string message = 1;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/bef49064/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 9a6646f..c82d99f 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -419,7 +419,13 @@ void Slave::initialize()
       &ShutdownMessage::message);
 
   // Install the ping message handler.
-  install("PING", &Slave::ping);
+  // TODO(vinod): Remove this handler in 0.22.0 in favor of the
+  // new PingSlaveMessage handler.
+  install("PING", &Slave::pingOld);
+
+  install<PingSlaveMessage>(
+      &Slave::ping,
+      &PingSlaveMessage::connected);
 
   // Setup HTTP routes.
   route("/health",
@@ -584,9 +590,7 @@ void Slave::detected(const Future<Option<MasterInfo> >& 
_master)
   Option<MasterInfo> latest;
 
   if (_master.isDiscarded()) {
-    LOG(INFO) << "No pings from master received within "
-              << MASTER_PING_TIMEOUT();
-
+    LOG(INFO) << "Re-detecting master";
     latest = None();
     master = None();
   } else if (_master.get().isNone()) {
@@ -2342,10 +2346,28 @@ void Slave::executorMessage(
 }
 
 
-void Slave::ping(const UPID& from, const string& body)
+void Slave::pingOld(const UPID& from, const string& body)
 {
   VLOG(1) << "Received ping from " << from;
 
+  if (!body.empty()) {
+    // This must be a ping from 0.21.0 master.
+    PingSlaveMessage message;
+    CHECK(message.ParseFromString(body))
+      << "Invalid ping message '" << body << "' from " << from;
+
+    if (!message.connected() && state == RUNNING) {
+      // This could happen if there is a one way partition between
+      // the master and slave, causing the master to get an exited
+      // event and marking the slave disconnected but the slave
+      // thinking it is still connected. Force a re-registration with
+      // the master to reconcile.
+      LOG(INFO) << "Master marked the slave as disconnected but the slave"
+                << " considers itself registered! Forcing re-registration.";
+      detection.discard();
+    }
+  }
+
   // If we don't get a ping from the master, trigger a
   // re-registration. This can occur when the master no
   // longer considers the slave to be registered, so it is
@@ -2363,12 +2385,47 @@ void Slave::ping(const UPID& from, const string& body)
 }
 
 
+void Slave::ping(const UPID& from, bool connected)
+{
+  VLOG(1) << "Received ping from " << from;
+
+  if (!connected && state == RUNNING) {
+    // This could happen if there is a one way partition between
+    // the master and slave, causing the master to get an exited
+    // event and marking the slave disconnected but the slave
+    // thinking it is still connected. Force a re-registration with
+    // the master to reconcile.
+    LOG(INFO) << "Master marked the slave as disconnected but the slave"
+              << " considers itself registered! Forcing re-registration.";
+    detection.discard();
+  }
+
+  // If we don't get a ping from the master, trigger a
+  // re-registration. This can occur when the master no
+  // longer considers the slave to be registered, so it is
+  // essential for the slave to attempt a re-registration
+  // when this occurs.
+  Timer::cancel(pingTimer);
+
+  pingTimer = delay(
+      MASTER_PING_TIMEOUT(),
+      self(),
+      &Slave::pingTimeout,
+      detection);
+
+  send(from, PongSlaveMessage());
+}
+
+
 void Slave::pingTimeout(Future<Option<MasterInfo> > future)
 {
   // It's possible that a new ping arrived since the timeout fired
   // and we were unable to cancel this timeout. If this occurs, don't
   // bother trying to re-detect.
   if (pingTimer.timeout().expired()) {
+    LOG(INFO) << "No pings from master received within "
+              << MASTER_PING_TIMEOUT();
+
     future.discard();
   }
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/bef49064/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 4f3df5c..2869710 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -155,7 +155,13 @@ public:
       const ExecutorID& executorId,
       const std::string& data);
 
-  void ping(const process::UPID& from, const std::string& body);
+  // TODO(vinod): Remove this in 0.23.0.
+  void pingOld(const process::UPID& from, const std::string& body);
+
+  // NOTE: This handler is added to make it easy for upgrading slaves
+  // and masters to 0.22.0. A 0.22.0 master will send PingSlaveMessage
+  // which will call this method.
+  void ping(const process::UPID& from, bool connected);
 
   // Handles the status update.
   // NOTE: If 'pid' is a valid UPID an ACK is sent to this pid

http://git-wip-us.apache.org/repos/asf/mesos/blob/bef49064/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
new file mode 100644
index 0000000..8136a95
--- /dev/null
+++ b/src/tests/partition_tests.cpp
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <string>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/pid.hpp>
+
+#include <stout/try.hpp>
+
+#include "master/master.hpp"
+
+#include "slave/constants.hpp"
+#include "slave/flags.hpp"
+#include "slave/slave.hpp"
+
+#include "tests/flags.hpp"
+#include "tests/mesos.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::tests;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::master::allocator::AllocatorProcess;
+
+using mesos::internal::slave::Slave;
+
+using process::Clock;
+using process::Future;
+using process::Message;
+using process::PID;
+
+using testing::_;
+using testing::Eq;
+
+
+class PartitionTest : public MesosTest {};
+
+
+// This test verifies that if master --> slave socket closes and the
+// slave is not aware of it (i.e., one way network partition), slave
+// will re-register with the master.
+TEST_F(PartitionTest, OneWayPartitionMasterToSlave)
+{
+  // Start a master.
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<Message> slaveRegisteredMessage =
+    FUTURE_MESSAGE(Eq(SlaveRegisteredMessage().GetTypeName()), _, _);
+
+  // Ensure a ping reaches the slave.
+  Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+
+  // Start a checkpointing slave.
+  slave::Flags flags = CreateSlaveFlags();
+  flags.checkpoint = true;
+  Try<PID<Slave> > slave = StartSlave(flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  AWAIT_READY(ping);
+
+  Future<Nothing> slaveDeactivated =
+    FUTURE_DISPATCH(_, &AllocatorProcess::slaveDeactivated);
+
+  // Inject a slave exited event at the master causing the master
+  // to mark the slave as disconnected. The slave should not notice
+  // it until the next ping is received.
+  process::inject::exited(slaveRegisteredMessage.get().to, master.get());
+
+  // Wait until master deactivates the slave.
+  AWAIT_READY(slaveDeactivated);
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Ensure the slave observer marked the slave as deactivated.
+  Clock::pause();
+  Clock::settle();
+
+  // Let the slave observer send the next ping.
+  Clock::advance(slave::MASTER_PING_TIMEOUT());
+
+  // Slave should re-register.
+  AWAIT_READY(slaveReregisteredMessage);
+}

Reply via email to