This is an automated email from the ASF dual-hosted git repository. bbannier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit eb505c1a7d00fc1d591d9fad53a95995601e13ba Author: Jan Schlicht <[email protected]> AuthorDate: Wed Mar 27 09:40:25 2019 +0100 Fixed operator operation handling with resource provider resources. The resource provider manager didn't allow operations originating from operator API calls. For speculatively applied operations, this is allowed now. Review: https://reviews.apache.org/r/70165/ --- src/resource_provider/manager.cpp | 30 +- .../storage_local_resource_provider_tests.cpp | 336 +++++++++++++++++++++ 2 files changed, 358 insertions(+), 8 deletions(-) diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp index b9dc887..ceed122 100644 --- a/src/resource_provider/manager.cpp +++ b/src/resource_provider/manager.cpp @@ -465,7 +465,9 @@ void ResourceProviderManagerProcess::applyOperation( const ApplyOperationMessage& message) { const Offer::Operation& operation = message.operation_info(); - const FrameworkID& frameworkId = message.framework_id(); + const Option<FrameworkID> frameworkId = message.has_framework_id() + ? message.framework_id() + : Option<FrameworkID>::none(); const UUID& operationUUID = message.operation_uuid(); Result<ResourceProviderID> resourceProviderId = @@ -474,7 +476,11 @@ void ResourceProviderManagerProcess::applyOperation( if (!resourceProviderId.isSome()) { LOG(ERROR) << "Failed to get the resource provider ID of operation " << "'" << operation.id() << "' (uuid: " << operationUUID - << ") from framework " << frameworkId << ": " + << ") from " + << (frameworkId.isSome() + ? "framework " + stringify(frameworkId.get()) + : "an operator API call") + << ": " << (resourceProviderId.isError() ? resourceProviderId.error() : "Not found"); return; @@ -482,7 +488,10 @@ void ResourceProviderManagerProcess::applyOperation( if (!resourceProviders.subscribed.contains(resourceProviderId.get())) { LOG(WARNING) << "Dropping operation '" << operation.id() << "' (uuid: " - << operationUUID << ") from framework " << frameworkId + << operationUUID << ") from " + << (frameworkId.isSome() + ? "framework " + stringify(frameworkId.get()) + : "an operator API call") << " because resource provider " << resourceProviderId.get() << " is not subscribed"; return; @@ -502,8 +511,10 @@ void ResourceProviderManagerProcess::applyOperation( Event event; event.set_type(Event::APPLY_OPERATION); - event.mutable_apply_operation() - ->mutable_framework_id()->CopyFrom(frameworkId); + if (frameworkId.isSome()) { + event.mutable_apply_operation() + ->mutable_framework_id()->CopyFrom(frameworkId.get()); + } event.mutable_apply_operation()->mutable_info()->CopyFrom(operation); event.mutable_apply_operation() ->mutable_operation_uuid()->CopyFrom(message.operation_uuid()); @@ -513,9 +524,12 @@ void ResourceProviderManagerProcess::applyOperation( if (!resourceProvider->http.send(event)) { LOG(WARNING) << "Failed to send operation '" << operation.id() << "' " - << "(uuid: " << operationUUID << ") from framework " - << frameworkId << " to resource provider " - << resourceProviderId.get() << ": connection closed"; + << "(uuid: " << operationUUID << ") from " + << (frameworkId.isSome() + ? "framework " + stringify(frameworkId.get()) + : "an operator API call") + << " to resource provider " << resourceProviderId.get() + << ": connection closed"; } } diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index 797f89e..55c9389 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -5381,6 +5381,342 @@ TEST_F( Clock::resume(); } + +// This test verifies that operators can reserve/unreserve and +// create/destroy persistent volumes with resource provider resources. +TEST_F( + StorageLocalResourceProviderTest, + OperatorOperationsWithResourceProviderResources) +{ + Clock::pause(); + + Future<shared_ptr<TestDiskProfileServer>> server = + TestDiskProfileServer::create(); + + AWAIT_READY(server); + + EXPECT_CALL(*server.get()->process, profiles(_)) + .WillOnce(Return(http::OK(createDiskProfileMapping({{"test", None()}})))) + .WillRepeatedly(Return(Future<http::Response>())); // Stop subsequent polls. + + const Duration pollInterval = Seconds(10); + loadUriDiskProfileAdaptorModule( + stringify(server.get()->process->url()), pollInterval); + + setupResourceProviderConfig(Gigabytes(2)); + + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME; + + // Since the local resource provider daemon is started after the agent + // is registered, it is guaranteed that the slave will send two + // `UpdateSlaveMessage`s, where the latter one contains resources from + // the storage local resource provider. + // + // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because + // Google Mock will search the expectations in reverse order. + Future<UpdateSlaveMessage> updateSlave2 = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + Future<UpdateSlaveMessage> updateSlave1 = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + // Advance the clock to trigger agent registration and prevent retry. + Clock::advance(slaveFlags.registration_backoff_factor); + + AWAIT_READY(updateSlave1); + + // NOTE: We need to resume the clock so that the resource provider can + // periodically check if the CSI endpoint socket has been created by + // the plugin container, which runs in another Linux process. + Clock::resume(); + + AWAIT_READY(updateSlave2); + ASSERT_TRUE(updateSlave2->has_resource_providers()); + + Clock::pause(); + + AWAIT_READY(slaveRegisteredMessage); + + // By default, all resource provider resources are reserved for the 'storage' + // role. We'll refine this role to be able to add a reservation to the + // existing one. + const string role = string("storage/") + DEFAULT_TEST_ROLE; + + // Register a framework to create a 'MOUNT' volume and verify operator + // operation results by checking received offers. + FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO; + framework.set_roles(0, role); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + // Expect some offers to be rescinded as consequence of the operator + // operations. + EXPECT_CALL(sched, offerRescinded(_, _)) + .WillRepeatedly(Return()); + + Future<vector<Offer>> offers; + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillRepeatedly(DeclineOffers()); + + // Create a 'MOUNT' volume from 'RAW' resources, because persistent volumes + // are only supported for 'MOUNT' volumes. + { + EXPECT_CALL( + sched, + resourceOffers( + &driver, + OffersHaveAnyResource( + std::bind(isStoragePool<Resource>, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); + + driver.start(); + + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + + const Offer offer = offers->at(0); + + Resource raw = *Resources(offer.resources()) + .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test")) + .begin(); + + EXPECT_CALL( + sched, + resourceOffers( + &driver, + OffersHaveAnyResource( + std::bind(isMountDisk<Resource>, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); + + driver.acceptOffers( + {offer.id()}, {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)}); + + // NOTE: We need to resume the clock so that the resource provider can + // periodically check if the CSI endpoint socket has been created by + // the plugin container, which runs in another Linux process. + Clock::resume(); + + // Wait for an offer that contains the 'MOUNT' volume with default + // reservation. + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + + Clock::pause(); + } + + Resource mountDisk = *Resources(offers->at(0).resources()) + .filter(std::bind(isMountDisk<Resource>, lambda::_1, "test")) + .begin(); + const SlaveID slaveId = offers->at(0).slave_id(); + + Resources reserved = + Resources(mountDisk).pushReservation(createDynamicReservationInfo( + role, + DEFAULT_CREDENTIAL.principal())); + reserved.unallocate(); + + auto isReservedMountDisk = [](const Resource& r) { + return + r.has_disk() && + r.disk().has_source() && + r.disk().source().type() == Resource::DiskInfo::Source::MOUNT && + r.disk().source().has_vendor() && + r.disk().source().vendor() == TEST_CSI_VENDOR && + r.disk().source().has_id() && + r.disk().source().has_profile() && + !r.disk().has_persistence() && + r.reservations().size() == 2; // Existing and refined reservation. + }; + + // Reserve resources as operator. + { + v1::master::Call v1ReserveResourcesCall; + v1ReserveResourcesCall.set_type(v1::master::Call::RESERVE_RESOURCES); + + v1::master::Call::ReserveResources* reserveResources = + v1ReserveResourcesCall.mutable_reserve_resources(); + + reserveResources->mutable_agent_id()->CopyFrom(evolve(slaveId)); + reserveResources->mutable_resources()->CopyFrom(evolve(reserved)); + + EXPECT_CALL( + sched, + resourceOffers(&driver, OffersHaveAnyResource(isReservedMountDisk))) + .WillOnce(FutureArg<1>(&offers)); + + Future<http::Response> v1ReserveResourcesResponse = http::post( + master.get()->pid, + "api/v1", + createBasicAuthHeaders(DEFAULT_CREDENTIAL), + serialize(ContentType::PROTOBUF, v1ReserveResourcesCall), + stringify(ContentType::PROTOBUF)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ( + http::Accepted().status, v1ReserveResourcesResponse); + + Clock::advance(masterFlags.allocation_interval); + + // Wait for an offer that contains the 'MOUNT' volume with refined + // reservation. + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + } + + vector<Resource> converted; + foreach (Resource resource, reserved) { + if (Resources::isDisk(resource, Resource::DiskInfo::Source::MOUNT)) { + resource.mutable_disk()->mutable_persistence()->set_id( + id::UUID::random().toString()); + resource.mutable_disk()->mutable_persistence()->set_principal( + DEFAULT_CREDENTIAL.principal()); + resource.mutable_disk()->mutable_volume()->set_container_path("volume"); + resource.mutable_disk()->mutable_volume()->set_mode(Volume::RW); + converted.push_back(resource); + } + } + + Resources volumes(converted); + + auto isPersistedMountDisk = [](const Resource& r) { + return + r.has_disk() && + r.disk().has_source() && + r.disk().source().type() == Resource::DiskInfo::Source::MOUNT && + r.disk().source().has_vendor() && + r.disk().source().vendor() == TEST_CSI_VENDOR && + r.disk().source().has_id() && + r.disk().source().has_profile() && + r.disk().has_persistence(); + }; + + // Create persistent volumes as operator. + { + v1::master::Call v1CreateVolumesCall; + v1CreateVolumesCall.set_type(v1::master::Call::CREATE_VOLUMES); + + v1::master::Call::CreateVolumes* createVolumes = + v1CreateVolumesCall.mutable_create_volumes(); + + createVolumes->mutable_agent_id()->CopyFrom(evolve(slaveId)); + createVolumes->mutable_volumes()->CopyFrom(evolve(volumes)); + + EXPECT_CALL( + sched, + resourceOffers(&driver, OffersHaveAnyResource(isPersistedMountDisk))) + .WillOnce(FutureArg<1>(&offers)); + + Future<http::Response> v1CreateVolumesResponse = http::post( + master.get()->pid, + "api/v1", + createBasicAuthHeaders(DEFAULT_CREDENTIAL), + serialize(ContentType::PROTOBUF, v1CreateVolumesCall), + stringify(ContentType::PROTOBUF)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ( + http::Accepted().status, v1CreateVolumesResponse); + + // Remove all offer filters. Existing filters would stop us from + // getting offers otherwise. + driver.reviveOffers(); + + Clock::advance(masterFlags.allocation_interval); + + // Wait for an offer that contains the 'MOUNT' volume with refined + // reservation and persistence. + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + } + + // Destroy persistent volumes as operator. + { + v1::master::Call v1DestroyVolumesCall; + v1DestroyVolumesCall.set_type(v1::master::Call::DESTROY_VOLUMES); + + v1::master::Call::DestroyVolumes* destroyVolumes = + v1DestroyVolumesCall.mutable_destroy_volumes(); + + destroyVolumes->mutable_agent_id()->CopyFrom(evolve(slaveId)); + destroyVolumes->mutable_volumes()->CopyFrom(evolve(volumes)); + + EXPECT_CALL( + sched, + resourceOffers(&driver, OffersHaveAnyResource(isReservedMountDisk))) + .WillOnce(FutureArg<1>(&offers)); + + Future<http::Response> v1DestroyVolumesResponse = http::post( + master.get()->pid, + "api/v1", + createBasicAuthHeaders(DEFAULT_CREDENTIAL), + serialize(ContentType::PROTOBUF, v1DestroyVolumesCall), + stringify(ContentType::PROTOBUF)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ( + http::Accepted().status, v1DestroyVolumesResponse); + + Clock::advance(masterFlags.allocation_interval); + + // Wait for an offer that contains the 'MOUNT' volume with refined + // reservation. + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + } + + // Unreserve resources as operator. + { + v1::master::Call v1UnreserveResourcesCall; + v1UnreserveResourcesCall.set_type(v1::master::Call::UNRESERVE_RESOURCES); + v1::master::Call::UnreserveResources* unreserveResources = + v1UnreserveResourcesCall.mutable_unreserve_resources(); + + unreserveResources->mutable_agent_id()->CopyFrom(evolve(slaveId)); + unreserveResources->mutable_resources()->CopyFrom(evolve(reserved)); + + EXPECT_CALL( + sched, + resourceOffers( + &driver, + OffersHaveAnyResource( + std::bind(isMountDisk<Resource>, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); + + Future<http::Response> v1UnreserveResourcesResponse = http::post( + master.get()->pid, + "api/v1", + createBasicAuthHeaders(DEFAULT_CREDENTIAL), + serialize(ContentType::PROTOBUF, v1UnreserveResourcesCall), + stringify(ContentType::PROTOBUF)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ( + http::Accepted().status, v1UnreserveResourcesResponse); + + Clock::advance(masterFlags.allocation_interval); + + // Wait for an offer that contains the 'MOUNT' volume with default + // reservation. + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + } +} + } // namespace tests { } // namespace internal { } // namespace mesos {
