Repository: mesos
Updated Branches:
  refs/heads/master 22471b83a -> 79d7a4bd2


Added authorization for storage operations.

Framework operations `CREATE_VOLUME`, `DESTROY_VOLUME`, `CREATE_BLOCK`,
`DESTROY_BLOCK` are authorized. Respective ACL actions have been added
to the local authorizer.

Review: https://reviews.apache.org/r/67501/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/79d7a4bd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/79d7a4bd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/79d7a4bd

Branch: refs/heads/master
Commit: 79d7a4bd2e6a5831cdf2b809d848ddc67c390402
Parents: 22471b8
Author: Jan Schlicht <[email protected]>
Authored: Thu Jun 28 10:51:37 2018 +0200
Committer: Benjamin Bannier <[email protected]>
Committed: Fri Jun 29 16:35:23 2018 +0200

----------------------------------------------------------------------
 docs/authorization.md                     |   24 +
 include/mesos/authorizer/acls.proto       |   40 +
 include/mesos/authorizer/authorizer.proto |   12 +
 src/authorizer/local/authorizer.cpp       |   43 +-
 src/master/master.cpp                     |  140 +++-
 src/master/master.hpp                     |   84 ++
 src/tests/authorization_tests.cpp         | 1040 ++++++++++++++++++++++++
 7 files changed, 1375 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/79d7a4bd/docs/authorization.md
----------------------------------------------------------------------
diff --git a/docs/authorization.md b/docs/authorization.md
index cd8622b..91dc03b 100644
--- a/docs/authorization.md
+++ b/docs/authorization.md
@@ -217,6 +217,30 @@ entries, each representing an authorizable action:
   </td>
 </tr>
 <tr>
+  <td><code>create_block_disks</code></td>
+  <td>Framework principal.</td>
+  <td>Resource role of the block disk.</td>
+  <td>Creating a block disk.</td>
+</tr>
+<tr>
+  <td><code>destroy_block_disks</code></td>
+  <td>Framework principal.</td>
+  <td>Resource role of the block disk.</td>
+  <td>Destroying a block disk.</td>
+</tr>
+<tr>
+  <td><code>create_mount_disks</code></td>
+  <td>Framework principal.</td>
+  <td>Resource role of the mount disk.</td>
+  <td>Creating a mount disk.</td>
+</tr>
+<tr>
+  <td><code>destroy_mount_disks</code></td>
+  <td>Framework principal.</td>
+  <td>Resource role of the mount disk.</td>
+  <td>Destroying a mount disk.</td>
+</tr>
+<tr>
   <td><code>get_quotas</code></td>
   <td>Operator username.</td>
   <td>Resource role whose quota status will be queried.</td>

http://git-wip-us.apache.org/repos/asf/mesos/blob/79d7a4bd/include/mesos/authorizer/acls.proto
----------------------------------------------------------------------
diff --git a/include/mesos/authorizer/acls.proto 
b/include/mesos/authorizer/acls.proto
index e488993..1777c04 100644
--- a/include/mesos/authorizer/acls.proto
+++ b/include/mesos/authorizer/acls.proto
@@ -520,6 +520,42 @@ message ACL {
     // SOME image reference.
     required Entity images = 2;
   }
+
+  // Which principals are authorized to create block disks.
+  message CreateBlockDisk {
+    // Subjects: Framework principal.
+    required Entity principals = 1;
+
+    // Objects: The list of roles for which block disks can be created.
+    required Entity roles = 2;
+  }
+
+  // Which principals are authorized to destroy block disks.
+  message DestroyBlockDisk {
+    // Subjects: Framework principal.
+    required Entity principals = 1;
+
+    // Objects: The list of roles for which block disks can be destroyed.
+    required Entity roles = 2;
+  }
+
+  // Which principals are authorized to create mount disks.
+  message CreateMountDisk {
+    // Subjects: Framework principal.
+    required Entity principals = 1;
+
+    // Objects: The list of roles for which mount disks can be created.
+    required Entity roles = 2;
+  }
+
+  // Which principals are authorized to destroy mount disks.
+  message DestroyMountDisk {
+    // Subjects: Framework principal.
+    required Entity principals = 1;
+
+    // Objects: The list of roles for which volume disks can be destroyed.
+    required Entity roles = 2;
+  }
 }
 
 
