Repository: mesos
Updated Branches:
  refs/heads/master e021a7b8b -> a06da4e48


Moved all resources validation logic to ResourceUsageChecker.

Review: https://reviews.apache.org/r/27843/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a06da4e4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a06da4e4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a06da4e4

Branch: refs/heads/master
Commit: a06da4e48b58e99bb431bff6d16db502a2342026
Parents: a0d2b58
Author: Jie Yu <[email protected]>
Authored: Mon Nov 10 14:59:01 2014 -0800
Committer: Jie Yu <[email protected]>
Committed: Thu Nov 13 10:45:13 2014 -0800

----------------------------------------------------------------------
 src/master/master.cpp               | 245 ++++++++++++++++---------------
 src/master/master.hpp               |   8 +-
 src/tests/resource_offers_tests.cpp |   3 +-
 3 files changed, 135 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a06da4e4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 00fb3e3..0f89d1f 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1851,37 +1851,40 @@ void Master::resourceRequest(
 
 // We use the visitor pattern to abstract the process of performing
 // any validations, aggregations, etc. of tasks that a framework
-// attempts to run within the resources provided by offers. A
-// visitor can return an optional error (typedef'ed as an option of a
-// string) which will cause the master to send a failed status update
-// back to the framework for only that task description. An instance
-// will be reused for each task description from same 'launchTasks()',
-// but not for task descriptions from different offers.
+// attempts to run within the resources provided by offers. A visitor
+// can return an optional error (typedef'ed as an option of a string)
+// which will cause the master to send a failed status update back to
+// the framework for only that task description. An instance will be
+// reused for each task description from same 'launchTasks()', but not
+// for task descriptions from different offers.
 struct TaskInfoVisitor
 {
   virtual Option<Error> operator () (
       const TaskInfo& task,
-      const Resources& resources,
       const Framework& framework,
-      const Slave& slave) = 0;
+      const Slave& slave,
+      const Resources& totalResources,
+      const Resources& usedResources) = 0;
 
   virtual ~TaskInfoVisitor() {}
 };
 
 
