Repository: mesos Updated Branches: refs/heads/master a1cf25cfa -> 16a2e7317
Enabled the master to handle reservation operations. Review: https://reviews.apache.org/r/32150 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/16a2e731 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/16a2e731 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/16a2e731 Branch: refs/heads/master Commit: 16a2e731719aad73d1435a10115462491b8992ac Parents: 7cbd524 Author: Michael Park <[email protected]> Authored: Sun May 3 12:48:07 2015 -0700 Committer: Jie Yu <[email protected]> Committed: Sun May 3 13:08:43 2015 -0700 ---------------------------------------------------------------------- include/mesos/resources.hpp | 3 +- src/master/master.cpp | 54 ++++++- src/master/validation.cpp | 104 ++++++++++++- src/master/validation.hpp | 13 ++ src/tests/master_validation_tests.cpp | 240 +++++++++++++++++++++++++++++ 5 files changed, 407 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/16a2e731/include/mesos/resources.hpp ---------------------------------------------------------------------- diff --git a/include/mesos/resources.hpp b/include/mesos/resources.hpp index fd20574..4c036d3 100644 --- a/include/mesos/resources.hpp +++ b/include/mesos/resources.hpp @@ -76,7 +76,8 @@ public: // Validates the given Resource object. Returns Error if it is not // valid. A Resource object is valid if it has a name, a valid type, - // i.e. scalar, range, or set, and has the appropriate value set. + // i.e. scalar, range, or set, has the appropriate value set, and + // a valid (role, reservation) pair for dynamic reservation. static Option<Error> validate(const Resource& resource); // Validates the given protobufs. http://git-wip-us.apache.org/repos/asf/mesos/blob/16a2e731/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index d42a6f3..bee8425 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2367,6 +2367,9 @@ void Master::accept( // TODO(jieyu): Currently, we only do authorization for the LAUNCH // operation. In the future, we might want to introduce // authorizations for other offer operations as well. + // + // TODO(mpark): Add authorization logic for RESERVE and UNRESERVE + // when "reserve" and "unreserve" ACLs are being introduced. list<Future<bool>> futures; foreach (const Offer::Operation& operation, accept.operations()) { if (operation.type() != Offer::Operation::LAUNCH) { @@ -2478,17 +2481,58 @@ void Master::_accept( list<Future<bool>> authorizations = _authorizations.get(); foreach (const Offer::Operation& operation, accept.operations()) { - // TODO(jieyu): Validate each operation! switch (operation.type()) { case Offer::Operation::RESERVE: { - // TODO(jieyu): Provide implementation for RESERVE. - drop(framework, operation, "Unimplemented"); + Option<string> principal = framework->info.has_principal() + ? framework->info.principal() + : Option<string>::none(); + + Option<Error> error = validation::operation::validate( + operation.reserve(), framework->info.role(), principal); + + if (error.isSome()) { + drop(framework, operation, error.get().message); + continue; + } + + Try<Resources> resources = _offeredResources.apply(operation); + if (resources.isError()) { + drop(framework, operation, resources.error()); + continue; + } + + _offeredResources = resources.get(); + + LOG(INFO) << "Applying RESERVE operation for resources " + << operation.reserve().resources() << " from framework " + << *framework << " to slave " << *slave; + + applyOfferOperation(framework, slave, operation); break; } case Offer::Operation::UNRESERVE: { - // TODO(jieyu): Provide implementation for UNRESERVE. - drop(framework, operation, "Unimplemented"); + Option<Error> error = validation::operation::validate( + operation.unreserve(), framework->info.has_principal()); + + if (error.isSome()) { + drop(framework, operation, error.get().message); + continue; + } + + Try<Resources> resources = _offeredResources.apply(operation); + if (resources.isError()) { + drop(framework, operation, resources.error()); + continue; + } + + _offeredResources = resources.get(); + + LOG(INFO) << "Applying UNRESERVE operation for resources " + << operation.unreserve().resources() << " from framework " + << *framework << " to slave " << *slave; + + applyOfferOperation(framework, slave, operation); break; } http://git-wip-us.apache.org/repos/asf/mesos/blob/16a2e731/src/master/validation.cpp ---------------------------------------------------------------------- diff --git a/src/master/validation.cpp b/src/master/validation.cpp index dc25995..c3e96ae 100644 --- a/src/master/validation.cpp +++ b/src/master/validation.cpp @@ -53,6 +53,22 @@ static bool invalid(char c) namespace resource { +// Validates that all the given resources are dynamically-reserved. +Option<Error> validateDynamicReservation( + const RepeatedPtrField<Resource>& resources) +{ + foreach (const Resource& resource, resources) { + if (!resource.has_reservation()) { + return Error( + "Resource " + stringify(resource) + + " does not have the 'reservation' field set"); + } + } + + return None(); +} + + // Validates the DiskInfos specified in the given resources (if // exist). Returns error if any DiskInfo is found invalid or // unsupported. @@ -66,7 +82,7 @@ Option<Error> validateDiskInfo(const RepeatedPtrField<Resource>& resources) if (resource.disk().has_persistence()) { if (Resources::isUnreserved(resource)) { return Error( - "Persistent volumes cannot be created from unreserved resources."); + "Persistent volumes cannot be created from unreserved resources"); } if (!resource.disk().has_volume()) { return Error("Expecting 'volume' to be set for persistent volume"); @@ -533,6 +549,92 @@ Option<Error> validate( namespace operation { Option<Error> validate( + const Offer::Operation::Reserve& reserve, + const string& role, + const Option<string>& principal) +{ + Option<Error> error = resource::validate(reserve.resources()); + if (error.isSome()) { + return Error("Invalid resources: " + error.get().message); + } + + error = resource::validateDynamicReservation(reserve.resources()); + if (error.isSome()) { + return Error("Not a dynamic reservation: " + error.get().message); + } + + if (principal.isNone()) { + return Error("A framework without a principal cannot reserve resources."); + } + + foreach (const Resource& resource, reserve.resources()) { + if (resource.role() != role) { + return Error( + "The reserved resource's role '" + resource.role() + + "' does not match the framework's role '" + role + "'"); + } + + if (resource.reservation().principal() != principal.get()) { + return Error( + "The reserved resource's principal '" + + stringify(resource.reservation().principal()) + + "' does not match the framework's principal '" + + stringify(principal.get()) + "'"); + } + + // NOTE: This check would be covered by 'contains' since there + // shouldn't be any unreserved resources with 'disk' set. + // However, we keep this check since it will be a more useful + // error message than what contains would produce. + if (Resources::isPersistentVolume(resource)) { + return Error("A persistent volume " + stringify(resource) + + " must already be reserved"); + } + } + + return None(); +} + + +Option<Error> validate( + const Offer::Operation::Unreserve& unreserve, + bool hasPrincipal) +{ + Option<Error> error = resource::validate(unreserve.resources()); + if (error.isSome()) { + return Error("Invalid resources: " + error.get().message); + } + + error = resource::validateDynamicReservation(unreserve.resources()); + if (error.isSome()) { + return Error("Not a dynamic reservation: " + error.get().message); + } + + if (!hasPrincipal) { + return Error("A framework without a principal cannot unreserve resources."); + } + + // NOTE: We don't check that 'FrameworkInfo.principal' matches + // 'Resource.ReservationInfo.principal' here because the authorization + // depends on the "unreserve" ACL which specifies which 'principal' can + // unreserve which 'principal's resources. In the absense of an ACL, we allow + // any 'principal' to unreserve any other 'principal's resources. + + foreach (const Resource& resource, unreserve.resources()) { + if (Resources::isPersistentVolume(resource)) { + return Error( + "A dynamically reserved persistent volume " + + stringify(resource) + + " cannot be unreserved directly. Please destroy the persistent" + " volume first then unreserve the resource"); + } + } + + return None(); +} + + +Option<Error> validate( const Offer::Operation::Create& create, const Resources& checkpointedResources) { http://git-wip-us.apache.org/repos/asf/mesos/blob/16a2e731/src/master/validation.hpp ---------------------------------------------------------------------- diff --git a/src/master/validation.hpp b/src/master/validation.hpp index 2d7416c..a74e844 100644 --- a/src/master/validation.hpp +++ b/src/master/validation.hpp @@ -81,6 +81,19 @@ Option<Error> validate( namespace operation { +// Validates the RESERVE operation. +Option<Error> validate( + const Offer::Operation::Reserve& reserve, + const std::string& role, + const Option<std::string>& principal); + + +// Validates the UNRESERVE operation. +Option<Error> validate( + const Offer::Operation::Unreserve& unreserve, + bool hasPrincipal); + + // Validates the CREATE operation. We need slave's checkpointed // resources so that we can validate persistence ID uniqueness. Option<Error> validate( http://git-wip-us.apache.org/repos/asf/mesos/blob/16a2e731/src/tests/master_validation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp index 4f2ad58..1366bcd 100644 --- a/src/tests/master_validation_tests.cpp +++ b/src/tests/master_validation_tests.cpp @@ -77,6 +77,31 @@ protected: }; +TEST_F(ResourceValidationTest, StaticReservation) +{ + Resource resource = Resources::parse("cpus", "8", "role").get(); + EXPECT_NONE(resource::validate(CreateResources(resource))); +} + + +TEST_F(ResourceValidationTest, DynamicReservation) +{ + Resource resource = Resources::parse("cpus", "8", "role").get(); + resource.mutable_reservation()->CopyFrom(createReservationInfo("principal")); + + EXPECT_NONE(resource::validate(CreateResources(resource))); +} + + +TEST_F(ResourceValidationTest, InvalidRoleReservationPair) +{ + Resource resource = Resources::parse("cpus", "8", "*").get(); + resource.mutable_reservation()->CopyFrom(createReservationInfo("principal")); + + EXPECT_SOME(resource::validate(CreateResources(resource))); +} + + TEST_F(ResourceValidationTest, PersistentVolume) { Resource volume = Resources::parse("disk", "128", "role1").get(); @@ -141,6 +166,221 @@ TEST_F(ResourceValidationTest, NonPersistentVolume) } +class ReserveOperationValidationTest : public MesosTest {}; + + +// This test verifies that the 'role' specified in the resources of +// the RESERVE operation needs to match the framework's 'role'. +TEST_F(ReserveOperationValidationTest, MatchingRole) +{ + Resource resource = Resources::parse("cpus", "8", "role").get(); + resource.mutable_reservation()->CopyFrom(createReservationInfo("principal")); + + Offer::Operation::Reserve reserve; + reserve.add_resources()->CopyFrom(resource); + + EXPECT_NONE(operation::validate(reserve, "role", "principal")); +} + + +// This test verifies that validation fails if the framework has a +// "*" role even if the role matches. +TEST_F(ReserveOperationValidationTest, DisallowStarRoleFrameworks) +{ + // The role "*" matches, but is invalid since frameworks with + // "*" role cannot reserve resources. + Resource resource = Resources::parse("cpus", "8", "*").get(); + resource.mutable_reservation()->CopyFrom(createReservationInfo("principal")); + + Offer::Operation::Reserve reserve; + reserve.add_resources()->CopyFrom(resource); + + EXPECT_SOME(operation::validate(reserve, "*", "principal")); +} + + +// This test verifies that validation fails if the 'role' +// specified in the resources of the RESERVE operation does not +// match the framework's 'role'. +TEST_F(ReserveOperationValidationTest, NonMatchingRole) +{ + { + // Non-matching role, "role" reserving for "*". + Resource resource = Resources::parse("cpus", "8", "*").get(); + resource.mutable_reservation()->CopyFrom( + createReservationInfo("principal")); + + Offer::Operation::Reserve reserve; + reserve.add_resources()->CopyFrom(resource); + + EXPECT_SOME(operation::validate(reserve, "role", "principal")); + } + + { + // Non-matching role, "*" reserving for "role". + Resource resource = Resources::parse("cpus", "8", "role").get(); + resource.mutable_reservation()->CopyFrom( + createReservationInfo("principal")); + + Offer::Operation::Reserve reserve; + reserve.add_resources()->CopyFrom(resource); + + EXPECT_SOME(operation::validate(reserve, "*", "principal")); + } + + { + // Non-matching role, "role1" reserving for "role2". + Resource resource = Resources::parse("cpus", "8", "role2").get(); + resource.mutable_reservation()->CopyFrom( + createReservationInfo("principal")); + + Offer::Operation::Reserve reserve; + reserve.add_resources()->CopyFrom(resource); + + EXPECT_SOME(operation::validate(reserve, "role1", "principal")); + } +} + + +// This test verifies that the 'principal' specified in the resources +// of the RESERVE operation needs to match the framework's 'principal'. +TEST_F(ReserveOperationValidationTest, MatchingPrincipal) +{ + Resource resource = Resources::parse("cpus", "8", "role").get(); + resource.mutable_reservation()->CopyFrom(createReservationInfo("principal")); + + Offer::Operation::Reserve reserve; + reserve.add_resources()->CopyFrom(resource); + + EXPECT_NONE(operation::validate(reserve, "role", "principal")); +} + + +// This test verifies that validation fails if the 'principal' +// specified in the resources of the RESERVE operation do not match +// the framework's 'principal'. +TEST_F(ReserveOperationValidationTest, NonMatchingPrincipal) +{ + Resource resource = Resources::parse("cpus", "8", "role").get(); + resource.mutable_reservation()->CopyFrom(createReservationInfo("principal2")); + + Offer::Operation::Reserve reserve; + reserve.add_resources()->CopyFrom(resource); + + EXPECT_SOME(operation::validate(reserve, "role", "principal1")); +} + + +// This test verifies that validation fails if the framework's +// 'principal' is not set. +TEST_F(ReserveOperationValidationTest, FrameworkMissingPrincipal) +{ + Resource resource = Resources::parse("cpus", "8", "role").get(); + resource.mutable_reservation()->CopyFrom(createReservationInfo("principal")); + + Offer::Operation::Reserve reserve; + reserve.add_resources()->CopyFrom(resource); + + EXPECT_SOME(operation::validate(reserve, "role", None())); +} + + +// This test verifies that the resources specified in the RESERVE +// operation cannot be persistent volumes. +TEST_F(ReserveOperationValidationTest, NoPersistentVolumes) +{ + Resource reserved = Resources::parse("cpus", "8", "role").get(); + reserved.mutable_reservation()->CopyFrom(createReservationInfo("principal")); + + Offer::Operation::Reserve reserve; + reserve.add_resources()->CopyFrom(reserved); + + EXPECT_NONE(operation::validate(reserve, "role", "principal")); +} + + +// This test verifies that validation fails if there are persistent +// volumes specified in the resources of the RESERVE operation. +TEST_F(ReserveOperationValidationTest, PersistentVolumes) +{ + Resource reserved = Resources::parse("cpus", "8", "role").get(); + reserved.mutable_reservation()->CopyFrom(createReservationInfo("principal")); + + Resource volume = Resources::parse("disk", "128", "role").get(); + volume.mutable_disk()->CopyFrom(createDiskInfo("id1", "path1")); + + Offer::Operation::Reserve reserve; + reserve.add_resources()->CopyFrom(reserved); + reserve.add_resources()->CopyFrom(volume); + + EXPECT_SOME(operation::validate(reserve, "role", "principal")); +} + + +class UnreserveOperationValidationTest : public MesosTest {}; + + +// This test verifies that any resources can be unreserved by any +// framework with a principal. +// TODO(mpark): Introduce the "unreserve" ACL to prevent this. +TEST_F(UnreserveOperationValidationTest, WithoutACL) +{ + Resource resource = Resources::parse("cpus", "8", "role").get(); + resource.mutable_reservation()->CopyFrom(createReservationInfo("principal")); + + Offer::Operation::Unreserve unreserve; + unreserve.add_resources()->CopyFrom(resource); + + EXPECT_NONE(operation::validate(unreserve, true)); +} + + +// This test verifies that validation fails if the framework's +// 'principal' is not set. +TEST_F(UnreserveOperationValidationTest, FrameworkMissingPrincipal) +{ + Resource resource = Resources::parse("cpus", "8", "role").get(); + resource.mutable_reservation()->CopyFrom(createReservationInfo("principal")); + + Offer::Operation::Unreserve unreserve; + unreserve.add_resources()->CopyFrom(resource); + + EXPECT_SOME(operation::validate(unreserve, false)); +} + + +// This test verifies that the resources specified in the UNRESERVE +// operation cannot be persistent volumes. +TEST_F(UnreserveOperationValidationTest, NoPersistentVolumes) +{ + Resource reserved = Resources::parse("cpus", "8", "role").get(); + reserved.mutable_reservation()->CopyFrom(createReservationInfo("principal")); + + Offer::Operation::Unreserve unreserve; + unreserve.add_resources()->CopyFrom(reserved); + + EXPECT_NONE(operation::validate(unreserve, true)); +} + + +// This test verifies that validation fails if there are persistent +// volumes specified in the resources of the UNRESERVE operation. +TEST_F(UnreserveOperationValidationTest, PersistentVolumes) +{ + Resource reserved = Resources::parse("cpus", "8", "role").get(); + reserved.mutable_reservation()->CopyFrom(createReservationInfo("principal")); + + Resource volume = Resources::parse("disk", "128", "role").get(); + volume.mutable_disk()->CopyFrom(createDiskInfo("id1", "path1")); + + Offer::Operation::Unreserve unreserve; + unreserve.add_resources()->CopyFrom(reserved); + unreserve.add_resources()->CopyFrom(volume); + + EXPECT_SOME(operation::validate(unreserve, true)); +} + + class CreateOperationValidationTest : public MesosTest {};
