This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit bf07bbd1cf103b6e8c23ff3831a11dacf9dfc398
Author: Chun-Hung Hsiao <[email protected]>
AuthorDate: Thu Jan 17 18:48:59 2019 -0800

    Added a unit test for master operation authorization.
    
    This test verifies that allowing or denying an action will only result
    in a success or failure on specific operations but not other operations
    in an accept call. This is a regression test for MESOS-9474 and
    MESOS-9480.
    
    Review: https://reviews.apache.org/r/70686
---
 src/slave/slave.hpp                      |   3 +-
 src/tests/master_authorization_tests.cpp | 597 +++++++++++++++++++++++++++++++
 src/tests/mesos.hpp                      | 123 +++++--
 src/tests/mock_slave.cpp                 |   7 +
 src/tests/mock_slave.hpp                 |   6 +
 5 files changed, 709 insertions(+), 27 deletions(-)

diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index c740bf7..6954f53 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -268,7 +268,8 @@ public:
   void checkpointResourcesMessage(
       const std::vector<Resource>& resources);
 
-  void applyOperation(const ApplyOperationMessage& message);
+  // Made 'virtual' for Slave mocking.
+  virtual void applyOperation(const ApplyOperationMessage& message);
 
   // Reconciles pending operations with the master. This is necessary to handle
   // cases in which operations were dropped in transit, or in which an agent's
diff --git a/src/tests/master_authorization_tests.cpp 
b/src/tests/master_authorization_tests.cpp
index f65b621..21e450c 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -14,7 +14,10 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <atomic>
+#include <set>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <gmock/gmock.h>
@@ -29,6 +32,7 @@
 #include <mesos/module/authorizer.hpp>
 
 #include <process/clock.hpp>
+#include <process/collect.hpp>
 #include <process/future.hpp>
 #include <process/http.hpp>
 #include <process/pid.hpp>
@@ -36,9 +40,16 @@
 
 #include <stout/gtest.hpp>
 #include <stout/try.hpp>
+#include <stout/unreachable.hpp>
 
 #include "authorizer/local/authorizer.hpp"
 
+#include "common/protobuf_utils.hpp"
+#include "common/resources_utils.hpp"
+
+#include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
+
 #include "master/master.hpp"
 
 #include "master/allocator/mesos/allocator.hpp"
@@ -57,6 +68,8 @@
 
 namespace http = process::http;
 
+using google::protobuf::RepeatedPtrField;
+
 using mesos::internal::master::Master;
 
 using mesos::internal::master::allocator::MesosAllocatorProcess;
@@ -81,11 +94,14 @@ using std::string;
 using std::vector;
 
 using testing::_;
+using testing::AllOf;
 using testing::An;
 using testing::AtMost;
 using testing::DoAll;
 using testing::Eq;
+using testing::Invoke;
 using testing::Return;
