Repository: mesos
Updated Branches:
  refs/heads/master a1cf25cfa -> 16a2e7317


Enabled the master to handle reservation operations.

Review: https://reviews.apache.org/r/32150


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/16a2e731
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/16a2e731
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/16a2e731

Branch: refs/heads/master
Commit: 16a2e731719aad73d1435a10115462491b8992ac
Parents: 7cbd524
Author: Michael Park <[email protected]>
Authored: Sun May 3 12:48:07 2015 -0700
Committer: Jie Yu <[email protected]>
Committed: Sun May 3 13:08:43 2015 -0700

----------------------------------------------------------------------
 include/mesos/resources.hpp           |   3 +-
 src/master/master.cpp                 |  54 ++++++-
 src/master/validation.cpp             | 104 ++++++++++++-
 src/master/validation.hpp             |  13 ++
 src/tests/master_validation_tests.cpp | 240 +++++++++++++++++++++++++++++
 5 files changed, 407 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/16a2e731/include/mesos/resources.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/resources.hpp b/include/mesos/resources.hpp
index fd20574..4c036d3 100644
--- a/include/mesos/resources.hpp
+++ b/include/mesos/resources.hpp
@@ -76,7 +76,8 @@ public:
 
   // Validates the given Resource object. Returns Error if it is not
   // valid. A Resource object is valid if it has a name, a valid type,
-  // i.e. scalar, range, or set, and has the appropriate value set.
+  // i.e. scalar, range, or set, has the appropriate value set, and
+  // a valid (role, reservation) pair for dynamic reservation.
   static Option<Error> validate(const Resource& resource);
 
   // Validates the given protobufs.

http://git-wip-us.apache.org/repos/asf/mesos/blob/16a2e731/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d42a6f3..bee8425 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2367,6 +2367,9 @@ void Master::accept(
   // TODO(jieyu): Currently, we only do authorization for the LAUNCH
   // operation. In the future, we might want to introduce
   // authorizations for other offer operations as well.
