Some Master cleanups.

Review: https://reviews.apache.org/r/24576


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3ff1180d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3ff1180d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3ff1180d

Branch: refs/heads/master
Commit: 3ff1180d2ee0e1d219335dbad38b7537ecaf9311
Parents: 0e8fa7b
Author: Benjamin Mahler <[email protected]>
Authored: Mon Aug 11 13:44:13 2014 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Wed Aug 13 11:54:23 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 142 ++++++++++++++++-----------------------------
 1 file changed, 51 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3ff1180d/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a8cf9ba..72494b5 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2098,87 +2098,54 @@ void Master::launchTasks(
     return;
   }
 
-  // TODO(bmahler): This case can be caught during offer validation.
-  if (offerIds.empty()) {
-    LOG(WARNING) << "No offers to launch tasks on";
-
-    foreach (const TaskInfo& task, tasks) {
-      const StatusUpdate& update = protobuf::createStatusUpdate(
-          framework->id,
-          task.slave_id(),
-          task.task_id(),
-          TASK_LOST,
-          "Task launched without offers");
-
-      forward(update, UPID(), framework);
-    }
-    return;
-  }
-
-  // Common slave id for task validation.
-  Option<SlaveID> slaveId;
-
-  // Create offer visitors.
-  list<OfferVisitor*> offerVisitors;
-  offerVisitors.push_back(new ValidOfferChecker());
-  offerVisitors.push_back(new FrameworkChecker());
-  offerVisitors.push_back(new SlaveChecker());
-  offerVisitors.push_back(new UniqueOfferIDChecker());
-
-  // Verify and aggregate all offers.
-  // Abort offer and task processing if any offer validation failed.
-  Resources totalResources;
+  // TODO(bmahler): We currently only support using multiple offers
+  // for a single slave.
+  Resources used;
+  Option<SlaveID> slaveId = None();
   Option<Error> error = None();