+using testing::Truly;
 
 namespace mesos {
 namespace internal {
@@ -2748,6 +2764,587 @@ TEST_F(MasterAuthorizationTest, 
AuthorizedToRegisterNoStaticReservations)
   AWAIT_READY(slaveRegisteredMessage);
 }
 
+
+class MasterOperationAuthorizationTest
+  : public MesosTest,
+    public ::testing::WithParamInterface<authorization::Action>
+{
+public:
+  static Resources createAgentResources(const Resources& resources)
+  {
+    Resources agentResources;
+    foreach (
+        Resource resource,
+        resources - resources.filter(&Resources::hasResourceProvider)) {
+      if (Resources::isPersistentVolume(resource)) {
+        if (resource.disk().has_source()) {
+          resource.mutable_disk()->clear_persistence();
+          resource.mutable_disk()->clear_volume();
+        } else {
+          resource.clear_disk();
+        }
+      }
+
+      agentResources += resource;
+    }
+
+    return agentResources;
+  }
+
+  static vector<v1::Offer::Operation> createOperations(
+      const v1::FrameworkID& frameworkId,
+      const v1::AgentID& agentId,
+      const authorization::Action& action)
+  {
+    switch (action) {
+      case authorization::RUN_TASK: {
+        const v1::Resources taskResources =
+          v1::Resources::parse("cpus:1;mem:32").get();
+
+        v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+            v1::DEFAULT_EXECUTOR_ID,
+            None(),
+            v1::Resources::parse("cpus:1;mem:32;disk:32").get(),
+            v1::ExecutorInfo::DEFAULT,
+            frameworkId);
+
+        return {v1::LAUNCH(
+                    {v1::createTask(agentId, taskResources, ""),
+                     v1::createTask(agentId, taskResources, "")}),
+                v1::LAUNCH_GROUP(
+                    executorInfo,
+                    v1::createTaskGroupInfo(
+                        {v1::createTask(agentId, taskResources, ""),
+                         v1::createTask(agentId, taskResources, "")}))};
+      }
+      case authorization::RESERVE_RESOURCES: {
+        v1::OperationID operationId;
+        operationId.set_value(id::UUID::random().toString());
+
+        v1::Resources reserved;
+        reserved += v1::createReservedResource(
+            "cpus", "1", v1::createDynamicReservationInfo(
+                "role", v1::DEFAULT_CREDENTIAL.principal()));
+        reserved += v1::createReservedResource(
+            "mem", "32", v1::createDynamicReservationInfo(
+                "role", v1::DEFAULT_CREDENTIAL.principal()));
+
+        return {v1::RESERVE(reserved, std::move(operationId))};
+      }
+      case authorization::UNRESERVE_RESOURCES: {
+        v1::OperationID operationId;
+        operationId.set_value(id::UUID::random().toString());
+
+        v1::Resources reserved;
+        reserved += v1::createReservedResource(
+            "cpus", "1", v1::createDynamicReservationInfo("role"));
+        reserved += v1::createReservedResource(
+            "mem", "32", v1::createDynamicReservationInfo("role"));
+
+        return {v1::UNRESERVE(reserved, std::move(operationId))};
+      }
+      case authorization::CREATE_VOLUME: {
+        v1::OperationID operationId;
+        operationId.set_value(id::UUID::random().toString());
+
+        v1::Resources volumes;
+        volumes += v1::createPersistentVolume(
+            Megabytes(32),
+            "role",
+            id::UUID::random().toString(),
+            "path",
+            None(),
+            None(),
+            v1::DEFAULT_CREDENTIAL.principal());
+        volumes += v1::createPersistentVolume(
+            Megabytes(32),
+            "role",
+            id::UUID::random().toString(),
+            "path",
+            None(),
+            None(),
+            v1::DEFAULT_CREDENTIAL.principal());
+
+        return {v1::CREATE(volumes, std::move(operationId))};
+      }
+      case authorization::DESTROY_VOLUME: {
+        v1::OperationID operationId;
+        operationId.set_value(id::UUID::random().toString());
+
+        v1::Resources volumes;
+        volumes += v1::createPersistentVolume(
+            Megabytes(32), "role", id::UUID::random().toString(), "path");
+        volumes += v1::createPersistentVolume(
+            Megabytes(32), "role", id::UUID::random().toString(), "path");
+
+        return {v1::DESTROY(volumes, std::move(operationId))};
+      }
+      case authorization::RESIZE_VOLUME: {
+        v1::OperationID operationId1;
+        operationId1.set_value(id::UUID::random().toString());
+
+        v1::OperationID operationId2;
+        operationId2.set_value(id::UUID::random().toString());
+
+        return {
+          v1::GROW_VOLUME(
+              v1::createPersistentVolume(
+                  Megabytes(32), "role", id::UUID::random().toString(), 
"path"),
+              v1::Resources::parse("disk", "32", "role").get(),
+              std::move(operationId1)),
+          v1::SHRINK_VOLUME(
+              v1::createPersistentVolume(
+                  Megabytes(64), "role", id::UUID::random().toString(), 
"path"),
+              mesos::v1::internal::values::parse("32")->scalar(),
+              std::move(operationId2))};
+      }
+      case authorization::CREATE_BLOCK_DISK: {
+        v1::OperationID operationId;
+        operationId.set_value(id::UUID::random().toString());
+
+        v1::Resource raw = v1::createDiskResource(
+            "32", "*", None(), None(), v1::createDiskSourceRaw(
+                None(), "profile"));
+        raw.mutable_provider_id()->set_value("provider");
+
+        return {v1::CREATE_DISK(
+            raw,
+            v1::Resource::DiskInfo::Source::BLOCK,
+            None(),
+            std::move(operationId))};
+      }
+      case authorization::DESTROY_BLOCK_DISK: {
+        v1::OperationID operationId;
+        operationId.set_value(id::UUID::random().toString());
+
+        v1::Resource block = v1::createDiskResource(
+            "32", "*", None(), None(), v1::createDiskSourceBlock(
+                id::UUID::random().toString(), "profile"));
+        block.mutable_provider_id()->set_value("provider");
+
+        return {v1::DESTROY_DISK(block, std::move(operationId))};
+      }
+      case authorization::CREATE_MOUNT_DISK: {
+        v1::OperationID operationId;
+        operationId.set_value(id::UUID::random().toString());
+
+        v1::Resource raw = v1::createDiskResource(
+            "32", "*", None(), None(), v1::createDiskSourceRaw(
+                None(), "profile"));
+        raw.mutable_provider_id()->set_value("provider");
+
+        return {v1::CREATE_DISK(
+            raw,
+            v1::Resource::DiskInfo::Source::MOUNT,
+            None(),
+            std::move(operationId))};
+      }
+      case authorization::DESTROY_MOUNT_DISK: {
+        v1::OperationID operationId;
+        operationId.set_value(id::UUID::random().toString());
+
+        v1::Resource mount = v1::createDiskResource(
+            "32", "*", None(), None(), v1::createDiskSourceMount(
+                None(), id::UUID::random().toString(), "profile"));
+        mount.mutable_provider_id()->set_value("provider");
+
+        return {v1::DESTROY_DISK(mount, std::move(operationId))};
+      }
+      case authorization::DESTROY_RAW_DISK: {
+        v1::OperationID operationId;
+        operationId.set_value(id::UUID::random().toString());
+
+        v1::Resource raw = v1::createDiskResource(
+            "32", "*", None(), None(), v1::createDiskSourceRaw(
+                id::UUID::random().toString(), "profile"));
+        raw.mutable_provider_id()->set_value("provider");
+
+        return {v1::DESTROY_DISK(raw, std::move(operationId))};
+      }
+      case authorization::UNKNOWN:
+      case authorization::REGISTER_FRAMEWORK:
+      case authorization::TEARDOWN_FRAMEWORK:
+      case authorization::GET_ENDPOINT_WITH_PATH:
+      case authorization::VIEW_ROLE:
+      case authorization::UPDATE_WEIGHT:
+      case authorization::GET_QUOTA:
+      case authorization::UPDATE_QUOTA:
+      case authorization::VIEW_FRAMEWORK:
+      case authorization::VIEW_TASK:
+      case authorization::VIEW_EXECUTOR:
+      case authorization::ACCESS_SANDBOX:
+      case authorization::ACCESS_MESOS_LOG:
+      case authorization::VIEW_FLAGS:
+      case authorization::LAUNCH_NESTED_CONTAINER:
+      case authorization::KILL_NESTED_CONTAINER:
+      case authorization::WAIT_NESTED_CONTAINER:
+      case authorization::LAUNCH_NESTED_CONTAINER_SESSION:
+      case authorization::ATTACH_CONTAINER_INPUT:
+      case authorization::ATTACH_CONTAINER_OUTPUT:
+      case authorization::VIEW_CONTAINER:
+      case authorization::SET_LOG_LEVEL:
+      case authorization::REMOVE_NESTED_CONTAINER:
+      case authorization::REGISTER_AGENT:
+      case authorization::UPDATE_MAINTENANCE_SCHEDULE:
+      case authorization::GET_MAINTENANCE_SCHEDULE:
+      case authorization::START_MAINTENANCE:
+      case authorization::STOP_MAINTENANCE:
+      case authorization::GET_MAINTENANCE_STATUS:
+      case authorization::MARK_AGENT_GONE:
+      case authorization::LAUNCH_STANDALONE_CONTAINER:
+      case authorization::KILL_STANDALONE_CONTAINER:
+      case authorization::WAIT_STANDALONE_CONTAINER:
+      case authorization::REMOVE_STANDALONE_CONTAINER:
+      case authorization::VIEW_STANDALONE_CONTAINER:
+      case authorization::MODIFY_RESOURCE_PROVIDER_CONFIG:
+      case authorization::MARK_RESOURCE_PROVIDER_GONE:
+      case authorization::VIEW_RESOURCE_PROVIDER:
+      case authorization::PRUNE_IMAGES:
+        return {};
+    }
+
+    UNREACHABLE();
+  }
+};
+
+
+INSTANTIATE_TEST_CASE_P(
+    AllowedAction,
+    MasterOperationAuthorizationTest,
+    ::testing::Values(
+        authorization::RUN_TASK,
+        authorization::RESERVE_RESOURCES,
+        authorization::UNRESERVE_RESOURCES,
+        authorization::CREATE_VOLUME,
+        authorization::DESTROY_VOLUME,
+        authorization::RESIZE_VOLUME,
+        authorization::CREATE_BLOCK_DISK,
+        authorization::DESTROY_BLOCK_DISK,
+        authorization::CREATE_MOUNT_DISK,
+        authorization::DESTROY_MOUNT_DISK),
+    [](const testing::TestParamInfo<authorization::Action>& action) {
+      return authorization::Action_Name(action.param);
+    });
+
+
+// This test verifies that allowing or denying an action will only result in a
+// success or failure on specific operations but not other operations in an
+// accept call. This is a regression test for MESOS-9474 and MESOS-9480.
+TEST_P(MasterOperationAuthorizationTest, Accept)
+{
+  Clock::pause();
+
+  // We use this flag to control when the mock authorizer starts to deny
+  // disallowed actions.
+  std::atomic_bool permissive(true);
+
+  MockAuthorizer authorizer;
+  EXPECT_CALL(authorizer, authorized(_))
+    .WillRepeatedly(Invoke([&](const authorization::Request& request) {
+      return permissive || request.action() == GetParam();
+    }));
+
+  Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
+  ASSERT_SOME(master);
+
+  // First, we create a list of operations to exercise all authorization
+  // actions, and compute the total resources needed by these operations and 
set
+  // up their expected terminal states.
+  //
+  // NOTE: We create some `RUN_TASK` operations in the beginning and some in 
the
+  // end for two reasons: 1. These operations doesn't have operation IDs so
+  // needs to be handled differently. 2. By adding some operations for the
+  // `RUN_TASK` action in the end, we can verify that their results are not
+  // affected by preceding authorizations.
+  vector<v1::Offer::Operation> operations;
+  v1::Resources totalResources;
+  hashmap<v1::TaskID, v1::TaskState> expectedTaskStates;
+  hashmap<v1::OperationID, v1::OperationState> expectedOperationStates;
+
+  // NOTE: Because we create operations before getting an offer, we synthesize
+  // the framework ID and agent ID here and check them later.
+  v1::FrameworkID frameworkId;
+  frameworkId.set_value(master.get()->getMasterInfo().id() + "-0000");
+  v1::AgentID agentId;
+  agentId.set_value(master.get()->getMasterInfo().id() + "-S0");
+
+  auto addRunTaskOperations = [&] {
+    foreach (
+        v1::Offer::Operation& operation,
+        createOperations(frameworkId, agentId, authorization::RUN_TASK)) {
+      if (operation.type() == v1::Offer::Operation::LAUNCH) {
+        foreach (const v1::TaskInfo& task, operation.launch().task_infos()) {
+          totalResources += task.resources();
+          totalResources += task.executor().resources();
+
+          expectedTaskStates.put(
+              task.task_id(),
+              authorization::RUN_TASK == GetParam()
+                ? v1::TASK_FINISHED : v1::TASK_ERROR);
+        }
+      } else if (operation.type() == v1::Offer::Operation::LAUNCH_GROUP) {
+        totalResources += operation.launch_group().executor().resources();
+
+        foreach (
+            const v1::TaskInfo& task,
+            operation.launch_group().task_group().tasks()) {
+          totalResources += task.resources();
+
+          expectedTaskStates.put(
+              task.task_id(),
+              authorization::RUN_TASK == GetParam()
+                ? v1::TASK_FINISHED : v1::TASK_ERROR);
+        }
+      }
+
+      operations.push_back(std::move(operation));
+    }
+  };
+
+  addRunTaskOperations();
+
+  for (int i = 0; i < authorization::Action_descriptor()->value_count(); i++) {
+    const authorization::Action action = static_cast<authorization::Action>(
+        authorization::Action_descriptor()->value(i)->number());
+
+    // Skip `RUN_TASK` operations since they are handled separately.
+    if (action == authorization::RUN_TASK) {
+      continue;
+    }
+
+    foreach (
+        v1::Offer::Operation& operation,
+        createOperations(frameworkId, agentId, action)) {
+      Try<Resources> consumed =
+        protobuf::getConsumedResources(devolve(operation));
+      ASSERT_SOME(consumed);
+      totalResources += evolve(consumed.get());
+
+      ASSERT_TRUE(operation.has_id());
+      expectedOperationStates.put(
+          operation.id(),
+          action == GetParam() ? v1::OPERATION_FINISHED : v1::OPERATION_ERROR);
+
+      operations.push_back(std::move(operation));
+    }
+  }
+
+  addRunTaskOperations();
+
+  // Then, we register a mock agent that has sufficient resources to exercise
+  // all operations and simply replies `TASK_FINISHED` or `OPERATION_FINISHED`
+  // for all tasks or operations it receives.
+  //
+  // NOTE: Since dynamic reservations, persistent volumes and resource
+  // provider resources cannot be specified through the `--resources` flag, we
+  // intercept the `RegisterSlaveMessage` and `UpdateSlaveMessage` to inject
+  // resources required by all operations then forward them to the master.
+  Future<RegisterSlaveMessage> registerSlaveMessage =
+    DROP_PROTOBUF(RegisterSlaveMessage(), _, _);
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    DROP_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), slaveFlags, true);
+  ASSERT_SOME(slave);
+  ASSERT_NE(nullptr, slave.get()->mock());
+
+  EXPECT_CALL(*slave.get()->mock(), runTask(_, _, _, _, _, _, _))
+    .WillRepeatedly(Invoke([&](
+        const process::UPID& from,
+        const FrameworkInfo& frameworkInfo,
+        const FrameworkID& frameworkId,
+        const process::UPID& pid,
+        const TaskInfo& task,
+        const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+        const Option<bool>& launchExecutor) {
+      ASSERT_TRUE(frameworkInfo.has_id());
+
+      StatusUpdateMessage message;
+      message.set_pid(slave.get()->pid);
+      *message.mutable_update() = protobuf::createStatusUpdate(
+          frameworkInfo.id(),
+          devolve(agentId),
+          task.task_id(),
+          TASK_FINISHED,
+          TaskStatus::SOURCE_SLAVE,
+          id::UUID::random());
+
+      process::post(slave.get()->pid, master.get()->pid, message);
+    }));
+
+  EXPECT_CALL(*slave.get()->mock(), runTaskGroup(_, _, _, _, _, _))
+    .WillRepeatedly(Invoke([&](
+        const process::UPID& from,
+        const FrameworkInfo& frameworkInfo,
+        const ExecutorInfo& executorInfo,
+        const TaskGroupInfo& taskGroup,
+        const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+        const Option<bool>& launchExecutor) {
+      ASSERT_TRUE(frameworkInfo.has_id());
+
+      foreach (const TaskInfo& task, taskGroup.tasks()) {
+        StatusUpdateMessage message;
+        message.set_pid(slave.get()->pid);
+        *message.mutable_update() = protobuf::createStatusUpdate(
+            frameworkInfo.id(),
+            devolve(agentId),
+            task.task_id(),
+            TASK_FINISHED,
+            TaskStatus::SOURCE_SLAVE,
+            id::UUID::random());
+
+        process::post(slave.get()->pid, master.get()->pid, message);
+      }
+    }));
+
+  EXPECT_CALL(*slave.get()->mock(), applyOperation(_))
+    .WillRepeatedly(Invoke([&](const ApplyOperationMessage& message) {
+      ASSERT_TRUE(message.has_framework_id());
+      ASSERT_TRUE(message.operation_info().has_id());
+
+      process::post(
+          slave.get()->pid,
+          master.get()->pid,
+          protobuf::createUpdateOperationStatusMessage(
+              message.operation_uuid(),
+              protobuf::createOperationStatus(
+                  OPERATION_FINISHED,
+                  message.operation_info().id(),
+                  None(),
+                  None(),
+                  None(),
+                  devolve(agentId)),
+              None(),
+              message.framework_id(),
+              devolve(agentId)));
+    }));
+
+  slave.get()->start();
+
+  // Settle the clock to ensure that the master has been detected by the agent,
+  // then advance the clock to trigger an authentication and a registration.
+  Clock::settle();
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(registerSlaveMessage);
+
+  {
+    RegisterSlaveMessage message = registerSlaveMessage.get();
+    *message.mutable_slave()->mutable_resources() =
+      createAgentResources(devolve(totalResources));
+    *message.mutable_checkpointed_resources() =
+      devolve(totalResources).filter(needCheckpointing);
+
+    process::post(slave.get()->pid, master.get()->pid, message);
+  }
+
+  AWAIT_READY(slaveRegisteredMessage);
+  EXPECT_EQ(devolve(agentId), slaveRegisteredMessage->slave_id());
+
+  AWAIT_READY(updateSlaveMessage);
+
+  {
+    UpdateSlaveMessage message = updateSlaveMessage.get();
+    UpdateSlaveMessage::ResourceProvider* resourceProvider =
+      message.mutable_resource_providers()->add_providers();
+    resourceProvider->mutable_info()->mutable_id()->set_value("provider");
+    resourceProvider->mutable_info()->set_type("resource_provider_type");
+    resourceProvider->mutable_info()->set_name("resource_provider_name");
+    *resourceProvider->mutable_total_resources() =
+      devolve(totalResources).filter(&Resources::hasResourceProvider);
+    resourceProvider->mutable_operations();
+    resourceProvider->mutable_resource_version_uuid()->set_value(
+        id::UUID::random().toBytes());
+
+    process::post(slave.get()->pid, master.get()->pid, message);
+  }
+
+  // Settle the clock to ensure that the resource providers has been updated.
+  Clock::settle();
+
+  // Finally, we register a framework to exercise all operations, and check 
that
+  // only authorized tasks and operations are finished.
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, "role");
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, 
_)).WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(subscribed);
+  EXPECT_EQ(frameworkId, subscribed->framework_id());
+
+  // Start to deny disallowed actions.
+  permissive = false;
+
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->offers_size());
+
+  hashmap<v1::TaskID, Future<v1::scheduler::Event::Update>>
+    actualTaskStatusUpdates;
+  hashmap<v1::OperationID, Future<v1::scheduler::Event::UpdateOperationStatus>>
+    actualOperationStatusUpdates;
+
+  foreachkey (const v1::TaskID& taskId, expectedTaskStates) {
+    EXPECT_CALL(*scheduler, update(_, AllOf(
+        TaskStatusUpdateTaskIdEq(taskId),
+        Truly([](const v1::scheduler::Event::Update& update) {
+          return protobuf::isTerminalState(devolve(update.status()).state());
+        }))))
+      .WillOnce(FutureArg<1>(&actualTaskStatusUpdates[taskId]));
+  }
+
+  foreachkey (const v1::OperationID& operationId, expectedOperationStates) {
+    EXPECT_CALL(*scheduler, updateOperationStatus(_, AllOf(
+        v1::scheduler::OperationStatusUpdateOperationIdEq(operationId),
+        Truly([](const v1::scheduler::Event::UpdateOperationStatus& update) {
+          return protobuf::isTerminalState(devolve(update.status()).state());
+        }))))
+      .WillOnce(FutureArg<1>(&actualOperationStatusUpdates[operationId]));
+  }
+
+  mesos.send(v1::createCallAccept(
+      subscribed->framework_id(),
+      offers->offers(0),
+      operations));
+
+  foreachpair (
+      const v1::TaskID& taskId,
+      const Future<v1::scheduler::Event::Update>& update,
+      actualTaskStatusUpdates) {
+    AWAIT_READY(update);
+    EXPECT_EQ(expectedTaskStates[taskId], update->status().state());
+  }
+
+  foreachpair (
+      const v1::OperationID& operationId,
+      const Future<v1::scheduler::Event::UpdateOperationStatus>& update,
+      actualOperationStatusUpdates) {
+    AWAIT_READY(update);
+    EXPECT_EQ(expectedOperationStates[operationId], update->status().state());
+  }
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index c886789..d0c82fa 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1321,65 +1321,102 @@ inline typename TOffer::Operation RESERVE(
   operation.mutable_reserve()->mutable_resources()->CopyFrom(resources);
 
   if (operationId.isSome()) {
-    operation.mutable_id()->CopyFrom(operationId.get());
+    *operation.mutable_id() = operationId.get();
   }
 
   return operation;
 }
 
 
