Updated ping message to embed the slave connected status. Review: https://reviews.apache.org/r/25867
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bef49064 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bef49064 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bef49064 Branch: refs/heads/master Commit: bef49064a830baea87891bfa61a68729a7c06029 Parents: 0b66d1d Author: Vinod Kone <[email protected]> Authored: Tue Sep 16 18:09:59 2014 -0700 Committer: Vinod Kone <[email protected]> Committed: Thu Sep 25 13:54:46 2014 -0700 ---------------------------------------------------------------------- src/Makefile.am | 1 + src/master/master.cpp | 45 ++++++++++++--- src/messages/messages.proto | 31 +++-------- src/slave/slave.cpp | 67 +++++++++++++++++++++-- src/slave/slave.hpp | 8 ++- src/tests/partition_tests.cpp | 109 +++++++++++++++++++++++++++++++++++++ 6 files changed, 226 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/bef49064/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index b821a3b..27c42df 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1107,6 +1107,7 @@ mesos_tests_SOURCES = \ tests/master_tests.cpp \ tests/mesos.cpp \ tests/monitor_tests.cpp \ + tests/partition_tests.cpp \ tests/paths_tests.cpp \ tests/protobuf_io_tests.cpp \ tests/rate_limiting_tests.cpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/bef49064/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 92d93fe..a60308f 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -173,15 +173,42 @@ public: slaveId(_slaveId), master(_master), timeouts(0), - pinged(false) + pinged(false), + connected(true) { + // TODO(vinod): Deprecate this handler in 0.22.0 in favor of a + // new PongSlaveMessage handler. install("PONG", &SlaveObserver::pong); } + void reconnect() + { + connected = true; + } + + void disconnect() + { + connected = false; + } + protected: virtual void initialize() { - send(slave, "PING"); + ping(); + } + + void ping() + { + // TODO(vinod): In 0.22.0, master should send the PingSlaveMessage + // instead of sending "PING" with the encoded PingSlaveMessage. + // Currently we do not do this for backwards compatibility with + // slaves on 0.20.0. + PingSlaveMessage message; + message.set_connected(connected); + string data; + CHECK(message.SerializeToString(&data)); + send(slave, "PING", data.data(), data.size()); + pinged = true; delay(SLAVE_PING_TIMEOUT, self(), &SlaveObserver::timeout); } @@ -201,9 +228,7 @@ protected: } } - send(slave, "PING"); - pinged = true; - delay(SLAVE_PING_TIMEOUT, self(), &SlaveObserver::timeout); + ping(); } void shutdown() @@ -218,6 +243,7 @@ private: const PID<Master> master; uint32_t timeouts; bool pinged; + bool connected; }; @@ -1717,6 +1743,9 @@ void Master::disconnect(Slave* slave) slave->connected = false; + // Inform the slave observer. + dispatch(slave->observer, &SlaveObserver::disconnect); + // Remove the slave from authenticated. This is safe because // a slave will always reauthenticate before (re-)registering. authenticated.erase(slave->pid); @@ -3050,6 +3079,7 @@ void Master::reregisterSlave( // slave. if (!slave->connected) { slave->connected = true; + dispatch(slave->observer, &SlaveObserver::reconnect); slave->active = true; allocator->slaveActivated(slave->id); } @@ -3683,8 +3713,9 @@ void Master::authenticate(const UPID& from, const UPID& pid) // after restart; true for slave but not for framework. // If the PID doesn't change the master might mark the client // disconnected *after* the client re-registers. - // TODO(vinod): To ensure safety the client (slave) should be - // informed about this discrepancy so that it can re-register. + // This is safe because the client (slave) will be informed + // about this discrepancy via ping messages so that it can + // re-register. authenticated.erase(pid); http://git-wip-us.apache.org/repos/asf/mesos/blob/bef49064/src/messages/messages.proto ---------------------------------------------------------------------- diff --git a/src/messages/messages.proto b/src/messages/messages.proto index 7cb3ce6..9ff06b3 100644 --- a/src/messages/messages.proto +++ b/src/messages/messages.proto @@ -256,11 +256,18 @@ message UnregisterSlaveMessage { } -message HeartbeatMessage { - required SlaveID slave_id = 1; +// This message is periodically sent by the master to the slave. +// If the slave is connected to the master, "connected" is true. +message PingSlaveMessage { + required bool connected = 1; } +// This message is sent by the slave to the master in response to the +// PingSlaveMessage. +message PongSlaveMessage {} + + // Tells a slave to shut down all executors of the given framework. message ShutdownFrameworkMessage { required FrameworkID framework_id = 1; @@ -320,26 +327,6 @@ message ReregisterExecutorMessage { } -message RegisterProjdMessage { - required string project = 1; -} - - -message ProjdReadyMessage { - required string project = 1; -} - - -message ProjdUpdateResourcesMessage { - optional Parameters parameters = 1; -} - - -message FrameworkExpiredMessage { - required FrameworkID framework_id = 1; -} - - message ShutdownMessage { optional string message = 1; } http://git-wip-us.apache.org/repos/asf/mesos/blob/bef49064/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 9a6646f..c82d99f 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -419,7 +419,13 @@ void Slave::initialize() &ShutdownMessage::message); // Install the ping message handler. - install("PING", &Slave::ping); + // TODO(vinod): Remove this handler in 0.22.0 in favor of the + // new PingSlaveMessage handler. + install("PING", &Slave::pingOld); + + install<PingSlaveMessage>( + &Slave::ping, + &PingSlaveMessage::connected); // Setup HTTP routes. route("/health", @@ -584,9 +590,7 @@ void Slave::detected(const Future<Option<MasterInfo> >& _master) Option<MasterInfo> latest; if (_master.isDiscarded()) { - LOG(INFO) << "No pings from master received within " - << MASTER_PING_TIMEOUT(); - + LOG(INFO) << "Re-detecting master"; latest = None(); master = None(); } else if (_master.get().isNone()) { @@ -2342,10 +2346,28 @@ void Slave::executorMessage( } -void Slave::ping(const UPID& from, const string& body) +void Slave::pingOld(const UPID& from, const string& body) { VLOG(1) << "Received ping from " << from; + if (!body.empty()) { + // This must be a ping from 0.21.0 master. + PingSlaveMessage message; + CHECK(message.ParseFromString(body)) + << "Invalid ping message '" << body << "' from " << from; + + if (!message.connected() && state == RUNNING) { + // This could happen if there is a one way partition between + // the master and slave, causing the master to get an exited + // event and marking the slave disconnected but the slave + // thinking it is still connected. Force a re-registration with + // the master to reconcile. + LOG(INFO) << "Master marked the slave as disconnected but the slave" + << " considers itself registered! Forcing re-registration."; + detection.discard(); + } + } + // If we don't get a ping from the master, trigger a // re-registration. This can occur when the master no // longer considers the slave to be registered, so it is @@ -2363,12 +2385,47 @@ void Slave::ping(const UPID& from, const string& body) } +void Slave::ping(const UPID& from, bool connected) +{ + VLOG(1) << "Received ping from " << from; + + if (!connected && state == RUNNING) { + // This could happen if there is a one way partition between + // the master and slave, causing the master to get an exited + // event and marking the slave disconnected but the slave + // thinking it is still connected. Force a re-registration with + // the master to reconcile. + LOG(INFO) << "Master marked the slave as disconnected but the slave" + << " considers itself registered! Forcing re-registration."; + detection.discard(); + } + + // If we don't get a ping from the master, trigger a + // re-registration. This can occur when the master no + // longer considers the slave to be registered, so it is + // essential for the slave to attempt a re-registration + // when this occurs. + Timer::cancel(pingTimer); + + pingTimer = delay( + MASTER_PING_TIMEOUT(), + self(), + &Slave::pingTimeout, + detection); + + send(from, PongSlaveMessage()); +} + + void Slave::pingTimeout(Future<Option<MasterInfo> > future) { // It's possible that a new ping arrived since the timeout fired // and we were unable to cancel this timeout. If this occurs, don't // bother trying to re-detect. if (pingTimer.timeout().expired()) { + LOG(INFO) << "No pings from master received within " + << MASTER_PING_TIMEOUT(); + future.discard(); } } http://git-wip-us.apache.org/repos/asf/mesos/blob/bef49064/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 4f3df5c..2869710 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -155,7 +155,13 @@ public: const ExecutorID& executorId, const std::string& data); - void ping(const process::UPID& from, const std::string& body); + // TODO(vinod): Remove this in 0.23.0. + void pingOld(const process::UPID& from, const std::string& body); + + // NOTE: This handler is added to make it easy for upgrading slaves + // and masters to 0.22.0. A 0.22.0 master will send PingSlaveMessage + // which will call this method. + void ping(const process::UPID& from, bool connected); // Handles the status update. // NOTE: If 'pid' is a valid UPID an ACK is sent to this pid http://git-wip-us.apache.org/repos/asf/mesos/blob/bef49064/src/tests/partition_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp new file mode 100644 index 0000000..8136a95 --- /dev/null +++ b/src/tests/partition_tests.cpp @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <gmock/gmock.h> + +#include <string> + +#include <process/clock.hpp> +#include <process/future.hpp> +#include <process/gmock.hpp> +#include <process/pid.hpp> + +#include <stout/try.hpp> + +#include "master/master.hpp" + +#include "slave/constants.hpp" +#include "slave/flags.hpp" +#include "slave/slave.hpp" + +#include "tests/flags.hpp" +#include "tests/mesos.hpp" + +using namespace mesos; +using namespace mesos::internal; +using namespace mesos::internal::tests; + +using mesos::internal::master::Master; + +using mesos::internal::master::allocator::AllocatorProcess; + +using mesos::internal::slave::Slave; + +using process::Clock; +using process::Future; +using process::Message; +using process::PID; + +using testing::_; +using testing::Eq; + + +class PartitionTest : public MesosTest {}; + + +// This test verifies that if master --> slave socket closes and the +// slave is not aware of it (i.e., one way network partition), slave +// will re-register with the master. +TEST_F(PartitionTest, OneWayPartitionMasterToSlave) +{ + // Start a master. + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + Future<Message> slaveRegisteredMessage = + FUTURE_MESSAGE(Eq(SlaveRegisteredMessage().GetTypeName()), _, _); + + // Ensure a ping reaches the slave. + Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _); + + // Start a checkpointing slave. + slave::Flags flags = CreateSlaveFlags(); + flags.checkpoint = true; + Try<PID<Slave> > slave = StartSlave(flags); + ASSERT_SOME(slave); + + AWAIT_READY(slaveRegisteredMessage); + + AWAIT_READY(ping); + + Future<Nothing> slaveDeactivated = + FUTURE_DISPATCH(_, &AllocatorProcess::slaveDeactivated); + + // Inject a slave exited event at the master causing the master + // to mark the slave as disconnected. The slave should not notice + // it until the next ping is received. + process::inject::exited(slaveRegisteredMessage.get().to, master.get()); + + // Wait until master deactivates the slave. + AWAIT_READY(slaveDeactivated); + + Future<SlaveReregisteredMessage> slaveReregisteredMessage = + FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + + // Ensure the slave observer marked the slave as deactivated. + Clock::pause(); + Clock::settle(); + + // Let the slave observer send the next ping. + Clock::advance(slave::MASTER_PING_TIMEOUT()); + + // Slave should re-register. + AWAIT_READY(slaveReregisteredMessage); +}
