Repository: mesos Updated Branches: refs/heads/master b810250fa -> c96ba8f60
Introduced Master <-> Slave reconciliation. Review: https://reviews.apache.org/r/26206 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cc4444eb Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cc4444eb Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cc4444eb Branch: refs/heads/master Commit: cc4444eb47705649a4e93a4045b4d130dd4b3354 Parents: b810250 Author: Benjamin Mahler <[email protected]> Authored: Tue Sep 30 11:31:53 2014 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Wed Oct 8 18:09:04 2014 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 95 +++++++++++++++++++------------- src/master/master.hpp | 5 +- src/messages/messages.proto | 2 + src/slave/slave.cpp | 79 +++++++++++++++++++++----- src/slave/slave.hpp | 7 ++- src/tests/fault_tolerance_tests.cpp | 12 +++- 6 files changed, 142 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index cc48b96..cb46cec 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -3092,21 +3092,7 @@ void Master::reregisterSlave( // For now, we assume this slave is not nefarious (eventually // this will be handled by orthogonal security measures like key // based authentication). - LOG(WARNING) << "Slave at " << from << " (" << slave->info.hostname() - << ") is being allowed to re-register with an already" - << " in use id (" << slave->id << ")"; - - // TODO(bmahler): There's an implicit assumption here that when - // the master already knows about this slave, the slave cannot - // have tasks unknown to the master. This _should_ be the case - // since the causal relationship is: - // slave removes task -> master removes task - // We should enforce this via a CHECK (dangerous), or by shutting - // down slaves that are found to violate this assumption. - - SlaveReregisteredMessage message; - message.mutable_slave_id()->MergeFrom(slave->id); - send(from, message); + LOG(INFO) << "Re-registering slave " << *slave; // Update the slave pid and relink to it. // NOTE: Re-linking the slave here always rather than only when @@ -3119,8 +3105,8 @@ void Master::reregisterSlave( link(slave->pid); // Reconcile tasks between master and the slave. - // NOTE: This needs to be done after the registration message is - // sent to the slave and the new pid is linked. + // NOTE: This sends the re-registered message, including tasks + // that need to be reconciled by the slave. reconcile(slave, executorInfos, tasks); // If this is a disconnected slave, add it back to the allocator. @@ -3871,44 +3857,79 @@ void Master::reconcile( { CHECK_NOTNULL(slave); + // TODO(bmahler): There's an implicit assumption here the slave + // cannot have tasks unknown to the master. This _should_ be the + // case since the causal relationship is: + // slave removes task -> master removes task + // Add error logging for any violations of this assumption! + // We convert the 'tasks' into a map for easier lookup below. - // TODO(vinod): Check if the tasks are known to the master. multihashmap<FrameworkID, TaskID> slaveTasks; foreach (const Task& task, tasks) { slaveTasks.put(task.framework_id(), task.task_id()); } - // Send TASK_LOST updates for tasks present in the master but - // missing from the slave. This could happen if the task was - // dropped by the slave (e.g., slave exited before getting the - // task or the task was launched while slave was in recovery). - // NOTE: copies are needed because removeTask modifies slave->tasks. + // Look for tasks missing in the slave's re-registration message. + // This can occur when: + // (1) a launch message was dropped (e.g. slave failed over), or + // (2) the slave re-registration raced with a launch message, in + // which case the slave actually received the task. + // To resolve both cases correctly, we must reconcile through the + // slave. For slaves that do not support reconciliation, we keep + // the old semantics and cover only case (1) via TASK_LOST. + SlaveReregisteredMessage reregistered; + reregistered.mutable_slave_id()->MergeFrom(slave->id); + + // NOTE: copies are needed because removeTask modified slave->tasks. foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) { + ReconcileTasksMessage reconcile; + reconcile.mutable_framework_id()->CopyFrom(frameworkId); + foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) { if (!slaveTasks.contains(task->framework_id(), task->task_id())) { LOG(WARNING) << "Task " << task->task_id() << " of framework " << task->framework_id() << " unknown to the slave " << *slave - << " during re-registration"; - - const StatusUpdate& update = protobuf::createStatusUpdate( - task->framework_id(), - slave->id, - task->task_id(), - TASK_LOST, - "Task is unknown to the slave"); + << " during re-registration" + << (slave->version.isSome() + ? ": reconciling with the slave" + : ": sending TASK_LOST"); + + if (slave->version.isSome()) { + TaskStatus* status = reconcile.add_statuses(); + status->mutable_task_id()->CopyFrom(task->task_id()); + status->mutable_slave_id()->CopyFrom(slave->id); + status->set_state(task->state()); + status->set_message("Reconciliation request"); + status->set_timestamp(Clock::now().secs()); + } else { + // TODO(bmahler): Remove this case in 0.22.0. + const StatusUpdate& update = protobuf::createStatusUpdate( + task->framework_id(), + slave->id, + task->task_id(), + TASK_LOST, + "Task is unknown to the slave"); + + updateTask(task, update.status()); + removeTask(task); - updateTask(task, update.status()); - removeTask(task); - - Framework* framework = getFramework(frameworkId); - if (framework != NULL) { - forward(update, UPID(), framework); + Framework* framework = getFramework(frameworkId); + if (framework != NULL) { + forward(update, UPID(), framework); + } } } } + + if (slave->version.isSome() && reconcile.statuses_size() > 0) { + reregistered.add_reconciliations()->CopyFrom(reconcile); + } } + // Re-register the slave. + send(slave->pid, reregistered); + // Likewise, any executors that are present in the master but // not present in the slave must be removed to correctly account // for resources. First we index the executors for fast lookup below. http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 49589f4..14f1d0f 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -292,9 +292,8 @@ protected: // Invoked when the contender has entered the contest. void contended(const process::Future<process::Future<Nothing> >& candidacy); - // Reconciles a re-registering slave's tasks / executors and sends - // TASK_LOST updates for tasks known to the master but unknown to - // the slave. + // Handles a known re-registering slave by reconciling the master's + // view of the slave's tasks and executors. void reconcile( Slave* slave, const std::vector<ExecutorInfo>& executors, http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/messages/messages.proto ---------------------------------------------------------------------- diff --git a/src/messages/messages.proto b/src/messages/messages.proto index edf1e4e..77515d9 100644 --- a/src/messages/messages.proto +++ b/src/messages/messages.proto @@ -259,6 +259,8 @@ message SlaveRegisteredMessage { message SlaveReregisteredMessage { required SlaveID slave_id = 1; + + repeated ReconcileTasksMessage reconciliations = 2; } http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 809b008..cb37599 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -347,7 +347,8 @@ void Slave::initialize() install<SlaveReregisteredMessage>( &Slave::reregistered, - &SlaveReregisteredMessage::slave_id); + &SlaveReregisteredMessage::slave_id, + &SlaveReregisteredMessage::reconciliations); install<RunTaskMessage>( &Slave::runTask, @@ -810,7 +811,10 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId) } -void Slave::reregistered(const UPID& from, const SlaveID& slaveId) +void Slave::reregistered( + const UPID& from, + const SlaveID& slaveId, + const vector<ReconcileTasksMessage>& reconciliations) { if (master != from) { LOG(WARNING) << "Ignoring re-registration message from " << from @@ -823,25 +827,15 @@ void Slave::reregistered(const UPID& from, const SlaveID& slaveId) case DISCONNECTED: CHECK_SOME(master); LOG(INFO) << "Re-registered with master " << master.get(); - state = RUNNING; - if (!(info.id() == slaveId)) { - EXIT(1) << "Re-registered but got wrong id: " << slaveId - << "(expected: " << info.id() << "). Committing suicide"; - } break; case RUNNING: - // Already re-registered! - if (!(info.id() == slaveId)) { - EXIT(1) << "Re-registered but got wrong id: " << slaveId - << "(expected: " << info.id() << "). Committing suicide"; - } CHECK_SOME(master); LOG(WARNING) << "Already re-registered with master " << master.get(); break; case TERMINATING: LOG(WARNING) << "Ignoring re-registration because slave is terminating"; - break; + return; case RECOVERING: // It's possible to receive a message intended for the previous // run of the slave here. Short term we can leave this as is and @@ -851,7 +845,64 @@ void Slave::reregistered(const UPID& from, const SlaveID& slaveId) // https://issues.apache.org/jira/browse/MESOS-677 default: LOG(FATAL) << "Unexpected slave state " << state; - break; + return;; + } + + if (!(info.id() == slaveId)) { + EXIT(1) << "Re-registered but got wrong id: " << slaveId + << "(expected: " << info.id() << "). Committing suicide"; + } + + // Reconcile any tasks per the master's request. + foreach (const ReconcileTasksMessage& reconcile, reconciliations) { + Framework* framework = getFramework(reconcile.framework_id()); + + foreach (const TaskStatus& status, reconcile.statuses()) { + const TaskID& taskId = status.task_id(); + + bool known = false; + + // Try to locate the task. + if (framework != NULL) { + foreachkey (const ExecutorID& executorId, framework->pending) { + if (framework->pending[executorId].contains(taskId)) { + known = true; + } + } + foreachvalue (Executor* executor, framework->executors) { + if (executor->queuedTasks.contains(taskId) || + executor->launchedTasks.contains(taskId) || + executor->terminatedTasks.contains(taskId)) { + known = true; + } + } + } + + // We only need to send a TASK_LOST update when the task is + // unknown (so that the master removes it). Otherwise, the + // master correctly holds the task and will receive updates. + if (!known) { + LOG(WARNING) << "Slave reconciling task " << taskId + << " of framework " << reconcile.framework_id() + << " in state TASK_LOST: task unknown to the slave"; + + const StatusUpdate& update = protobuf::createStatusUpdate( + reconcile.framework_id(), + info.id(), + taskId, + TASK_LOST, + "Reconciliation: task unknown to the slave"); + + // NOTE: We can't use statusUpdate() here because it drops + // updates for unknown frameworks. + statusUpdateManager->update(update, info.id()) + .onAny(defer(self(), + &Slave::__statusUpdate, + lambda::_1, + update, + UPID())); + } + } } } http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 2869710..76d505c 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -97,7 +97,12 @@ public: void shutdown(const process::UPID& from, const std::string& message); void registered(const process::UPID& from, const SlaveID& slaveId); - void reregistered(const process::UPID& from, const SlaveID& slaveId); + + void reregistered( + const process::UPID& from, + const SlaveID& slaveId, + const std::vector<ReconcileTasksMessage>& reconciliations); + void doReliableRegistration(const Duration& duration); void runTask( http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/tests/fault_tolerance_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp index e8f5322..c34a9d6 100644 --- a/src/tests/fault_tolerance_tests.cpp +++ b/src/tests/fault_tolerance_tests.cpp @@ -2233,9 +2233,9 @@ TEST_F(FaultToleranceTest, FrameworkReregisterEmptyExecutor) } -// This test verifies that the master sends TASK_LOST updates -// for tasks in the master absent from the re-registered slave. -// We do this by dropping RunTaskMessage from master to the slave. +// This test verifies that the master reconciles tasks that are +// missing from a re-registering slave. In this case, we drop the +// RunTaskMessage so the slave should send TASK_LOST. TEST_F(FaultToleranceTest, ReconcileLostTasks) { Try<PID<Master> > master = StartMaster(); @@ -2285,6 +2285,9 @@ TEST_F(FaultToleranceTest, ReconcileLostTasks) Future<SlaveReregisteredMessage> slaveReregisteredMessage = FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + Future<StatusUpdateMessage> statusUpdateMessage = + FUTURE_PROTOBUF(StatusUpdateMessage(), _, master.get()); + Future<TaskStatus> status; EXPECT_CALL(sched, statusUpdate(&driver, _)) .WillOnce(FutureArg<1>(&status)); @@ -2295,6 +2298,9 @@ TEST_F(FaultToleranceTest, ReconcileLostTasks) AWAIT_READY(slaveReregisteredMessage); + // Make sure the slave generated the TASK_LOST. + AWAIT_READY(statusUpdateMessage); + AWAIT_READY(status); ASSERT_EQ(task.task_id(), status.get().task_id());