-template <typename TResources, typename TOffer>
-inline typename TOffer::Operation UNRESERVE(const TResources& resources)
+template <typename TResources, typename TOperationID, typename TOffer>
+inline typename TOffer::Operation UNRESERVE(
+    const TResources& resources,
+    const Option<TOperationID>& operationId = None())
 {
   typename TOffer::Operation operation;
   operation.set_type(TOffer::Operation::UNRESERVE);
   operation.mutable_unreserve()->mutable_resources()->CopyFrom(resources);
+
+  if (operationId.isSome()) {
+    *operation.mutable_id() = operationId.get();
+  }
+
   return operation;
 }
 
 
-template <typename TResources, typename TOffer>
-inline typename TOffer::Operation CREATE(const TResources& volumes)
+template <typename TResources, typename TOperationID, typename TOffer>
+inline typename TOffer::Operation CREATE(
+    const TResources& volumes,
+    const Option<TOperationID>& operationId = None())
 {
   typename TOffer::Operation operation;
   operation.set_type(TOffer::Operation::CREATE);
   operation.mutable_create()->mutable_volumes()->CopyFrom(volumes);
+
+  if (operationId.isSome()) {
+    *operation.mutable_id() = operationId.get();
+  }
+
   return operation;
 }
 
 
-template <typename TResources, typename TOffer>
-inline typename TOffer::Operation DESTROY(const TResources& volumes)
+template <typename TResources, typename TOperationID, typename TOffer>
+inline typename TOffer::Operation DESTROY(
+    const TResources& volumes,
+    const Option<TOperationID>& operationId = None())
 {
   typename TOffer::Operation operation;
   operation.set_type(TOffer::Operation::DESTROY);
   operation.mutable_destroy()->mutable_volumes()->CopyFrom(volumes);
+
+  if (operationId.isSome()) {
+    *operation.mutable_id() = operationId.get();
+  }
+
   return operation;
 }
 
 