@@ -599,4 +635,8 @@ message ACLs {
   repeated ACL.ModifyResourceProviderConfig modify_resource_provider_configs = 
45;
   repeated ACL.PruneImages prune_images = 47;
   repeated ACL.ResizeVolume resize_volumes = 48;
+  repeated ACL.CreateBlockDisk create_block_disks = 49;
+  repeated ACL.DestroyBlockDisk destroy_block_disks = 50;
+  repeated ACL.CreateMountDisk create_mount_disks = 51;
+  repeated ACL.DestroyMountDisk destroy_mount_disks = 52;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/79d7a4bd/include/mesos/authorizer/authorizer.proto
----------------------------------------------------------------------
diff --git a/include/mesos/authorizer/authorizer.proto 
b/include/mesos/authorizer/authorizer.proto
index bb1010d..8b5fa09 100644
--- a/include/mesos/authorizer/authorizer.proto
+++ b/include/mesos/authorizer/authorizer.proto
@@ -257,6 +257,18 @@ enum Action {
 
   // `RESIZE_VOLUME` will have an object with `Resource` set.
   RESIZE_VOLUME = 42;
+
+  // `CREATE_BLOCK_DISK` will have an object with `Resource` set.
+  CREATE_BLOCK_DISK = 43;
+
+  // `DESTROY_BLOCK_DISK` will have an object with `Resource` set.
+  DESTROY_BLOCK_DISK = 44;
+
+  // `CREATE_MOUNT_DISK` will have an object with `Resource` set.
+  CREATE_MOUNT_DISK = 45;
+
+  // `DESTROY_MOUNT_DISK` will have an object with `Resource` set.
+  DESTROY_MOUNT_DISK = 46;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/79d7a4bd/src/authorizer/local/authorizer.cpp
----------------------------------------------------------------------
diff --git a/src/authorizer/local/authorizer.cpp 
b/src/authorizer/local/authorizer.cpp
index 61e9ab5..056b172 100644
--- a/src/authorizer/local/authorizer.cpp
+++ b/src/authorizer/local/authorizer.cpp
@@ -424,6 +424,10 @@ public:
         case authorization::UPDATE_WEIGHT:
         case authorization::VIEW_ROLE:
         case authorization::REGISTER_FRAMEWORK:
+        case authorization::CREATE_BLOCK_DISK:
+        case authorization::DESTROY_BLOCK_DISK:
+        case authorization::CREATE_MOUNT_DISK:
+        case authorization::DESTROY_MOUNT_DISK:
           return Error("Authorization for action " + stringify(action_) +
                        " requires a specialized approver object.");
         case authorization::UNKNOWN:
@@ -596,7 +600,11 @@ public:
       switch (action_) {
         case authorization::CREATE_VOLUME:
         case authorization::RESIZE_VOLUME:
-        case authorization::RESERVE_RESOURCES: {
+        case authorization::RESERVE_RESOURCES:
+        case authorization::CREATE_BLOCK_DISK:
+        case authorization::DESTROY_BLOCK_DISK:
+        case authorization::CREATE_MOUNT_DISK:
+        case authorization::DESTROY_MOUNT_DISK: {
           entityObject.set_type(ACL::Entity::SOME);
           if (object->resource) {
             if (object->resource->reservations_size() > 0) {
@@ -912,6 +920,26 @@ public:
             createHierarchicalRoleACLs(acls.update_quotas());
         break;
       }
+      case authorization::CREATE_BLOCK_DISK: {
+        hierarchicalRoleACLs =
+          createHierarchicalRoleACLs(acls.create_block_disks());
+        break;
+      }
+      case authorization::DESTROY_BLOCK_DISK: {
+        hierarchicalRoleACLs =
+          createHierarchicalRoleACLs(acls.destroy_block_disks());
+        break;
+      }
+      case authorization::CREATE_MOUNT_DISK: {
+        hierarchicalRoleACLs =
+          createHierarchicalRoleACLs(acls.create_mount_disks());
+        break;
+      }
+      case authorization::DESTROY_MOUNT_DISK: {
+        hierarchicalRoleACLs =
+          createHierarchicalRoleACLs(acls.destroy_mount_disks());
+        break;
+      }
       case authorization::ACCESS_MESOS_LOG:
       case authorization::ACCESS_SANDBOX:
       case authorization::ATTACH_CONTAINER_INPUT:
@@ -1126,7 +1154,11 @@ public:
       case authorization::VIEW_ROLE:
       case authorization::GET_QUOTA:
       case authorization::REGISTER_FRAMEWORK:
-      case authorization::UPDATE_QUOTA: {
+      case authorization::UPDATE_QUOTA:
+      case authorization::CREATE_BLOCK_DISK:
+      case authorization::DESTROY_BLOCK_DISK:
+      case authorization::CREATE_MOUNT_DISK:
+      case authorization::DESTROY_MOUNT_DISK: {
         return getHierarchicalRoleApprover(subject, action);
       }
       case authorization::ACCESS_MESOS_LOG:
@@ -1536,6 +1568,10 @@ private:
       case authorization::UPDATE_QUOTA:
       case authorization::LAUNCH_NESTED_CONTAINER_SESSION:
       case authorization::LAUNCH_NESTED_CONTAINER:
+      case authorization::CREATE_BLOCK_DISK:
+      case authorization::DESTROY_BLOCK_DISK:
+      case authorization::CREATE_MOUNT_DISK:
+      case authorization::DESTROY_MOUNT_DISK:
         return Error("Extracting ACLs for " + stringify(action) + " requires "
                      "a specialized function");
       case authorization::UNKNOWN:
@@ -1653,8 +1689,7 @@ Option<Error> LocalAuthorizer::validate(const ACLs& acls)
   foreach (const ACL::GetMaintenanceStatus& acl,
            acls.get_maintenance_statuses()) {
     if (acl.machines().type() == ACL::Entity::SOME) {
-      return Error(
-          "ACL.GetMaintenanceStatus type must be either NONE or ANY");
+      return Error("ACL.GetMaintenanceStatus type must be either NONE or ANY");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/79d7a4bd/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 4ade16f..ddc8df0 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3882,6 +3882,110 @@ Future<bool> Master::authorizeResizeVolume(
 }
 
 
+Future<bool> Master::authorizeCreateVolume(
+    const Resource& source,
+    const Option<Principal>& principal)
+{
+  if (authorizer.isNone()) {
+    return true; // Authorization is disabled.
+  }
+
+  authorization::Request request;
+  request.set_action(authorization::CREATE_MOUNT_DISK);
+
+  Option<authorization::Subject> subject = createSubject(principal);
+  if (subject.isSome()) {
+    request.mutable_subject()->CopyFrom(subject.get());
+  }
+
+  request.mutable_object()->mutable_resource()->CopyFrom(source);
+
+  LOG(INFO) << "Authorizing principal '"
+            << (principal.isSome() ? stringify(principal.get()) : "ANY")
+            << "' to create a volume from '" << source << "'";
+
+  return authorizer.get()->authorized(request);
+}
+
+
+Future<bool> Master::authorizeDestroyVolume(
+    const Resource& volume,
+    const Option<Principal>& principal)
+{
+  if (authorizer.isNone()) {
+    return true; // Authorization is disabled.
+  }
+
+  authorization::Request request;
+  request.set_action(authorization::DESTROY_MOUNT_DISK);
+
+  Option<authorization::Subject> subject = createSubject(principal);
+  if (subject.isSome()) {
+    request.mutable_subject()->CopyFrom(subject.get());
+  }
+
+  request.mutable_object()->mutable_resource()->CopyFrom(volume);
+
+  LOG(INFO) << "Authorizing principal '"
+            << (principal.isSome() ? stringify(principal.get()) : "ANY")
+            << "' to destroy volume '" << volume << "'";
+
+  return authorizer.get()->authorized(request);
+}
+
+
+Future<bool> Master::authorizeCreateBlock(
+    const Resource& source,
+    const Option<Principal>& principal)
+{
+  if (authorizer.isNone()) {
+    return true; // Authorization is disabled.
+  }
+
+  authorization::Request request;
+  request.set_action(authorization::CREATE_BLOCK_DISK);
+
+  Option<authorization::Subject> subject = createSubject(principal);
+  if (subject.isSome()) {
+    request.mutable_subject()->CopyFrom(subject.get());
+  }
+
+  request.mutable_object()->mutable_resource()->CopyFrom(source);
+
+  LOG(INFO) << "Authorizing principal '"
+            << (principal.isSome() ? stringify(principal.get()) : "ANY")
+            << "' to create a block from '" << source << "'";
+
+  return authorizer.get()->authorized(request);
+}
+
+
+Future<bool> Master::authorizeDestroyBlock(
+    const Resource& block,
+    const Option<Principal>& principal)
+{
+  if (authorizer.isNone()) {
+    return true; // Authorization is disabled.
+  }
+
+  authorization::Request request;
+  request.set_action(authorization::DESTROY_BLOCK_DISK);
+
+  Option<authorization::Subject> subject = createSubject(principal);
+  if (subject.isSome()) {
+    request.mutable_subject()->CopyFrom(subject.get());
+  }
+
+  request.mutable_object()->mutable_resource()->CopyFrom(block);
+
+  LOG(INFO) << "Authorizing principal '"
+            << (principal.isSome() ? stringify(principal.get()) : "ANY")
+            << "' to destroy block '" << block << "'";
+
+  return authorizer.get()->authorized(request);
+}
+
+
 Future<bool> Master::authorizeSlave(
     const SlaveInfo& slaveInfo,
     const Option<Principal>& principal)
@@ -4523,22 +4627,50 @@ void Master::accept(
       }
 
       case Offer::Operation::CREATE_VOLUME: {
-        // TODO(nfnt): Implement authorization for 'CREATE_VOLUME'.
+        Option<Principal> principal = framework->info.has_principal()
+          ? Principal(framework->info.principal())
+          : Option<Principal>::none();
+
+        futures.push_back(
+            authorizeCreateVolume(
+                operation.create_volume().source(), principal));
+
         break;
       }
 
       case Offer::Operation::DESTROY_VOLUME: {
-        // TODO(nfnt): Implement authorization for 'DESTROY_VOLUME'.
+        Option<Principal> principal = framework->info.has_principal()
+          ? Principal(framework->info.principal())
+          : Option<Principal>::none();
+
+        futures.push_back(
+            authorizeDestroyVolume(
+                operation.destroy_volume().volume(), principal));
+
         break;
       }
 
       case Offer::Operation::CREATE_BLOCK: {
-        // TODO(nfnt): Implement authorization for 'CREATE_BLOCK'.
+        Option<Principal> principal = framework->info.has_principal()
+          ? Principal(framework->info.principal())
+          : Option<Principal>::none();
+
+        futures.push_back(
+            authorizeCreateBlock(
+                operation.create_block().source(), principal));
+
         break;
       }
 
       case Offer::Operation::DESTROY_BLOCK: {
-        // TODO(nfnt): Implement authorization for 'DESTROY_BLOCK'.
+        Option<Principal> principal = framework->info.has_principal()
+          ? Principal(framework->info.principal())
+          : Option<Principal>::none();
+
+        futures.push_back(
+            authorizeDestroyBlock(
+                operation.destroy_block().block(), principal));
+
         break;
       }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/79d7a4bd/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 4180341..2ce71dc 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -895,6 +895,90 @@ protected:
       const Option<process::http::authentication::Principal>& principal);
 
 
+  /**
+   * Authorizes a `CREATE_VOLUME` operation.
+   *
+   * Returns whether the `CREATE_VOLUME` operation is authorized with the
+   * provided principal. This function is used for authorization of operations
+   * originating from frameworks. Note that operations may be validated AFTER
+   * authorization, so it's possible that the operation could be malformed.
+   *
+   * @param source The source from which a volume will be created.
+   * @param principal An `Option` containing the principal attempting this
+   *     operation.
+   *
+   * @return A `Future` containing a boolean value representing the success or
+   *     failure of this authorization. A failed `Future` implies that
+   *     validation of the operation did not succeed.
+   */
+  process::Future<bool> authorizeCreateVolume(
+      const Resource& source,
+      const Option<process::http::authentication::Principal>& principal);
+
+
+  /**
+   * Authorizes a `DESTROY_VOLUME` operation.
+   *
+   * Returns whether the `DESTROY_VOLUME` operation is authorized with the
+   * provided principal. This function is used for authorization of operations
+   * originating from frameworks. Note that operations may be validated AFTER
+   * authorization, so it's possible that the operation could be malformed.
+   *
+   * @param volume The volume being destroyed.
+   * @param principal An `Option` containing the principal attempting this
+   *     operation.
+   *
+   * @return A `Future` containing a boolean value representing the success or
+   *     failure of this authorization. A failed `Future` implies that
+   *     validation of the operation did not succeed.
+   */
+  process::Future<bool> authorizeDestroyVolume(
+      const Resource& volume,
+      const Option<process::http::authentication::Principal>& principal);
+
+
+  /**
+   * Authorizes a `CREATE_BLOCK` operation.
+   *
+   * Returns whether the `CREATE_BLOCK` operation is authorized with the
+   * provided principal. This function is used for authorization of operations
+   * originating from frameworks. Note that operations may be validated AFTER
+   * authorization, so it's possible that the operation could be malformed.
+   *
+   * @param source The source from which a block will be created.
+   * @param principal An `Option` containing the principal attempting this
+   *     operation.
+   *
+   * @return A `Future` containing a boolean value representing the success or
+   *     failure of this authorization. A failed `Future` implies that
+   *     validation of the operation did not succeed.
+   */
+  process::Future<bool> authorizeCreateBlock(
+      const Resource& source,
+      const Option<process::http::authentication::Principal>& principal);
+
+
+  /**
+   * Authorizes a `DESTROY_BLOCK` operation.
+   *
+   * Returns whether the `DESTROY_BLOCK` operation is authorized with the
+   * provided principal. This function is used for authorization of operations
+   * originating from frameworks. Note that operations may be validated AFTER
+   * authorization, so it's possible that the operation could be malformed.
+   *
+   * @param block The block being destroyed.
+   * @param principal An `Option` containing the principal attempting this
+   *     operation.
+   *
+   * @return A `Future` containing a boolean value representing the success or
+   *     failure of this authorization. A failed `Future` implies that
+   *     validation of the operation did not succeed.
+   */
+  process::Future<bool> authorizeDestroyBlock(
+      const Resource& block,
+      const Option<process::http::authentication::Principal>& principal);
+
+
   // Determine if a new executor needs to be launched.
   bool isLaunchExecutor (
       const ExecutorID& executorId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/79d7a4bd/src/tests/authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/authorization_tests.cpp 
b/src/tests/authorization_tests.cpp
index f6f7769..41ecac2 100644
--- a/src/tests/authorization_tests.cpp
+++ b/src/tests/authorization_tests.cpp
@@ -5844,6 +5844,1046 @@ TYPED_TEST(AuthorizationTest, PruneImages)
   }
 }
 
+
+// This tests the authorization to create block disks.
+TYPED_TEST(AuthorizationTest, CreateBlockDisk)
+{
+  ACLs acls;
+
+  {
+    // Principal "foo" can create block disks for any role.
+    mesos::ACL::CreateBlockDisk* acl = acls.add_create_block_disks();
+    acl->mutable_principals()->add_values("foo");
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::ANY);
+  }
+
+  {
+    // Principal "bar" can only create block disks for the "panda" role.
+    mesos::ACL::CreateBlockDisk* acl = acls.add_create_block_disks();
+    acl->mutable_principals()->add_values("bar");
+    acl->mutable_roles()->add_values("panda");
+  }
+
+  {
+    // Principal "baz" cannot create block disks.
+    mesos::ACL::CreateBlockDisk* acl = acls.add_create_block_disks();
+    acl->mutable_principals()->add_values("baz");
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // Principal "elizabeth-ii" can create block disks for the "king" role
+    // and its nested ones.
+    mesos::ACL::CreateBlockDisk* acl = acls.add_create_block_disks();
+    acl->mutable_principals()->add_values("elizabeth-ii");
+    acl->mutable_roles()->add_values("king/%");
+    acl->mutable_roles()->add_values("king");
+  }
+
+  {
+    // Principal "charles" can create block disks for any role below the
+    // "king/" role. Not in "king" itself.
+    mesos::ACL::CreateBlockDisk* acl = acls.add_create_block_disks();
+    acl->mutable_principals()->add_values("charles");
+    acl->mutable_roles()->add_values("king/%");
+  }
+
+  {
+    // Principal "j-welby" can create block disks only for the "king"
+    // role but not in any nested one.
+    mesos::ACL::CreateBlockDisk* acl = acls.add_create_block_disks();
+    acl->mutable_principals()->add_values("j-welby");
+    acl->mutable_roles()->add_values("king");
+  }
+
+  {
+    // No other principals can create block disks.
+    mesos::ACL::CreateBlockDisk* acl = acls.add_create_block_disks();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // No other principals can create block disks.
+    mesos::ACL::CreateBlockDisk* acl = acls.add_create_block_disks();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  // Create an `Authorizer` with the ACLs.
+  Try<Authorizer*> create = TypeParam::create(parameterize(acls));
+  ASSERT_SOME(create);
+  Owned<Authorizer> authorizer(create.get());
+
+  // Principal "foo" can create block disks for any role, so this
+  // request will pass.
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_BLOCK_DISK);
+    request.mutable_subject()->set_value("foo");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("awesome_role");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // Principal "bar" can create block disks for the "panda" role,
+  // so this request will pass.
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_BLOCK_DISK);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("panda");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // Principal "bar" cannot create block disks for the "giraffe" role,
+  // so this request will fail.
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_BLOCK_DISK);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("giraffe");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  // Principal "baz" cannot create block disks for any role,
+  // so this request will fail.
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_BLOCK_DISK);
+    request.mutable_subject()->set_value("baz");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("panda");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  // Principal "zelda" is not mentioned in the ACLs of the Authorizer, so it
+  // will be caught by the final ACL, which provides a default case that denies
+  // access for all other principals. This request will fail.
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_BLOCK_DISK);
+    request.mutable_subject()->set_value("zelda");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("panda");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  // `elizabeth-ii` has full permissions for the `king` role as well as all
+  // its nested roles. She should be able to create block disks in the next
+  // three blocks.
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_BLOCK_DISK);
+    request.mutable_subject()->set_value("elizabeth-ii");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_BLOCK_DISK);
+    request.mutable_subject()->set_value("elizabeth-ii");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_BLOCK_DISK);
+    request.mutable_subject()->set_value("elizabeth-ii");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince/duke");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // `charles` doesn't have permissions for the `king` role, so the first
+  // test should fail. However he has permissions for `king`'s nested roles
+  // so the next two tests should succeed.
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_BLOCK_DISK);
+    request.mutable_subject()->set_value("charles");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_BLOCK_DISK);
+    request.mutable_subject()->set_value("charles");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_BLOCK_DISK);
+    request.mutable_subject()->set_value("charles");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince/duke");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // `j-welby` only has permissions for the role `king` itself, but not
+  // for its nested roles, therefore only the first of the following three
+  // tests should succeed.
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_BLOCK_DISK);
+    request.mutable_subject()->set_value("j-welby");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_BLOCK_DISK);
+    request.mutable_subject()->set_value("j-welby");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_BLOCK_DISK);
+    request.mutable_subject()->set_value("j-welby");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince/duke");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+}
+
+
+// This tests the authorization to destroy block disks.
+TYPED_TEST(AuthorizationTest, DestroyBlockDisk)
+{
+  ACLs acls;
+
+  {
+    // Principal "foo" can destroy block disks for any role.
+    mesos::ACL::DestroyBlockDisk* acl = acls.add_destroy_block_disks();
+    acl->mutable_principals()->add_values("foo");
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::ANY);
+  }
+
+  {
+    // Principal "bar" can only destroy block disks for the "panda" role.
+    mesos::ACL::DestroyBlockDisk* acl = acls.add_destroy_block_disks();
+    acl->mutable_principals()->add_values("bar");
+    acl->mutable_roles()->add_values("panda");
+  }
+
+  {
+    // Principal "baz" cannot destroy block disks.
+    mesos::ACL::DestroyBlockDisk* acl = acls.add_destroy_block_disks();
+    acl->mutable_principals()->add_values("baz");
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // Principal "elizabeth-ii" can destroy block disks for the "king" role
+    // and its nested ones.
+    mesos::ACL::DestroyBlockDisk* acl = acls.add_destroy_block_disks();
+    acl->mutable_principals()->add_values("elizabeth-ii");
+    acl->mutable_roles()->add_values("king/%");
+    acl->mutable_roles()->add_values("king");
+  }
+
+  {
+    // Principal "charles" can destroy block disks for any role below the
+    // "king/" role. Not in "king" itself.
+    mesos::ACL::DestroyBlockDisk* acl = acls.add_destroy_block_disks();
+    acl->mutable_principals()->add_values("charles");
+    acl->mutable_roles()->add_values("king/%");
+  }
+
+  {
+    // Principal "j-welby" can destroy block disks only for the "king"
+    // role but not in any nested one.
+    mesos::ACL::DestroyBlockDisk* acl = acls.add_destroy_block_disks();
+    acl->mutable_principals()->add_values("j-welby");
+    acl->mutable_roles()->add_values("king");
+  }
+
+  {
+    // No other principals can destroy block disks.
+    mesos::ACL::DestroyBlockDisk* acl = acls.add_destroy_block_disks();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // No other principals can destroy block disks.
+    mesos::ACL::DestroyBlockDisk* acl = acls.add_destroy_block_disks();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  // Create an `Authorizer` with the ACLs.
+  Try<Authorizer*> create = TypeParam::create(parameterize(acls));
+  ASSERT_SOME(create);
+  Owned<Authorizer> authorizer(create.get());
+
+  // Principal "foo" can destroy block disks for any role, so this
+  // request will pass.
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_BLOCK_DISK);
+    request.mutable_subject()->set_value("foo");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("awesome_role");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // Principal "bar" can destroy block disks for the "panda" role,
+  // so this request will pass.
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_BLOCK_DISK);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("panda");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // Principal "bar" cannot destroy block disks for the "giraffe" role,
+  // so this request will fail.
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_BLOCK_DISK);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("giraffe");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  // Principal "baz" cannot destroy block disks for any role,
+  // so this request will fail.
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_BLOCK_DISK);
+    request.mutable_subject()->set_value("baz");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("panda");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  // Principal "zelda" is not mentioned in the ACLs of the Authorizer, so it
+  // will be caught by the final ACL, which provides a default case that denies
+  // access for all other principals. This request will fail.
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_BLOCK_DISK);
+    request.mutable_subject()->set_value("zelda");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("panda");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  // `elizabeth-ii` has full permissions for the `king` role as well as all
+  // its nested roles. She should be able to destroy block disks in the next
+  // three blocks.
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_BLOCK_DISK);
+    request.mutable_subject()->set_value("elizabeth-ii");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_BLOCK_DISK);
+    request.mutable_subject()->set_value("elizabeth-ii");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_BLOCK_DISK);
+    request.mutable_subject()->set_value("elizabeth-ii");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince/duke");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // `charles` doesn't have permissions for the `king` role, so the first
+  // test should fail. However he has permissions for `king`'s nested roles
+  // so the next two tests should succeed.
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_BLOCK_DISK);
+    request.mutable_subject()->set_value("charles");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_BLOCK_DISK);
+    request.mutable_subject()->set_value("charles");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_BLOCK_DISK);
+    request.mutable_subject()->set_value("charles");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince/duke");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // `j-welby` only has permissions for the role `king` itself, but not
+  // for its nested roles, therefore only the first of the following three
+  // tests should succeed.
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_BLOCK_DISK);
+    request.mutable_subject()->set_value("j-welby");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_BLOCK_DISK);
+    request.mutable_subject()->set_value("j-welby");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_BLOCK_DISK);
+    request.mutable_subject()->set_value("j-welby");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince/duke");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+}
+
+
+// This tests the authorization to create mount disks.
+TYPED_TEST(AuthorizationTest, CreateMountDisk)
+{
+  ACLs acls;
+
+  {
+    // Principal "foo" can create mount disks for any role.
+    mesos::ACL::CreateMountDisk* acl = acls.add_create_mount_disks();
+    acl->mutable_principals()->add_values("foo");
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::ANY);
+  }
+
+  {
+    // Principal "bar" can only create mount disks for the "panda" role.
+    mesos::ACL::CreateMountDisk* acl = acls.add_create_mount_disks();
+    acl->mutable_principals()->add_values("bar");
+    acl->mutable_roles()->add_values("panda");
+  }
+
+  {
+    // Principal "baz" cannot create mount disks.
+    mesos::ACL::CreateMountDisk* acl = acls.add_create_mount_disks();
+    acl->mutable_principals()->add_values("baz");
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // Principal "elizabeth-ii" can create mount disks for the "king" role
+    // and its nested ones.
+    mesos::ACL::CreateMountDisk* acl = acls.add_create_mount_disks();
+    acl->mutable_principals()->add_values("elizabeth-ii");
+    acl->mutable_roles()->add_values("king/%");
+    acl->mutable_roles()->add_values("king");
+  }
+
+  {
+    // Principal "charles" can create mount disks for any role below the
+    // "king/" role. Not in "king" itself.
+    mesos::ACL::CreateMountDisk* acl = acls.add_create_mount_disks();
+    acl->mutable_principals()->add_values("charles");
+    acl->mutable_roles()->add_values("king/%");
+  }
+
+  {
+    // Principal "j-welby" can create mount disks only for the "king" role
+    // but not in any nested one.
+    mesos::ACL::CreateMountDisk* acl = acls.add_create_mount_disks();
+    acl->mutable_principals()->add_values("j-welby");
+    acl->mutable_roles()->add_values("king");
+  }
+
+  {
+    // No other principals can create mount disks.
+    mesos::ACL::CreateMountDisk* acl = acls.add_create_mount_disks();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // No other principals can create mount disks.
+    mesos::ACL::CreateMountDisk* acl = acls.add_create_mount_disks();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  // Create an `Authorizer` with the ACLs.
+  Try<Authorizer*> create = TypeParam::create(parameterize(acls));
+  ASSERT_SOME(create);
+  Owned<Authorizer> authorizer(create.get());
+
+  // Principal "foo" can create mount disks for any role,
+  // so this request will pass.
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_MOUNT_DISK);
+    request.mutable_subject()->set_value("foo");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("awesome_role");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // Principal "bar" can create mount disks for the "panda" role,
+  // so this request will pass.
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_MOUNT_DISK);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("panda");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // Principal "bar" cannot create mount disks for the "giraffe" role,
+  // so this request will fail.
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_MOUNT_DISK);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("giraffe");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  // Principal "baz" cannot create mount disks for any role,
+  // so this request will fail.
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_MOUNT_DISK);
+    request.mutable_subject()->set_value("baz");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("panda");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  // Principal "zelda" is not mentioned in the ACLs of the Authorizer, so it
+  // will be caught by the final ACL, which provides a default case that denies
+  // access for all other principals. This request will fail.
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_MOUNT_DISK);
+    request.mutable_subject()->set_value("zelda");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("panda");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  // `elizabeth-ii` has full permissions for the `king` role as well as all
+  // its nested roles. She should be able to create mount disks in the next
+  // three blocks.
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_MOUNT_DISK);
+    request.mutable_subject()->set_value("elizabeth-ii");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_MOUNT_DISK);
+    request.mutable_subject()->set_value("elizabeth-ii");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_MOUNT_DISK);
+    request.mutable_subject()->set_value("elizabeth-ii");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince/duke");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // `charles` doesn't have permissions for the `king` role, so the first
+  // test should fail. However he has permissions for `king`'s nested roles
+  // so the next two tests should succeed.
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_MOUNT_DISK);
+    request.mutable_subject()->set_value("charles");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_MOUNT_DISK);
+    request.mutable_subject()->set_value("charles");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_MOUNT_DISK);
+    request.mutable_subject()->set_value("charles");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince/duke");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // `j-welby` only has permissions for the role `king` itself, but not
+  // for its nested roles, therefore only the first of the following three
+  // tests should succeed.
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_MOUNT_DISK);
+    request.mutable_subject()->set_value("j-welby");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_MOUNT_DISK);
+    request.mutable_subject()->set_value("j-welby");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::CREATE_MOUNT_DISK);
+    request.mutable_subject()->set_value("j-welby");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince/duke");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+}
+
+
+// This tests the authorization to destroy mount disks.
+TYPED_TEST(AuthorizationTest, DestroyMountDisk)
+{
+  ACLs acls;
+
+  {
+    // Principal "foo" can destroy mount disks for any role.
+    mesos::ACL::DestroyMountDisk* acl = acls.add_destroy_mount_disks();
+    acl->mutable_principals()->add_values("foo");
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::ANY);
+  }
+
+  {
+    // Principal "bar" can only destroy mount disks for the "panda" role.
+    mesos::ACL::DestroyMountDisk* acl = acls.add_destroy_mount_disks();
+    acl->mutable_principals()->add_values("bar");
+    acl->mutable_roles()->add_values("panda");
+  }
+
+  {
+    // Principal "baz" cannot destroy mount disks.
+    mesos::ACL::DestroyMountDisk* acl = acls.add_destroy_mount_disks();
+    acl->mutable_principals()->add_values("baz");
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // Principal "elizabeth-ii" can destroy mount disks for the "king" role
+    // and its nested ones.
+    mesos::ACL::DestroyMountDisk* acl = acls.add_destroy_mount_disks();
+    acl->mutable_principals()->add_values("elizabeth-ii");
+    acl->mutable_roles()->add_values("king/%");
+    acl->mutable_roles()->add_values("king");
+  }
+
+  {
+    // Principal "charles" can destroy mount disks for any role below the
+    // "king/" role. Not in "king" itself.
+    mesos::ACL::DestroyMountDisk* acl = acls.add_destroy_mount_disks();
+    acl->mutable_principals()->add_values("charles");
+    acl->mutable_roles()->add_values("king/%");
+  }
+
+  {
+    // Principal "j-welby" can destroy mount disks only for the "king"
+    // role but not in any nested one.
+    mesos::ACL::DestroyMountDisk* acl = acls.add_destroy_mount_disks();
+    acl->mutable_principals()->add_values("j-welby");
+    acl->mutable_roles()->add_values("king");
+  }
+
+  {
+    // No other principals can destroy mount disks.
+    mesos::ACL::DestroyMountDisk* acl = acls.add_destroy_mount_disks();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // No other principals can destroy mount disks.
+    mesos::ACL::DestroyMountDisk* acl = acls.add_destroy_mount_disks();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  // Create an `Authorizer` with the ACLs.
+  Try<Authorizer*> create = TypeParam::create(parameterize(acls));
+  ASSERT_SOME(create);
+  Owned<Authorizer> authorizer(create.get());
+
+  // Principal "foo" can destroy mount disks for any role, so this
+  // request will pass.
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_MOUNT_DISK);
+    request.mutable_subject()->set_value("foo");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("awesome_role");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // Principal "bar" can destroy mount disks for the "panda" role,
+  // so this request will pass.
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_MOUNT_DISK);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("panda");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // Principal "bar" cannot destroy mount disks for the "giraffe" role,
+  // so this request will fail.
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_MOUNT_DISK);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("giraffe");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  // Principal "baz" cannot destroy mount disks for any role,
+  // so this request will fail.
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_MOUNT_DISK);
+    request.mutable_subject()->set_value("baz");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("panda");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  // Principal "zelda" is not mentioned in the ACLs of the Authorizer, so it
+  // will be caught by the final ACL, which provides a default case that denies
+  // access for all other principals. This request will fail.
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_MOUNT_DISK);
+    request.mutable_subject()->set_value("zelda");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("panda");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  // `elizabeth-ii` has full permissions for the `king` role as well as all
+  // its nested roles. She should be able to destroy mount disks in the next
+  // three blocks.
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_MOUNT_DISK);
+    request.mutable_subject()->set_value("elizabeth-ii");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_MOUNT_DISK);
+    request.mutable_subject()->set_value("elizabeth-ii");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_MOUNT_DISK);
+    request.mutable_subject()->set_value("elizabeth-ii");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince/duke");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // `charles` doesn't have permissions for the `king` role, so the first
+  // test should fail. However he has permissions for `king`'s nested roles
+  // so the next two tests should succeed.
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_MOUNT_DISK);
+    request.mutable_subject()->set_value("charles");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_MOUNT_DISK);
+    request.mutable_subject()->set_value("charles");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_MOUNT_DISK);
+    request.mutable_subject()->set_value("charles");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince/duke");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // `j-welby` only has permissions for the role `king` itself, but not
+  // for its nested roles, therefore only the first of the following three
+  // tests should succeed.
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_MOUNT_DISK);
+    request.mutable_subject()->set_value("j-welby");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_MOUNT_DISK);
+    request.mutable_subject()->set_value("j-welby");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::DESTROY_MOUNT_DISK);
+    request.mutable_subject()->set_value("j-welby");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince/duke");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to