Repository: mesos Updated Branches: refs/heads/master 3da05548f -> df5b2cfc1
Added support for CREATE operation in master. Review: https://reviews.apache.org/r/30386 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/df5b2cfc Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/df5b2cfc Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/df5b2cfc Branch: refs/heads/master Commit: df5b2cfc184039adb9c32ee54ca5c9bcc8245de9 Parents: 3da0554 Author: Jie Yu <[email protected]> Authored: Wed Jan 28 16:11:12 2015 -0800 Committer: Jie Yu <[email protected]> Committed: Wed Feb 4 14:01:29 2015 -0800 ---------------------------------------------------------------------- src/master/master.cpp | 113 +++++++++++++++++++++++++++++++++++++++++---- src/master/master.hpp | 13 ++++++ 2 files changed, 117 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/df5b2cfc/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 69b945d..f4b6463 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1339,6 +1339,23 @@ void Master::drop( } +void Master::drop( + Framework* framework, + const Offer::Operation& operation, + const string& message) +{ + // TODO(jieyu): Increment a metric. + + // NOTE: There is no direct feedback to the framework when an + // operation is dropped. The framework will find out that the + // operation was dropped through subsequent offers. + + LOG(ERROR) << "Dropping " << Offer::Operation::Type_Name(operation.type()) + << " offer operation from framework " << *framework + << ": " << message; +} + + void Master::receive( const UPID& from, const scheduler::Call& call) @@ -2237,29 +2254,46 @@ void Master::_accept( switch (operation.type()) { case Offer::Operation::RESERVE: { // TODO(jieyu): Provide implementation for RESERVE. - LOG(ERROR) << "Unsupported offer operation: " - << Offer::Operation::Type_Name(operation.type()); + drop(framework, operation, "Unimplemented"); break; } case Offer::Operation::UNRESERVE: { // TODO(jieyu): Provide implementation for UNRESERVE. - LOG(ERROR) << "Unsupported offer operation: " - << Offer::Operation::Type_Name(operation.type()); + drop(framework, operation, "Unimplemented"); break; } case Offer::Operation::CREATE: { - // TODO(jieyu): Provide implementation for CREATE. - LOG(ERROR) << "Unsupported offer operation: " - << Offer::Operation::Type_Name(operation.type()); + Option<Error> error = validation::operation::validate( + operation.create(), + slave->checkpointedResources); + + if (error.isSome()) { + drop(framework, operation, error.get().message); + continue; + } + + Try<Resources> resources = _offeredResources.apply(operation); + if (resources.isError()) { + drop(framework, operation, resources.error()); + continue; + } + + _offeredResources = resources.get(); + + allocator->updateAllocation( + frameworkId, + slaveId, + {operation}); + + updateCheckpointedResources(slave, operation); break; } case Offer::Operation::DESTROY: { // TODO(jieyu): Provide implementation for DESTROY. - LOG(ERROR) << "Unsupported offer operation: " - << Offer::Operation::Type_Name(operation.type()); + drop(framework, operation, "Unimplemented"); break; } @@ -4568,6 +4602,67 @@ void Master::removeOffer(Offer* offer, bool rescind) } +void Master::updateCheckpointedResources( + Slave* slave, + const Offer::Operation& operation) +{ + CHECK_NOTNULL(slave); + + switch (operation.type()) { + case Offer::Operation::RESERVE: { + // TODO(jieyu): Provide implementation. + LOG(ERROR) << "Failed to update checkpointed resources for slave " + << *slave << ": Unimplemented RESERVE operation"; + break; + } + + case Offer::Operation::UNRESERVE: { + // TODO(jieyu): Provide implementation. + LOG(ERROR) << "Failed to update checkpointed resources for slave " + << *slave << ": Unimplemented UNRESERVE operation"; + break; + } + + case Offer::Operation::CREATE: { + // If the CREATE operation applies to resources that have + // already been checkpointed on the slave, that means the + // volumes are created from dynamically reserved resources and + // we need to update the checkpointed resources. Otherwise, the + // volumes are created from regular resources and we need to add + // them to the checkpointed resources. + Try<Resources> resources = slave->checkpointedResources.apply(operation); + if (resources.isError()) { + slave->checkpointedResources += operation.create().volumes(); + } else { + slave->checkpointedResources = resources.get(); + } + break; + } + + case Offer::Operation::DESTROY: { + // TODO(jieyu): Provide implementation. + LOG(ERROR) << "Failed to update checkpointed resources for slave " + << *slave << ": Unimplemented DESTROY operation"; + break; + } + + default: + LOG(FATAL) << "Not expecting operation " << operation.type() + << " when updating slave checkpointed resources"; + break; + } + + LOG(INFO) << "Sending checkpointed resources " + << slave->checkpointedResources + << " to slave " << *slave; + + CheckpointResourcesMessage message; + message.mutable_resources()->CopyFrom(slave->checkpointedResources); + + send(slave->pid, message); +} + + // TODO(bmahler): Consider killing this. Framework* Master::getFramework(const FrameworkID& frameworkId) { http://git-wip-us.apache.org/repos/asf/mesos/blob/df5b2cfc/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index cd37ee9..9d8c508 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -390,6 +390,14 @@ protected: const FrameworkID& frameworkId, const ExecutorID& executorId); + // Certain offer operations (e.g., RESERVE, CREATE) may result in a + // change to the checkpointed resources on the slave. This method + // updates the checkpointed resources for the slave and sends + // CheckpointResourcesMessage to the slave accordingly. + void updateCheckpointedResources( + Slave* slave, + const Offer::Operation& operation); + // Forwards the update to the framework. void forward( const StatusUpdate& update, @@ -418,6 +426,11 @@ private: const scheduler::Call& call, const std::string& message); + void drop( + Framework* framework, + const Offer::Operation& operation, + const std::string& message); + // Call handlers. void receive( const process::UPID& from,
