This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 6b05bc8c05c1951999ea787ffb9c1815bb5ef8b4 Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Mon Feb 4 16:38:50 2019 -0800 Updated `CreateDestroyDisk*` SLRP tests to test pipelined operations. This patch extends the code coverage of the `CreateDestroyDisk` and `CreateDestroyDiskRecovery` tests by testing pipelined `RESERVE`, `CREATE`, `DESTROY` and `UNRESERVE` operations along with `CREATE_DISK` and `DESTROY_DISK`. It also renames `CreateDestroyDiskRecovery` to `CreateDestroyDiskWithRecovery` for consistency. Review: https://reviews.apache.org/r/69898 --- .../storage_local_resource_provider_tests.cpp | 412 ++++++++++----------- 1 file changed, 200 insertions(+), 212 deletions(-) diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index 575795b..e8b3f01 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -717,8 +717,8 @@ TEST_F(StorageLocalResourceProviderTest, ProfileAppeared) } -// This test verifies that the storage local resource provider can -// create then destroy a new volume from a storage pool. +// This test verifies that the storage local resource provider can create then +// destroy a MOUNT disk from a storage pool with other pipelined operations. TEST_F(StorageLocalResourceProviderTest, CreateDestroyDisk) { const string profilesPath = path::join(sandbox.get(), "profiles.json"); @@ -748,7 +748,7 @@ TEST_F(StorageLocalResourceProviderTest, CreateDestroyDisk) // Register a framework to exercise operations. FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO; - framework.set_roles(0, "storage"); + framework.set_roles(0, "storage/role"); MockScheduler sched; MesosSchedulerDriver driver( @@ -756,100 +756,86 @@ TEST_F(StorageLocalResourceProviderTest, CreateDestroyDisk) EXPECT_CALL(sched, registered(&driver, _, _)); - // The framework is expected to see the following offers in sequence: - // 1. One containing a RAW disk resource before `CREATE_DISK`. - // 2. One containing a MOUNT disk resource after `CREATE_DISK`. - // 3. One containing a RAW disk resource after `DESTROY_DISK`. - // - // We set up the expectations for these offers as the test progresses. - Future<vector<Offer>> rawDiskOffers; - Future<vector<Offer>> volumeCreatedOffers; - Future<vector<Offer>> volumeDestroyedOffers; - - Sequence offers; - - // We use the following filter to filter offers that do not have - // wanted resources for 365 days (the maximum). + // We use the following filter to filter offers that do not have wanted + // resources for 365 days (the maximum). Filters declineFilters; declineFilters.set_refuse_seconds(Days(365).secs()); - // Decline offers that contain only the agent's default resources. + // Decline unwanted offers. The master can send such offers before the + // resource provider receives profile updates. EXPECT_CALL(sched, resourceOffers(&driver, _)) .WillRepeatedly(DeclineOffers(declineFilters)); - // We are only interested in any storage pool or volume with a "test" profile. - auto hasSourceType = []( - const Resource& r, - const Resource::DiskInfo::Source::Type& type) { + auto isStoragePool = [](const Resource& r, const string& profile) { return r.has_disk() && r.disk().has_source() && + r.disk().source().type() == Resource::DiskInfo::Source::RAW && + r.disk().source().has_vendor() && + r.disk().source().vendor() == TEST_CSI_VENDOR && + !r.disk().source().has_id() && r.disk().source().has_profile() && - r.disk().source().profile() == "test" && - r.disk().source().type() == type; + r.disk().source().profile() == profile; }; + Future<vector<Offer>> offers; EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( - std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW)))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&rawDiskOffers)); + std::bind(isStoragePool, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); driver.start(); - AWAIT_READY(rawDiskOffers); - ASSERT_FALSE(rawDiskOffers->empty()); + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); - Option<Resource> source; + Offer offer = offers->at(0); - foreach (const Resource& resource, rawDiskOffers->at(0).resources()) { - if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) { - source = resource; - break; - } - } + // Create a MOUNT disk with a pipelined `RESERVE` operation. + Resource raw = *Resources(offer.resources()) + .filter(std::bind(isStoragePool, lambda::_1, "test")) + .begin(); - ASSERT_SOME(source); + Resource reserved = *Resources(raw) + .pushReservation(createDynamicReservationInfo( + framework.roles(0), framework.principal())) + .begin(); - // Create a volume. - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( - std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT)))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&volumeCreatedOffers)); + auto isMountDisk = [](const Resource& r, const string& profile) { + return r.has_disk() && + r.disk().has_source() && + r.disk().source().type() == Resource::DiskInfo::Source::MOUNT && + r.disk().source().has_vendor() && + r.disk().source().vendor() == TEST_CSI_VENDOR && + r.disk().source().has_id() && + r.disk().source().has_profile() && + r.disk().source().profile() == profile; + }; - // We use the following filter so that the resources will not be - // filtered for 5 seconds (the default). - Filters acceptFilters; - acceptFilters.set_refuse_seconds(0); + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(isMountDisk, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); driver.acceptOffers( - {rawDiskOffers->at(0).id()}, - {CREATE_DISK(source.get(), Resource::DiskInfo::Source::MOUNT)}, - acceptFilters); + {offer.id()}, + {RESERVE(reserved), + CREATE_DISK(reserved, Resource::DiskInfo::Source::MOUNT)}); - AWAIT_READY(volumeCreatedOffers); - ASSERT_FALSE(volumeCreatedOffers->empty()); + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); - Option<Resource> volume; + offer = offers->at(0); - foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) { - if (hasSourceType(resource, Resource::DiskInfo::Source::MOUNT)) { - volume = resource; - break; - } - } + Resource created = *Resources(offer.resources()) + .filter(std::bind(isMountDisk, lambda::_1, "test")) + .begin(); - ASSERT_SOME(volume); - ASSERT_TRUE(volume->disk().source().has_vendor()); - EXPECT_EQ(TEST_CSI_VENDOR, volume->disk().source().vendor()); - ASSERT_TRUE(volume->disk().source().has_id()); - ASSERT_TRUE(volume->disk().source().has_metadata()); - ASSERT_TRUE(volume->disk().source().has_mount()); - ASSERT_TRUE(volume->disk().source().mount().has_root()); - EXPECT_FALSE(path::absolute(volume->disk().source().mount().root())); + ASSERT_TRUE(created.disk().source().has_metadata()); + ASSERT_TRUE(created.disk().source().has_mount()); + ASSERT_TRUE(created.disk().source().mount().has_root()); + EXPECT_FALSE(path::absolute(created.disk().source().mount().root())); // Check if the volume is actually created by the test CSI plugin. Option<string> volumePath; - - foreach (const Label& label, volume->disk().source().metadata().labels()) { + foreach (const Label& label, created.disk().source().metadata().labels()) { if (label.key() == "path") { volumePath = label.value(); break; @@ -859,55 +845,49 @@ TEST_F(StorageLocalResourceProviderTest, CreateDestroyDisk) ASSERT_SOME(volumePath); EXPECT_TRUE(os::exists(volumePath.get())); - // Destroy the created volume. - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( - std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW)))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&volumeDestroyedOffers)); - - driver.acceptOffers( - {volumeCreatedOffers->at(0).id()}, - {DESTROY_DISK(volume.get())}, - acceptFilters); + // Destroy the MOUNT disk with pipelined `CREATE`, `DESTROY` and `UNRESERVE` + // operations. Note that `UNRESERVE` can come before `DESTROY_DISK`. + Resource persistentVolume = created; + persistentVolume.mutable_disk()->mutable_persistence() + ->set_id(id::UUID::random().toString()); + persistentVolume.mutable_disk()->mutable_persistence() + ->set_principal(framework.principal()); + persistentVolume.mutable_disk()->mutable_volume() + ->set_container_path("volume"); + persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW); - AWAIT_READY(volumeDestroyedOffers); - ASSERT_FALSE(volumeDestroyedOffers->empty()); + Resource unreserved = *Resources(created) + .popReservation() + .begin(); - Option<Resource> destroyed; + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(raw))) + .WillOnce(FutureArg<1>(&offers)); - foreach (const Resource& resource, volumeDestroyedOffers->at(0).resources()) { - if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) { - destroyed = resource; - break; - } - } + driver.acceptOffers( + {offer.id()}, + {CREATE(persistentVolume), + DESTROY(persistentVolume), + UNRESERVE(created), + DESTROY_DISK(unreserved)}); - ASSERT_SOME(destroyed); - ASSERT_TRUE(destroyed->disk().source().has_vendor()); - EXPECT_EQ(TEST_CSI_VENDOR, destroyed->disk().source().vendor()); - ASSERT_FALSE(destroyed->disk().source().has_id()); - ASSERT_FALSE(destroyed->disk().source().has_metadata()); - ASSERT_FALSE(destroyed->disk().source().has_mount()); + AWAIT_READY(offers); // Check if the volume is actually deleted by the test CSI plugin. EXPECT_FALSE(os::exists(volumePath.get())); } -// This test verifies that the storage local resource provider can -// destroy a volume created from a storage pool after recovery. -TEST_F(StorageLocalResourceProviderTest, CreateDestroyDiskRecovery) +// This test verifies that the storage local resource provider can destroy a +// MOUNT disk created from a storage pool with other pipelined operations after +// recovery. +TEST_F(StorageLocalResourceProviderTest, CreateDestroyDiskWithRecovery) { - Future<shared_ptr<TestDiskProfileServer>> server = - TestDiskProfileServer::create(); - AWAIT_READY(server); + const string profilesPath = path::join(sandbox.get(), "profiles.json"); - Promise<http::Response> recoveredProfileMapping; - EXPECT_CALL(*server.get()->process, profiles(_)) - .WillOnce(Return(http::OK(createDiskProfileMapping({{"test", None()}})))) - .WillOnce(Return(recoveredProfileMapping.future())); + ASSERT_SOME( + os::write(profilesPath, createDiskProfileMapping({{"test", None()}}))); - loadUriDiskProfileAdaptorModule(stringify(server.get()->process->url())); + loadUriDiskProfileAdaptorModule(profilesPath); setupResourceProviderConfig(Gigabytes(4)); @@ -929,7 +909,7 @@ TEST_F(StorageLocalResourceProviderTest, CreateDestroyDiskRecovery) // Register a framework to exercise operations. FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO; - framework.set_roles(0, "storage"); + framework.set_roles(0, "storage/role"); MockScheduler sched; MesosSchedulerDriver driver( @@ -937,103 +917,86 @@ TEST_F(StorageLocalResourceProviderTest, CreateDestroyDiskRecovery) EXPECT_CALL(sched, registered(&driver, _, _)); - // The framework is expected to see the following offers in sequence: - // 1. One containing a RAW disk resource before `CREATE_DISK`. - // 2. One containing a MOUNT disk resource after `CREATE_DISK`. - // 3. One containing a MOUNT disk resource after the agent recovers - // from a failover. - // 4. One containing a RAW disk resource after `DESTROY_DISK`. - // - // We set up the expectations for these offers as the test progresses. - Future<vector<Offer>> rawDiskOffers; - Future<vector<Offer>> volumeCreatedOffers; - Future<vector<Offer>> slaveRecoveredOffers; - Future<vector<Offer>> volumeDestroyedOffers; - - Sequence offers; - - // We use the following filter to filter offers that do not have - // wanted resources for 365 days (the maximum). + // We use the following filter to filter offers that do not have wanted + // resources for 365 days (the maximum). Filters declineFilters; declineFilters.set_refuse_seconds(Days(365).secs()); - // Decline offers that contain only the agent's default resources. + // Decline unwanted offers. The master can send such offers before the + // resource provider receives profile updates. EXPECT_CALL(sched, resourceOffers(&driver, _)) .WillRepeatedly(DeclineOffers(declineFilters)); - // We are only interested in any storage pool or volume with a "test" profile. - auto hasSourceType = []( - const Resource& r, - const Resource::DiskInfo::Source::Type& type) { + auto isStoragePool = [](const Resource& r, const string& profile) { return r.has_disk() && r.disk().has_source() && + r.disk().source().type() == Resource::DiskInfo::Source::RAW && + r.disk().source().has_vendor() && + r.disk().source().vendor() == TEST_CSI_VENDOR && + !r.disk().source().has_id() && r.disk().source().has_profile() && - r.disk().source().profile() == "test" && - r.disk().source().type() == type; + r.disk().source().profile() == profile; }; + Future<vector<Offer>> offers; EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( - std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW)))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&rawDiskOffers)); + std::bind(isStoragePool, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); driver.start(); - AWAIT_READY(rawDiskOffers); - ASSERT_FALSE(rawDiskOffers->empty()); + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); - Option<Resource> source; + Offer offer = offers->at(0); - foreach (const Resource& resource, rawDiskOffers->at(0).resources()) { - if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) { - source = resource; - break; - } - } + // Create a MOUNT disk with a pipelined `RESERVE` operation. + Resource raw = *Resources(offer.resources()) + .filter(std::bind(isStoragePool, lambda::_1, "test")) + .begin(); - ASSERT_SOME(source); + Resource reserved = *Resources(raw) + .pushReservation(createDynamicReservationInfo( + framework.roles(0), framework.principal())) + .begin(); - // Create a volume. - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( - std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT)))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&volumeCreatedOffers)); + auto isMountDisk = [](const Resource& r, const string& profile) { + return r.has_disk() && + r.disk().has_source() && + r.disk().source().type() == Resource::DiskInfo::Source::MOUNT && + r.disk().source().has_vendor() && + r.disk().source().vendor() == TEST_CSI_VENDOR && + r.disk().source().has_id() && + r.disk().source().has_profile() && + r.disk().source().profile() == profile; + }; - // We use the following filter so that the resources will not be - // filtered for 5 seconds (the default). - Filters acceptFilters; - acceptFilters.set_refuse_seconds(0); + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(isMountDisk, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); driver.acceptOffers( - {rawDiskOffers->at(0).id()}, - {CREATE_DISK(source.get(), Resource::DiskInfo::Source::MOUNT)}, - acceptFilters); + {offer.id()}, + {RESERVE(reserved), + CREATE_DISK(reserved, Resource::DiskInfo::Source::MOUNT)}); - AWAIT_READY(volumeCreatedOffers); - ASSERT_FALSE(volumeCreatedOffers->empty()); + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); - Option<Resource> volume; + offer = offers->at(0); - foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) { - if (hasSourceType(resource, Resource::DiskInfo::Source::MOUNT)) { - volume = resource; - break; - } - } + Resource created = *Resources(offer.resources()) + .filter(std::bind(isMountDisk, lambda::_1, "test")) + .begin(); - ASSERT_SOME(volume); - ASSERT_TRUE(volume->disk().source().has_vendor()); - EXPECT_EQ(TEST_CSI_VENDOR, volume->disk().source().vendor()); - ASSERT_TRUE(volume->disk().source().has_id()); - ASSERT_TRUE(volume->disk().source().has_metadata()); - ASSERT_TRUE(volume->disk().source().has_mount()); - ASSERT_TRUE(volume->disk().source().mount().has_root()); - EXPECT_FALSE(path::absolute(volume->disk().source().mount().root())); + ASSERT_TRUE(created.disk().source().has_metadata()); + ASSERT_TRUE(created.disk().source().has_mount()); + ASSERT_TRUE(created.disk().source().mount().has_root()); + EXPECT_FALSE(path::absolute(created.disk().source().mount().root())); // Check if the volume is actually created by the test CSI plugin. Option<string> volumePath; - - foreach (const Label& label, volume->disk().source().metadata().labels()) { + foreach (const Label& label, created.disk().source().metadata().labels()) { if (label.key() == "path") { volumePath = label.value(); break; @@ -1048,57 +1011,82 @@ TEST_F(StorageLocalResourceProviderTest, CreateDestroyDiskRecovery) slave.get()->terminate(); + // NOTE: We set up the resource provider with an extra storage pool, so that + // when the storage pool shows up, we know that the resource provider has + // finished reconciling storage pools and thus operations won't be dropped. + // + // To achieve this, we drop `SlaveRegisteredMessage`s other than the first one + // to avoid unexpected `UpdateSlaveMessage`s. Then, we also drop the first two + // `UpdateSlaveMessage`s (one sent after agent reregistration and one after + // resource provider reregistration) and wait for the third one, which should + // contain the extra storage pool. We let it fall through to trigger an offer + // allocation for the persistent volume. + // + // Since the extra storage pool is never used, we reject the offers if only + // the storage pool is presented. + // + // TODO(chhsiao): Remove this workaround once MESOS-9553 is done. + setupResourceProviderConfig(Gigabytes(5)); + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( - std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT)))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&slaveRecoveredOffers)); + std::bind(isStoragePool, lambda::_1, "test")))) + .WillRepeatedly(DeclineOffers(declineFilters)); - slave = StartSlave(detector.get(), slaveFlags); - ASSERT_SOME(slave); + // NOTE: The order of these expectations is reversed because Google Mock will + // search the expectations in reverse order. + Future<UpdateSlaveMessage> updateSlaveMessage = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); - AWAIT_READY(slaveRecoveredOffers); - ASSERT_FALSE(slaveRecoveredOffers->empty()); + DROP_PROTOBUF(UpdateSlaveMessage(), _, _); + DROP_PROTOBUF(UpdateSlaveMessage(), _, _); + DROP_PROTOBUFS(SlaveReregisteredMessage(), _, _); + FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); - // Destroy the created volume. - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( - std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW)))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&volumeDestroyedOffers)); + EXPECT_CALL( + sched, resourceOffers(&driver, OffersHaveResource(created))) + .WillOnce(FutureArg<1>(&offers)); - Future<UpdateOperationStatusMessage> destroyDiskStatus = - FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); + slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); - driver.acceptOffers( - {slaveRecoveredOffers->at(0).id()}, - {DESTROY_DISK(volume.get())}, - acceptFilters); + AWAIT_READY(updateSlaveMessage); + ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size()); + ASSERT_FALSE(Resources( + updateSlaveMessage->resource_providers().providers(0).total_resources()) + .filter(std::bind(isStoragePool, lambda::_1, "test")) + .empty()); - AWAIT_READY(destroyDiskStatus); - EXPECT_EQ(OPERATION_FINISHED, destroyDiskStatus->status().state()); + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); - // NOTE: We update the disk profile mapping after the `DESTROY_DISK` operation - // is applied, otherwise it could be dropped due to reconciling storage pools. - recoveredProfileMapping.set( - http::OK(createDiskProfileMapping({{"test", None()}}))); + offer = offers->at(0); - AWAIT_READY(volumeDestroyedOffers); - ASSERT_FALSE(volumeDestroyedOffers->empty()); + // Destroy the MOUNT disk with pipelined `CREATE`, `DESTROY` and `UNRESERVE` + // operations. Note that `UNRESERVE` can come before `DESTROY_DISK`. + Resource persistentVolume = created; + persistentVolume.mutable_disk()->mutable_persistence() + ->set_id(id::UUID::random().toString()); + persistentVolume.mutable_disk()->mutable_persistence() + ->set_principal(framework.principal()); + persistentVolume.mutable_disk()->mutable_volume() + ->set_container_path("volume"); + persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW); - Option<Resource> destroyed; + Resource unreserved = *Resources(created) + .popReservation() + .begin(); - foreach (const Resource& resource, volumeDestroyedOffers->at(0).resources()) { - if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) { - destroyed = resource; - break; - } - } + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(raw))) + .WillOnce(FutureArg<1>(&offers)); - ASSERT_SOME(destroyed); - ASSERT_TRUE(destroyed->disk().source().has_vendor()); - EXPECT_EQ(TEST_CSI_VENDOR, destroyed->disk().source().vendor()); - ASSERT_FALSE(destroyed->disk().source().has_id()); - ASSERT_FALSE(destroyed->disk().source().has_metadata()); - ASSERT_FALSE(destroyed->disk().source().has_mount()); + driver.acceptOffers( + {offer.id()}, + {CREATE(persistentVolume), + DESTROY(persistentVolume), + UNRESERVE(created), + DESTROY_DISK(unreserved)}); + + AWAIT_READY(offers); // Check if the volume is actually deleted by the test CSI plugin. EXPECT_FALSE(os::exists(volumePath.get()));