-// Checks that a task id is valid, i.e., contains only valid characters.
+// Checks that a task id is valid, i.e., contains only valid
+// characters.
 struct TaskIDChecker : TaskInfoVisitor
 {
   virtual Option<Error> operator () (
       const TaskInfo& task,
-      const Resources& resources,
       const Framework& framework,
-      const Slave& slave)
+      const Slave& slave,
+      const Resources& totalResources,
+      const Resources& usedResources)
   {
     const string& id = task.task_id().value();
 
     if (std::count_if(id.begin(), id.end(), invalid) > 0) {
-      return "TaskID '" + id + "' contains invalid characters";
+      return Error("TaskID '" + id + "' contains invalid characters");
     }
 
     return None();
@@ -1899,13 +1902,15 @@ struct SlaveIDChecker : TaskInfoVisitor
 {
   virtual Option<Error> operator () (
       const TaskInfo& task,
-      const Resources& resources,
       const Framework& framework,
-      const Slave& slave)
+      const Slave& slave,
+      const Resources& totalResources,
+      const Resources& usedResources)
   {
     if (!(task.slave_id() == slave.id)) {
-      return "Task uses invalid slave " + task.slave_id().value() +
-             " while slave " + slave.id.value() + " is expected";
+      return Error(
+          "Task uses invalid slave " + task.slave_id().value() +
+          " while slave " + slave.id.value() + " is expected");
     }
 
     return None();
@@ -1921,15 +1926,17 @@ struct UniqueTaskIDChecker : TaskInfoVisitor
 {
   virtual Option<Error> operator () (
       const TaskInfo& task,
-      const Resources& resources,
       const Framework& framework,
-      const Slave& slave)
+      const Slave& slave,
+      const Resources& totalResources,
+      const Resources& usedResources)
   {
     const TaskID& taskId = task.task_id();
 
     if (framework.tasks.contains(taskId)) {
-      return "Task has duplicate ID: " + taskId.value();
+      return Error("Task has duplicate ID: " + taskId.value());
     }
+
     return None();
   }
 };
@@ -1945,43 +1952,38 @@ struct ResourceUsageChecker : TaskInfoVisitor
 {
   virtual Option<Error> operator () (
       const TaskInfo& task,
-      const Resources& resources,
       const Framework& framework,
-      const Slave& slave)
+      const Slave& slave,
+      const Resources& totalResources,
+      const Resources& usedResources)
   {
     if (task.resources().size() == 0) {
-      return stringify("Task uses no resources");
+      return Error("Task uses no resources");
     }
 
     foreach (const Resource& resource, task.resources()) {
       if (!Resources::isAllocatable(resource)) {
-        return "Task uses invalid resources: " + stringify(resource);
+        return Error("Task uses invalid resources: " + stringify(resource));
       }
     }
 
-    // Check if this task uses more resources than offered.
-    const Resources& taskResources = task.resources();
-
-    if (!(taskResources <= resources)) {
-      return "Task " + stringify(task.task_id()) + " attempted to use " +
-             stringify(taskResources) + " which is greater than offered " +
-             stringify(resources);
-    }
-
     // Check this task's executor's resources.
-    if (task.has_executor()) {
-      const Resources& executorResources = task.executor().resources();
+    Resources executorResources;
 
-      foreach (const Resource& resource, executorResources) {
+    if (task.has_executor()) {
+      foreach (const Resource& resource, task.executor().resources()) {
         if (!Resources::isAllocatable(resource)) {
           // TODO(benh): Send back the invalid resources?
-          return "Executor for task " + stringify(task.task_id()) +
-                 " uses invalid resources " + stringify(resource);
+          return Error(
+              "Executor for task " + stringify(task.task_id()) +
+              " uses invalid resources " + stringify(resource));
         }
       }
 
-      // Check minimal cpus and memory resources of executor
-      // and log warnings if not set.
+      executorResources = task.executor().resources();
+
+      // Check minimal cpus and memory resources of executor and log
+      // warnings if not set.
       // TODO(martin): MESOS-1807. Return Error instead of logging a
       // warning in 0.22.0.
       Option<double> cpus =  executorResources.cpus();
@@ -1995,6 +1997,7 @@ struct ResourceUsageChecker : TaskInfoVisitor
           << "). Please update your executor, as this will be mandatory "
           << "in future releases.";
       }
+
       Option<Bytes> mem = executorResources.mem();
       if (mem.isNone() || mem.get() < MIN_MEM) {
         LOG(WARNING)
@@ -2008,6 +2011,20 @@ struct ResourceUsageChecker : TaskInfoVisitor
       }
     }
 
+    // Check if resources needed by the task (and its executor in case
+    // the executor is new) are available.
+    Resources resources = task.resources();
+
+    if (!slave.hasExecutor(framework.id, task.executor().executor_id())) {
+      resources += task.executor().resources();
+    }
+
+    if (!(resources + usedResources <= totalResources)) {
+      return Error(
+          "Task uses more resources " + stringify(resources) +
+          " than available " + stringify(totalResources - usedResources));
+    }
+
     return None();
   }
 };
@@ -2019,33 +2036,35 @@ struct ExecutorInfoChecker : TaskInfoVisitor
 {
   virtual Option<Error> operator () (
       const TaskInfo& task,
-      const Resources& resources,
       const Framework& framework,
-      const Slave& slave)
+      const Slave& slave,
+      const Resources& totalResources,
+      const Resources& usedResources)
   {
     if (task.has_executor() == task.has_command()) {
-      return stringify(
-          "Task should have at least one (but not both) of CommandInfo or"
-          " ExecutorInfo present");
+      return Error(
+          "Task should have at least one (but not both) of CommandInfo or "
+          "ExecutorInfo present");
     }
 
     if (task.has_executor()) {
-      // The master currently expects ExecutorInfo.framework_id
-      // to be set even though it is an optional field.
-      // Currently, the scheduler driver ensures that the field
-      // is set. For schedulers not using the driver, we need to
-      // do the validation here.
+      // The master currently expects ExecutorInfo.framework_id to be
+      // set even though it is an optional field. Currently, the
+      // scheduler driver ensures that the field is set. For
+      // schedulers not using the driver, we need to do the validation
+      // here.
       // TODO(bmahler): Set this field in the master instead of
       // depending on the scheduler driver do it.
       if (!task.executor().has_framework_id()) {
-        return stringify(
+        return Error(
             "Task has invalid ExecutorInfo: missing field 'framework_id'");
       }
 
       if (!(task.executor().framework_id() == framework.id)) {
-        return string("ExecutorInfo has an invalid FrameworkID") +
-               " (Actual: " + stringify(task.executor().framework_id()) +
-               " vs Expected: " + stringify(framework.id) + ")";
+        return Error(
+            "ExecutorInfo has an invalid FrameworkID"
+            " (Actual: " + stringify(task.executor().framework_id()) +
+            " vs Expected: " + stringify(framework.id) + ")");
       }
 
       const ExecutorID& executorId = task.executor().executor_id();
@@ -2056,15 +2075,16 @@ struct ExecutorInfoChecker : TaskInfoVisitor
       }
 
       if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) {
-        return "Task has invalid ExecutorInfo (existing ExecutorInfo"
-               " with same ExecutorID is not compatible).\n"
-               "------------------------------------------------------------\n"
-               "Existing ExecutorInfo:\n" +
-               stringify(executorInfo.get()) + "\n"
-               "------------------------------------------------------------\n"
-               "Task's ExecutorInfo:\n" +
-               stringify(task.executor()) + "\n"
-               
"------------------------------------------------------------\n";
+        return Error(
+            "Task has invalid ExecutorInfo (existing ExecutorInfo"
+            " with same ExecutorID is not compatible).\n"
+            "------------------------------------------------------------\n"
+            "Existing ExecutorInfo:\n" +
+            stringify(executorInfo.get()) + "\n"
+            "------------------------------------------------------------\n"
+            "Task's ExecutorInfo:\n" +
+            stringify(task.executor()) + "\n"
+            "------------------------------------------------------------\n");
       }
     }
 
@@ -2079,21 +2099,24 @@ struct CheckpointChecker : TaskInfoVisitor
 {
   virtual Option<Error> operator () (
       const TaskInfo& task,
-      const Resources& resources,
       const Framework& framework,
-      const Slave& slave)
+      const Slave& slave,
+      const Resources& totalResources,
+      const Resources& usedResources)
   {
     if (framework.info.checkpoint() && !slave.info.checkpoint()) {
-      return "Task asked to be checkpointed but slave " +
-          stringify(slave.id) + " has checkpointing disabled";
+      return Error(
+          "Task asked to be checkpointed but slave " +
+          stringify(slave.id) + " has checkpointing disabled");
     }
+
     return None();
   }
 };
 
 
