This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit bf07bbd1cf103b6e8c23ff3831a11dacf9dfc398 Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Thu Jan 17 18:48:59 2019 -0800 Added a unit test for master operation authorization. This test verifies that allowing or denying an action will only result in a success or failure on specific operations but not other operations in an accept call. This is a regression test for MESOS-9474 and MESOS-9480. Review: https://reviews.apache.org/r/70686 --- src/slave/slave.hpp | 3 +- src/tests/master_authorization_tests.cpp | 597 +++++++++++++++++++++++++++++++ src/tests/mesos.hpp | 123 +++++-- src/tests/mock_slave.cpp | 7 + src/tests/mock_slave.hpp | 6 + 5 files changed, 709 insertions(+), 27 deletions(-) diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index c740bf7..6954f53 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -268,7 +268,8 @@ public: void checkpointResourcesMessage( const std::vector<Resource>& resources); - void applyOperation(const ApplyOperationMessage& message); + // Made 'virtual' for Slave mocking. + virtual void applyOperation(const ApplyOperationMessage& message); // Reconciles pending operations with the master. This is necessary to handle // cases in which operations were dropped in transit, or in which an agent's diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp index f65b621..21e450c 100644 --- a/src/tests/master_authorization_tests.cpp +++ b/src/tests/master_authorization_tests.cpp @@ -14,7 +14,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include <atomic> +#include <set> #include <string> +#include <utility> #include <vector> #include <gmock/gmock.h> @@ -29,6 +32,7 @@ #include <mesos/module/authorizer.hpp> #include <process/clock.hpp> +#include <process/collect.hpp> #include <process/future.hpp> #include <process/http.hpp> #include <process/pid.hpp> @@ -36,9 +40,16 @@ #include <stout/gtest.hpp> #include <stout/try.hpp> +#include <stout/unreachable.hpp> #include "authorizer/local/authorizer.hpp" +#include "common/protobuf_utils.hpp" +#include "common/resources_utils.hpp" + +#include "internal/devolve.hpp" +#include "internal/evolve.hpp" + #include "master/master.hpp" #include "master/allocator/mesos/allocator.hpp" @@ -57,6 +68,8 @@ namespace http = process::http; +using google::protobuf::RepeatedPtrField; + using mesos::internal::master::Master; using mesos::internal::master::allocator::MesosAllocatorProcess; @@ -81,11 +94,14 @@ using std::string; using std::vector; using testing::_; +using testing::AllOf; using testing::An; using testing::AtMost; using testing::DoAll; using testing::Eq; +using testing::Invoke; using testing::Return; +using testing::Truly; namespace mesos { namespace internal { @@ -2748,6 +2764,587 @@ TEST_F(MasterAuthorizationTest, AuthorizedToRegisterNoStaticReservations) AWAIT_READY(slaveRegisteredMessage); } + +class MasterOperationAuthorizationTest + : public MesosTest, + public ::testing::WithParamInterface<authorization::Action> +{ +public: + static Resources createAgentResources(const Resources& resources) + { + Resources agentResources; + foreach ( + Resource resource, + resources - resources.filter(&Resources::hasResourceProvider)) { + if (Resources::isPersistentVolume(resource)) { + if (resource.disk().has_source()) { + resource.mutable_disk()->clear_persistence(); + resource.mutable_disk()->clear_volume(); + } else { + resource.clear_disk(); + } + } + + agentResources += resource; + } + + return agentResources; + } + + static vector<v1::Offer::Operation> createOperations( + const v1::FrameworkID& frameworkId, + const v1::AgentID& agentId, + const authorization::Action& action) + { + switch (action) { + case authorization::RUN_TASK: { + const v1::Resources taskResources = + v1::Resources::parse("cpus:1;mem:32").get(); + + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID, + None(), + v1::Resources::parse("cpus:1;mem:32;disk:32").get(), + v1::ExecutorInfo::DEFAULT, + frameworkId); + + return {v1::LAUNCH( + {v1::createTask(agentId, taskResources, ""), + v1::createTask(agentId, taskResources, "")}), + v1::LAUNCH_GROUP( + executorInfo, + v1::createTaskGroupInfo( + {v1::createTask(agentId, taskResources, ""), + v1::createTask(agentId, taskResources, "")}))}; + } + case authorization::RESERVE_RESOURCES: { + v1::OperationID operationId; + operationId.set_value(id::UUID::random().toString()); + + v1::Resources reserved; + reserved += v1::createReservedResource( + "cpus", "1", v1::createDynamicReservationInfo( + "role", v1::DEFAULT_CREDENTIAL.principal())); + reserved += v1::createReservedResource( + "mem", "32", v1::createDynamicReservationInfo( + "role", v1::DEFAULT_CREDENTIAL.principal())); + + return {v1::RESERVE(reserved, std::move(operationId))}; + } + case authorization::UNRESERVE_RESOURCES: { + v1::OperationID operationId; + operationId.set_value(id::UUID::random().toString()); + + v1::Resources reserved; + reserved += v1::createReservedResource( + "cpus", "1", v1::createDynamicReservationInfo("role")); + reserved += v1::createReservedResource( + "mem", "32", v1::createDynamicReservationInfo("role")); + + return {v1::UNRESERVE(reserved, std::move(operationId))}; + } + case authorization::CREATE_VOLUME: { + v1::OperationID operationId; + operationId.set_value(id::UUID::random().toString()); + + v1::Resources volumes; + volumes += v1::createPersistentVolume( + Megabytes(32), + "role", + id::UUID::random().toString(), + "path", + None(), + None(), + v1::DEFAULT_CREDENTIAL.principal()); + volumes += v1::createPersistentVolume( + Megabytes(32), + "role", + id::UUID::random().toString(), + "path", + None(), + None(), + v1::DEFAULT_CREDENTIAL.principal()); + + return {v1::CREATE(volumes, std::move(operationId))}; + } + case authorization::DESTROY_VOLUME: { + v1::OperationID operationId; + operationId.set_value(id::UUID::random().toString()); + + v1::Resources volumes; + volumes += v1::createPersistentVolume( + Megabytes(32), "role", id::UUID::random().toString(), "path"); + volumes += v1::createPersistentVolume( + Megabytes(32), "role", id::UUID::random().toString(), "path"); + + return {v1::DESTROY(volumes, std::move(operationId))}; + } + case authorization::RESIZE_VOLUME: { + v1::OperationID operationId1; + operationId1.set_value(id::UUID::random().toString()); + + v1::OperationID operationId2; + operationId2.set_value(id::UUID::random().toString()); + + return { + v1::GROW_VOLUME( + v1::createPersistentVolume( + Megabytes(32), "role", id::UUID::random().toString(), "path"), + v1::Resources::parse("disk", "32", "role").get(), + std::move(operationId1)), + v1::SHRINK_VOLUME( + v1::createPersistentVolume( + Megabytes(64), "role", id::UUID::random().toString(), "path"), + mesos::v1::internal::values::parse("32")->scalar(), + std::move(operationId2))}; + } + case authorization::CREATE_BLOCK_DISK: { + v1::OperationID operationId; + operationId.set_value(id::UUID::random().toString()); + + v1::Resource raw = v1::createDiskResource( + "32", "*", None(), None(), v1::createDiskSourceRaw( + None(), "profile")); + raw.mutable_provider_id()->set_value("provider"); + + return {v1::CREATE_DISK( + raw, + v1::Resource::DiskInfo::Source::BLOCK, + None(), + std::move(operationId))}; + } + case authorization::DESTROY_BLOCK_DISK: { + v1::OperationID operationId; + operationId.set_value(id::UUID::random().toString()); + + v1::Resource block = v1::createDiskResource( + "32", "*", None(), None(), v1::createDiskSourceBlock( + id::UUID::random().toString(), "profile")); + block.mutable_provider_id()->set_value("provider"); + + return {v1::DESTROY_DISK(block, std::move(operationId))}; + } + case authorization::CREATE_MOUNT_DISK: { + v1::OperationID operationId; + operationId.set_value(id::UUID::random().toString()); + + v1::Resource raw = v1::createDiskResource( + "32", "*", None(), None(), v1::createDiskSourceRaw( + None(), "profile")); + raw.mutable_provider_id()->set_value("provider"); + + return {v1::CREATE_DISK( + raw, + v1::Resource::DiskInfo::Source::MOUNT, + None(), + std::move(operationId))}; + } + case authorization::DESTROY_MOUNT_DISK: { + v1::OperationID operationId; + operationId.set_value(id::UUID::random().toString()); + + v1::Resource mount = v1::createDiskResource( + "32", "*", None(), None(), v1::createDiskSourceMount( + None(), id::UUID::random().toString(), "profile")); + mount.mutable_provider_id()->set_value("provider"); + + return {v1::DESTROY_DISK(mount, std::move(operationId))}; + } + case authorization::DESTROY_RAW_DISK: { + v1::OperationID operationId; + operationId.set_value(id::UUID::random().toString()); + + v1::Resource raw = v1::createDiskResource( + "32", "*", None(), None(), v1::createDiskSourceRaw( + id::UUID::random().toString(), "profile")); + raw.mutable_provider_id()->set_value("provider"); + + return {v1::DESTROY_DISK(raw, std::move(operationId))}; + } + case authorization::UNKNOWN: + case authorization::REGISTER_FRAMEWORK: + case authorization::TEARDOWN_FRAMEWORK: + case authorization::GET_ENDPOINT_WITH_PATH: + case authorization::VIEW_ROLE: + case authorization::UPDATE_WEIGHT: + case authorization::GET_QUOTA: + case authorization::UPDATE_QUOTA: + case authorization::VIEW_FRAMEWORK: + case authorization::VIEW_TASK: + case authorization::VIEW_EXECUTOR: + case authorization::ACCESS_SANDBOX: + case authorization::ACCESS_MESOS_LOG: + case authorization::VIEW_FLAGS: + case authorization::LAUNCH_NESTED_CONTAINER: + case authorization::KILL_NESTED_CONTAINER: + case authorization::WAIT_NESTED_CONTAINER: + case authorization::LAUNCH_NESTED_CONTAINER_SESSION: + case authorization::ATTACH_CONTAINER_INPUT: + case authorization::ATTACH_CONTAINER_OUTPUT: + case authorization::VIEW_CONTAINER: + case authorization::SET_LOG_LEVEL: + case authorization::REMOVE_NESTED_CONTAINER: + case authorization::REGISTER_AGENT: + case authorization::UPDATE_MAINTENANCE_SCHEDULE: + case authorization::GET_MAINTENANCE_SCHEDULE: + case authorization::START_MAINTENANCE: + case authorization::STOP_MAINTENANCE: + case authorization::GET_MAINTENANCE_STATUS: + case authorization::MARK_AGENT_GONE: + case authorization::LAUNCH_STANDALONE_CONTAINER: + case authorization::KILL_STANDALONE_CONTAINER: + case authorization::WAIT_STANDALONE_CONTAINER: + case authorization::REMOVE_STANDALONE_CONTAINER: + case authorization::VIEW_STANDALONE_CONTAINER: + case authorization::MODIFY_RESOURCE_PROVIDER_CONFIG: + case authorization::MARK_RESOURCE_PROVIDER_GONE: + case authorization::VIEW_RESOURCE_PROVIDER: + case authorization::PRUNE_IMAGES: + return {}; + } + + UNREACHABLE(); + } +}; + + +INSTANTIATE_TEST_CASE_P( + AllowedAction, + MasterOperationAuthorizationTest, + ::testing::Values( + authorization::RUN_TASK, + authorization::RESERVE_RESOURCES, + authorization::UNRESERVE_RESOURCES, + authorization::CREATE_VOLUME, + authorization::DESTROY_VOLUME, + authorization::RESIZE_VOLUME, + authorization::CREATE_BLOCK_DISK, + authorization::DESTROY_BLOCK_DISK, + authorization::CREATE_MOUNT_DISK, + authorization::DESTROY_MOUNT_DISK), + [](const testing::TestParamInfo<authorization::Action>& action) { + return authorization::Action_Name(action.param); + }); + + +// This test verifies that allowing or denying an action will only result in a +// success or failure on specific operations but not other operations in an +// accept call. This is a regression test for MESOS-9474 and MESOS-9480. +TEST_P(MasterOperationAuthorizationTest, Accept) +{ + Clock::pause(); + + // We use this flag to control when the mock authorizer starts to deny + // disallowed actions. + std::atomic_bool permissive(true); + + MockAuthorizer authorizer; + EXPECT_CALL(authorizer, authorized(_)) + .WillRepeatedly(Invoke([&](const authorization::Request& request) { + return permissive || request.action() == GetParam(); + })); + + Try<Owned<cluster::Master>> master = StartMaster(&authorizer); + ASSERT_SOME(master); + + // First, we create a list of operations to exercise all authorization + // actions, and compute the total resources needed by these operations and set + // up their expected terminal states. + // + // NOTE: We create some `RUN_TASK` operations in the beginning and some in the + // end for two reasons: 1. These operations doesn't have operation IDs so + // needs to be handled differently. 2. By adding some operations for the + // `RUN_TASK` action in the end, we can verify that their results are not + // affected by preceding authorizations. + vector<v1::Offer::Operation> operations; + v1::Resources totalResources; + hashmap<v1::TaskID, v1::TaskState> expectedTaskStates; + hashmap<v1::OperationID, v1::OperationState> expectedOperationStates; + + // NOTE: Because we create operations before getting an offer, we synthesize + // the framework ID and agent ID here and check them later. + v1::FrameworkID frameworkId; + frameworkId.set_value(master.get()->getMasterInfo().id() + "-0000"); + v1::AgentID agentId; + agentId.set_value(master.get()->getMasterInfo().id() + "-S0"); + + auto addRunTaskOperations = [&] { + foreach ( + v1::Offer::Operation& operation, + createOperations(frameworkId, agentId, authorization::RUN_TASK)) { + if (operation.type() == v1::Offer::Operation::LAUNCH) { + foreach (const v1::TaskInfo& task, operation.launch().task_infos()) { + totalResources += task.resources(); + totalResources += task.executor().resources(); + + expectedTaskStates.put( + task.task_id(), + authorization::RUN_TASK == GetParam() + ? v1::TASK_FINISHED : v1::TASK_ERROR); + } + } else if (operation.type() == v1::Offer::Operation::LAUNCH_GROUP) { + totalResources += operation.launch_group().executor().resources(); + + foreach ( + const v1::TaskInfo& task, + operation.launch_group().task_group().tasks()) { + totalResources += task.resources(); + + expectedTaskStates.put( + task.task_id(), + authorization::RUN_TASK == GetParam() + ? v1::TASK_FINISHED : v1::TASK_ERROR); + } + } + + operations.push_back(std::move(operation)); + } + }; + + addRunTaskOperations(); + + for (int i = 0; i < authorization::Action_descriptor()->value_count(); i++) { + const authorization::Action action = static_cast<authorization::Action>( + authorization::Action_descriptor()->value(i)->number()); + + // Skip `RUN_TASK` operations since they are handled separately. + if (action == authorization::RUN_TASK) { + continue; + } + + foreach ( + v1::Offer::Operation& operation, + createOperations(frameworkId, agentId, action)) { + Try<Resources> consumed = + protobuf::getConsumedResources(devolve(operation)); + ASSERT_SOME(consumed); + totalResources += evolve(consumed.get()); + + ASSERT_TRUE(operation.has_id()); + expectedOperationStates.put( + operation.id(), + action == GetParam() ? v1::OPERATION_FINISHED : v1::OPERATION_ERROR); + + operations.push_back(std::move(operation)); + } + } + + addRunTaskOperations(); + + // Then, we register a mock agent that has sufficient resources to exercise + // all operations and simply replies `TASK_FINISHED` or `OPERATION_FINISHED` + // for all tasks or operations it receives. + // + // NOTE: Since dynamic reservations, persistent volumes and resource + // provider resources cannot be specified through the `--resources` flag, we + // intercept the `RegisterSlaveMessage` and `UpdateSlaveMessage` to inject + // resources required by all operations then forward them to the master. + Future<RegisterSlaveMessage> registerSlaveMessage = + DROP_PROTOBUF(RegisterSlaveMessage(), _, _); + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + Future<UpdateSlaveMessage> updateSlaveMessage = + DROP_PROTOBUF(UpdateSlaveMessage(), _, _); + + Owned<MasterDetector> detector = master.get()->createDetector(); + slave::Flags slaveFlags = CreateSlaveFlags(); + Try<Owned<cluster::Slave>> slave = + StartSlave(detector.get(), slaveFlags, true); + ASSERT_SOME(slave); + ASSERT_NE(nullptr, slave.get()->mock()); + + EXPECT_CALL(*slave.get()->mock(), runTask(_, _, _, _, _, _, _)) + .WillRepeatedly(Invoke([&]( + const process::UPID& from, + const FrameworkInfo& frameworkInfo, + const FrameworkID& frameworkId, + const process::UPID& pid, + const TaskInfo& task, + const std::vector<ResourceVersionUUID>& resourceVersionUuids, + const Option<bool>& launchExecutor) { + ASSERT_TRUE(frameworkInfo.has_id()); + + StatusUpdateMessage message; + message.set_pid(slave.get()->pid); + *message.mutable_update() = protobuf::createStatusUpdate( + frameworkInfo.id(), + devolve(agentId), + task.task_id(), + TASK_FINISHED, + TaskStatus::SOURCE_SLAVE, + id::UUID::random()); + + process::post(slave.get()->pid, master.get()->pid, message); + })); + + EXPECT_CALL(*slave.get()->mock(), runTaskGroup(_, _, _, _, _, _)) + .WillRepeatedly(Invoke([&]( + const process::UPID& from, + const FrameworkInfo& frameworkInfo, + const ExecutorInfo& executorInfo, + const TaskGroupInfo& taskGroup, + const std::vector<ResourceVersionUUID>& resourceVersionUuids, + const Option<bool>& launchExecutor) { + ASSERT_TRUE(frameworkInfo.has_id()); + + foreach (const TaskInfo& task, taskGroup.tasks()) { + StatusUpdateMessage message; + message.set_pid(slave.get()->pid); + *message.mutable_update() = protobuf::createStatusUpdate( + frameworkInfo.id(), + devolve(agentId), + task.task_id(), + TASK_FINISHED, + TaskStatus::SOURCE_SLAVE, + id::UUID::random()); + + process::post(slave.get()->pid, master.get()->pid, message); + } + })); + + EXPECT_CALL(*slave.get()->mock(), applyOperation(_)) + .WillRepeatedly(Invoke([&](const ApplyOperationMessage& message) { + ASSERT_TRUE(message.has_framework_id()); + ASSERT_TRUE(message.operation_info().has_id()); + + process::post( + slave.get()->pid, + master.get()->pid, + protobuf::createUpdateOperationStatusMessage( + message.operation_uuid(), + protobuf::createOperationStatus( + OPERATION_FINISHED, + message.operation_info().id(), + None(), + None(), + None(), + devolve(agentId)), + None(), + message.framework_id(), + devolve(agentId))); + })); + + slave.get()->start(); + + // Settle the clock to ensure that the master has been detected by the agent, + // then advance the clock to trigger an authentication and a registration. + Clock::settle(); + Clock::advance(slaveFlags.registration_backoff_factor); + + AWAIT_READY(registerSlaveMessage); + + { + RegisterSlaveMessage message = registerSlaveMessage.get(); + *message.mutable_slave()->mutable_resources() = + createAgentResources(devolve(totalResources)); + *message.mutable_checkpointed_resources() = + devolve(totalResources).filter(needCheckpointing); + + process::post(slave.get()->pid, master.get()->pid, message); + } + + AWAIT_READY(slaveRegisteredMessage); + EXPECT_EQ(devolve(agentId), slaveRegisteredMessage->slave_id()); + + AWAIT_READY(updateSlaveMessage); + + { + UpdateSlaveMessage message = updateSlaveMessage.get(); + UpdateSlaveMessage::ResourceProvider* resourceProvider = + message.mutable_resource_providers()->add_providers(); + resourceProvider->mutable_info()->mutable_id()->set_value("provider"); + resourceProvider->mutable_info()->set_type("resource_provider_type"); + resourceProvider->mutable_info()->set_name("resource_provider_name"); + *resourceProvider->mutable_total_resources() = + devolve(totalResources).filter(&Resources::hasResourceProvider); + resourceProvider->mutable_operations(); + resourceProvider->mutable_resource_version_uuid()->set_value( + id::UUID::random().toBytes()); + + process::post(slave.get()->pid, master.get()->pid, message); + } + + // Settle the clock to ensure that the resource providers has been updated. + Clock::settle(); + + // Finally, we register a framework to exercise all operations, and check that + // only authorized tasks and operations are finished. + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_roles(0, "role"); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)).WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + v1::scheduler::TestMesos mesos( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + AWAIT_READY(subscribed); + EXPECT_EQ(frameworkId, subscribed->framework_id()); + + // Start to deny disallowed actions. + permissive = false; + + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->offers_size()); + + hashmap<v1::TaskID, Future<v1::scheduler::Event::Update>> + actualTaskStatusUpdates; + hashmap<v1::OperationID, Future<v1::scheduler::Event::UpdateOperationStatus>> + actualOperationStatusUpdates; + + foreachkey (const v1::TaskID& taskId, expectedTaskStates) { + EXPECT_CALL(*scheduler, update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskId), + Truly([](const v1::scheduler::Event::Update& update) { + return protobuf::isTerminalState(devolve(update.status()).state()); + })))) + .WillOnce(FutureArg<1>(&actualTaskStatusUpdates[taskId])); + } + + foreachkey (const v1::OperationID& operationId, expectedOperationStates) { + EXPECT_CALL(*scheduler, updateOperationStatus(_, AllOf( + v1::scheduler::OperationStatusUpdateOperationIdEq(operationId), + Truly([](const v1::scheduler::Event::UpdateOperationStatus& update) { + return protobuf::isTerminalState(devolve(update.status()).state()); + })))) + .WillOnce(FutureArg<1>(&actualOperationStatusUpdates[operationId])); + } + + mesos.send(v1::createCallAccept( + subscribed->framework_id(), + offers->offers(0), + operations)); + + foreachpair ( + const v1::TaskID& taskId, + const Future<v1::scheduler::Event::Update>& update, + actualTaskStatusUpdates) { + AWAIT_READY(update); + EXPECT_EQ(expectedTaskStates[taskId], update->status().state()); + } + + foreachpair ( + const v1::OperationID& operationId, + const Future<v1::scheduler::Event::UpdateOperationStatus>& update, + actualOperationStatusUpdates) { + AWAIT_READY(update); + EXPECT_EQ(expectedOperationStates[operationId], update->status().state()); + } +} + } // namespace tests { } // namespace internal { } // namespace mesos { diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index c886789..d0c82fa 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -1321,65 +1321,102 @@ inline typename TOffer::Operation RESERVE( operation.mutable_reserve()->mutable_resources()->CopyFrom(resources); if (operationId.isSome()) { - operation.mutable_id()->CopyFrom(operationId.get()); + *operation.mutable_id() = operationId.get(); } return operation; } -template <typename TResources, typename TOffer> -inline typename TOffer::Operation UNRESERVE(const TResources& resources) +template <typename TResources, typename TOperationID, typename TOffer> +inline typename TOffer::Operation UNRESERVE( + const TResources& resources, + const Option<TOperationID>& operationId = None()) { typename TOffer::Operation operation; operation.set_type(TOffer::Operation::UNRESERVE); operation.mutable_unreserve()->mutable_resources()->CopyFrom(resources); + + if (operationId.isSome()) { + *operation.mutable_id() = operationId.get(); + } + return operation; } -template <typename TResources, typename TOffer> -inline typename TOffer::Operation CREATE(const TResources& volumes) +template <typename TResources, typename TOperationID, typename TOffer> +inline typename TOffer::Operation CREATE( + const TResources& volumes, + const Option<TOperationID>& operationId = None()) { typename TOffer::Operation operation; operation.set_type(TOffer::Operation::CREATE); operation.mutable_create()->mutable_volumes()->CopyFrom(volumes); + + if (operationId.isSome()) { + *operation.mutable_id() = operationId.get(); + } + return operation; } -template <typename TResources, typename TOffer> -inline typename TOffer::Operation DESTROY(const TResources& volumes) +template <typename TResources, typename TOperationID, typename TOffer> +inline typename TOffer::Operation DESTROY( + const TResources& volumes, + const Option<TOperationID>& operationId = None()) { typename TOffer::Operation operation; operation.set_type(TOffer::Operation::DESTROY); operation.mutable_destroy()->mutable_volumes()->CopyFrom(volumes); + + if (operationId.isSome()) { + *operation.mutable_id() = operationId.get(); + } + return operation; } -template <typename TResource, typename TOffer> +template <typename TResource, typename TOperationID, typename TOffer> inline typename TOffer::Operation GROW_VOLUME( const TResource& volume, - const TResource& addition) + const TResource& addition, + const Option<TOperationID>& operationId = None()) { typename TOffer::Operation operation; operation.set_type(TOffer::Operation::GROW_VOLUME); operation.mutable_grow_volume()->mutable_volume()->CopyFrom(volume); operation.mutable_grow_volume()->mutable_addition()->CopyFrom(addition); + + if (operationId.isSome()) { + *operation.mutable_id() = operationId.get(); + } + return operation; } -template <typename TResource, typename TOffer, typename TValueScalar> +template < + typename TResource, + typename TValueScalar, + typename TOperationID, + typename TOffer> inline typename TOffer::Operation SHRINK_VOLUME( const TResource& volume, - const TValueScalar& subtract) + const TValueScalar& subtract, + const Option<TOperationID>& operationId = None()) { typename TOffer::Operation operation; operation.set_type(TOffer::Operation::SHRINK_VOLUME); operation.mutable_shrink_volume()->mutable_volume()->CopyFrom(volume); operation.mutable_shrink_volume()->mutable_subtract()->CopyFrom(subtract); + + if (operationId.isSome()) { + *operation.mutable_id() = operationId.get(); + } + return operation; } @@ -1765,35 +1802,40 @@ inline Offer::Operation RESERVE(Args&&... args) template <typename... Args> inline Offer::Operation UNRESERVE(Args&&... args) { - return common::UNRESERVE<Resources, Offer>(std::forward<Args>(args)...); + return common::UNRESERVE<Resources, OperationID, Offer>( + std::forward<Args>(args)...); } template <typename... Args> inline Offer::Operation CREATE(Args&&... args) { - return common::CREATE<Resources, Offer>(std::forward<Args>(args)...); + return common::CREATE<Resources, OperationID, Offer>( + std::forward<Args>(args)...); } template <typename... Args> inline Offer::Operation DESTROY(Args&&... args) { - return common::DESTROY<Resources, Offer>(std::forward<Args>(args)...); + return common::DESTROY<Resources, OperationID, Offer>( + std::forward<Args>(args)...); } template <typename... Args> inline Offer::Operation GROW_VOLUME(Args&&... args) { - return common::GROW_VOLUME<Resource, Offer>(std::forward<Args>(args)...); + return common::GROW_VOLUME<Resource, OperationID, Offer>( + std::forward<Args>(args)...); } template <typename... Args> inline Offer::Operation SHRINK_VOLUME(Args&&... args) { - return common::SHRINK_VOLUME<Resource, Offer>(std::forward<Args>(args)...); + return common::SHRINK_VOLUME<Resource, Value::Scalar, OperationID, Offer>( + std::forward<Args>(args)...); } @@ -2063,40 +2105,51 @@ inline mesos::v1::Offer::Operation RESERVE(Args&&... args) template <typename... Args> inline mesos::v1::Offer::Operation UNRESERVE(Args&&... args) { - return common::UNRESERVE<mesos::v1::Resources, mesos::v1::Offer>( - std::forward<Args>(args)...); + return common::UNRESERVE< + mesos::v1::Resources, + mesos::v1::OperationID, + mesos::v1::Offer>(std::forward<Args>(args)...); } template <typename... Args> inline mesos::v1::Offer::Operation CREATE(Args&&... args) { - return common::CREATE<mesos::v1::Resources, mesos::v1::Offer>( - std::forward<Args>(args)...); + return common::CREATE< + mesos::v1::Resources, + mesos::v1::OperationID, + mesos::v1::Offer>(std::forward<Args>(args)...); } template <typename... Args> inline mesos::v1::Offer::Operation DESTROY(Args&&... args) { - return common::DESTROY<mesos::v1::Resources, mesos::v1::Offer>( - std::forward<Args>(args)...); + return common::DESTROY< + mesos::v1::Resources, + mesos::v1::OperationID, + mesos::v1::Offer>(std::forward<Args>(args)...); } template <typename... Args> inline mesos::v1::Offer::Operation GROW_VOLUME(Args&&... args) { - return common::GROW_VOLUME<mesos::v1::Resource, mesos::v1::Offer>( - std::forward<Args>(args)...); + return common::GROW_VOLUME< + mesos::v1::Resource, + mesos::v1::OperationID, + mesos::v1::Offer>(std::forward<Args>(args)...); } template <typename... Args> inline mesos::v1::Offer::Operation SHRINK_VOLUME(Args&&... args) { - return common::SHRINK_VOLUME<mesos::v1::Resource, mesos::v1::Offer>( - std::forward<Args>(args)...); + return common::SHRINK_VOLUME< + mesos::v1::Resource, + mesos::v1::Value::Scalar, + mesos::v1::OperationID, + mesos::v1::Offer>(std::forward<Args>(args)...); } @@ -2745,6 +2798,24 @@ ACTION_P2(SendAcknowledge, frameworkId, agentId) } +ACTION_P2( + SendAcknowledgeOperationStatus, frameworkId, agentId) +{ + Call call; + call.set_type(Call::ACKNOWLEDGE_OPERATION_STATUS); + call.mutable_framework_id()->CopyFrom(frameworkId); + + Call::AcknowledgeOperationStatus* acknowledge = + call.mutable_acknowledge_operation_status(); + + acknowledge->mutable_agent_id()->CopyFrom(agentId); + acknowledge->set_uuid(arg1.status().uuid().value()); + acknowledge->mutable_operation_id()->CopyFrom(arg1.status().operation_id()); + + arg0->send(call); +} + + ACTION_P3( SendAcknowledgeOperationStatus, frameworkId, agentId, resourceProviderId) { diff --git a/src/tests/mock_slave.cpp b/src/tests/mock_slave.cpp index 1122c2a..dd458e3 100644 --- a/src/tests/mock_slave.cpp +++ b/src/tests/mock_slave.cpp @@ -153,6 +153,8 @@ MockSlave::MockSlave( .WillRepeatedly(Invoke(this, &MockSlave::unmocked_shutdownExecutor)); EXPECT_CALL(*this, _shutdownExecutor(_, _)) .WillRepeatedly(Invoke(this, &MockSlave::unmocked__shutdownExecutor)); + EXPECT_CALL(*this, applyOperation(_)) + .WillRepeatedly(Invoke(this, &MockSlave::unmocked_applyOperation)); } @@ -314,6 +316,11 @@ void MockSlave::unmocked__shutdownExecutor( } +void MockSlave::unmocked_applyOperation(const ApplyOperationMessage& message) +{ + slave::Slave::applyOperation(message); +} + } // namespace tests { } // namespace internal { } // namespace mesos { diff --git a/src/tests/mock_slave.hpp b/src/tests/mock_slave.hpp index 326a450..c057b40 100644 --- a/src/tests/mock_slave.hpp +++ b/src/tests/mock_slave.hpp @@ -253,6 +253,12 @@ public: void unmocked__shutdownExecutor( slave::Framework* framework, slave::Executor* executor); + + MOCK_METHOD1(applyOperation, void( + const ApplyOperationMessage& message)); + + void unmocked_applyOperation( + const ApplyOperationMessage& message); }; } // namespace tests {
