Added pause() and resume() to status update manager. Review: https://reviews.apache.org/r/26957
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/65c3c363 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/65c3c363 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/65c3c363 Branch: refs/heads/master Commit: 65c3c3639b385d880dbfe10bc4f652655695c8b3 Parents: e64dda4 Author: Vinod Kone <[email protected]> Authored: Fri Oct 17 15:26:52 2014 -0700 Committer: Vinod Kone <[email protected]> Committed: Tue Oct 21 15:47:08 2014 -0700 ---------------------------------------------------------------------- src/local/local.cpp | 2 +- src/slave/main.cpp | 2 +- src/slave/slave.cpp | 15 ++++--- src/slave/status_update_manager.cpp | 75 ++++++++++++++++++++++---------- src/slave/status_update_manager.hpp | 18 +++++--- src/tests/cluster.hpp | 2 +- src/tests/mesos.cpp | 8 +++- src/tests/mesos.hpp | 4 +- 8 files changed, 84 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/local/local.cpp ---------------------------------------------------------------------- diff --git a/src/local/local.cpp b/src/local/local.cpp index 66de798..2756d42 100644 --- a/src/local/local.cpp +++ b/src/local/local.cpp @@ -214,7 +214,7 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator) } garbageCollectors->push_back(new GarbageCollector()); - statusUpdateManagers->push_back(new StatusUpdateManager()); + statusUpdateManagers->push_back(new StatusUpdateManager(flags)); Try<Containerizer*> containerizer = Containerizer::create(flags, true); if (containerizer.isError()) { http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/slave/main.cpp ---------------------------------------------------------------------- diff --git a/src/slave/main.cpp b/src/slave/main.cpp index b27cc32..bf56f69 100644 --- a/src/slave/main.cpp +++ b/src/slave/main.cpp @@ -165,7 +165,7 @@ int main(int argc, char** argv) Files files; GarbageCollector gc; - StatusUpdateManager statusUpdateManager; + StatusUpdateManager statusUpdateManager(flags); Slave* slave = new Slave( flags, http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index afcb669..a98e408 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -318,7 +318,7 @@ void Slave::initialize() LOG(INFO) << "Slave hostname: " << info.hostname(); LOG(INFO) << "Slave checkpoint: " << stringify(flags.checkpoint); - statusUpdateManager->initialize(flags, self()); + statusUpdateManager->initialize(defer(self(), &Slave::forward, lambda::_1)); // Start disk monitoring. // NOTE: We send a delayed message here instead of directly calling @@ -573,6 +573,9 @@ void Slave::detected(const Future<Option<MasterInfo> >& _master) state = DISCONNECTED; } + // Pause the status updates. + statusUpdateManager->pause(); + if (_master.isFailed()) { EXIT(1) << "Failed to detect a master: " << _master.failure(); } @@ -749,6 +752,9 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId) << "; given slave ID " << slaveId; state = RUNNING; + + statusUpdateManager->resume(); // Resume status updates. + info.mutable_id()->CopyFrom(slaveId); // Store the slave id. if (flags.checkpoint) { @@ -813,10 +819,7 @@ void Slave::reregistered( LOG(INFO) << "Re-registered with master " << master.get(); state = RUNNING; - // Inform status update manager to immediately resend any - // pending updates. - statusUpdateManager->flush(); - + statusUpdateManager->resume(); // Resume status updates. break; case RUNNING: CHECK_SOME(master); @@ -1699,7 +1702,7 @@ void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid) // Inform status update manager to immediately resend any pending // updates. - statusUpdateManager->flush(); + statusUpdateManager->resume(); break; } http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/slave/status_update_manager.cpp ---------------------------------------------------------------------- diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp index fb35ace..9bdbf5e 100644 --- a/src/slave/status_update_manager.cpp +++ b/src/slave/status_update_manager.cpp @@ -24,6 +24,7 @@ #include <stout/foreach.hpp> #include <stout/hashmap.hpp> #include <stout/hashset.hpp> +#include <stout/lambda.hpp> #include <stout/option.hpp> #include <stout/protobuf.hpp> #include <stout/utils.hpp> @@ -37,6 +38,8 @@ #include "slave/state.hpp" #include "slave/status_update_manager.hpp" +using lambda::function; + using std::string; using process::wait; // Necessary on some OS's to disambiguate. @@ -61,16 +64,14 @@ class StatusUpdateManagerProcess : public ProtobufProcess<StatusUpdateManagerProcess> { public: - StatusUpdateManagerProcess() {} + StatusUpdateManagerProcess(const Flags& flags); virtual ~StatusUpdateManagerProcess(); // Explicitely use 'initialize' since we're overloading below. using process::ProcessBase::initialize; // StatusUpdateManager implementation. - void initialize( - const Flags& flags, - const PID<Slave>& slave); + void initialize(const function<void(const StatusUpdate&)>& forward); Future<Nothing> update( const StatusUpdate& update, @@ -91,7 +92,8 @@ public: const string& rootDir, const Option<SlaveState>& state); - void flush(); + void pause(); + void resume(); void cleanup(const FrameworkID& frameworkId); @@ -133,12 +135,19 @@ private: const TaskID& taskId, const FrameworkID& frameworkId); - Flags flags; - PID<Slave> slave; + const Flags flags; + bool paused; + + function<void(const StatusUpdate&)> forward_; + hashmap<FrameworkID, hashmap<TaskID, StatusUpdateStream*> > streams; }; +StatusUpdateManagerProcess::StatusUpdateManagerProcess(const Flags& _flags) + : flags(_flags), paused(false) {} + + StatusUpdateManagerProcess::~StatusUpdateManagerProcess() { foreachkey (const FrameworkID& frameworkId, streams) { @@ -151,21 +160,29 @@ StatusUpdateManagerProcess::~StatusUpdateManagerProcess() void StatusUpdateManagerProcess::initialize( - const Flags& _flags, - const PID<Slave>& _slave) + const function<void(const StatusUpdate&)>& forward) +{ + forward_ = forward; +} + + +void StatusUpdateManagerProcess::pause() { - flags = _flags; - slave = _slave; + LOG(INFO) << "Pausing sending status updates"; + paused = true; } -void StatusUpdateManagerProcess::flush() +void StatusUpdateManagerProcess::resume() { + LOG(INFO) << "Resuming sending status updates"; + paused = false; + foreachkey (const FrameworkID& frameworkId, streams) { foreachvalue (StatusUpdateStream* stream, streams[frameworkId]) { if (!stream->pending.empty()) { const StatusUpdate& update = stream->pending.front(); - LOG(WARNING) << "Flushing status update " << update; + LOG(WARNING) << "Resending status update " << update; stream->timeout = forward(update, STATUS_UPDATE_RETRY_INTERVAL_MIN); } } @@ -330,7 +347,7 @@ Future<Nothing> StatusUpdateManagerProcess::_update( // Forward the status update to the master if this is the first in the stream. // Subsequent status updates will get sent in 'acknowledgement()'. - if (stream->pending.size() == 1) { + if (!paused && stream->pending.size() == 1) { CHECK(stream->timeout.isNone()); const Result<StatusUpdate>& next = stream->next(); if (next.isError()) { @@ -349,10 +366,12 @@ Timeout StatusUpdateManagerProcess::forward( const StatusUpdate& update, const Duration& duration) { + CHECK(!paused); + VLOG(1) << "Forwarding update " << update << " to the slave"; - // Forward the update to the slave. - dispatch(slave, &Slave::forward, update); + // Forward the update. + forward_(update); // Send a message to self to resend after some delay if no ACK is received. return delay(duration, @@ -426,7 +445,7 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement( << " but updates are still pending"; } cleanupStatusUpdateStream(taskId, frameworkId); - } else if (next.isSome()) { + } else if (!paused && next.isSome()) { // Forward the next queued status update. stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN); } @@ -438,6 +457,10 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement( // TODO(vinod): There should be a limit on the retries. void StatusUpdateManagerProcess::timeout(const Duration& duration) { + if (paused) { + return; + } + // Check and see if we should resend any status updates. foreachkey (const FrameworkID& frameworkId, streams) { foreachvalue (StatusUpdateStream* stream, streams[frameworkId]) { @@ -520,9 +543,9 @@ void StatusUpdateManagerProcess::cleanupStatusUpdateStream( } -StatusUpdateManager::StatusUpdateManager() +StatusUpdateManager::StatusUpdateManager(const Flags& flags) { - process = new StatusUpdateManagerProcess(); + process = new StatusUpdateManagerProcess(flags); spawn(process); } @@ -536,10 +559,9 @@ StatusUpdateManager::~StatusUpdateManager() void StatusUpdateManager::initialize( - const Flags& flags, - const PID<Slave>& slave) + const function<void(const StatusUpdate&)>& forward) { - dispatch(process, &StatusUpdateManagerProcess::initialize, flags, slave); + dispatch(process, &StatusUpdateManagerProcess::initialize, forward); } @@ -594,10 +616,15 @@ Future<Nothing> StatusUpdateManager::recover( } +void StatusUpdateManager::pause() +{ + dispatch(process, &StatusUpdateManagerProcess::pause); +} + -void StatusUpdateManager::flush() +void StatusUpdateManager::resume() { - dispatch(process, &StatusUpdateManagerProcess::flush); + dispatch(process, &StatusUpdateManagerProcess::resume); } http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/slave/status_update_manager.hpp ---------------------------------------------------------------------- diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp index 1c1a8a8..2852884 100644 --- a/src/slave/status_update_manager.hpp +++ b/src/slave/status_update_manager.hpp @@ -31,6 +31,7 @@ #include <stout/hashmap.hpp> #include <stout/hashset.hpp> +#include <stout/lambda.hpp> #include <stout/none.hpp> #include <stout/nothing.hpp> #include <stout/option.hpp> @@ -72,12 +73,12 @@ struct StatusUpdateStream; class StatusUpdateManager { public: - StatusUpdateManager(); + StatusUpdateManager(const Flags& flags); virtual ~StatusUpdateManager(); - void initialize( - const Flags& flags, - const process::PID<Slave>& slave); + // Expects a callback 'forward' which gets called whenever there is + // a new status update that needs to be forwarded to the master. + void initialize(const lambda::function<void(const StatusUpdate&)>& forward); // TODO(vinod): Come up with better names/signatures for the // checkpointing and non-checkpointing 'update()' functions. @@ -118,10 +119,15 @@ public: const Option<state::SlaveState>& state); - // Resend all the pending updates right away. + // Pause sending updates. + // This is useful when the slave is disconnected because a + // disconnected slave will drop the updates. + void pause(); + + // Unpause and resend all the pending updates right away. // This is useful when the updates were pending because there was // no master elected (e.g., during recovery) or framework failed over. - void flush(); + void resume(); // Closes all the status update streams corresponding to this framework. // NOTE: This stops retrying any pending status updates for this framework. http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/tests/cluster.hpp ---------------------------------------------------------------------- diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp index ee194ad..fa5eeef 100644 --- a/src/tests/cluster.hpp +++ b/src/tests/cluster.hpp @@ -488,7 +488,7 @@ inline Try<process::PID<slave::Slave> > Cluster::Slaves::start( // Create a status update manager if one wasn't provided. if (statusUpdateManager.isNone()) { - slave.statusUpdateManager.reset(new slave::StatusUpdateManager()); + slave.statusUpdateManager.reset(new slave::StatusUpdateManager(flags)); } slave.flags = flags; http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/tests/mesos.cpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp index 147e23f..bff10d2 100644 --- a/src/tests/mesos.cpp +++ b/src/tests/mesos.cpp @@ -349,7 +349,7 @@ MockSlave::MockSlave(const slave::Flags& flags, containerizer, &files, &gc, - &statusUpdateManager) + statusUpdateManager = new slave::StatusUpdateManager(flags)) { // Set up default behaviors, calling the original methods. EXPECT_CALL(*this, runTask(_, _, _, _, _)). @@ -363,6 +363,12 @@ MockSlave::MockSlave(const slave::Flags& flags, } +MockSlave::~MockSlave() +{ + delete statusUpdateManager; +} + + void MockSlave::unmocked_runTask( const process::UPID& from, const FrameworkInfo& frameworkInfo, http://git-wip-us.apache.org/repos/asf/mesos/blob/65c3c363/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index e40575c..e36e138 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -548,7 +548,7 @@ public: MasterDetector* detector, slave::Containerizer* containerizer); - virtual ~MockSlave() {} + virtual ~MockSlave(); MOCK_METHOD5(runTask, void( const process::UPID& from, @@ -597,7 +597,7 @@ public: private: Files files; MockGarbageCollector gc; - slave::StatusUpdateManager statusUpdateManager; + slave::StatusUpdateManager* statusUpdateManager; };