-template <typename TResource, typename TOffer>
+template <typename TResource, typename TOperationID, typename TOffer>
 inline typename TOffer::Operation GROW_VOLUME(
     const TResource& volume,
-    const TResource& addition)
+    const TResource& addition,
+    const Option<TOperationID>& operationId = None())
 {
   typename TOffer::Operation operation;
   operation.set_type(TOffer::Operation::GROW_VOLUME);
   operation.mutable_grow_volume()->mutable_volume()->CopyFrom(volume);
   operation.mutable_grow_volume()->mutable_addition()->CopyFrom(addition);
+
+  if (operationId.isSome()) {
+    *operation.mutable_id() = operationId.get();
+  }
+
   return operation;
 }
 
 
-template <typename TResource, typename TOffer, typename TValueScalar>
+template <
+    typename TResource,
+    typename TValueScalar,
+    typename TOperationID,
+    typename TOffer>
 inline typename TOffer::Operation SHRINK_VOLUME(
     const TResource& volume,
-    const TValueScalar& subtract)
+    const TValueScalar& subtract,
+    const Option<TOperationID>& operationId = None())
 {
   typename TOffer::Operation operation;
   operation.set_type(TOffer::Operation::SHRINK_VOLUME);
   operation.mutable_shrink_volume()->mutable_volume()->CopyFrom(volume);
   operation.mutable_shrink_volume()->mutable_subtract()->CopyFrom(subtract);
+
+  if (operationId.isSome()) {
+    *operation.mutable_id() = operationId.get();
+  }
+
   return operation;
 }
 