+  //
+  // TODO(mpark): Add authorization logic for RESERVE and UNRESERVE
+  // when "reserve" and "unreserve" ACLs are being introduced.
   list<Future<bool>> futures;
   foreach (const Offer::Operation& operation, accept.operations()) {
     if (operation.type() != Offer::Operation::LAUNCH) {
@@ -2478,17 +2481,58 @@ void Master::_accept(
   list<Future<bool>> authorizations = _authorizations.get();
 
   foreach (const Offer::Operation& operation, accept.operations()) {
-    // TODO(jieyu): Validate each operation!
     switch (operation.type()) {
       case Offer::Operation::RESERVE: {
-        // TODO(jieyu): Provide implementation for RESERVE.
-        drop(framework, operation, "Unimplemented");
+        Option<string> principal = framework->info.has_principal()
+          ? framework->info.principal()
+          : Option<string>::none();
+
+        Option<Error> error = validation::operation::validate(
+            operation.reserve(), framework->info.role(), principal);
+
+        if (error.isSome()) {
+          drop(framework, operation, error.get().message);
+          continue;
+        }
+
+        Try<Resources> resources = _offeredResources.apply(operation);
+        if (resources.isError()) {
+          drop(framework, operation, resources.error());
+          continue;
+        }
+
+        _offeredResources = resources.get();
+
+        LOG(INFO) << "Applying RESERVE operation for resources "
+                  << operation.reserve().resources() << " from framework "
+                  << *framework << " to slave " << *slave;
+
+        applyOfferOperation(framework, slave, operation);
         break;
       }
 
       case Offer::Operation::UNRESERVE: {
-        // TODO(jieyu): Provide implementation for UNRESERVE.
-        drop(framework, operation, "Unimplemented");
+        Option<Error> error = validation::operation::validate(
+            operation.unreserve(), framework->info.has_principal());
+
+        if (error.isSome()) {
+          drop(framework, operation, error.get().message);
+          continue;
+        }
+
+        Try<Resources> resources = _offeredResources.apply(operation);
+        if (resources.isError()) {
+          drop(framework, operation, resources.error());
+          continue;
+        }
+
+        _offeredResources = resources.get();
+
+        LOG(INFO) << "Applying UNRESERVE operation for resources "
+                  << operation.unreserve().resources() << " from framework "
+                  << *framework << " to slave " << *slave;
+
+        applyOfferOperation(framework, slave, operation);
         break;
       }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/16a2e731/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index dc25995..c3e96ae 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -53,6 +53,22 @@ static bool invalid(char c)
 
 namespace resource {
 
+// Validates that all the given resources are dynamically-reserved.
+Option<Error> validateDynamicReservation(
+    const RepeatedPtrField<Resource>& resources)
+{
+  foreach (const Resource& resource, resources) {
+    if (!resource.has_reservation()) {
+      return Error(
+          "Resource " + stringify(resource) +
+          " does not have the 'reservation' field set");
+    }
+  }
+
+  return None();
+}
+
+
 // Validates the DiskInfos specified in the given resources (if
 // exist). Returns error if any DiskInfo is found invalid or
 // unsupported.
@@ -66,7 +82,7 @@ Option<Error> validateDiskInfo(const 
RepeatedPtrField<Resource>& resources)
     if (resource.disk().has_persistence()) {
       if (Resources::isUnreserved(resource)) {
         return Error(
-            "Persistent volumes cannot be created from unreserved resources.");
+            "Persistent volumes cannot be created from unreserved resources");
       }
       if (!resource.disk().has_volume()) {
         return Error("Expecting 'volume' to be set for persistent volume");
@@ -533,6 +549,92 @@ Option<Error> validate(
 namespace operation {
 
 Option<Error> validate(
+    const Offer::Operation::Reserve& reserve,
+    const string& role,
+    const Option<string>& principal)
+{
+  Option<Error> error = resource::validate(reserve.resources());
+  if (error.isSome()) {
+    return Error("Invalid resources: " + error.get().message);
+  }
+
+  error = resource::validateDynamicReservation(reserve.resources());
+  if (error.isSome()) {
+    return Error("Not a dynamic reservation: " + error.get().message);
+  }
+
+  if (principal.isNone()) {
+    return Error("A framework without a principal cannot reserve resources.");
+  }
+
+  foreach (const Resource& resource, reserve.resources()) {
+    if (resource.role() != role) {
+      return Error(
+          "The reserved resource's role '" + resource.role() +
+          "' does not match the framework's role '" + role + "'");
+    }
+
+    if (resource.reservation().principal() != principal.get()) {
+      return Error(
+          "The reserved resource's principal '" +
+          stringify(resource.reservation().principal()) +
+          "' does not match the framework's principal '" +
+          stringify(principal.get()) + "'");
+    }
+
+    // NOTE: This check would be covered by 'contains' since there
+    // shouldn't be any unreserved resources with 'disk' set.
+    // However, we keep this check since it will be a more useful
+    // error message than what contains would produce.
+    if (Resources::isPersistentVolume(resource)) {
+      return Error("A persistent volume " + stringify(resource) +
+                   " must already be reserved");
+    }
+  }
+
+  return None();
+}
+
+
+Option<Error> validate(
+    const Offer::Operation::Unreserve& unreserve,
+    bool hasPrincipal)
+{
+  Option<Error> error = resource::validate(unreserve.resources());
+  if (error.isSome()) {
+    return Error("Invalid resources: " + error.get().message);
+  }
+
+  error = resource::validateDynamicReservation(unreserve.resources());
+  if (error.isSome()) {
+    return Error("Not a dynamic reservation: " + error.get().message);
+  }
+
+  if (!hasPrincipal) {
+    return Error("A framework without a principal cannot unreserve 
resources.");
+  }
+
+  // NOTE: We don't check that 'FrameworkInfo.principal' matches
+  // 'Resource.ReservationInfo.principal' here because the authorization
+  // depends on the "unreserve" ACL which specifies which 'principal' can
+  // unreserve which 'principal's resources. In the absense of an ACL, we allow
+  // any 'principal' to unreserve any other 'principal's resources.
+
+  foreach (const Resource& resource, unreserve.resources()) {
+    if (Resources::isPersistentVolume(resource)) {
+      return Error(
+          "A dynamically reserved persistent volume " +
+          stringify(resource) +
+          " cannot be unreserved directly. Please destroy the persistent"
+          " volume first then unreserve the resource");
+    }
+  }
+
+  return None();
+}
+
+
+Option<Error> validate(
     const Offer::Operation::Create& create,
     const Resources& checkpointedResources)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/16a2e731/src/master/validation.hpp
----------------------------------------------------------------------
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
index 2d7416c..a74e844 100644
--- a/src/master/validation.hpp
+++ b/src/master/validation.hpp
@@ -81,6 +81,19 @@ Option<Error> validate(
 
 namespace operation {
 
+// Validates the RESERVE operation.
+Option<Error> validate(
+    const Offer::Operation::Reserve& reserve,
+    const std::string& role,
+    const Option<std::string>& principal);
+
+
+// Validates the UNRESERVE operation.
+Option<Error> validate(
+    const Offer::Operation::Unreserve& unreserve,
+    bool hasPrincipal);
+
+
 // Validates the CREATE operation. We need slave's checkpointed
 // resources so that we can validate persistence ID uniqueness.
 Option<Error> validate(

http://git-wip-us.apache.org/repos/asf/mesos/blob/16a2e731/src/tests/master_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_validation_tests.cpp 
b/src/tests/master_validation_tests.cpp
index 4f2ad58..1366bcd 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -77,6 +77,31 @@ protected:
 };
 
 
+TEST_F(ResourceValidationTest, StaticReservation)
+{
+  Resource resource = Resources::parse("cpus", "8", "role").get();
+  EXPECT_NONE(resource::validate(CreateResources(resource)));
+}
+
+
+TEST_F(ResourceValidationTest, DynamicReservation)
+{
+  Resource resource = Resources::parse("cpus", "8", "role").get();
+  resource.mutable_reservation()->CopyFrom(createReservationInfo("principal"));
+
+  EXPECT_NONE(resource::validate(CreateResources(resource)));
+}
+
+
+TEST_F(ResourceValidationTest, InvalidRoleReservationPair)
+{
+  Resource resource = Resources::parse("cpus", "8", "*").get();
+  resource.mutable_reservation()->CopyFrom(createReservationInfo("principal"));
+
+  EXPECT_SOME(resource::validate(CreateResources(resource)));
+}
+
+
 TEST_F(ResourceValidationTest, PersistentVolume)
 {
   Resource volume = Resources::parse("disk", "128", "role1").get();
@@ -141,6 +166,221 @@ TEST_F(ResourceValidationTest, NonPersistentVolume)
 }
 
 
+class ReserveOperationValidationTest : public MesosTest {};
+
+
+// This test verifies that the 'role' specified in the resources of
+// the RESERVE operation needs to match the framework's 'role'.
+TEST_F(ReserveOperationValidationTest, MatchingRole)
+{
+  Resource resource = Resources::parse("cpus", "8", "role").get();
+  resource.mutable_reservation()->CopyFrom(createReservationInfo("principal"));
+
+  Offer::Operation::Reserve reserve;
+  reserve.add_resources()->CopyFrom(resource);
+
+  EXPECT_NONE(operation::validate(reserve, "role", "principal"));
+}
+
+
+// This test verifies that validation fails if the framework has a
+// "*" role even if the role matches.
+TEST_F(ReserveOperationValidationTest, DisallowStarRoleFrameworks)
+{
+  // The role "*" matches, but is invalid since frameworks with
+  // "*" role cannot reserve resources.
+  Resource resource = Resources::parse("cpus", "8", "*").get();
+  resource.mutable_reservation()->CopyFrom(createReservationInfo("principal"));
+
+  Offer::Operation::Reserve reserve;
+  reserve.add_resources()->CopyFrom(resource);
+
+  EXPECT_SOME(operation::validate(reserve, "*", "principal"));
+}
+
+
+// This test verifies that validation fails if the 'role'
+// specified in the resources of the RESERVE operation does not
+// match the framework's 'role'.
+TEST_F(ReserveOperationValidationTest, NonMatchingRole)
+{
+  {
+    // Non-matching role, "role" reserving for "*".
+    Resource resource = Resources::parse("cpus", "8", "*").get();
+    resource.mutable_reservation()->CopyFrom(
+        createReservationInfo("principal"));
+
+    Offer::Operation::Reserve reserve;
+    reserve.add_resources()->CopyFrom(resource);
+
+    EXPECT_SOME(operation::validate(reserve, "role", "principal"));
+  }
+
+  {
+    // Non-matching role, "*" reserving for "role".
+    Resource resource = Resources::parse("cpus", "8", "role").get();
+    resource.mutable_reservation()->CopyFrom(
+        createReservationInfo("principal"));
+
+    Offer::Operation::Reserve reserve;
+    reserve.add_resources()->CopyFrom(resource);
+
+    EXPECT_SOME(operation::validate(reserve, "*", "principal"));
+  }
+
+  {
+    // Non-matching role, "role1" reserving for "role2".
+    Resource resource = Resources::parse("cpus", "8", "role2").get();
+    resource.mutable_reservation()->CopyFrom(
+        createReservationInfo("principal"));
+
+    Offer::Operation::Reserve reserve;
+    reserve.add_resources()->CopyFrom(resource);
+
+    EXPECT_SOME(operation::validate(reserve, "role1", "principal"));
+  }
+}
+
+
+// This test verifies that the 'principal' specified in the resources
+// of the RESERVE operation needs to match the framework's 'principal'.
+TEST_F(ReserveOperationValidationTest, MatchingPrincipal)
+{
+  Resource resource = Resources::parse("cpus", "8", "role").get();
+  resource.mutable_reservation()->CopyFrom(createReservationInfo("principal"));
+
+  Offer::Operation::Reserve reserve;
+  reserve.add_resources()->CopyFrom(resource);
+
+  EXPECT_NONE(operation::validate(reserve, "role", "principal"));
+}
+
+
+// This test verifies that validation fails if the 'principal'
+// specified in the resources of the RESERVE operation do not match
+// the framework's 'principal'.
+TEST_F(ReserveOperationValidationTest, NonMatchingPrincipal)
+{
+  Resource resource = Resources::parse("cpus", "8", "role").get();
+  
resource.mutable_reservation()->CopyFrom(createReservationInfo("principal2"));
+
+  Offer::Operation::Reserve reserve;
+  reserve.add_resources()->CopyFrom(resource);
+
+  EXPECT_SOME(operation::validate(reserve, "role", "principal1"));
+}
+
+
+// This test verifies that validation fails if the framework's
+// 'principal' is not set.
+TEST_F(ReserveOperationValidationTest, FrameworkMissingPrincipal)
+{
+  Resource resource = Resources::parse("cpus", "8", "role").get();
+  resource.mutable_reservation()->CopyFrom(createReservationInfo("principal"));
+
+  Offer::Operation::Reserve reserve;
+  reserve.add_resources()->CopyFrom(resource);
+
+  EXPECT_SOME(operation::validate(reserve, "role", None()));
+}
+
+
+// This test verifies that the resources specified in the RESERVE
+// operation cannot be persistent volumes.
+TEST_F(ReserveOperationValidationTest, NoPersistentVolumes)
+{
+  Resource reserved = Resources::parse("cpus", "8", "role").get();
+  reserved.mutable_reservation()->CopyFrom(createReservationInfo("principal"));
+
+  Offer::Operation::Reserve reserve;
+  reserve.add_resources()->CopyFrom(reserved);
+
+  EXPECT_NONE(operation::validate(reserve, "role", "principal"));
+}
+
+
+// This test verifies that validation fails if there are persistent
+// volumes specified in the resources of the RESERVE operation.
+TEST_F(ReserveOperationValidationTest, PersistentVolumes)
+{
+  Resource reserved = Resources::parse("cpus", "8", "role").get();
+  reserved.mutable_reservation()->CopyFrom(createReservationInfo("principal"));
+
+  Resource volume = Resources::parse("disk", "128", "role").get();
+  volume.mutable_disk()->CopyFrom(createDiskInfo("id1", "path1"));
+
+  Offer::Operation::Reserve reserve;
+  reserve.add_resources()->CopyFrom(reserved);
+  reserve.add_resources()->CopyFrom(volume);
+
+  EXPECT_SOME(operation::validate(reserve, "role", "principal"));
+}
+
+
+class UnreserveOperationValidationTest : public MesosTest {};
+
+
+// This test verifies that any resources can be unreserved by any
+// framework with a principal.
+// TODO(mpark): Introduce the "unreserve" ACL to prevent this.
+TEST_F(UnreserveOperationValidationTest, WithoutACL)
+{
+  Resource resource = Resources::parse("cpus", "8", "role").get();
+  resource.mutable_reservation()->CopyFrom(createReservationInfo("principal"));
+
+  Offer::Operation::Unreserve unreserve;
+  unreserve.add_resources()->CopyFrom(resource);
+
+  EXPECT_NONE(operation::validate(unreserve, true));
+}
+
+
+// This test verifies that validation fails if the framework's
+// 'principal' is not set.
+TEST_F(UnreserveOperationValidationTest, FrameworkMissingPrincipal)
+{
+  Resource resource = Resources::parse("cpus", "8", "role").get();
+  resource.mutable_reservation()->CopyFrom(createReservationInfo("principal"));
+
+  Offer::Operation::Unreserve unreserve;
+  unreserve.add_resources()->CopyFrom(resource);
+
+  EXPECT_SOME(operation::validate(unreserve, false));
+}
+
+
+// This test verifies that the resources specified in the UNRESERVE
+// operation cannot be persistent volumes.
+TEST_F(UnreserveOperationValidationTest, NoPersistentVolumes)
+{
+  Resource reserved = Resources::parse("cpus", "8", "role").get();
+  reserved.mutable_reservation()->CopyFrom(createReservationInfo("principal"));
+
+  Offer::Operation::Unreserve unreserve;
+  unreserve.add_resources()->CopyFrom(reserved);
+
+  EXPECT_NONE(operation::validate(unreserve, true));
+}
+
+
+// This test verifies that validation fails if there are persistent
+// volumes specified in the resources of the UNRESERVE operation.
+TEST_F(UnreserveOperationValidationTest, PersistentVolumes)
+{
+  Resource reserved = Resources::parse("cpus", "8", "role").get();
+  reserved.mutable_reservation()->CopyFrom(createReservationInfo("principal"));
+
+  Resource volume = Resources::parse("disk", "128", "role").get();
+  volume.mutable_disk()->CopyFrom(createDiskInfo("id1", "path1"));
+
+  Offer::Operation::Unreserve unreserve;
+  unreserve.add_resources()->CopyFrom(reserved);
+  unreserve.add_resources()->CopyFrom(volume);
+
+  EXPECT_SOME(operation::validate(unreserve, true));
+}
+
+
 class CreateOperationValidationTest : public MesosTest {};
 
 

Reply via email to