This is an automated email from the ASF dual-hosted git repository. asekretenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 6a29061fb2f357d591f27d7541f92e0f73f0e944 Author: Andrei Sekretenko <asekrete...@mesosphere.com> AuthorDate: Thu Jan 30 15:42:07 2020 +0100 Removed code for tracking pending tasks. Now that ACCEPT is authorized synchronously, there are no pending tasks in-between dispatches on `Master` methods, thus the pending task tracking code is not needed anymore. Review: https://reviews.apache.org/r/72099 --- include/mesos/master/master.proto | 2 +- include/mesos/v1/master/master.proto | 2 +- src/master/http.cpp | 13 +- src/master/master.cpp | 258 +++------------------------------- src/master/master.hpp | 10 -- src/master/readonly_handler.cpp | 91 ------------ src/master/validation.cpp | 22 --- src/master/validation.hpp | 1 - src/tests/master_validation_tests.cpp | 33 ----- 9 files changed, 22 insertions(+), 410 deletions(-) diff --git a/include/mesos/master/master.proto b/include/mesos/master/master.proto index 8c22802..021dadc 100644 --- a/include/mesos/master/master.proto +++ b/include/mesos/master/master.proto @@ -542,7 +542,7 @@ message Response { message GetTasks { // Tasks that are enqueued on the master waiting (e.g., authorizing) // to be launched. - repeated Task pending_tasks = 1; + repeated Task pending_tasks = 1 [deprecated=true]; // Tasks that have been forwarded to the agent for launch. This // includes tasks that are staging or running; it also includes diff --git a/include/mesos/v1/master/master.proto b/include/mesos/v1/master/master.proto index 40de358..488fe29 100644 --- a/include/mesos/v1/master/master.proto +++ b/include/mesos/v1/master/master.proto @@ -542,7 +542,7 @@ message Response { message GetTasks { // Tasks that are enqueued on the master waiting (e.g., authorizing) // to be launched. - repeated Task pending_tasks = 1; + repeated Task pending_tasks = 1 [deprecated=true]; // Tasks that have been forwarded to the agent for launch. This // includes tasks that are staging or running; it also includes diff --git a/src/master/http.cpp b/src/master/http.cpp index 67572a3..f1be402 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -1029,8 +1029,7 @@ Future<Response> Master::Http::_destroyVolumes( error = validation::operation::validate( operation.destroy(), slave->checkpointedResources, - slave->usedResources, - slave->pendingTasks); + slave->usedResources); if (error.isSome()) { return BadRequest("Invalid DESTROY operation: " + error->message); @@ -3782,13 +3781,6 @@ Future<Response> Master::Http::_drainAgent( // It's possible for the slave to be removed in the interim // if it is marked unreachable. if (slave != nullptr) { - hashmap<FrameworkID, hashset<TaskID>> pendingTaskIds; - foreachpair (const FrameworkID& frameworkId, - const auto& tasks, - slave->pendingTasks) { - pendingTaskIds[frameworkId] = tasks.keys(); - } - hashmap<FrameworkID, hashset<TaskID>> taskIds; foreachpair (const FrameworkID& frameworkId, const auto& tasks, @@ -3798,8 +3790,7 @@ Future<Response> Master::Http::_drainAgent( LOG(INFO) << "Transitioning agent " << slaveId << " to the DRAINING state" - << "; agent has (pending tasks, tasks, operations) == (" - << stringify(pendingTaskIds) << ", " + << "; agent has (tasks, operations) == (" << stringify(taskIds) << ", " << stringify(slave->operations.keys()) << ")"; diff --git a/src/master/master.cpp b/src/master/master.cpp index b09ce8e..7662c56 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1178,10 +1178,6 @@ void Master::finalize() removeInverseOffer(inverseOffer); } - // Remove pending tasks from the slave. Don't bother - // recovering the resources in the allocator. - slave->pendingTasks.clear(); - // Terminate the slave observer. terminate(slave->observer); wait(slave->observer); @@ -1198,10 +1194,6 @@ void Master::finalize() foreachvalue (Framework* framework, frameworks.registered) { allocator->removeFramework(framework->id()); - // Remove pending tasks from the framework. Don't bother - // recovering the resources in the allocator. - framework->pendingTasks.clear(); - // No tasks/executors/offers should remain since the slaves // have been removed. CHECK(framework->tasks.empty()); @@ -4275,46 +4267,6 @@ void Master::accept( LOG(INFO) << "Processing ACCEPT call for offers: " << accept.offer_ids() << " on agent " << *slave << " for framework " << *framework; - auto getOperationTasks = - [](const Offer::Operation& operation) -> const RepeatedPtrField<TaskInfo>& { - if (operation.type() == Offer::Operation::LAUNCH) { - return operation.launch().task_infos(); - } - - if (operation.type() == Offer::Operation::LAUNCH_GROUP) { - return operation.launch_group().task_group().tasks(); - } - - UNREACHABLE(); - }; - - // Add tasks to be launched to the framework's list of pending tasks - // before authorizing. - // - // NOTE: If two tasks have the same ID, the second one will - // not be put into 'framework->pendingTasks', therefore - // will not be launched (and TASK_ERROR will be sent). - // Unfortunately, we can't tell the difference between a - // duplicate TaskID and getting killed while pending - // (removed from the map). So it's possible that we send - // a TASK_ERROR after a TASK_KILLED (see _accept())! - for (const Offer::Operation& operation : accept.operations()) { - if (operation.type() == Offer::Operation::LAUNCH || - operation.type() == Offer::Operation::LAUNCH_GROUP) { - for (const TaskInfo& task : getOperationTasks(operation)) { - if (!framework->pendingTasks.contains(task.task_id())) { - framework->pendingTasks[task.task_id()] = task; - } - - // Add to the slave's list of pending tasks. - if (!slave->pendingTasks.contains(framework->id()) || - !slave->pendingTasks[framework->id()].contains(task.task_id())) { - slave->pendingTasks[framework->id()][task.task_id()] = task; - } - } - } - } - // TODO(asekretenko): Dismantle `_accept(...)` (which, before synchronous // authorization was introduced, used to be a deferred continuation of ACCEPT // call processing, but now is kept only for limiting variable scopes) and @@ -4376,15 +4328,6 @@ void Master::_accept( }(); foreach (const TaskInfo& task, tasks) { - // Remove the task from being pending. - framework->pendingTasks.erase(task.task_id()); - if (slave != nullptr) { - slave->pendingTasks[framework->id()].erase(task.task_id()); - if (slave->pendingTasks[framework->id()].empty()) { - slave->pendingTasks.erase(framework->id()); - } - } - const TaskStatus::Reason reason = slave == nullptr ? TaskStatus::REASON_SLAVE_REMOVED : TaskStatus::REASON_SLAVE_DISCONNECTED; @@ -4664,8 +4607,7 @@ void Master::_accept( Option<Error> error = validation::operation::validate( operation.destroy(), slave->checkpointedResources, - slave->usedResources, - slave->pendingTasks); + slave->usedResources); error = error.isSome() ? Error(error->message + "; on agent " + stringify(*slave)) @@ -4844,29 +4786,6 @@ void Master::_accept( case Offer::Operation::LAUNCH: { foreach (const TaskInfo& task, operation.launch().task_infos()) { - // The task will not be in `pendingTasks` if it has been - // killed in the interim. No need to send TASK_KILLED in - // this case as it has already been sent. Note however that - // we cannot currently distinguish between the task being - // killed and the task having a duplicate TaskID within - // `pendingTasks`. Therefore we must still validate the task - // to ensure we send the TASK_ERROR in the case that it has a - // duplicate TaskID. - // - // TODO(bmahler): We may send TASK_ERROR after a TASK_KILLED - // if a task was killed (removed from `pendingTasks`) *and* - // the task is invalid or unauthorized here. - // - // TODO(asekretenko): Now that ACCEPT is authorized synchronously, - // master state cannot change while the task is being authorized, - // and all the code for tracking pending tasks can be removed. - bool pending = framework->pendingTasks.contains(task.task_id()); - framework->pendingTasks.erase(task.task_id()); - slave->pendingTasks[framework->id()].erase(task.task_id()); - if (slave->pendingTasks[framework->id()].empty()) { - slave->pendingTasks.erase(framework->id()); - } - const Option<Error> authorizationError = authorized(ActionObject::taskLaunch(task, framework->info)); @@ -4931,7 +4850,7 @@ void Master::_accept( } // Add task. - if (pending) { + { Resources consumed; bool launchExecutor = true; @@ -5044,26 +4963,9 @@ void Master::_accept( case Offer::Operation::LAUNCH_GROUP: { // We must ensure that the entire group can be launched. This // means all tasks in the group must be authorized and valid. - // If any tasks in the group have been killed in the interim - // we must kill the entire group. const ExecutorInfo& executor = operation.launch_group().executor(); const TaskGroupInfo& taskGroup = operation.launch_group().task_group(); - // Remove all the tasks from being pending. - hashset<TaskID> killed; - foreach (const TaskInfo& task, taskGroup.tasks()) { - bool pending = framework->pendingTasks.contains(task.task_id()); - framework->pendingTasks.erase(task.task_id()); - slave->pendingTasks[framework->id()].erase(task.task_id()); - if (slave->pendingTasks[framework->id()].empty()) { - slave->pendingTasks.erase(framework->id()); - } - - if (!pending) { - killed.insert(task.task_id()); - } - } - // Note that we do not fill in the `ExecutorInfo.framework_id` // since we do not have to support backwards compatibility like // in the `Launch` operation case. @@ -5110,10 +5012,6 @@ void Master::_accept( if (error.isSome()) { CHECK_SOME(reason); - - // NOTE: If some of these invalid or unauthorized tasks were - // killed already, here we end up sending a TASK_ERROR after - // having already sent TASK_KILLED. foreach (const TaskInfo& task, taskGroup.tasks()) { const StatusUpdate& update = protobuf::createStatusUpdate( framework->id(), @@ -5136,39 +5034,6 @@ void Master::_accept( continue; } - // If task(s) were killed, send TASK_KILLED for - // all of the remaining tasks, since a TaskGroup must - // be delivered in its entirety. - // - // TODO(bmahler): Do this killing when processing - // the `Kill` call, rather than doing it here. - if (!killed.empty()) { - foreach (const TaskInfo& task, taskGroup.tasks()) { - if (!killed.contains(task.task_id())) { - const StatusUpdate& update = protobuf::createStatusUpdate( - framework->id(), - task.slave_id(), - task.task_id(), - TASK_KILLED, - TaskStatus::SOURCE_MASTER, - None(), - "A task within the task group was killed before" - " delivery to the agent", - TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH); - - metrics->tasks_killed++; - - // TODO(bmahler): Increment the task state source metric, - // we currently cannot because it requires each source - // requires a reason. - - forward(update, UPID(), framework); - } - } - - continue; - } - // Now launch the task group! RunTaskGroupMessage message; message.mutable_framework()->CopyFrom(framework->info); @@ -5558,22 +5423,15 @@ void Master::checkAndTransitionDrainingAgent(Slave* slave) } // Check if the agent has any tasks running or operations pending. - if (!slave->pendingTasks.empty() || - !slave->tasks.empty() || + if (!slave->tasks.empty() || !slave->operations.empty()) { size_t numTasks = 0u; foreachvalue (const auto& frameworkTasks, slave->tasks) { numTasks += frameworkTasks.size(); } - size_t numPendingTasks = 0u; - foreachvalue (const auto& frameworkTasks, slave->pendingTasks) { - numPendingTasks += frameworkTasks.size(); - } - VLOG(1) << "DRAINING Agent " << slaveId << " has " - << numPendingTasks << " pending tasks, " << numTasks << " tasks, and " << slave->operations.size() << " operations"; return; @@ -5751,36 +5609,6 @@ void Master::kill(Framework* framework, const scheduler::Call::Kill& kill) ++metrics->messages_kill_task; - if (framework->pendingTasks.contains(taskId)) { - // Remove from pending tasks. - framework->pendingTasks.erase(taskId); - - if (slaveId.isSome()) { - Slave* slave = slaves.registered.get(slaveId.get()); - - if (slave != nullptr) { - slave->pendingTasks[framework->id()].erase(taskId); - if (slave->pendingTasks[framework->id()].empty()) { - slave->pendingTasks.erase(framework->id()); - } - } - } - - const StatusUpdate& update = protobuf::createStatusUpdate( - framework->id(), - slaveId, - taskId, - TASK_KILLED, - TaskStatus::SOURCE_MASTER, - None(), - "Killed before delivery to the agent", - TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH); - - forward(update, UPID(), framework); - - return; - } - Task* task = framework->getTask(taskId); if (task == nullptr) { LOG(WARNING) << "Cannot kill task " << taskId @@ -9027,29 +8855,6 @@ void Master::reconcile( LOG(INFO) << "Performing implicit task state reconciliation" " for framework " << *framework; - foreachvalue (const TaskInfo& task, framework->pendingTasks) { - StatusUpdate update = protobuf::createStatusUpdate( - framework->id(), - task.slave_id(), - task.task_id(), - TASK_STAGING, - TaskStatus::SOURCE_MASTER, - None(), - "Reconciliation: Latest task state", - TaskStatus::REASON_RECONCILIATION); - - VLOG(1) << "Sending implicit reconciliation state " - << update.status().state() - << " for task " << update.status().task_id() - << " of framework " << *framework; - - // TODO(bmahler): Consider using forward(); might lead to too - // much logging. - StatusUpdateMessage message; - *message.mutable_update() = std::move(update); - framework->send(message); - } - foreachvalue (Task* task, framework->tasks) { const TaskState& state = task->has_status_update_state() ? task->status_update_state() @@ -9095,20 +8900,19 @@ void Master::reconcile( << " of framework " << *framework; // Explicit reconciliation occurs for the following cases: - // (1) Task is known, but pending: TASK_STAGING. - // (2) Task is known: send the latest state. - // (3) Task is unknown, slave is recovered: no-op. - // (4) Task is unknown, slave is registered: TASK_GONE. - // (5) Task is unknown, slave is unreachable: TASK_UNREACHABLE. - // (6) Task is unknown, slave is gone: TASK_GONE_BY_OPERATOR. - // (7) Task is unknown, slave is unknown: TASK_UNKNOWN. + // (1) Task is known: send the latest state. + // (2) Task is unknown, slave is recovered: no-op. + // (3) Task is unknown, slave is registered: TASK_GONE. + // (4) Task is unknown, slave is unreachable: TASK_UNREACHABLE. + // (5) Task is unknown, slave is gone: TASK_GONE_BY_OPERATOR. + // (6) Task is unknown, slave is unknown: TASK_UNKNOWN. // - // For case (3), if the slave ID is not provided, we err on the + // For case (2), if the slave ID is not provided, we err on the // side of caution and do not reply if there are *any* recovered // slaves that haven't reregistered, since the task could reside // on one of these slaves. // - // For cases (4), (5), (6) and (7) TASK_LOST is sent instead if the + // For cases (3), (4), (5) and (6) TASK_LOST is sent instead if the // framework has not opted-in to the PARTITION_AWARE capability. foreach (const scheduler::Call::Reconcile::Task& t, reconcile.tasks()) { Option<SlaveID> slaveId = None(); @@ -9119,20 +8923,8 @@ void Master::reconcile( Option<StatusUpdate> update = None(); Task* task = framework->getTask(t.task_id()); - if (framework->pendingTasks.contains(t.task_id())) { - // (1) Task is known, but pending: TASK_STAGING. - const TaskInfo& task_ = framework->pendingTasks[t.task_id()]; - update = protobuf::createStatusUpdate( - framework->id(), - task_.slave_id(), - task_.task_id(), - TASK_STAGING, - TaskStatus::SOURCE_MASTER, - None(), - "Reconciliation: Latest task state", - TaskStatus::REASON_RECONCILIATION); - } else if (task != nullptr) { - // (2) Task is known: send the latest status update state. + if (task != nullptr) { + // (1) Task is known: send the latest status update state. const TaskState& state = task->has_status_update_state() ? task->status_update_state() : task->state(); @@ -9157,7 +8949,7 @@ void Master::reconcile( protobuf::getTaskContainerStatus(*task)); } else if ((slaveId.isSome() && slaves.recovered.contains(slaveId.get())) || (slaveId.isNone() && !slaves.recovered.empty())) { - // (3) Task is unknown, slave is recovered: no-op. The framework + // (2) Task is unknown, slave is recovered: no-op. The framework // will have to retry this and will not receive a response until // the agent either registers, or is marked unreachable after the // timeout. @@ -9168,7 +8960,7 @@ void Master::reconcile( "some agents have") << " not yet reregistered with the master"; } else if (slaveId.isSome() && slaves.registered.contains(slaveId.get())) { - // (4) Task is unknown, slave is registered: TASK_GONE. If the + // (3) Task is unknown, slave is registered: TASK_GONE. If the // framework does not have the PARTITION_AWARE capability, send // TASK_LOST for backward compatibility. TaskState taskState = TASK_GONE; @@ -9186,7 +8978,7 @@ void Master::reconcile( "Reconciliation: Task is unknown to the agent", TaskStatus::REASON_RECONCILIATION); } else if (slaveId.isSome() && slaves.unreachable.contains(slaveId.get())) { - // (5) Slave is unreachable: TASK_UNREACHABLE. If the framework + // (4) Slave is unreachable: TASK_UNREACHABLE. If the framework // does not have the PARTITION_AWARE capability, send TASK_LOST // for backward compatibility. In either case, the status update // also includes the time when the slave was marked unreachable. @@ -9213,7 +9005,7 @@ void Master::reconcile( None(), unreachableTime); } else if (slaveId.isSome() && slaves.gone.contains(slaveId.get())) { - // (6) Slave is gone: TASK_GONE_BY_OPERATOR. If the framework + // (5) Slave is gone: TASK_GONE_BY_OPERATOR. If the framework // does not have the PARTITION_AWARE capability, send TASK_LOST // for backward compatibility. TaskState taskState = TASK_GONE_BY_OPERATOR; @@ -9231,7 +9023,7 @@ void Master::reconcile( "Reconciliation: Task is gone", TaskStatus::REASON_RECONCILIATION); } else { - // (7) Task is unknown, slave is unknown: TASK_UNKNOWN. If the + // (6) Task is unknown, slave is unknown: TASK_UNKNOWN. If the // framework does not have the PARTITION_AWARE capability, send // TASK_LOST for backward compatibility. TaskState taskState = TASK_UNKNOWN; @@ -10504,18 +10296,12 @@ void Master::removeFramework(Framework* framework) CHECK(framework->inverseOffers.empty()); foreachvalue (Slave* slave, slaves.registered) { - // Remove the pending tasks from the slave. - slave->pendingTasks.erase(framework->id()); - // Tell slaves to shutdown the framework. ShutdownFrameworkMessage message; message.mutable_framework_id()->MergeFrom(framework->id()); send(slave->pid, message); } - // Remove the pending tasks from the framework. - framework->pendingTasks.clear(); - // Remove pointers to the framework's tasks in slaves and mark those // tasks as completed. foreachvalue (Task* task, utils::copy(framework->tasks)) { @@ -11039,9 +10825,6 @@ void Master::_removeSlave( } } - // Remove the pending tasks from the slave. - slave->pendingTasks.clear(); - // Mark the slave as being removed. slaves.registered.remove(slave); slaves.removed.put(slave->id, Nothing()); @@ -12256,11 +12039,6 @@ double Master::_tasks_staging() { double count = 0.0; - // Add the tasks pending validation / authorization. - foreachvalue (Framework* framework, frameworks.registered) { - count += framework->pendingTasks.size(); - } - foreachvalue (Slave* slave, slaves.registered) { typedef hashmap<TaskID, Task*> TaskMap; foreachvalue (const TaskMap& tasks, slave->tasks) { diff --git a/src/master/master.hpp b/src/master/master.hpp index 7281815..34ef2f1 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -237,12 +237,6 @@ Slave(Master* const _master, // the executors. hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo>> executors; - // Tasks that have not yet been launched because they are currently - // being authorized. This is similar to Framework's pendingTasks but we - // track pendingTasks per agent separately to determine if any offer - // operation for this agent would change resources requested by these tasks. - hashmap<FrameworkID, hashmap<TaskID, TaskInfo>> pendingTasks; - // Tasks present on this slave. // // TODO(bmahler): Make this private to enforce that `addTask()` and @@ -2570,10 +2564,6 @@ struct Framework process::Time reregisteredTime; process::Time unregisteredTime; - // Tasks that have not yet been launched because they are currently - // being authorized. - hashmap<TaskID, TaskInfo> pendingTasks; - // TODO(bmahler): Make this private to enforce that `addTask()` and // `removeTask()` are used, and provide a const view into the tasks. hashmap<TaskID, Task*> tasks; diff --git a/src/master/readonly_handler.cpp b/src/master/readonly_handler.cpp index f9c0006..e4a3134 100644 --- a/src/master/readonly_handler.cpp +++ b/src/master/readonly_handler.cpp @@ -191,47 +191,6 @@ void FullFrameworkWriter::operator()(JSON::ObjectWriter* writer) const // Model all of the tasks associated with a framework. writer->field("tasks", [this](JSON::ArrayWriter* writer) { - foreachvalue (const TaskInfo& taskInfo, framework_->pendingTasks) { - // Skip unauthorized tasks. - if (!approvers_->approved<VIEW_TASK>(taskInfo, framework_->info)) { - continue; - } - - writer->element([this, &taskInfo](JSON::ObjectWriter* writer) { - writer->field("id", taskInfo.task_id().value()); - writer->field("name", taskInfo.name()); - writer->field("framework_id", framework_->id().value()); - - writer->field( - "executor_id", - taskInfo.executor().executor_id().value()); - - writer->field("slave_id", taskInfo.slave_id().value()); - writer->field("state", TaskState_Name(TASK_STAGING)); - writer->field("resources", taskInfo.resources()); - - // Tasks are not allowed to mix resources allocated to - // different roles, see MESOS-6636. - writer->field( - "role", - taskInfo.resources().begin()->allocation_info().role()); - - writer->field("statuses", std::initializer_list<TaskStatus>{}); - - if (taskInfo.has_labels()) { - writer->field("labels", taskInfo.labels()); - } - - if (taskInfo.has_discovery()) { - writer->field("discovery", JSON::Protobuf(taskInfo.discovery())); - } - - if (taskInfo.has_container()) { - writer->field("container", JSON::Protobuf(taskInfo.container())); - } - }); - } - foreachvalue (Task* task, framework_->tasks) { // Skip unauthorized tasks. if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) { @@ -511,11 +470,6 @@ public: foreachpair (const FrameworkID& frameworkId, const Framework* framework, frameworks) { - foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) { - frameworksToSlaves[frameworkId].insert(taskInfo.slave_id()); - slavesToFrameworks[taskInfo.slave_id()].insert(frameworkId); - } - foreachvalue (const Task* task, framework->tasks) { frameworksToSlaves[frameworkId].insert(task->slave_id()); slavesToFrameworks[task->slave_id()].insert(frameworkId); @@ -632,11 +586,6 @@ public: foreachpair (const FrameworkID& frameworkId, const Framework* framework, frameworks) { - foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) { - frameworkTaskSummaries[frameworkId].staging++; - slaveTaskSummaries[taskInfo.slave_id()].staging++; - } - foreachvalue (const Task* task, framework->tasks) { frameworkTaskSummaries[frameworkId].count(*task); slaveTaskSummaries[task->slave_id()].count(*task); @@ -1983,26 +1932,6 @@ function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifyGetTasks( int field; - // Pending tasks. - field = v1::master::Response::GetTasks::kPendingTasksFieldNumber; - writer->field( - descriptor->FindFieldByNumber(field)->name(), - [&](JSON::ArrayWriter* writer) { - foreach (const Framework* framework, frameworks) { - foreachvalue (const TaskInfo& t, framework->pendingTasks) { - // Skip unauthorized tasks. - if (!approvers->approved<VIEW_TASK>(t, framework->info)) { - continue; - } - - Task task = - protobuf::createTask(t, TASK_STAGING, framework->id()); - - writer->element(asV1Protobuf(task)); - } - } - }); - // Active tasks. field = v1::master::Response::GetTasks::kTasksFieldNumber; writer->field( @@ -2095,26 +2024,6 @@ string Master::ReadOnlyHandler::serializeGetTasks( google::protobuf::io::CodedOutputStream writer(&stream); foreach (const Framework* framework, frameworks) { - // Pending tasks. - foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) { - // Skip unauthorized tasks. - if (!approvers->approved<VIEW_TASK>(taskInfo, framework->info)) { - continue; - } - - // TODO(bmahler): Consider not constructing the temporary task - // object and instead serialize directly. Since we don't expect - // a large number of pending tasks, we currently don't bother - // with the more efficient approach. - // - // *getTasks.add_pending_tasks() = - // protobuf::createTask(taskInfo, TASK_STAGING, framework->id()); - WireFormatLite2::WriteMessageWithoutCachedSizes( - mesos::v1::master::Response::GetTasks::kPendingTasksFieldNumber, - protobuf::createTask(taskInfo, TASK_STAGING, framework->id()), - &writer); - } - // Active tasks. foreachvalue (const Task* task, framework->tasks) { // Skip unauthorized tasks. diff --git a/src/master/validation.cpp b/src/master/validation.cpp index 2f80536..084f281 100644 --- a/src/master/validation.cpp +++ b/src/master/validation.cpp @@ -2543,7 +2543,6 @@ Option<Error> validate( const Offer::Operation::Destroy& destroy, const Resources& checkpointedResources, const hashmap<FrameworkID, Resources>& usedResources, - const hashmap<FrameworkID, hashmap<TaskID, TaskInfo>>& pendingTasks, const Option<FrameworkInfo>& frameworkInfo) { // The operation can either contain allocated resources @@ -2601,27 +2600,6 @@ Option<Error> validate( } } - // Ensure that the volumes being destroyed are not requested by any pending - // task. This check is mainly to validate destruction of shared volumes. - // Note that resource requirements in pending tasks are not validated yet - // so it is possible that the DESTROY validation fails due to invalid - // pending tasks. - typedef hashmap<TaskID, TaskInfo> TaskMap; - foreachvalue(const TaskMap& tasks, pendingTasks) { - foreachvalue (const TaskInfo& task, tasks) { - Resources resources = task.resources(); - if (task.has_executor()) { - resources += task.executor().resources(); - } - - foreach (const Resource& volume, destroy.volumes()) { - if (unallocated(resources).contains(volume)) { - return Error("Persistent volume in pending tasks"); - } - } - } - } - return None(); } diff --git a/src/master/validation.hpp b/src/master/validation.hpp index b289713..7fe8f08 100644 --- a/src/master/validation.hpp +++ b/src/master/validation.hpp @@ -326,7 +326,6 @@ Option<Error> validate( const Offer::Operation::Destroy& destroy, const Resources& checkpointedResources, const hashmap<FrameworkID, Resources>& usedResources, - const hashmap<FrameworkID, hashmap<TaskID, TaskInfo>>& pendingTasks, const Option<FrameworkInfo>& frameworkInfo = None()); diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp index e92ff59..8d5e74e 100644 --- a/src/tests/master_validation_tests.cpp +++ b/src/tests/master_validation_tests.cpp @@ -1627,39 +1627,6 @@ TEST_F(DestroyOperationValidationTest, SharedPersistentVolumeInUse) } -// This test verifies that DESTROY for shared persistent volumes is valid -// when the volumes are not assigned to any pending task. -TEST_F(DestroyOperationValidationTest, SharedPersistentVolumeInPendingTasks) -{ - Resource cpus = Resources::parse("cpus", "1", "*").get(); - Resource mem = Resources::parse("mem", "5", "*").get(); - Resource sharedDisk = createDiskResource( - "50", "role1", "1", "path1", None(), true); // Shared. - - SlaveID slaveId; - slaveId.set_value("S1"); - - // Add a task using shared volume as pending tasks. - TaskInfo task = createTask(slaveId, sharedDisk, "sleep 1000"); - - hashmap<FrameworkID, hashmap<TaskID, TaskInfo>> pendingTasks; - FrameworkID frameworkId1; - frameworkId1.set_value("id1"); - pendingTasks[frameworkId1] = {{task.task_id(), task}}; - - // Destroy `sharedDisk` which is assigned to `task`. - Offer::Operation::Destroy destroy; - destroy.add_volumes()->CopyFrom(sharedDisk); - - EXPECT_SOME(operation::validate(destroy, sharedDisk, {}, pendingTasks)); - - // Remove all pending tasks. - pendingTasks.clear(); - - EXPECT_NONE(operation::validate(destroy, sharedDisk, {}, pendingTasks)); -} - - TEST_F(DestroyOperationValidationTest, UnknownPersistentVolume) { Resource volume = Resources::parse("disk", "128", "role1").get();