@@ -1765,35 +1802,40 @@ inline Offer::Operation RESERVE(Args&&... args)
 template <typename... Args>
 inline Offer::Operation UNRESERVE(Args&&... args)
 {
-  return common::UNRESERVE<Resources, Offer>(std::forward<Args>(args)...);
+  return common::UNRESERVE<Resources, OperationID, Offer>(
+      std::forward<Args>(args)...);
 }
 
 
 template <typename... Args>
 inline Offer::Operation CREATE(Args&&... args)
 {
-  return common::CREATE<Resources, Offer>(std::forward<Args>(args)...);
+  return common::CREATE<Resources, OperationID, Offer>(
+      std::forward<Args>(args)...);
 }
 
 
 template <typename... Args>
 inline Offer::Operation DESTROY(Args&&... args)
 {
-  return common::DESTROY<Resources, Offer>(std::forward<Args>(args)...);
+  return common::DESTROY<Resources, OperationID, Offer>(
+      std::forward<Args>(args)...);
 }
 
 
 template <typename... Args>
 inline Offer::Operation GROW_VOLUME(Args&&... args)
 {
-  return common::GROW_VOLUME<Resource, Offer>(std::forward<Args>(args)...);
+  return common::GROW_VOLUME<Resource, OperationID, Offer>(
+      std::forward<Args>(args)...);
 }
 
 
 template <typename... Args>
 inline Offer::Operation SHRINK_VOLUME(Args&&... args)
 {
-  return common::SHRINK_VOLUME<Resource, Offer>(std::forward<Args>(args)...);
+  return common::SHRINK_VOLUME<Resource, Value::Scalar, OperationID, Offer>(
+      std::forward<Args>(args)...);
 }
 
 
