Updated status update manager to forward updates via slave. Review: https://reviews.apache.org/r/26846
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e64dda41 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e64dda41 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e64dda41 Branch: refs/heads/master Commit: e64dda411bc83963179c92ae71caefa8d21b54b4 Parents: 616d401 Author: Vinod Kone <[email protected]> Authored: Wed Oct 15 18:22:46 2014 -0700 Committer: Vinod Kone <[email protected]> Committed: Tue Oct 21 15:47:08 2014 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 6 +- src/slave/slave.cpp | 122 ++++++++++++++++++------------- src/slave/slave.hpp | 4 + src/slave/status_update_manager.cpp | 47 ++---------- src/slave/status_update_manager.hpp | 2 - src/tests/slave_recovery_tests.cpp | 19 +++-- 6 files changed, 94 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index be910d9..f04c085 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -3294,14 +3294,12 @@ void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId) } -// NOTE: We cannot use 'from' here to identify the slave as this is -// now sent by the StatusUpdateManagerProcess and master itself when -// it generates TASK_LOST messages. Only 'pid' can be used to identify -// the slave. // TODO(bmahler): The master will not release resources until the // slave receives acknowlegements for all non-terminal updates. This // means if a framework is down, the resources will remain allocated // even though the tasks are terminal on the slaves! +// TODO(vinod): Since 0.22.0, we can use 'from' instead of 'pid' because +// the status updates will be sent by the slave. void Master::statusUpdate(const StatusUpdate& update, const UPID& pid) { ++metrics.messages_status_update; http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 7b5474a..afcb669 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -497,7 +497,7 @@ void Slave::finalize() } } - if (state == TERMINATING || flags.recover == "cleanup") { + if (state == TERMINATING) { // We remove the "latest" symlink in meta directory, so that the // slave doesn't recover the state when it restarts and registers // as a new slave with the master. @@ -533,14 +533,8 @@ void Slave::shutdown(const UPID& from, const string& message) if (frameworks.empty()) { // Terminate slave if there are no frameworks. terminate(self()); } else { - // NOTE: The slave will terminate after all - // executors have terminated. - // TODO(vinod): Wait until all updates have been acknowledged. - // This is tricky without persistent state at master because the - // slave might wait forever for status update acknowledgements, - // since it cannot reliably know when a framework has shut down. - // A short-term fix could be to wait for a certain time for ACKs - // and then shutdown. + // NOTE: The slave will terminate after all the executors have + // terminated. // NOTE: We use 'frameworks.keys()' here because 'shutdownFramework' // can potentially remove a framework from 'frameworks'. foreach (const FrameworkID& frameworkId, frameworks.keys()) { @@ -600,22 +594,11 @@ void Slave::detected(const Future<Option<MasterInfo> >& _master) LOG(INFO) << "New master detected at " << master.get(); link(master.get()); - // Inform the status updates manager about the new master. - statusUpdateManager->newMasterDetected(master.get()); - if (state == TERMINATING) { LOG(INFO) << "Skipping registration because slave is terminating"; return; } - // The slave does not (re-)register if it is in the cleanup mode - // because we do not want to accept new tasks. - if (flags.recover == "cleanup") { - LOG(INFO) - << "Skipping registration because slave was started in cleanup mode"; - return; - } - // Wait for a random amount of time before authentication or // registration. Duration duration = @@ -829,6 +812,11 @@ void Slave::reregistered( CHECK_SOME(master); LOG(INFO) << "Re-registered with master " << master.get(); state = RUNNING; + + // Inform status update manager to immediately resend any + // pending updates. + statusUpdateManager->flush(); + break; case RUNNING: CHECK_SOME(master); @@ -931,6 +919,8 @@ void Slave::doReliableRegistration(const Duration& duration) CHECK(state == DISCONNECTED) << state; + CHECK_NE("cleanup", flags.recover); + if (!info.has_id()) { // Registering for the first time. RegisterSlaveMessage message; @@ -2373,6 +2363,33 @@ void Slave::__statusUpdate( } +// NOTE: An acknowledgement for this update might have already been +// processed by the slave but not the status update manager. +void Slave::forward(const StatusUpdate& update) +{ + CHECK(state == RECOVERING || state == DISCONNECTED || + state == RUNNING || state == TERMINATING) + << state; + + if (state != RUNNING) { + LOG(WARNING) << "Dropping status update " << update + << " sent by status update manager because the slave" + << " is in " << state << " state"; + return; + } + + CHECK_SOME(master); + LOG(INFO) << "Forwarding the update " << update << " to " << master.get(); + + // Forward the update to master. + StatusUpdateMessage message; + message.mutable_update()->MergeFrom(update); + message.set_pid(self()); // The ACK will be first received by the slave. + + send(master.get(), message); +} + + void Slave::executorMessage( const SlaveID& slaveId, const FrameworkID& frameworkId, @@ -2866,9 +2883,10 @@ void Slave::executorTerminated( if (master.isSome()) { send(master.get(), message); } } - // Remove the executor if either the framework is terminating or - // there are no incomplete tasks. - if (framework->state == Framework::TERMINATING || + // Remove the executor if either the slave or framework is + // terminating or there are no incomplete tasks. + if (state == TERMINATING || + framework->state == Framework::TERMINATING || !executor->incompleteTasks()) { removeExecutor(framework, executor); } @@ -2900,13 +2918,15 @@ void Slave::removeExecutor(Framework* framework, Executor* executor) framework->state == Framework::TERMINATING) << framework->state; - // Check that this executor has terminated and either has no - // pending updates or the framework is terminating. We don't - // care for pending updates when a framework is terminating - // because the framework cannot ACK them. + // Check that this executor has terminated. CHECK(executor->state == Executor::TERMINATED) << executor->state; - CHECK(framework->state == Framework::TERMINATING || - !executor->incompleteTasks()); + + // Check that either 1) the executor has no tasks with pending + // updates or 2) the slave/framework is terminating, because no + // acknowledgements might be received. + CHECK(!executor->incompleteTasks() || + state == TERMINATING || + framework->state == Framework::TERMINATING); // Write a sentinel file to indicate that this executor // is completed. @@ -3006,17 +3026,8 @@ void Slave::removeFramework(Framework* framework) // Pass ownership of the framework pointer. completedFrameworks.push_back(Owned<Framework>(framework)); - if (frameworks.empty()) { - // Terminate the slave if - // 1) it's being shut down or - // 2) it's started in cleanup mode and recovery finished. - // TODO(vinod): Instead of doing it this way, shutdownFramework() - // and shutdownExecutor() could return Futures and a slave could - // shutdown when all the Futures are satisfied (e.g., collect()). - if (state == TERMINATING || - (flags.recover == "cleanup" && !recovered.future().isPending())) { - terminate(self()); - } + if (state == TERMINATING && frameworks.empty()) { + terminate(self()); } } @@ -3362,7 +3373,6 @@ void Slave::__recover(const Future<Nothing>& future) LOG(INFO) << "Finished recovery"; CHECK_EQ(RECOVERING, state); - state = DISCONNECTED; // Checkpoint boot ID. Try<string> bootId = os::bootId(); @@ -3413,18 +3423,30 @@ void Slave::__recover(const Future<Nothing>& future) } } - recovered.set(Nothing()); // Signal recovery. + if (flags.recover == "reconnect") { + state = DISCONNECTED; - // Terminate slave, if it has no active frameworks and is started - // in 'cleanup' mode. - if (frameworks.empty() && flags.recover == "cleanup") { - terminate(self()); - return; + // Start detecting masters. + detection = detector->detect() + .onAny(defer(self(), &Slave::detected, lambda::_1)); + } else { + // Slave started in cleanup mode. + CHECK_EQ("cleanup", flags.recover); + state = TERMINATING; + + if (frameworks.empty()) { + terminate(self()); + } + + // If there are active executors/frameworks, the slave will + // shutdown when all the executors are terminated. Note that + // the executors are guaranteed to terminate because they + // are sent shutdown signal in '_recover()' which results in + // 'Containerizer::destroy()' being called if the termination + // doesn't happen within a timeout. } - // Start detecting masters. - detection = detector->detect() - .onAny(defer(self(), &Slave::detected, lambda::_1)); + recovered.set(Nothing()); // Signal recovery. } http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index ccc0e03..439052e 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -197,6 +197,10 @@ public: const StatusUpdate& update, const process::UPID& pid); + // This is called by status update manager to forward a + // status update to the master. + void forward(const StatusUpdate& update); + void statusUpdateAcknowledgement( const process::UPID& from, const SlaveID& slaveId, http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/slave/status_update_manager.cpp ---------------------------------------------------------------------- diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp index 5d5cf23..fb35ace 100644 --- a/src/slave/status_update_manager.cpp +++ b/src/slave/status_update_manager.cpp @@ -91,8 +91,6 @@ public: const string& rootDir, const Option<SlaveState>& state); - void newMasterDetected(const UPID& pid); - void flush(); void cleanup(const FrameworkID& frameworkId); @@ -135,7 +133,6 @@ private: const TaskID& taskId, const FrameworkID& frameworkId); - UPID master; Flags flags; PID<Slave> slave; hashmap<FrameworkID, hashmap<TaskID, StatusUpdateStream*> > streams; @@ -162,23 +159,13 @@ void StatusUpdateManagerProcess::initialize( } -void StatusUpdateManagerProcess::newMasterDetected(const UPID& pid) -{ - LOG(INFO) << "New master detected at " << pid; - master = pid; - - // Retry any pending status updates. - flush(); -} - - void StatusUpdateManagerProcess::flush() { foreachkey (const FrameworkID& frameworkId, streams) { foreachvalue (StatusUpdateStream* stream, streams[frameworkId]) { if (!stream->pending.empty()) { const StatusUpdate& update = stream->pending.front(); - LOG(WARNING) << "Resending status update " << update; + LOG(WARNING) << "Flushing status update " << update; stream->timeout = forward(update, STATUS_UPDATE_RETRY_INTERVAL_MIN); } } @@ -256,18 +243,11 @@ Future<Nothing> StatusUpdateManagerProcess::recover( } // At the end of the replay, the stream is either terminated or - // contains only unacknowledged, if any, pending updates. + // contains only unacknowledged, if any, pending updates. The + // pending updates will be flushed after the slave + // re-registers with the master. if (stream->terminated) { cleanupStatusUpdateStream(task.id, framework.id); - } else { - // If a stream has pending updates after the replay, - // send the first pending update. - const Result<StatusUpdate>& next = stream->next(); - CHECK(!next.isError()); - if (next.isSome()) { - stream->timeout = - forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN); - } } } } @@ -369,18 +349,10 @@ Timeout StatusUpdateManagerProcess::forward( const StatusUpdate& update, const Duration& duration) { - if (master) { - LOG(INFO) << "Forwarding status update " << update << " to " << master; + VLOG(1) << "Forwarding update " << update << " to the slave"; - StatusUpdateMessage message; - message.mutable_update()->MergeFrom(update); - message.set_pid(slave); // The ACK will be first received by the slave. - - send(master, message); - } else { - LOG(WARNING) << "Not forwarding status update " << update - << " because no master is elected yet"; - } + // Forward the update to the slave. + dispatch(slave, &Slave::forward, update); // Send a message to self to resend after some delay if no ACK is received. return delay(duration, @@ -622,11 +594,6 @@ Future<Nothing> StatusUpdateManager::recover( } -void StatusUpdateManager::newMasterDetected(const UPID& pid) -{ - dispatch(process, &StatusUpdateManagerProcess::newMasterDetected, pid); -} - void StatusUpdateManager::flush() { http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/slave/status_update_manager.hpp ---------------------------------------------------------------------- diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp index c371e55..1c1a8a8 100644 --- a/src/slave/status_update_manager.hpp +++ b/src/slave/status_update_manager.hpp @@ -117,8 +117,6 @@ public: const std::string& rootDir, const Option<state::SlaveState>& state); - // TODO(vinod): Remove this hack once the new leader detector code is merged. - void newMasterDetected(const process::UPID& pid); // Resend all the pending updates right away. // This is useful when the updates were pending because there was http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index 4fb357b..813e2d6 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -912,7 +912,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor) // The slave is stopped after a non-terminal update is received. // Slave is restarted in recovery=cleanup mode. It kills the command -// executor, and transitions the task to FAILED. +// executor, and terminates. Master should then send TASK_LOST. TYPED_TEST(SlaveRecoveryTest, CleanupExecutor) { Try<PID<Master> > master = this->StartMaster(); @@ -965,8 +965,8 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor) this->Stop(slave.get()); delete containerizer1.get(); - // Slave in cleanup mode shouldn't reregister with slave and hence - // no offers should be made by the master. + // Slave in cleanup mode shouldn't re-register with the master and + // hence no offers should be made by the master. EXPECT_CALL(sched, resourceOffers(_, _)) .Times(0); @@ -976,12 +976,14 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor) Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); + EXPECT_CALL(sched, slaveLost(_, _)) + .Times(AtMost(1)); + // Restart the slave in 'cleanup' recovery mode with a new isolator. Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); ASSERT_SOME(containerizer2); flags.recover = "cleanup"; - slave = this->StartSlave(containerizer2.get(), flags); ASSERT_SOME(slave); @@ -993,15 +995,12 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor) Clock::settle(); } - // Scheduler should receive the TASK_FAILED update. - AWAIT_READY(status); - ASSERT_EQ(TASK_FAILED, status.get().state()); - // Wait for recovery to complete. AWAIT_READY(__recover); - Clock::settle(); - Clock::resume(); + // Scheduler should receive the TASK_LOST update. + AWAIT_READY(status); + ASSERT_EQ(TASK_LOST, status.get().state()); driver.stop(); driver.join();
