This is an automated email from the ASF dual-hosted git repository.

asekretenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 6a29061fb2f357d591f27d7541f92e0f73f0e944
Author: Andrei Sekretenko <asekrete...@mesosphere.com>
AuthorDate: Thu Jan 30 15:42:07 2020 +0100

    Removed code for tracking pending tasks.
    
    Now that ACCEPT is authorized synchronously, there are no pending
    tasks in-between dispatches on `Master` methods, thus the pending task
    tracking code is not needed anymore.
    
    Review: https://reviews.apache.org/r/72099
---
 include/mesos/master/master.proto     |   2 +-
 include/mesos/v1/master/master.proto  |   2 +-
 src/master/http.cpp                   |  13 +-
 src/master/master.cpp                 | 258 +++-------------------------------
 src/master/master.hpp                 |  10 --
 src/master/readonly_handler.cpp       |  91 ------------
 src/master/validation.cpp             |  22 ---
 src/master/validation.hpp             |   1 -
 src/tests/master_validation_tests.cpp |  33 -----
 9 files changed, 22 insertions(+), 410 deletions(-)

diff --git a/include/mesos/master/master.proto 
b/include/mesos/master/master.proto
index 8c22802..021dadc 100644
--- a/include/mesos/master/master.proto
+++ b/include/mesos/master/master.proto
@@ -542,7 +542,7 @@ message Response {
   message GetTasks {
     // Tasks that are enqueued on the master waiting (e.g., authorizing)
     // to be launched.
-    repeated Task pending_tasks = 1;
+    repeated Task pending_tasks = 1 [deprecated=true];
 
     // Tasks that have been forwarded to the agent for launch. This
     // includes tasks that are staging or running; it also includes
diff --git a/include/mesos/v1/master/master.proto 
b/include/mesos/v1/master/master.proto
index 40de358..488fe29 100644
--- a/include/mesos/v1/master/master.proto
+++ b/include/mesos/v1/master/master.proto
@@ -542,7 +542,7 @@ message Response {
   message GetTasks {
     // Tasks that are enqueued on the master waiting (e.g., authorizing)
     // to be launched.
-    repeated Task pending_tasks = 1;
+    repeated Task pending_tasks = 1 [deprecated=true];
 
     // Tasks that have been forwarded to the agent for launch. This
     // includes tasks that are staging or running; it also includes
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 67572a3..f1be402 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -1029,8 +1029,7 @@ Future<Response> Master::Http::_destroyVolumes(
   error = validation::operation::validate(
       operation.destroy(),
       slave->checkpointedResources,
-      slave->usedResources,
-      slave->pendingTasks);
+      slave->usedResources);
 
   if (error.isSome()) {
     return BadRequest("Invalid DESTROY operation: " + error->message);
@@ -3782,13 +3781,6 @@ Future<Response> Master::Http::_drainAgent(
           // It's possible for the slave to be removed in the interim
           // if it is marked unreachable.
           if (slave != nullptr) {
-            hashmap<FrameworkID, hashset<TaskID>> pendingTaskIds;
-            foreachpair (const FrameworkID& frameworkId,
-                         const auto& tasks,
-                         slave->pendingTasks) {
-              pendingTaskIds[frameworkId] = tasks.keys();
-            }
-
             hashmap<FrameworkID, hashset<TaskID>> taskIds;
             foreachpair (const FrameworkID& frameworkId,
                          const auto& tasks,
@@ -3798,8 +3790,7 @@ Future<Response> Master::Http::_drainAgent(
 
             LOG(INFO)
               << "Transitioning agent " << slaveId << " to the DRAINING state"
-              << "; agent has (pending tasks, tasks, operations) == ("
-              << stringify(pendingTaskIds) << ", "
+              << "; agent has (tasks, operations) == ("
               << stringify(taskIds) << ", "
               << stringify(slave->operations.keys()) << ")";
 
diff --git a/src/master/master.cpp b/src/master/master.cpp
index b09ce8e..7662c56 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1178,10 +1178,6 @@ void Master::finalize()
       removeInverseOffer(inverseOffer);
     }
 
-    // Remove pending tasks from the slave. Don't bother
-    // recovering the resources in the allocator.
-    slave->pendingTasks.clear();
-
     // Terminate the slave observer.
     terminate(slave->observer);
     wait(slave->observer);
@@ -1198,10 +1194,6 @@ void Master::finalize()
   foreachvalue (Framework* framework, frameworks.registered) {
     allocator->removeFramework(framework->id());
 
-    // Remove pending tasks from the framework. Don't bother
-    // recovering the resources in the allocator.
-    framework->pendingTasks.clear();
-
     // No tasks/executors/offers should remain since the slaves
     // have been removed.
     CHECK(framework->tasks.empty());
@@ -4275,46 +4267,6 @@ void Master::accept(
   LOG(INFO) << "Processing ACCEPT call for offers: " << accept.offer_ids()
             << " on agent " << *slave << " for framework " << *framework;
 
-  auto getOperationTasks =
-    [](const Offer::Operation& operation) -> const RepeatedPtrField<TaskInfo>& 
{
-    if (operation.type() == Offer::Operation::LAUNCH) {
-      return operation.launch().task_infos();
-    }
-
-    if (operation.type() == Offer::Operation::LAUNCH_GROUP) {
-      return operation.launch_group().task_group().tasks();
-    }
-
-    UNREACHABLE();
-  };
-
-  // Add tasks to be launched to the framework's list of pending tasks
-  // before authorizing.
-  //
-  // NOTE: If two tasks have the same ID, the second one will
-  // not be put into 'framework->pendingTasks', therefore
-  // will not be launched (and TASK_ERROR will be sent).
-  // Unfortunately, we can't tell the difference between a
-  // duplicate TaskID and getting killed while pending
-  // (removed from the map). So it's possible that we send
-  // a TASK_ERROR after a TASK_KILLED (see _accept())!
-  for (const Offer::Operation& operation : accept.operations()) {
-    if (operation.type() == Offer::Operation::LAUNCH ||
-        operation.type() == Offer::Operation::LAUNCH_GROUP) {
-      for (const TaskInfo& task : getOperationTasks(operation)) {
-        if (!framework->pendingTasks.contains(task.task_id())) {
-          framework->pendingTasks[task.task_id()] = task;
-        }
-
-        // Add to the slave's list of pending tasks.
-        if (!slave->pendingTasks.contains(framework->id()) ||
-            !slave->pendingTasks[framework->id()].contains(task.task_id())) {
-          slave->pendingTasks[framework->id()][task.task_id()] = task;
-        }
-      }
-    }
-  }
-
   // TODO(asekretenko): Dismantle `_accept(...)` (which, before synchronous
   // authorization was introduced, used to be a deferred continuation of ACCEPT
   // call processing, but now is kept only for limiting variable scopes) and
@@ -4376,15 +4328,6 @@ void Master::_accept(
       }();
 
       foreach (const TaskInfo& task, tasks) {
-        // Remove the task from being pending.
-        framework->pendingTasks.erase(task.task_id());
-        if (slave != nullptr) {
-          slave->pendingTasks[framework->id()].erase(task.task_id());
-          if (slave->pendingTasks[framework->id()].empty()) {
-            slave->pendingTasks.erase(framework->id());
-          }
-        }
-
         const TaskStatus::Reason reason =
             slave == nullptr ? TaskStatus::REASON_SLAVE_REMOVED
                              : TaskStatus::REASON_SLAVE_DISCONNECTED;
@@ -4664,8 +4607,7 @@ void Master::_accept(
         Option<Error> error = validation::operation::validate(
             operation.destroy(),
             slave->checkpointedResources,
-            slave->usedResources,
-            slave->pendingTasks);
+            slave->usedResources);
 
         error = error.isSome()
           ? Error(error->message + "; on agent " + stringify(*slave))
@@ -4844,29 +4786,6 @@ void Master::_accept(
 
       case Offer::Operation::LAUNCH: {
         foreach (const TaskInfo& task, operation.launch().task_infos()) {
-          // The task will not be in `pendingTasks` if it has been
-          // killed in the interim. No need to send TASK_KILLED in
-          // this case as it has already been sent. Note however that
-          // we cannot currently distinguish between the task being
-          // killed and the task having a duplicate TaskID within
-          // `pendingTasks`. Therefore we must still validate the task
-          // to ensure we send the TASK_ERROR in the case that it has a
-          // duplicate TaskID.
-          //
-          // TODO(bmahler): We may send TASK_ERROR after a TASK_KILLED
-          // if a task was killed (removed from `pendingTasks`) *and*
-          // the task is invalid or unauthorized here.
-          //
-          // TODO(asekretenko): Now that ACCEPT is authorized synchronously,
-          // master state cannot change while the task is being authorized,
-          // and all the code for tracking pending tasks can be removed.
-          bool pending = framework->pendingTasks.contains(task.task_id());
-          framework->pendingTasks.erase(task.task_id());
-          slave->pendingTasks[framework->id()].erase(task.task_id());
-          if (slave->pendingTasks[framework->id()].empty()) {
-            slave->pendingTasks.erase(framework->id());
-          }
-
           const Option<Error> authorizationError =
             authorized(ActionObject::taskLaunch(task, framework->info));
 
@@ -4931,7 +4850,7 @@ void Master::_accept(
           }
 
           // Add task.
-          if (pending) {
+          {
             Resources consumed;
 
             bool launchExecutor = true;
@@ -5044,26 +4963,9 @@ void Master::_accept(
       case Offer::Operation::LAUNCH_GROUP: {
         // We must ensure that the entire group can be launched. This
         // means all tasks in the group must be authorized and valid.
-        // If any tasks in the group have been killed in the interim
-        // we must kill the entire group.
         const ExecutorInfo& executor = operation.launch_group().executor();
         const TaskGroupInfo& taskGroup = operation.launch_group().task_group();
 
-        // Remove all the tasks from being pending.
-        hashset<TaskID> killed;
-        foreach (const TaskInfo& task, taskGroup.tasks()) {
-          bool pending = framework->pendingTasks.contains(task.task_id());
-          framework->pendingTasks.erase(task.task_id());
-          slave->pendingTasks[framework->id()].erase(task.task_id());
-          if (slave->pendingTasks[framework->id()].empty()) {
-            slave->pendingTasks.erase(framework->id());
-          }
-
-          if (!pending) {
-            killed.insert(task.task_id());
-          }
-        }
-
         // Note that we do not fill in the `ExecutorInfo.framework_id`
         // since we do not have to support backwards compatibility like
         // in the `Launch` operation case.
@@ -5110,10 +5012,6 @@ void Master::_accept(
 
         if (error.isSome()) {
           CHECK_SOME(reason);
-
-          // NOTE: If some of these invalid or unauthorized tasks were
-          // killed already, here we end up sending a TASK_ERROR after
-          // having already sent TASK_KILLED.
           foreach (const TaskInfo& task, taskGroup.tasks()) {
             const StatusUpdate& update = protobuf::createStatusUpdate(
                 framework->id(),
@@ -5136,39 +5034,6 @@ void Master::_accept(
           continue;
         }
 
-        // If task(s) were killed, send TASK_KILLED for
-        // all of the remaining tasks, since a TaskGroup must
-        // be delivered in its entirety.
-        //
-        // TODO(bmahler): Do this killing when processing
-        // the `Kill` call, rather than doing it here.
-        if (!killed.empty()) {
-          foreach (const TaskInfo& task, taskGroup.tasks()) {
-            if (!killed.contains(task.task_id())) {
-              const StatusUpdate& update = protobuf::createStatusUpdate(
-                  framework->id(),
-                  task.slave_id(),
-                  task.task_id(),
-                  TASK_KILLED,
-                  TaskStatus::SOURCE_MASTER,
-                  None(),
-                  "A task within the task group was killed before"
-                  " delivery to the agent",
-                  TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH);
-
-              metrics->tasks_killed++;
-
-              // TODO(bmahler): Increment the task state source metric,
-              // we currently cannot because it requires each source
-              // requires a reason.
-
-              forward(update, UPID(), framework);
-            }
-          }
-
-          continue;
-        }
-
         // Now launch the task group!
         RunTaskGroupMessage message;
         message.mutable_framework()->CopyFrom(framework->info);
@@ -5558,22 +5423,15 @@ void Master::checkAndTransitionDrainingAgent(Slave* 
slave)
   }
 
   // Check if the agent has any tasks running or operations pending.
-  if (!slave->pendingTasks.empty() ||
-      !slave->tasks.empty() ||
+  if (!slave->tasks.empty() ||
       !slave->operations.empty()) {
     size_t numTasks = 0u;
     foreachvalue (const auto& frameworkTasks, slave->tasks) {
       numTasks += frameworkTasks.size();
     }
 
-    size_t numPendingTasks = 0u;
-    foreachvalue (const auto& frameworkTasks, slave->pendingTasks) {
-      numPendingTasks += frameworkTasks.size();
-    }
-
     VLOG(1)
       << "DRAINING Agent " << slaveId << " has "
-      << numPendingTasks << " pending tasks, "
       << numTasks << " tasks, and "
       << slave->operations.size() << " operations";
     return;
@@ -5751,36 +5609,6 @@ void Master::kill(Framework* framework, const 
scheduler::Call::Kill& kill)
 
   ++metrics->messages_kill_task;
 
-  if (framework->pendingTasks.contains(taskId)) {
-    // Remove from pending tasks.
-    framework->pendingTasks.erase(taskId);
-
-    if (slaveId.isSome()) {
-      Slave* slave = slaves.registered.get(slaveId.get());
-
-      if (slave != nullptr) {
-        slave->pendingTasks[framework->id()].erase(taskId);
-        if (slave->pendingTasks[framework->id()].empty()) {
-          slave->pendingTasks.erase(framework->id());
-        }
-      }
-    }
-
-    const StatusUpdate& update = protobuf::createStatusUpdate(
-        framework->id(),
-        slaveId,
-        taskId,
-        TASK_KILLED,
-        TaskStatus::SOURCE_MASTER,
-        None(),
-        "Killed before delivery to the agent",
-        TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH);
-
-    forward(update, UPID(), framework);
-
-    return;
-  }
-
   Task* task = framework->getTask(taskId);
   if (task == nullptr) {
     LOG(WARNING) << "Cannot kill task " << taskId
@@ -9027,29 +8855,6 @@ void Master::reconcile(
     LOG(INFO) << "Performing implicit task state reconciliation"
                  " for framework " << *framework;
 
-    foreachvalue (const TaskInfo& task, framework->pendingTasks) {
-      StatusUpdate update = protobuf::createStatusUpdate(
-          framework->id(),
-          task.slave_id(),
-          task.task_id(),
-          TASK_STAGING,
-          TaskStatus::SOURCE_MASTER,
-          None(),
-          "Reconciliation: Latest task state",
-          TaskStatus::REASON_RECONCILIATION);
-
-      VLOG(1) << "Sending implicit reconciliation state "
-              << update.status().state()
-              << " for task " << update.status().task_id()
-              << " of framework " << *framework;
-
-      // TODO(bmahler): Consider using forward(); might lead to too
-      // much logging.
-      StatusUpdateMessage message;
-      *message.mutable_update() = std::move(update);
-      framework->send(message);
-    }
-
     foreachvalue (Task* task, framework->tasks) {
       const TaskState& state = task->has_status_update_state()
           ? task->status_update_state()
@@ -9095,20 +8900,19 @@ void Master::reconcile(
             << " of framework " << *framework;
 
   // Explicit reconciliation occurs for the following cases:
-  //   (1) Task is known, but pending: TASK_STAGING.
-  //   (2) Task is known: send the latest state.
-  //   (3) Task is unknown, slave is recovered: no-op.
-  //   (4) Task is unknown, slave is registered: TASK_GONE.
-  //   (5) Task is unknown, slave is unreachable: TASK_UNREACHABLE.
-  //   (6) Task is unknown, slave is gone: TASK_GONE_BY_OPERATOR.
-  //   (7) Task is unknown, slave is unknown: TASK_UNKNOWN.
+  //   (1) Task is known: send the latest state.
+  //   (2) Task is unknown, slave is recovered: no-op.
+  //   (3) Task is unknown, slave is registered: TASK_GONE.
+  //   (4) Task is unknown, slave is unreachable: TASK_UNREACHABLE.
+  //   (5) Task is unknown, slave is gone: TASK_GONE_BY_OPERATOR.
+  //   (6) Task is unknown, slave is unknown: TASK_UNKNOWN.
   //
-  // For case (3), if the slave ID is not provided, we err on the
+  // For case (2), if the slave ID is not provided, we err on the
   // side of caution and do not reply if there are *any* recovered
   // slaves that haven't reregistered, since the task could reside
   // on one of these slaves.
   //
-  // For cases (4), (5), (6) and (7) TASK_LOST is sent instead if the
+  // For cases (3), (4), (5) and (6) TASK_LOST is sent instead if the
   // framework has not opted-in to the PARTITION_AWARE capability.
   foreach (const scheduler::Call::Reconcile::Task& t, reconcile.tasks()) {
     Option<SlaveID> slaveId = None();
@@ -9119,20 +8923,8 @@ void Master::reconcile(
     Option<StatusUpdate> update = None();
     Task* task = framework->getTask(t.task_id());
 
-    if (framework->pendingTasks.contains(t.task_id())) {
-      // (1) Task is known, but pending: TASK_STAGING.
-      const TaskInfo& task_ = framework->pendingTasks[t.task_id()];
-      update = protobuf::createStatusUpdate(
-          framework->id(),
-          task_.slave_id(),
-          task_.task_id(),
-          TASK_STAGING,
-          TaskStatus::SOURCE_MASTER,
-          None(),
-          "Reconciliation: Latest task state",
-          TaskStatus::REASON_RECONCILIATION);
-    } else if (task != nullptr) {
-      // (2) Task is known: send the latest status update state.
+    if (task != nullptr) {
+      // (1) Task is known: send the latest status update state.
       const TaskState& state = task->has_status_update_state()
           ? task->status_update_state()
           : task->state();
@@ -9157,7 +8949,7 @@ void Master::reconcile(
           protobuf::getTaskContainerStatus(*task));
     } else if ((slaveId.isSome() && slaves.recovered.contains(slaveId.get())) 
||
                (slaveId.isNone() && !slaves.recovered.empty())) {
-      // (3) Task is unknown, slave is recovered: no-op. The framework
+      // (2) Task is unknown, slave is recovered: no-op. The framework
       // will have to retry this and will not receive a response until
       // the agent either registers, or is marked unreachable after the
       // timeout.
@@ -9168,7 +8960,7 @@ void Master::reconcile(
                       "some agents have")
                 << " not yet reregistered with the master";
     } else if (slaveId.isSome() && slaves.registered.contains(slaveId.get())) {
-      // (4) Task is unknown, slave is registered: TASK_GONE. If the
+      // (3) Task is unknown, slave is registered: TASK_GONE. If the
       // framework does not have the PARTITION_AWARE capability, send
       // TASK_LOST for backward compatibility.
       TaskState taskState = TASK_GONE;
@@ -9186,7 +8978,7 @@ void Master::reconcile(
           "Reconciliation: Task is unknown to the agent",
           TaskStatus::REASON_RECONCILIATION);
     } else if (slaveId.isSome() && slaves.unreachable.contains(slaveId.get())) 
{
-      // (5) Slave is unreachable: TASK_UNREACHABLE. If the framework
+      // (4) Slave is unreachable: TASK_UNREACHABLE. If the framework
       // does not have the PARTITION_AWARE capability, send TASK_LOST
       // for backward compatibility. In either case, the status update
       // also includes the time when the slave was marked unreachable.
@@ -9213,7 +9005,7 @@ void Master::reconcile(
           None(),
           unreachableTime);
     } else if (slaveId.isSome() && slaves.gone.contains(slaveId.get())) {
-      // (6) Slave is gone: TASK_GONE_BY_OPERATOR. If the framework
+      // (5) Slave is gone: TASK_GONE_BY_OPERATOR. If the framework
       // does not have the PARTITION_AWARE capability, send TASK_LOST
       // for backward compatibility.
       TaskState taskState = TASK_GONE_BY_OPERATOR;
@@ -9231,7 +9023,7 @@ void Master::reconcile(
           "Reconciliation: Task is gone",
           TaskStatus::REASON_RECONCILIATION);
     } else {
-      // (7) Task is unknown, slave is unknown: TASK_UNKNOWN. If the
+      // (6) Task is unknown, slave is unknown: TASK_UNKNOWN. If the
       // framework does not have the PARTITION_AWARE capability, send
       // TASK_LOST for backward compatibility.
       TaskState taskState = TASK_UNKNOWN;
@@ -10504,18 +10296,12 @@ void Master::removeFramework(Framework* framework)
   CHECK(framework->inverseOffers.empty());
 
   foreachvalue (Slave* slave, slaves.registered) {
-    // Remove the pending tasks from the slave.
-    slave->pendingTasks.erase(framework->id());
-
     // Tell slaves to shutdown the framework.
     ShutdownFrameworkMessage message;
     message.mutable_framework_id()->MergeFrom(framework->id());
     send(slave->pid, message);
   }
 
-  // Remove the pending tasks from the framework.
-  framework->pendingTasks.clear();
-
   // Remove pointers to the framework's tasks in slaves and mark those
   // tasks as completed.
   foreachvalue (Task* task, utils::copy(framework->tasks)) {
@@ -11039,9 +10825,6 @@ void Master::_removeSlave(
     }
   }
 
-  // Remove the pending tasks from the slave.
-  slave->pendingTasks.clear();
-
   // Mark the slave as being removed.
   slaves.registered.remove(slave);
   slaves.removed.put(slave->id, Nothing());
@@ -12256,11 +12039,6 @@ double Master::_tasks_staging()
 {
   double count = 0.0;
 
-  // Add the tasks pending validation / authorization.
-  foreachvalue (Framework* framework, frameworks.registered) {
-    count += framework->pendingTasks.size();
-  }
-
   foreachvalue (Slave* slave, slaves.registered) {
     typedef hashmap<TaskID, Task*> TaskMap;
     foreachvalue (const TaskMap& tasks, slave->tasks) {
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 7281815..34ef2f1 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -237,12 +237,6 @@ Slave(Master* const _master,
   // the executors.
   hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo>> executors;
 
-  // Tasks that have not yet been launched because they are currently
-  // being authorized. This is similar to Framework's pendingTasks but we
-  // track pendingTasks per agent separately to determine if any offer
-  // operation for this agent would change resources requested by these tasks.
-  hashmap<FrameworkID, hashmap<TaskID, TaskInfo>> pendingTasks;
-
   // Tasks present on this slave.
   //
   // TODO(bmahler): Make this private to enforce that `addTask()` and
@@ -2570,10 +2564,6 @@ struct Framework
   process::Time reregisteredTime;
   process::Time unregisteredTime;
 
-  // Tasks that have not yet been launched because they are currently
-  // being authorized.
-  hashmap<TaskID, TaskInfo> pendingTasks;
-
   // TODO(bmahler): Make this private to enforce that `addTask()` and
   // `removeTask()` are used, and provide a const view into the tasks.
   hashmap<TaskID, Task*> tasks;
diff --git a/src/master/readonly_handler.cpp b/src/master/readonly_handler.cpp
index f9c0006..e4a3134 100644
--- a/src/master/readonly_handler.cpp
+++ b/src/master/readonly_handler.cpp
@@ -191,47 +191,6 @@ void FullFrameworkWriter::operator()(JSON::ObjectWriter* 
writer) const
 
   // Model all of the tasks associated with a framework.
   writer->field("tasks", [this](JSON::ArrayWriter* writer) {
-    foreachvalue (const TaskInfo& taskInfo, framework_->pendingTasks) {
-      // Skip unauthorized tasks.
-      if (!approvers_->approved<VIEW_TASK>(taskInfo, framework_->info)) {
-        continue;
-      }
-
-      writer->element([this, &taskInfo](JSON::ObjectWriter* writer) {
-        writer->field("id", taskInfo.task_id().value());
-        writer->field("name", taskInfo.name());
-        writer->field("framework_id", framework_->id().value());
-
-        writer->field(
-            "executor_id",
-            taskInfo.executor().executor_id().value());
-
-        writer->field("slave_id", taskInfo.slave_id().value());
-        writer->field("state", TaskState_Name(TASK_STAGING));
-        writer->field("resources", taskInfo.resources());
-
-        // Tasks are not allowed to mix resources allocated to
-        // different roles, see MESOS-6636.
-        writer->field(
-            "role",
-            taskInfo.resources().begin()->allocation_info().role());
-
-        writer->field("statuses", std::initializer_list<TaskStatus>{});
-
-        if (taskInfo.has_labels()) {
-          writer->field("labels", taskInfo.labels());
-        }
-
-        if (taskInfo.has_discovery()) {
-          writer->field("discovery", JSON::Protobuf(taskInfo.discovery()));
-        }
-
-        if (taskInfo.has_container()) {
-          writer->field("container", JSON::Protobuf(taskInfo.container()));
-        }
-      });
-    }
-
     foreachvalue (Task* task, framework_->tasks) {
       // Skip unauthorized tasks.
       if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) {
@@ -511,11 +470,6 @@ public:
     foreachpair (const FrameworkID& frameworkId,
                  const Framework* framework,
                  frameworks) {
-      foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
-        frameworksToSlaves[frameworkId].insert(taskInfo.slave_id());
-        slavesToFrameworks[taskInfo.slave_id()].insert(frameworkId);
-      }
-
       foreachvalue (const Task* task, framework->tasks) {
         frameworksToSlaves[frameworkId].insert(task->slave_id());
         slavesToFrameworks[task->slave_id()].insert(frameworkId);
@@ -632,11 +586,6 @@ public:
     foreachpair (const FrameworkID& frameworkId,
                  const Framework* framework,
                  frameworks) {
-      foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
-        frameworkTaskSummaries[frameworkId].staging++;
-        slaveTaskSummaries[taskInfo.slave_id()].staging++;
-      }
-
       foreachvalue (const Task* task, framework->tasks) {
         frameworkTaskSummaries[frameworkId].count(*task);
         slaveTaskSummaries[task->slave_id()].count(*task);
@@ -1983,26 +1932,6 @@ function<void(JSON::ObjectWriter*)> 
Master::ReadOnlyHandler::jsonifyGetTasks(
 
     int field;
 
-    // Pending tasks.
-    field = v1::master::Response::GetTasks::kPendingTasksFieldNumber;
-    writer->field(
-        descriptor->FindFieldByNumber(field)->name(),
-        [&](JSON::ArrayWriter* writer) {
-          foreach (const Framework* framework, frameworks) {
-            foreachvalue (const TaskInfo& t, framework->pendingTasks) {
-              // Skip unauthorized tasks.
-              if (!approvers->approved<VIEW_TASK>(t, framework->info)) {
-                continue;
-              }
-
-              Task task =
-                  protobuf::createTask(t, TASK_STAGING, framework->id());
-
-              writer->element(asV1Protobuf(task));
-            }
-          }
-        });
-
     // Active tasks.
     field = v1::master::Response::GetTasks::kTasksFieldNumber;
     writer->field(
@@ -2095,26 +2024,6 @@ string Master::ReadOnlyHandler::serializeGetTasks(
   google::protobuf::io::CodedOutputStream writer(&stream);
 
   foreach (const Framework* framework, frameworks) {
-    // Pending tasks.
-    foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
-      // Skip unauthorized tasks.
-      if (!approvers->approved<VIEW_TASK>(taskInfo, framework->info)) {
-        continue;
-      }
-
-      // TODO(bmahler): Consider not constructing the temporary task
-      // object and instead serialize directly. Since we don't expect
-      // a large number of pending tasks, we currently don't bother
-      // with the more efficient approach.
-      //
-      // *getTasks.add_pending_tasks() =
-      //   protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
-      WireFormatLite2::WriteMessageWithoutCachedSizes(
-          mesos::v1::master::Response::GetTasks::kPendingTasksFieldNumber,
-          protobuf::createTask(taskInfo, TASK_STAGING, framework->id()),
-          &writer);
-    }
-
     // Active tasks.
     foreachvalue (const Task* task, framework->tasks) {
       // Skip unauthorized tasks.
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 2f80536..084f281 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -2543,7 +2543,6 @@ Option<Error> validate(
     const Offer::Operation::Destroy& destroy,
     const Resources& checkpointedResources,
     const hashmap<FrameworkID, Resources>& usedResources,
-    const hashmap<FrameworkID, hashmap<TaskID, TaskInfo>>& pendingTasks,
     const Option<FrameworkInfo>& frameworkInfo)
 {
   // The operation can either contain allocated resources
@@ -2601,27 +2600,6 @@ Option<Error> validate(
     }
   }
 
-  // Ensure that the volumes being destroyed are not requested by any pending
-  // task. This check is mainly to validate destruction of shared volumes.
-  // Note that resource requirements in pending tasks are not validated yet
-  // so it is possible that the DESTROY validation fails due to invalid
-  // pending tasks.
-  typedef hashmap<TaskID, TaskInfo> TaskMap;
-  foreachvalue(const TaskMap& tasks, pendingTasks) {
-    foreachvalue (const TaskInfo& task, tasks) {
-      Resources resources = task.resources();
-      if (task.has_executor()) {
-        resources += task.executor().resources();
-      }
-
-      foreach (const Resource& volume, destroy.volumes()) {
-        if (unallocated(resources).contains(volume)) {
-          return Error("Persistent volume in pending tasks");
-        }
-      }
-    }
-  }
-
   return None();
 }
 
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
index b289713..7fe8f08 100644
--- a/src/master/validation.hpp
+++ b/src/master/validation.hpp
@@ -326,7 +326,6 @@ Option<Error> validate(
     const Offer::Operation::Destroy& destroy,
     const Resources& checkpointedResources,
     const hashmap<FrameworkID, Resources>& usedResources,
-    const hashmap<FrameworkID, hashmap<TaskID, TaskInfo>>& pendingTasks,
     const Option<FrameworkInfo>& frameworkInfo = None());
 
 
diff --git a/src/tests/master_validation_tests.cpp 
b/src/tests/master_validation_tests.cpp
index e92ff59..8d5e74e 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -1627,39 +1627,6 @@ TEST_F(DestroyOperationValidationTest, 
SharedPersistentVolumeInUse)
 }
 
 
-// This test verifies that DESTROY for shared persistent volumes is valid
-// when the volumes are not assigned to any pending task.
-TEST_F(DestroyOperationValidationTest, SharedPersistentVolumeInPendingTasks)
-{
-  Resource cpus = Resources::parse("cpus", "1", "*").get();
-  Resource mem = Resources::parse("mem", "5", "*").get();
-  Resource sharedDisk = createDiskResource(
-      "50", "role1", "1", "path1", None(), true); // Shared.
-
-  SlaveID slaveId;
-  slaveId.set_value("S1");
-
-  // Add a task using shared volume as pending tasks.
-  TaskInfo task = createTask(slaveId, sharedDisk, "sleep 1000");
-
-  hashmap<FrameworkID, hashmap<TaskID, TaskInfo>> pendingTasks;
-  FrameworkID frameworkId1;
-  frameworkId1.set_value("id1");
-  pendingTasks[frameworkId1] = {{task.task_id(), task}};
-
-  // Destroy `sharedDisk` which is assigned to `task`.
-  Offer::Operation::Destroy destroy;
-  destroy.add_volumes()->CopyFrom(sharedDisk);
-
-  EXPECT_SOME(operation::validate(destroy, sharedDisk, {}, pendingTasks));
-
-  // Remove all pending tasks.
-  pendingTasks.clear();
-
-  EXPECT_NONE(operation::validate(destroy, sharedDisk, {}, pendingTasks));
-}
-
-
 TEST_F(DestroyOperationValidationTest, UnknownPersistentVolume)
 {
   Resource volume = Resources::parse("disk", "128", "role1").get();

Reply via email to