Some Master cleanups. Review: https://reviews.apache.org/r/24576
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3ff1180d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3ff1180d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3ff1180d Branch: refs/heads/master Commit: 3ff1180d2ee0e1d219335dbad38b7537ecaf9311 Parents: 0e8fa7b Author: Benjamin Mahler <[email protected]> Authored: Mon Aug 11 13:44:13 2014 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Wed Aug 13 11:54:23 2014 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 142 ++++++++++++++++----------------------------- 1 file changed, 51 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3ff1180d/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index a8cf9ba..72494b5 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2098,87 +2098,54 @@ void Master::launchTasks( return; } - // TODO(bmahler): This case can be caught during offer validation. - if (offerIds.empty()) { - LOG(WARNING) << "No offers to launch tasks on"; - - foreach (const TaskInfo& task, tasks) { - const StatusUpdate& update = protobuf::createStatusUpdate( - framework->id, - task.slave_id(), - task.task_id(), - TASK_LOST, - "Task launched without offers"); - - forward(update, UPID(), framework); - } - return; - } - - // Common slave id for task validation. - Option<SlaveID> slaveId; - - // Create offer visitors. - list<OfferVisitor*> offerVisitors; - offerVisitors.push_back(new ValidOfferChecker()); - offerVisitors.push_back(new FrameworkChecker()); - offerVisitors.push_back(new SlaveChecker()); - offerVisitors.push_back(new UniqueOfferIDChecker()); - - // Verify and aggregate all offers. - // Abort offer and task processing if any offer validation failed. - Resources totalResources; + // TODO(bmahler): We currently only support using multiple offers + // for a single slave. + Resources used; + Option<SlaveID> slaveId = None(); Option<Error> error = None(); - foreach (const OfferID& offerId, offerIds) { - foreach (OfferVisitor* visitor, offerVisitors) { - error = (*visitor)(offerId, *framework, this); - if (error.isSome()) { - break; - } - } - // Offer validation error needs to be propagated from visitor - // loop above. - if (error.isSome()) { - break; - } - // If offer validation succeeds, we need to pass along the common - // slave. So optimistically, we store the first slave id we see. - // In case of invalid offers (different slaves for example), we - // report error and return from launchTask before slaveId is used. - if (slaveId.isNone()) { - slaveId = getOffer(offerId)->slave_id(); + if (offerIds.empty()) { + error = Error("No offers specified"); + } else { + list<Owned<OfferVisitor> > offerVisitors; + offerVisitors.push_back(Owned<OfferVisitor>(new ValidOfferChecker())); + offerVisitors.push_back(Owned<OfferVisitor>(new FrameworkChecker())); + offerVisitors.push_back(Owned<OfferVisitor>(new SlaveChecker())); + offerVisitors.push_back(Owned<OfferVisitor>(new UniqueOfferIDChecker())); + + // Validate the offers. + foreach (const OfferID& offerId, offerIds) { + foreach (const Owned<OfferVisitor>& visitor, offerVisitors) { + if (error.isNone()) { + error = (*visitor)(offerId, *framework, this); + } + } } - totalResources += getOffer(offerId)->resources(); - } - - // Cleanup visitors. - while (!offerVisitors.empty()) { - OfferVisitor* visitor = offerVisitors.front(); - offerVisitors.pop_front(); - delete visitor; - }; - - // Remove offers and recover resources if any of the offers are - // invalid. - foreach (const OfferID& offerId, offerIds) { - Offer* offer = getOffer(offerId); - if (offer != NULL) { - if (error.isSome()) { - allocator->resourcesRecovered( - offer->framework_id(), - offer->slave_id(), - offer->resources(), - None()); + // Compute used resources and remove the offers. If the + // validation failed, return resources to the allocator. + foreach (const OfferID& offerId, offerIds) { + Offer* offer = getOffer(offerId); + if (offer != NULL) { + slaveId = offer->slave_id(); + used += offer->resources(); + + if (error.isSome()) { + allocator->resourcesRecovered( + offer->framework_id(), + offer->slave_id(), + offer->resources(), + None()); + } + removeOffer(offer); } - removeOffer(offer); } } + // If invalid, send TASK_LOST for the launch attempts. if (error.isSome()) { - LOG(WARNING) << "Failed to validate offer " << stringify(offerIds) - << ": " << error.get().message; + LOG(WARNING) << "Launch tasks message used invalid offers '" + << stringify(offerIds) << "': " << error.get().message; foreach (const TaskInfo& task, tasks) { const StatusUpdate& update = protobuf::createStatusUpdate( @@ -2193,7 +2160,7 @@ void Master::launchTasks( return; } - CHECK(slaveId.isSome()) << "Slave id not found"; + CHECK_SOME(slaveId); Slave* slave = CHECK_NOTNULL(getSlave(slaveId.get())); LOG(INFO) << "Processing reply for offers: " @@ -2204,7 +2171,7 @@ void Master::launchTasks( // Validate each task and launch if valid. list<Future<Option<Error> > > futures; foreach (const TaskInfo& task, tasks) { - futures.push_back(validateTask(task, framework, slave, totalResources)); + futures.push_back(validateTask(task, framework, slave, used)); // Add to pending tasks. // NOTE: We need to do this here after validation because of the @@ -2221,7 +2188,7 @@ void Master::launchTasks( framework->id, slaveId.get(), tasks, - totalResources, + used, filters, lambda::_1)); } @@ -2237,15 +2204,15 @@ Future<Option<Error> > Master::validateTask( CHECK_NOTNULL(slave); // Create task visitors. - // TODO(vinod): Create the visitors on the heap and make the visit + // TODO(vinod): Create the visitors on the stack and make the visit // operation const. - list<TaskInfoVisitor*> taskVisitors; - taskVisitors.push_back(new TaskIDChecker()); - taskVisitors.push_back(new SlaveIDChecker()); - taskVisitors.push_back(new UniqueTaskIDChecker()); - taskVisitors.push_back(new ResourceUsageChecker()); - taskVisitors.push_back(new ExecutorInfoChecker()); - taskVisitors.push_back(new CheckpointChecker()); + list<Owned<TaskInfoVisitor> > taskVisitors; + taskVisitors.push_back(Owned<TaskInfoVisitor>(new TaskIDChecker())); + taskVisitors.push_back(Owned<TaskInfoVisitor>(new SlaveIDChecker())); + taskVisitors.push_back(Owned<TaskInfoVisitor>(new UniqueTaskIDChecker())); + taskVisitors.push_back(Owned<TaskInfoVisitor>(new ResourceUsageChecker())); + taskVisitors.push_back(Owned<TaskInfoVisitor>(new ExecutorInfoChecker())); + taskVisitors.push_back(Owned<TaskInfoVisitor>(new CheckpointChecker())); // TODO(benh): Add a HealthCheckChecker visitor. @@ -2253,20 +2220,13 @@ Future<Option<Error> > Master::validateTask( // Invoke each visitor. Option<Error> error = None(); - foreach (TaskInfoVisitor* visitor, taskVisitors) { + foreach (const Owned<TaskInfoVisitor>& visitor, taskVisitors) { error = (*visitor)(task, totalResources, *framework, *slave); if (error.isSome()) { break; } } - // Cleanup visitors. - while (!taskVisitors.empty()) { - TaskInfoVisitor* visitor = taskVisitors.front(); - taskVisitors.pop_front(); - delete visitor; - }; - if (error.isSome()) { return Error(error.get().message); }
