Added /reserve HTTP endpoint to the master. Review: https://reviews.apache.org/r/35702
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/57a7e7d0 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/57a7e7d0 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/57a7e7d0 Branch: refs/heads/master Commit: 57a7e7d0283aa08455d6572ade75a11d914c6962 Parents: e758d24 Author: Michael Park <[email protected]> Authored: Wed Aug 5 00:04:03 2015 -0700 Committer: Michael Park <[email protected]> Committed: Wed Sep 9 15:28:29 2015 -0700 ---------------------------------------------------------------------- src/master/http.cpp | 126 +++++++++++++++++++++++++++++++++++++++++ src/master/master.cpp | 36 +++++++++--- src/master/master.hpp | 24 ++++++-- src/master/validation.cpp | 14 ++--- src/master/validation.hpp | 2 +- 5 files changed, 181 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/57a7e7d0/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index 94e97a2..bcf7f93 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -78,6 +78,7 @@ using process::USAGE; using process::http::Accepted; using process::http::BadRequest; +using process::http::Conflict; using process::http::Forbidden; using process::http::InternalServerError; using process::http::MethodNotAllowed; @@ -672,6 +673,131 @@ Future<Response> Master::Http::redirect(const Request& request) const } +Future<Response> Master::Http::reserve(const Request& request) const +{ + if (request.method != "POST") { + return BadRequest("Expecting POST"); + } + + // Parse the query string in the request body. + Try<hashmap<string, string>> decode = + process::http::query::decode(request.body); + + if (decode.isError()) { + return BadRequest("Unable to decode query string: " + decode.error()); + } + + const hashmap<string, string>& values = decode.get(); + + if (values.get("slaveId").isNone()) { + return BadRequest("Missing 'slaveId' query parameter"); + } + + SlaveID slaveId; + slaveId.set_value(values.get("slaveId").get()); + + Slave* slave = master->slaves.registered.get(slaveId); + if (slave == NULL) { + return BadRequest("No slave found with specified ID"); + } + + if (values.get("resources").isNone()) { + return BadRequest("Missing 'resources' query parameter"); + } + + Try<JSON::Array> parse = + JSON::parse<JSON::Array>(values.get("resources").get()); + + if (parse.isError()) { + return BadRequest( + "Error in parsing 'resources' query parameter: " + parse.error()); + } + + Resources resources; + foreach (const JSON::Value& value, parse.get().values) { + Try<Resource> resource = ::protobuf::parse<Resource>(value); + if (resource.isError()) { + return BadRequest( + "Error in parsing 'resources' query parameter: " + resource.error()); + } + resources += resource.get(); + } + + Result<Credential> credential = authenticate(request); + if (credential.isError()) { + return Unauthorized("Mesos master", credential.error()); + } + + // Create an offer operation. + Offer::Operation operation; + operation.set_type(Offer::Operation::RESERVE); + operation.mutable_reserve()->mutable_resources()->CopyFrom(resources); + + Option<string> principal = + credential.isSome() ? credential.get().principal() : Option<string>::none(); + + Option<Error> validate = + validation::operation::validate(operation.reserve(), None(), principal); + + if (validate.isSome()) { + return BadRequest("Invalid RESERVE operation: " + validate.get().message); + } + + // TODO(mpark): Add a reserve ACL for authorization. + + // The resources recovered by rescinding outstanding offers. + Resources recovered; + + // The unreserved resources needed to satisfy the RESERVE operation. + // This is used in an optimization where we try to only rescind + // offers that would contribute to satisfying the Reserve operation. + Resources remaining = resources.flatten(); + + // We pessimistically assume that what seems like "available" + // resources in the allocator will be gone. This can happen due to + // the race between the allocator scheduling an 'allocate' call to + // itself vs master's request to schedule 'updateAvailable'. + // We greedily rescind one offer at time until we've rescinded + // enough offers to cover for 'resources'. + foreach (Offer* offer, utils::copy(slave->offers)) { + // If rescinding the offer would not contribute to satisfying + // the remaining resources, skip it. + if (remaining == remaining - offer->resources()) { + continue; + } + + recovered += offer->resources(); + remaining -= offer->resources(); + + // We explicitly pass 'Filters()' which has a default 'refuse_sec' + // of 5 seconds rather than 'None()' here, so that we can + // virtually always win the race against 'allocate'. + master->allocator->recoverResources( + offer->framework_id(), + offer->slave_id(), + offer->resources(), + Filters()); + + master->removeOffer(offer, true); // Rescind! + + // If we've rescinded enough offers to cover for 'resources', + // we're done. + Try<Resources> updatedRecovered = recovered.apply(operation); + if (updatedRecovered.isSome()) { + break; + } + } + + // Propogate the 'Future<Nothing>' as 'Future<Response>' where + // 'Nothing' -> 'OK' and Failed -> 'Conflict'. + return master->apply(slave, operation) + .then([]() -> Response { return OK(); }) + .repair([](const Future<Response>& result) { + return Conflict(result.failure()); + }); +} + + const string Master::Http::SLAVES_HELP = HELP( TLDR( "Information about registered slaves."), http://git-wip-us.apache.org/repos/asf/mesos/blob/57a7e7d0/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 5589eca..ea7d613 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -780,6 +780,11 @@ void Master::initialize() [http](const process::http::Request& request) { return http.redirect(request); }); + route("/reserve", + None(), // TODO(mpark): Add an Http::RESERVE_HELP, + [http](const process::http::Request& request) { + return http.reserve(request); + }); route("/roles.json", Http::ROLES_HELP, [http](const process::http::Request& request) { @@ -2921,7 +2926,7 @@ void Master::_accept( << operation.reserve().resources() << " from framework " << *framework << " to slave " << *slave; - applyOfferOperation(framework, slave, operation); + apply(framework, slave, operation); break; } @@ -2946,7 +2951,7 @@ void Master::_accept( << operation.unreserve().resources() << " from framework " << *framework << " to slave " << *slave; - applyOfferOperation(framework, slave, operation); + apply(framework, slave, operation); break; } @@ -2972,7 +2977,7 @@ void Master::_accept( << operation.create().volumes() << " from framework " << *framework << " to slave " << *slave; - applyOfferOperation(framework, slave, operation); + apply(framework, slave, operation); break; } @@ -2998,7 +3003,7 @@ void Master::_accept( << operation.create().volumes() << " from framework " << *framework << " to slave " << *slave; - applyOfferOperation(framework, slave, operation); + apply(framework, slave, operation); break; } @@ -5721,7 +5726,7 @@ void Master::removeExecutor( } -void Master::applyOfferOperation( +void Master::apply( Framework* framework, Slave* slave, const Offer::Operation& operation) @@ -5729,10 +5734,23 @@ void Master::applyOfferOperation( CHECK_NOTNULL(framework); CHECK_NOTNULL(slave); - allocator->updateAllocation( - framework->id(), - slave->id, - {operation}); + allocator->updateAllocation(framework->id(), slave->id, {operation}); + + _apply(slave, operation); +} + + +Future<Nothing> Master::apply(Slave* slave, const Offer::Operation& operation) +{ + CHECK_NOTNULL(slave); + + return allocator->updateAvailable(slave->id, {operation}) + .onReady(defer(self(), &Master::_apply, slave, operation)); +} + + +void Master::_apply(Slave* slave, const Offer::Operation& operation) { + CHECK_NOTNULL(slave); slave->apply(operation); http://git-wip-us.apache.org/repos/asf/mesos/blob/57a7e7d0/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index e133185..7849d68 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -672,14 +672,24 @@ protected: const FrameworkID& frameworkId, const ExecutorID& executorId); - // Updates slave's resources by applying the given operation. It - // also updates the allocator and sends a CheckpointResourcesMessage - // to the slave with slave's current checkpointed resources. - void applyOfferOperation( + // Updates the allocator and updates the slave's resources by + // applying the given operation. It also sends a + // 'CheckpointResourcesMessage' to the slave with the updated + // checkpointed resources. + void apply( Framework* framework, Slave* slave, const Offer::Operation& operation); + // Attempts to update the allocator by applying the given operation. + // If successful, updates the slave's resources, sends a + // 'CheckpointResourcesMessage' to the slave with the updated + // checkpointed resources, and returns a 'Future' with 'Nothing'. + // Otherwise, no action is taken and returns a failed 'Future'. + process::Future<Nothing> apply( + Slave* slave, + const Offer::Operation& operation); + // Forwards the update to the framework. void forward( const StatusUpdate& update, @@ -702,6 +712,8 @@ protected: Option<Credentials> credentials; private: + void _apply(Slave* slave, const Offer::Operation& operation); + void drop( const process::UPID& from, const scheduler::Call& call, @@ -810,6 +822,10 @@ private: process::Future<process::http::Response> redirect( const process::http::Request& request) const; + // /master/reserve + process::Future<process::http::Response> reserve( + const process::http::Request& request) const; + // /master/roles.json process::Future<process::http::Response> roles( const process::http::Request& request) const; http://git-wip-us.apache.org/repos/asf/mesos/blob/57a7e7d0/src/master/validation.cpp ---------------------------------------------------------------------- diff --git a/src/master/validation.cpp b/src/master/validation.cpp index ffb7bf0..0361d1f 100644 --- a/src/master/validation.cpp +++ b/src/master/validation.cpp @@ -668,7 +668,7 @@ namespace operation { Option<Error> validate( const Offer::Operation::Reserve& reserve, - const string& role, + const Option<string>& role, const Option<string>& principal) { Option<Error> error = resource::validate(reserve.resources()); @@ -677,7 +677,7 @@ Option<Error> validate( } if (principal.isNone()) { - return Error("A framework without a principal cannot reserve resources."); + return Error("Cannot reserve resources without a principal."); } foreach (const Resource& resource, reserve.resources()) { @@ -686,18 +686,18 @@ Option<Error> validate( "Resource " + stringify(resource) + " is not dynamically reserved"); } - if (resource.role() != role) { + if (role.isSome() && resource.role() != role.get()) { return Error( "The reserved resource's role '" + resource.role() + - "' does not match the framework's role '" + role + "'"); + "' does not match the framework's role '" + role.get() + "'"); } if (resource.reservation().principal() != principal.get()) { return Error( "The reserved resource's principal '" + - stringify(resource.reservation().principal()) + - "' does not match the framework's principal '" + - stringify(principal.get()) + "'"); + resource.reservation().principal() + + "' does not match the principal '" + + principal.get() + "'"); } // NOTE: This check would be covered by 'contains' since there http://git-wip-us.apache.org/repos/asf/mesos/blob/57a7e7d0/src/master/validation.hpp ---------------------------------------------------------------------- diff --git a/src/master/validation.hpp b/src/master/validation.hpp index 43b8d84..3434868 100644 --- a/src/master/validation.hpp +++ b/src/master/validation.hpp @@ -104,7 +104,7 @@ namespace operation { // Validates the RESERVE operation. Option<Error> validate( const Offer::Operation::Reserve& reserve, - const std::string& role, + const Option<std::string>& role, const Option<std::string>& principal);
