Updated master to update task unacknowledged state properly. Review: https://reviews.apache.org/r/26701
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3c4e3fdf Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3c4e3fdf Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3c4e3fdf Branch: refs/heads/master Commit: 3c4e3fdf73fdbb2081e58fe3e9831b15d67bd440 Parents: da66970 Author: Vinod Kone <[email protected]> Authored: Fri Oct 10 18:50:07 2014 -0700 Committer: Vinod Kone <[email protected]> Committed: Tue Oct 21 15:47:09 2014 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 94 +++++++++++++++++++++++++++++++---------- src/master/master.hpp | 2 +- src/tests/master_tests.cpp | 84 ++++++++++++++++++++++++++++++++++++ 3 files changed, 156 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3c4e3fdf/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index f04c085..9743eab 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2844,19 +2844,39 @@ void Master::statusUpdateAcknowledgement( Task* task = slave->getTask(frameworkId, taskId); - if (task != NULL && protobuf::isTerminalState(task->state())) { - removeTask(task); + if (task != NULL) { + // Status update state and uuid should be either set or unset + // together. + CHECK_EQ(task->has_status_update_uuid(), task->has_status_update_state()); + + if (!task->has_status_update_state()) { + // Task should have status update state set because it must have + // been set when the update corresponding to this + // acknowledgement was processed by the master. But in case this + // acknowledgement was intended for the old run of the master + // and the task belongs to a 0.20.0 slave, we could be here. + // Dropping the acknowledgement is safe because the slave will + // retry the update, at which point the master will set the + // status update state. + LOG(ERROR) + << "Ignoring status update acknowledgement message for task " << taskId + << " of framework " << *framework << " to slave " << *slave + << " because it no update was sent by this master"; + metrics.invalid_status_update_acknowledgements++; + return; + } + + // Remove the task once the terminal update is acknowledged. + if (protobuf::isTerminalState(task->status_update_state()) && + task->status_update_uuid() == uuid) { + removeTask(task); + } } LOG(INFO) << "Forwarding status update acknowledgement " << UUID::fromBytes(uuid) << " for task " << taskId << " of framework " << *framework << " to slave " << *slave; - // TODO(bmahler): Once we store terminal unacknowledged updates in - // the master per MESOS-1410, this is where we'll find the - // unacknowledged task and remove it if present. - // Also, be sure to confirm Master::reconcile is still correct! - StatusUpdateAcknowledgementMessage message; message.mutable_slave_id()->CopyFrom(slaveId); message.mutable_framework_id()->CopyFrom(frameworkId); @@ -3294,12 +3314,8 @@ void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId) } -// TODO(bmahler): The master will not release resources until the -// slave receives acknowlegements for all non-terminal updates. This -// means if a framework is down, the resources will remain allocated -// even though the tasks are terminal on the slaves! -// TODO(vinod): Since 0.22.0, we can use 'from' instead of 'pid' because -// the status updates will be sent by the slave. +// TODO(vinod): Since 0.22.0, we can use 'from' instead of 'pid' +// because the status updates will be sent by the slave. void Master::statusUpdate(const StatusUpdate& update, const UPID& pid) { ++metrics.messages_status_update; @@ -3359,7 +3375,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid) LOG(INFO) << "Status update " << update << " from slave " << *slave; - updateTask(task, update.status()); + updateTask(task, update); // If the task is terminal and no acknowledgement is needed, // then remove the task now. @@ -3947,7 +3963,7 @@ void Master::reconcile( TASK_LOST, "Task is unknown to the slave"); - updateTask(task, update.status()); + updateTask(task, update); removeTask(task); Framework* framework = getFramework(frameworkId); @@ -4279,7 +4295,7 @@ void Master::removeFramework(Slave* slave, Framework* framework) (task->has_executor_id() ? Option<ExecutorID>(task->executor_id()) : None())); - updateTask(task, update.status()); + updateTask(task, update); removeTask(task); forward(update, UPID(), framework); } @@ -4396,7 +4412,7 @@ void Master::removeSlave(Slave* slave) (task->has_executor_id() ? Option<ExecutorID>(task->executor_id()) : None())); - updateTask(task, update.status()); + updateTask(task, update); removeTask(task); updates.push_back(update); @@ -4495,12 +4511,15 @@ void Master::_removeSlave( } -void Master::updateTask(Task* task, const TaskStatus& status) +void Master::updateTask(Task* task, const StatusUpdate& update) { CHECK_NOTNULL(task); + // Get the unacknowledged status. + const TaskStatus& status = update.status(); + // Out-of-order updates should not occur, however in case they - // do (e.g. MESOS-1799), prevent them here to ensure that the + // do (e.g., due to bugs), prevent them here to ensure that the // resource accounting is not affected. if (protobuf::isTerminalState(task->state()) && !protobuf::isTerminalState(status.state())) { @@ -4511,9 +4530,33 @@ void Master::updateTask(Task* task, const TaskStatus& status) return; } - bool terminated = - !protobuf::isTerminalState(task->state()) && - protobuf::isTerminalState(status.state()); + // Get the latest state. + Option<TaskState> latestState; + if (update.has_latest_state()) { + latestState = update.latest_state(); + } + + // Set 'terminated' to true if this is the first time the task + // transitioned to terminal state. Also set the latest state. + bool terminated; + if (latestState.isSome()) { + // This update must be from >= 0.21.0 slave. + terminated = !protobuf::isTerminalState(task->state()) && + protobuf::isTerminalState(latestState.get()); + + task->set_state(latestState.get()); + } else { + // This update must be from a pre 0.21.0 slave or generated by the + // master. + terminated = !protobuf::isTerminalState(task->state()) && + protobuf::isTerminalState(status.state()); + + task->set_state(status.state()); + } + + // Set the status update state and uuid for the task. + task->set_status_update_state(status.state()); + task->set_status_update_uuid(update.uuid()); // TODO(brenden) Consider wiping the `message` field? if (task->statuses_size() > 0 && @@ -4530,7 +4573,12 @@ void Master::updateTask(Task* task, const TaskStatus& status) // MESOS-1746. task->mutable_statuses(task->statuses_size() - 1)->clear_data(); - task->set_state(status.state()); + LOG(INFO) << "Updating the latest state of task " << task->task_id() + << " of framework " << task->framework_id() + << " to " << task->state() + << (task->state() != status.state() + ? " (status update state: " + stringify(status.state()) + ")" + : ""); stats.tasks[status.state()]++; http://git-wip-us.apache.org/repos/asf/mesos/blob/3c4e3fdf/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 18898e9..b1a2cd0 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -378,7 +378,7 @@ protected: // Transitions the task, and recovers resources if the task becomes // terminal. - void updateTask(Task* task, const TaskStatus& status); + void updateTask(Task* task, const StatusUpdate& update); // Removes the task. void removeTask(Task* task); http://git-wip-us.apache.org/repos/asf/mesos/blob/3c4e3fdf/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index d9dc40c..f60e376 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -62,6 +62,7 @@ using namespace mesos::internal::tests; using mesos::internal::master::Master; using mesos::internal::master::allocator::AllocatorProcess; +using mesos::internal::master::allocator::HierarchicalDRFAllocatorProcess; using mesos::internal::slave::GarbageCollectorProcess; using mesos::internal::slave::Slave; @@ -2359,6 +2360,89 @@ TEST_F(MasterTest, UnacknowledgedTerminalTask) } +// This test ensures that the master releases resources for a +// terminated task even when it receives a non-terminal update (with +// latest state set). +TEST_F(MasterTest, ReleaseResourcesForTerminalTaskWithPendingUpdates) +{ + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + + TestContainerizer containerizer(&exec); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.resources = "cpus:1;mem:64"; + Try<PID<Slave> > slave = StartSlave(&containerizer, slaveFlags); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 64, "*")) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + ExecutorDriver* execDriver; + EXPECT_CALL(exec, registered(_, _, _, _)) + .WillOnce(SaveArg<0>(&execDriver)); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + // Drop all the updates from master to scheduler. + DROP_PROTOBUFS(StatusUpdateMessage(), master.get(), _); + + Future<StatusUpdateMessage> statusUpdateMessage = + FUTURE_PROTOBUF(StatusUpdateMessage(), _, master.get()); + + Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate); + + driver.start(); + + // Wait until TASK_RUNNING is sent to the master. + AWAIT_READY(statusUpdateMessage); + + // Ensure status update manager handles TASK_RUNNING update. + AWAIT_READY(__statusUpdate); + + Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate); + + // Now send TASK_FINISHED update. + TaskStatus finishedStatus; + finishedStatus = statusUpdateMessage.get().update().status(); + finishedStatus.set_state(TASK_FINISHED); + execDriver->sendStatusUpdate(finishedStatus); + + // Ensure status update manager handles TASK_FINISHED update. + AWAIT_READY(__statusUpdate2); + + Future<Nothing> resourcesRecovered = FUTURE_DISPATCH( + _, &HierarchicalDRFAllocatorProcess::resourcesRecovered); + + // Advance the clock so that the status update manager resends + // TASK_RUNNING update with 'latest_state' as TASK_FINISHED. + Clock::pause(); + Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); + Clock::resume(); + + // Ensure the resources are recovered. + AWAIT_READY(resourcesRecovered); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + driver.stop(); + driver.join(); + + Shutdown(); // Must shutdown before 'containerizer' gets deallocated. +} + + // This test ensures that the web UI of a framework is included in the // state.json endpoint, if provided by the framework. TEST_F(MasterTest, FrameworkWebUIUrl)