-// OfferVisitors are similar to the TaskInfoVisitor pattern and
-// are used for validation and aggregation of offers.
+// OfferVisitors are similar to the TaskInfoVisitor pattern and are
+// used for validation and aggregation of offers.
 // The error reporting scheme is also similar to TaskInfoVisitor.
 // However, offer processing (and subsequent task processing) is
 // aborted altogether if offer visitor reports an error.
@@ -2150,9 +2173,10 @@ struct FrameworkChecker : OfferVisitor {
     }
 
     if (!(framework.id == offer->framework_id())) {
-      return "Offer " + stringify(offer->id()) +
+      return Error(
+          "Offer " + stringify(offer->id()) +
           " has invalid framework " + stringify(offer->framework_id()) +
-          " while framework " + stringify(framework.id) + " is expected";
+          " while framework " + stringify(framework.id) + " is expected");
     }
 
     return None();
@@ -2160,8 +2184,8 @@ struct FrameworkChecker : OfferVisitor {
 };
 
 
-// Checks that the slave is valid and ensures that all offers belong to
-// the same slave.
+// Checks that the slave is valid and ensures that all offers belong
+// to the same slave.
 struct SlaveChecker : OfferVisitor
 {
   virtual Option<Error> operator () (
@@ -2171,7 +2195,7 @@ struct SlaveChecker : OfferVisitor
   {
     Offer* offer = getOffer(master, offerId);
     if (offer == NULL) {
-      return "Offer " + stringify(offerId) + " is no longer valid";
+      return Error("Offer " + stringify(offerId) + " is no longer valid");
     }
 
     Slave* slave = getSlave(master, offer->slave_id());
@@ -2190,10 +2214,11 @@ struct SlaveChecker : OfferVisitor
       // Set slave id and use as base case for validation.
       slaveId = slave->id;
     } else if (!(slave->id == slaveId.get())) {
-      return "Aggregated offers must belong to one single slave. Offer " +
+      return Error(
+          "Aggregated offers must belong to one single slave. Offer " +
           stringify(offerId) + " uses slave " +
           stringify(slave->id) + " and slave " +
-          stringify(slaveId.get());
+          stringify(slaveId.get()));
     }
 
     return None();
@@ -2212,7 +2237,7 @@ struct UniqueOfferIDChecker : OfferVisitor
       Master* master)
   {
     if (offers.contains(offerId)) {
-      return "Duplicate offer " + stringify(offerId) + " in offer list";
+      return Error("Duplicate offer " + stringify(offerId) + " in offer list");
     }
     offers.insert(offerId);
 
@@ -2243,6 +2268,7 @@ void Master::launchTasks(
       << "Ignoring launch tasks message for offers " << stringify(offerIds)
       << " of framework " << frameworkId
       << " because the framework cannot be found";
+
     return;
   }
 
@@ -2252,6 +2278,7 @@ void Master::launchTasks(
       << " of framework " << frameworkId << " from '" << from
       << "' because it is not from the registered framework '"
       << framework->pid << "'";
+
     return;
   }
 
@@ -2319,6 +2346,7 @@ void Master::launchTasks(
 
       forward(update, UPID(), framework);
     }
+
     return;
   }
 
@@ -2365,21 +2393,25 @@ Option<Error> Master::validateTask(
     const TaskInfo& task,
     Framework* framework,
     Slave* slave,
-    const Resources& totalResources)
+    const Resources& totalResources,
+    const Resources& usedResources)
 {
   CHECK_NOTNULL(framework);
   CHECK_NOTNULL(slave);
 
-  // Create task visitors.
+  // Create task visitors. The order in which the following checkers
+  // are executed does matter! For example, ResourceUsageChecker
+  // assumes that ExecutorInfo is valid which is verified by
+  // ExecutorInfoChecker.
   // TODO(vinod): Create the visitors on the stack and make the visit
   // operation const.
   list<Owned<TaskInfoVisitor>> taskVisitors;
   taskVisitors.push_back(Owned<TaskInfoVisitor>(new TaskIDChecker()));
   taskVisitors.push_back(Owned<TaskInfoVisitor>(new SlaveIDChecker()));
   taskVisitors.push_back(Owned<TaskInfoVisitor>(new UniqueTaskIDChecker()));
-  taskVisitors.push_back(Owned<TaskInfoVisitor>(new ResourceUsageChecker()));
-  taskVisitors.push_back(Owned<TaskInfoVisitor>(new ExecutorInfoChecker()));
   taskVisitors.push_back(Owned<TaskInfoVisitor>(new CheckpointChecker()));
+  taskVisitors.push_back(Owned<TaskInfoVisitor>(new ExecutorInfoChecker()));
+  taskVisitors.push_back(Owned<TaskInfoVisitor>(new ResourceUsageChecker()));
 
   // TODO(benh): Add a HealthCheckChecker visitor.
 
@@ -2388,7 +2420,7 @@ Option<Error> Master::validateTask(
   // Invoke each visitor.
   Option<Error> error = None();
   foreach (const Owned<TaskInfoVisitor>& visitor, taskVisitors) {
-    error = (*visitor)(task, totalResources, *framework, *slave);
+    error = (*visitor)(task, *framework, *slave, totalResources, 
usedResources);
     if (error.isSome()) {
       break;
     }
@@ -2436,7 +2468,7 @@ Future<bool> Master::authorizeTask(
 }
 
 
-void Master::launchTask(
+Resources Master::launchTask(
     const TaskInfo& task,
     Framework* framework,
     Slave* slave)
@@ -2446,6 +2478,9 @@ void Master::launchTask(
   CHECK(slave->connected) << "Launching task " << task.task_id()
                           << " on disconnected slave " << *slave;
 
+  // The resources consumed.
+  Resources resources = task.resources();
+
   // Determine if this task launches an executor, and if so make sure
   // the slave and framework state has been updated accordingly.
   Option<ExecutorID> executorId;
@@ -2460,6 +2495,8 @@ void Master::launchTask(
 
       slave->addExecutor(framework->id, task.executor());
       framework->addExecutor(slave->id, task.executor());
+
+      resources += task.executor().resources();
     }
 
     executorId = task.executor().executor_id();
@@ -2494,7 +2531,7 @@ void Master::launchTask(
   message.mutable_task()->MergeFrom(task);
   send(slave->pid, message);
 
-  return;
+  return resources;
 }
 
 
@@ -2593,8 +2630,12 @@ void Master::_launchTasks(
     }
 
     // Validate the task.
-    const Option<Error>& validation =
-      validateTask(task, framework, slave, totalResources);
+    const Option<Error>& validation = validateTask(
+        task,
+        framework,
+        slave,
+        totalResources,
+        usedResources);
 
     if (validation.isSome()) {
       const StatusUpdate& update = protobuf::createStatusUpdate(
@@ -2614,40 +2655,8 @@ void Master::_launchTasks(
       continue;
     }
 
-    // Check if resources needed by the task (and its executor in case
-    // the executor is new) are available. These resources will be
-    // added by 'launchTask()' below.
-    Resources resources = task.resources();
-    if (task.has_executor() &&
-        !slave->hasExecutor(framework->id, task.executor().executor_id())) {
-      resources += task.executor().resources();
-    }
-
-    if (!(usedResources + resources <= totalResources)) {
-      const string error =
-        "Task uses more resources " + stringify(resources) +
-        " than available " + stringify(totalResources - usedResources);
-
-      const StatusUpdate& update = protobuf::createStatusUpdate(
-          framework->id,
-          task.slave_id(),
-          task.task_id(),
-          TASK_LOST,
-          TaskStatus::SOURCE_MASTER,
-          error,
-          TaskStatus::REASON_TASK_INVALID);
-
-      metrics.tasks_lost++;
-      stats.tasks[TASK_LOST]++;
-
-      forward(update, UPID(), framework);
-
-      continue;
-    }
-
     // Launch task.
-    launchTask(task, framework, slave);
-    usedResources += resources;
+    usedResources += launchTask(task, framework, slave);
   }
 
   // All used resources should be allocatable, enforced by our validators.

http://git-wip-us.apache.org/repos/asf/mesos/blob/a06da4e4/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index a5e8e08..47f3bc9 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -350,7 +350,8 @@ protected:
       const TaskInfo& task,
       Framework* framework,
       Slave* slave,
-      const Resources& totalResources);
+      const Resources& totalResources,
+      const Resources& usedResources);
 
   // Authorizes the task.
   // Returns true if task is authorized.
@@ -361,7 +362,10 @@ protected:
       Framework* framework);
 
   // Launch a task from a task description.
-  void launchTask(const TaskInfo& task, Framework* framework, Slave* slave);
+  Resources launchTask(
+      const TaskInfo& task,
+      Framework* framework,
+      Slave* slave);
 
   // 'launchTasks()' continuation.
   void _launchTasks(

http://git-wip-us.apache.org/repos/asf/mesos/blob/a06da4e4/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp 
b/src/tests/resource_offers_tests.cpp
index 3c78a21..21cb5ad 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -354,8 +354,9 @@ TEST_F(ResourceOffersTest, TaskUsesMoreResourcesThanOffered)
   EXPECT_EQ(TASK_ERROR, status.get().state());
   EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason());
   EXPECT_TRUE(status.get().has_message());
+
   EXPECT_TRUE(strings::contains(
-      status.get().message(), "greater than offered"));
+      status.get().message(), "Task uses more resources"));
 
   driver.stop();
   driver.join();

Reply via email to