@@ -2063,40 +2105,51 @@ inline mesos::v1::Offer::Operation RESERVE(Args&&... 
args)
 template <typename... Args>
 inline mesos::v1::Offer::Operation UNRESERVE(Args&&... args)
 {
-  return common::UNRESERVE<mesos::v1::Resources, mesos::v1::Offer>(
-      std::forward<Args>(args)...);
+  return common::UNRESERVE<
+      mesos::v1::Resources,
+      mesos::v1::OperationID,
+      mesos::v1::Offer>(std::forward<Args>(args)...);
 }
 
 
 template <typename... Args>
 inline mesos::v1::Offer::Operation CREATE(Args&&... args)
 {
-  return common::CREATE<mesos::v1::Resources, mesos::v1::Offer>(
-      std::forward<Args>(args)...);
+  return common::CREATE<
+      mesos::v1::Resources,
+      mesos::v1::OperationID,
+      mesos::v1::Offer>(std::forward<Args>(args)...);
 }
 
 
 template <typename... Args>
 inline mesos::v1::Offer::Operation DESTROY(Args&&... args)
 {
-  return common::DESTROY<mesos::v1::Resources, mesos::v1::Offer>(
-      std::forward<Args>(args)...);
+  return common::DESTROY<
+      mesos::v1::Resources,
+      mesos::v1::OperationID,
+      mesos::v1::Offer>(std::forward<Args>(args)...);
 }
 
 
 template <typename... Args>
 inline mesos::v1::Offer::Operation GROW_VOLUME(Args&&... args)
 {
-  return common::GROW_VOLUME<mesos::v1::Resource, mesos::v1::Offer>(
-      std::forward<Args>(args)...);
+  return common::GROW_VOLUME<
+      mesos::v1::Resource,
+      mesos::v1::OperationID,
+      mesos::v1::Offer>(std::forward<Args>(args)...);
 }
 
 
 template <typename... Args>
 inline mesos::v1::Offer::Operation SHRINK_VOLUME(Args&&... args)
 {
-  return common::SHRINK_VOLUME<mesos::v1::Resource, mesos::v1::Offer>(
-      std::forward<Args>(args)...);
+  return common::SHRINK_VOLUME<
+      mesos::v1::Resource,
+      mesos::v1::Value::Scalar,
+      mesos::v1::OperationID,
+      mesos::v1::Offer>(std::forward<Args>(args)...);
 }
 
 