-  foreach (const OfferID& offerId, offerIds) {
-    foreach (OfferVisitor* visitor, offerVisitors) {
-      error = (*visitor)(offerId, *framework, this);
-      if (error.isSome()) {
-        break;
-      }
-    }
-    // Offer validation error needs to be propagated from visitor
-    // loop above.
-    if (error.isSome()) {
-      break;
-    }
 
-    // If offer validation succeeds, we need to pass along the common
-    // slave. So optimistically, we store the first slave id we see.
-    // In case of invalid offers (different slaves for example), we
-    // report error and return from launchTask before slaveId is used.
-    if (slaveId.isNone()) {
-      slaveId = getOffer(offerId)->slave_id();
+  if (offerIds.empty()) {
+    error = Error("No offers specified");
+  } else {
+    list<Owned<OfferVisitor> > offerVisitors;
+    offerVisitors.push_back(Owned<OfferVisitor>(new ValidOfferChecker()));
+    offerVisitors.push_back(Owned<OfferVisitor>(new FrameworkChecker()));
+    offerVisitors.push_back(Owned<OfferVisitor>(new SlaveChecker()));
+    offerVisitors.push_back(Owned<OfferVisitor>(new UniqueOfferIDChecker()));
+
+    // Validate the offers.
+    foreach (const OfferID& offerId, offerIds) {
+      foreach (const Owned<OfferVisitor>& visitor, offerVisitors) {
+        if (error.isNone()) {
+          error = (*visitor)(offerId, *framework, this);
+        }
+      }
     }
 
-    totalResources += getOffer(offerId)->resources();
-  }
-
-  // Cleanup visitors.
-  while (!offerVisitors.empty()) {
-    OfferVisitor* visitor = offerVisitors.front();
-    offerVisitors.pop_front();
-    delete visitor;
-  };
-
-  // Remove offers and recover resources if any of the offers are
-  // invalid.
-  foreach (const OfferID& offerId, offerIds) {
-    Offer* offer = getOffer(offerId);
-    if (offer != NULL) {
-      if (error.isSome()) {
-        allocator->resourcesRecovered(
-            offer->framework_id(),
-            offer->slave_id(),
-            offer->resources(),
-            None());
+    // Compute used resources and remove the offers. If the
+    // validation failed, return resources to the allocator.
+    foreach (const OfferID& offerId, offerIds) {
+      Offer* offer = getOffer(offerId);
+      if (offer != NULL) {
+        slaveId = offer->slave_id();
+        used += offer->resources();
+
+        if (error.isSome()) {
+          allocator->resourcesRecovered(
+              offer->framework_id(),
+              offer->slave_id(),
+              offer->resources(),
+              None());
+        }
+        removeOffer(offer);
       }
-      removeOffer(offer);
     }
   }
 
+  // If invalid, send TASK_LOST for the launch attempts.
   if (error.isSome()) {
-    LOG(WARNING) << "Failed to validate offer " << stringify(offerIds)
-                   << ": " << error.get().message;
+    LOG(WARNING) << "Launch tasks message used invalid offers '"
+                 << stringify(offerIds) << "': " << error.get().message;
 
     foreach (const TaskInfo& task, tasks) {
       const StatusUpdate& update = protobuf::createStatusUpdate(
@@ -2193,7 +2160,7 @@ void Master::launchTasks(
     return;
   }
 
-  CHECK(slaveId.isSome()) << "Slave id not found";
+  CHECK_SOME(slaveId);
   Slave* slave = CHECK_NOTNULL(getSlave(slaveId.get()));
 
   LOG(INFO) << "Processing reply for offers: "
@@ -2204,7 +2171,7 @@ void Master::launchTasks(
   // Validate each task and launch if valid.
   list<Future<Option<Error> > > futures;
   foreach (const TaskInfo& task, tasks) {
-    futures.push_back(validateTask(task, framework, slave, totalResources));
+    futures.push_back(validateTask(task, framework, slave, used));
 
     // Add to pending tasks.
     // NOTE: We need to do this here after validation because of the
@@ -2221,7 +2188,7 @@ void Master::launchTasks(
                  framework->id,
                  slaveId.get(),
                  tasks,
-                 totalResources,
+                 used,
                  filters,
                  lambda::_1));
 }
@@ -2237,15 +2204,15 @@ Future<Option<Error> > Master::validateTask(
   CHECK_NOTNULL(slave);
 
   // Create task visitors.
-  // TODO(vinod): Create the visitors on the heap and make the visit
+  // TODO(vinod): Create the visitors on the stack and make the visit
   // operation const.
-  list<TaskInfoVisitor*> taskVisitors;
-  taskVisitors.push_back(new TaskIDChecker());
-  taskVisitors.push_back(new SlaveIDChecker());
-  taskVisitors.push_back(new UniqueTaskIDChecker());
-  taskVisitors.push_back(new ResourceUsageChecker());
-  taskVisitors.push_back(new ExecutorInfoChecker());
-  taskVisitors.push_back(new CheckpointChecker());
+  list<Owned<TaskInfoVisitor> > taskVisitors;
+  taskVisitors.push_back(Owned<TaskInfoVisitor>(new TaskIDChecker()));
+  taskVisitors.push_back(Owned<TaskInfoVisitor>(new SlaveIDChecker()));
+  taskVisitors.push_back(Owned<TaskInfoVisitor>(new UniqueTaskIDChecker()));
+  taskVisitors.push_back(Owned<TaskInfoVisitor>(new ResourceUsageChecker()));
+  taskVisitors.push_back(Owned<TaskInfoVisitor>(new ExecutorInfoChecker()));
+  taskVisitors.push_back(Owned<TaskInfoVisitor>(new CheckpointChecker()));
 
   // TODO(benh): Add a HealthCheckChecker visitor.
 
@@ -2253,20 +2220,13 @@ Future<Option<Error> > Master::validateTask(
 
   // Invoke each visitor.
   Option<Error> error = None();
-  foreach (TaskInfoVisitor* visitor, taskVisitors) {
+  foreach (const Owned<TaskInfoVisitor>& visitor, taskVisitors) {
     error = (*visitor)(task, totalResources, *framework, *slave);
     if (error.isSome()) {
       break;
     }
   }
 
-  // Cleanup visitors.
-  while (!taskVisitors.empty()) {
-    TaskInfoVisitor* visitor = taskVisitors.front();
-    taskVisitors.pop_front();
-    delete visitor;
-  };
-
   if (error.isSome()) {
     return Error(error.get().message);
   }

Reply via email to