@@ -2745,6 +2798,24 @@ ACTION_P2(SendAcknowledge, frameworkId, agentId)
 }
 
 
+ACTION_P2(
+    SendAcknowledgeOperationStatus, frameworkId, agentId)
+{
+  Call call;
+  call.set_type(Call::ACKNOWLEDGE_OPERATION_STATUS);
+  call.mutable_framework_id()->CopyFrom(frameworkId);
+
+  Call::AcknowledgeOperationStatus* acknowledge =
+    call.mutable_acknowledge_operation_status();
+
+  acknowledge->mutable_agent_id()->CopyFrom(agentId);
+  acknowledge->set_uuid(arg1.status().uuid().value());
+  acknowledge->mutable_operation_id()->CopyFrom(arg1.status().operation_id());
+
+  arg0->send(call);
+}
+
+
 ACTION_P3(
     SendAcknowledgeOperationStatus, frameworkId, agentId, resourceProviderId)
 {
diff --git a/src/tests/mock_slave.cpp b/src/tests/mock_slave.cpp
index 1122c2a..dd458e3 100644
--- a/src/tests/mock_slave.cpp
+++ b/src/tests/mock_slave.cpp
@@ -153,6 +153,8 @@ MockSlave::MockSlave(
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_shutdownExecutor));
   EXPECT_CALL(*this, _shutdownExecutor(_, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked__shutdownExecutor));
+  EXPECT_CALL(*this, applyOperation(_))
+    .WillRepeatedly(Invoke(this, &MockSlave::unmocked_applyOperation));
 }
 
 
@@ -314,6 +316,11 @@ void MockSlave::unmocked__shutdownExecutor(
 }
 
 
+void MockSlave::unmocked_applyOperation(const ApplyOperationMessage& message)
+{
+  slave::Slave::applyOperation(message);
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {
diff --git a/src/tests/mock_slave.hpp b/src/tests/mock_slave.hpp
index 326a450..c057b40 100644
--- a/src/tests/mock_slave.hpp
+++ b/src/tests/mock_slave.hpp
@@ -253,6 +253,12 @@ public:
   void unmocked__shutdownExecutor(
       slave::Framework* framework,
       slave::Executor* executor);
+
+  MOCK_METHOD1(applyOperation, void(
+      const ApplyOperationMessage& message));
+
+  void unmocked_applyOperation(
+      const ApplyOperationMessage& message);
 };
 
 } // namespace tests {

Reply via email to