This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit a6effff757467d6443ccc1328a65cd3e36aaa871 Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Fri Feb 1 17:01:06 2019 -0800 Made SLRP `PublishResourcesRecovery` test to check volume cleanup. This patch renames the `ROOT_PublishResourcesRecovery` test to `ROOT_CreateDestroyPersistentMountVolumeWithRecovery` and makes it verify that the persistent volume is cleaned up with `DESTROY` after recovery. NOTE: The `filesystem/linux` isolator has been removed from the test because it is not necessary for the test. However, the root privilege is still required by the test CSI plugin for bind-mounting. Review: https://reviews.apache.org/r/69897 --- .../storage_local_resource_provider_tests.cpp | 388 +++++++++++---------- 1 file changed, 209 insertions(+), 179 deletions(-) diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index f446a46..575795b 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -1993,9 +1993,22 @@ TEST_F( } -// This test verifies that the storage local resource provider can -// destroy a published volume after recovery. -TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery) +// This test verifies that the storage local resource provider can republish and +// clean up persistent volumes after recovery. +// +// To accomplish this: +// 1. Create a MOUNT disk from a RAW disk resources. +// 2. Create a persistent volume on the MOUNT disk then launch a task to +// write a file into it. +// 3. Restart the agent. +// 4. Launch another task to read the file. +// 5. Restart the agent again. +// 6. Destroy the persistent volume but keep the MOUNT disk. The file should +// be deleted. +// 7. Destroy the MOUNT disk. +TEST_F( + StorageLocalResourceProviderTest, + ROOT_CreateDestroyPersistentMountVolumeWithRecovery) { const string profilesPath = path::join(sandbox.get(), "profiles.json"); @@ -2012,8 +2025,6 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery) Owned<MasterDetector> detector = master.get()->createDetector(); slave::Flags slaveFlags = CreateSlaveFlags(); - slaveFlags.isolation = "filesystem/linux"; - slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME; Future<SlaveRegisteredMessage> slaveRegisteredMessage = @@ -2034,120 +2045,74 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery) EXPECT_CALL(sched, registered(&driver, _, _)); - // The framework is expected to see the following offers in sequence: - // 1. One containing a RAW disk resource before `CREATE_DISK`. - // 2. One containing a MOUNT disk resource after `CREATE_DISK`. - // 3. One containing a persistent volume after `CREATE` and `LAUNCH`. - // 4. One containing the same persistent volume after the agent - // recovers from a failover. - // 5. One containing the same persistent volume after another `LAUNCH`. - // 6. One containing the original RAW disk resource after `DESTROY` - // and `DESTROY_DISK`. - // - // We set up the expectations for these offers as the test progresses. - Future<vector<Offer>> rawDiskOffers; - Future<vector<Offer>> volumeCreatedOffers; - Future<vector<Offer>> task1FinishedOffers; - Future<vector<Offer>> slaveRecoveredOffers; - Future<vector<Offer>> task2FinishedOffers; - Future<vector<Offer>> volumeDestroyedOffers; - - Sequence offers; - - // We use the following filter to filter offers that do not have - // wanted resources for 365 days (the maximum). + // We use the following filter to filter offers that do not have wanted + // resources for 365 days (the maximum). Filters declineFilters; declineFilters.set_refuse_seconds(Days(365).secs()); - // Decline offers that contain only the agent's default resources. + // Decline unwanted offers. The master can send such offers before the + // resource provider receives profile updates. EXPECT_CALL(sched, resourceOffers(&driver, _)) .WillRepeatedly(DeclineOffers(declineFilters)); - // We are only interested in any storage pool or volume with a "test" profile. - auto hasSourceType = []( - const Resource& r, - const Resource::DiskInfo::Source::Type& type) { + auto isStoragePool = [](const Resource& r, const string& profile) { return r.has_disk() && r.disk().has_source() && + r.disk().source().type() == Resource::DiskInfo::Source::RAW && + r.disk().source().has_vendor() && + r.disk().source().vendor() == TEST_CSI_VENDOR && + !r.disk().source().has_id() && r.disk().source().has_profile() && - r.disk().source().profile() == "test" && - r.disk().source().type() == type; + r.disk().source().profile() == profile; }; + Future<vector<Offer>> offers; EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( - std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW)))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&rawDiskOffers)); + std::bind(isStoragePool, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); driver.start(); - AWAIT_READY(rawDiskOffers); - ASSERT_FALSE(rawDiskOffers->empty()); + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); - Option<Resource> source; + Offer offer = offers->at(0); - foreach (const Resource& resource, rawDiskOffers->at(0).resources()) { - if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) { - source = resource; - break; - } - } + // Create a MOUNT disk. + Resource raw = *Resources(offer.resources()) + .filter(std::bind(isStoragePool, lambda::_1, "test")) + .begin(); - ASSERT_SOME(source); + auto isMountDisk = [](const Resource& r, const string& profile) { + return r.has_disk() && + r.disk().has_source() && + r.disk().source().type() == Resource::DiskInfo::Source::MOUNT && + r.disk().source().has_vendor() && + r.disk().source().vendor() == TEST_CSI_VENDOR && + r.disk().source().has_id() && + r.disk().source().has_profile() && + r.disk().source().profile() == profile && + !r.disk().has_persistence(); + }; - // Create a volume. EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( - std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT)))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&volumeCreatedOffers)); - - // We use the following filter so that the resources will not be - // filtered for 5 seconds (the default). - Filters acceptFilters; - acceptFilters.set_refuse_seconds(0); + std::bind(isMountDisk, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); driver.acceptOffers( - {rawDiskOffers->at(0).id()}, - {CREATE_DISK(source.get(), Resource::DiskInfo::Source::MOUNT)}, - acceptFilters); - - AWAIT_READY(volumeCreatedOffers); - ASSERT_FALSE(volumeCreatedOffers->empty()); - - Option<Resource> volume; - - foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) { - if (hasSourceType(resource, Resource::DiskInfo::Source::MOUNT)) { - volume = resource; - break; - } - } - - ASSERT_SOME(volume); - ASSERT_TRUE(volume->disk().source().has_vendor()); - EXPECT_EQ(TEST_CSI_VENDOR, volume->disk().source().vendor()); - ASSERT_TRUE(volume->disk().source().has_id()); - ASSERT_TRUE(volume->disk().source().has_metadata()); - ASSERT_TRUE(volume->disk().source().has_mount()); - ASSERT_TRUE(volume->disk().source().mount().has_root()); - EXPECT_FALSE(path::absolute(volume->disk().source().mount().root())); + {offer.id()}, {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)}); - // Check if the volume is actually created by the test CSI plugin. - Option<string> volumePath; + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); - foreach (const Label& label, volume->disk().source().metadata().labels()) { - if (label.key() == "path") { - volumePath = label.value(); - break; - } - } + offer = offers->at(0); - ASSERT_SOME(volumePath); - EXPECT_TRUE(os::exists(volumePath.get())); + Resource created = *Resources(offer.resources()) + .filter(std::bind(isMountDisk, lambda::_1, "test")) + .begin(); - // Create a persistent volume on the CSI volume, then launch a task to - // use the persistent volume. - Resource persistentVolume = volume.get(); + // Create a persistent MOUNT volume then launch a task to write a file. + Resource persistentVolume = created; persistentVolume.mutable_disk()->mutable_persistence() ->set_id(id::UUID::random().toString()); persistentVolume.mutable_disk()->mutable_persistence() @@ -2156,111 +2121,176 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery) ->set_container_path("volume"); persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW); - { - Future<TaskStatus> taskStarting; - Future<TaskStatus> taskRunning; - Future<TaskStatus> taskFinished; - - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&taskStarting)) - .WillOnce(FutureArg<1>(&taskRunning)) - .WillOnce(FutureArg<1>(&taskFinished)); - - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource( - persistentVolume))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&task1FinishedOffers)); - - driver.acceptOffers( - {volumeCreatedOffers->at(0).id()}, - {CREATE(persistentVolume), - LAUNCH({createTask( - volumeCreatedOffers->at(0).slave_id(), - persistentVolume, - createCommandInfo("touch " + path::join("volume", "file")))})}, - acceptFilters); - - AWAIT_READY(taskStarting); - EXPECT_EQ(TASK_STARTING, taskStarting->state()); - - AWAIT_READY(taskRunning); - EXPECT_EQ(TASK_RUNNING, taskRunning->state()); - - AWAIT_READY(taskFinished); - EXPECT_EQ(TASK_FINISHED, taskFinished->state()); - } + Future<Nothing> taskFinished; + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_STARTING))); + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_RUNNING))); + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_FINISHED))) + .WillOnce(FutureSatisfy(&taskFinished)); + + EXPECT_CALL( + sched, resourceOffers(&driver, OffersHaveResource(persistentVolume))) + .WillOnce(FutureArg<1>(&offers)); + + driver.acceptOffers( + {offer.id()}, + {CREATE(persistentVolume), + LAUNCH({createTask( + offer.slave_id(), + persistentVolume, + createCommandInfo("touch " + path::join("volume", "file")))})}); - AWAIT_READY(task1FinishedOffers); + AWAIT_READY(taskFinished); + + AWAIT_READY(offers); // Restart the agent. EXPECT_CALL(sched, offerRescinded(_, _)); slave.get()->terminate(); - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource( - persistentVolume))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&slaveRecoveredOffers)); + EXPECT_CALL( + sched, resourceOffers(&driver, OffersHaveResource(persistentVolume))) + .WillOnce(FutureArg<1>(&offers)); slave = StartSlave(detector.get(), slaveFlags); ASSERT_SOME(slave); - AWAIT_READY(slaveRecoveredOffers); - ASSERT_FALSE(slaveRecoveredOffers->empty()); + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); - // Launch another task to read the file that is created by the - // previous task on the persistent volume. - { - Future<TaskStatus> taskStarting; - Future<TaskStatus> taskRunning; - Future<TaskStatus> taskFinished; - - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&taskStarting)) - .WillOnce(FutureArg<1>(&taskRunning)) - .WillOnce(FutureArg<1>(&taskFinished)); - - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource( - persistentVolume))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&task2FinishedOffers)); - - driver.acceptOffers( - {slaveRecoveredOffers->at(0).id()}, - {LAUNCH({createTask( - slaveRecoveredOffers->at(0).slave_id(), - persistentVolume, - createCommandInfo("test -f " + path::join("volume", "file")))})}, - acceptFilters); - - AWAIT_READY(taskStarting); - EXPECT_EQ(TASK_STARTING, taskStarting->state()); - - AWAIT_READY(taskRunning); - EXPECT_EQ(TASK_RUNNING, taskRunning->state()); - - AWAIT_READY(taskFinished); - EXPECT_EQ(TASK_FINISHED, taskFinished->state()); - } + offer = offers->at(0); - AWAIT_READY(task2FinishedOffers); + // Launch another task to read the file created by the previous task. + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_STARTING))); + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_RUNNING))); + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_FINISHED))) + .WillOnce(FutureSatisfy(&taskFinished)); - // Destroy the persistent volume and the CSI volume. - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(source.get()))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&volumeDestroyedOffers)); + EXPECT_CALL( + sched, resourceOffers(&driver, OffersHaveResource(persistentVolume))) + .WillOnce(FutureArg<1>(&offers)); driver.acceptOffers( - {task2FinishedOffers->at(0).id()}, - {DESTROY(persistentVolume), - DESTROY_DISK(volume.get())}, - acceptFilters); + {offer.id()}, + {LAUNCH({createTask( + offer.slave_id(), + persistentVolume, + createCommandInfo("test -f " + path::join("volume", "file")))})}); - AWAIT_READY(volumeDestroyedOffers); - ASSERT_FALSE(volumeDestroyedOffers->empty()); + AWAIT_READY(taskFinished); - // Check if the volume is actually deleted by the test CSI plugin. + AWAIT_READY(offers); + + // Restart the agent. + EXPECT_CALL(sched, offerRescinded(_, _)); + + slave.get()->terminate(); + + // NOTE: We set up the resource provider with an extra storage pool, so that + // when the storage pool shows up, we know that the resource provider has + // finished reconciling storage pools and thus operations won't be dropped. + // + // To achieve this, we drop `SlaveRegisteredMessage`s other than the first one + // to avoid unexpected `UpdateSlaveMessage`s. Then, we also drop the first two + // `UpdateSlaveMessage`s (one sent after agent reregistration and one after + // resource provider reregistration) and wait for the third one, which should + // contain the extra storage pool. We let it fall through to trigger an offer + // allocation for the persistent volume. + // + // Since the extra storage pool is never used, we reject the offers if only + // the storage pool is presented. + // + // TODO(chhsiao): Remove this workaround once MESOS-9553 is done. + setupResourceProviderConfig(Gigabytes(5)); + + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(isStoragePool, lambda::_1, "test")))) + .WillRepeatedly(DeclineOffers(declineFilters)); + + // NOTE: The order of these expectations is reversed because Google Mock will + // search the expectations in reverse order. + Future<UpdateSlaveMessage> updateSlaveMessage = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + DROP_PROTOBUF(UpdateSlaveMessage(), _, _); + DROP_PROTOBUF(UpdateSlaveMessage(), _, _); + DROP_PROTOBUFS(SlaveReregisteredMessage(), _, _); + FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + + EXPECT_CALL( + sched, resourceOffers(&driver, OffersHaveResource(persistentVolume))) + .WillOnce(FutureArg<1>(&offers)); + + slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + AWAIT_READY(updateSlaveMessage); + ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size()); + ASSERT_FALSE(Resources( + updateSlaveMessage->resource_providers().providers(0).total_resources()) + .filter(std::bind(isStoragePool, lambda::_1, "test")) + .empty()); + + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + + offer = offers->at(0); + + // Destroy the persistent volume. + Future<UpdateOperationStatusMessage> updateOperationStatusMessage = + FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); + + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(created))) + .WillOnce(FutureArg<1>(&offers)); + + // TODO(chhsiao): We use the following filter so that the resources will not + // be filtered for 5 seconds (the default) because of MESOS-9616. Remove the + // filter once it is resolved. + Filters acceptFilters; + acceptFilters.set_refuse_seconds(0); + driver.acceptOffers({offer.id()}, {DESTROY(persistentVolume)}, acceptFilters); + + // NOTE: Since `DESTROY` would be applied by the master synchronously, we + // might get an offer before the persistent volume is cleaned up on the agent, + // so we wait for an `UpdateOperationStatusMessage` before the check below. + AWAIT_READY(updateOperationStatusMessage); + EXPECT_EQ(OPERATION_FINISHED, updateOperationStatusMessage->status().state()); + + // Check if the CSI volume still exists but has being cleaned up. + Option<string> volumePath; + + foreach (const Label& label, created.disk().source().metadata().labels()) { + if (label.key() == "path") { + volumePath = label.value(); + break; + } + } + + ASSERT_SOME(volumePath); + EXPECT_TRUE(os::exists(volumePath.get())); + EXPECT_FALSE(os::exists(path::join(volumePath.get(), "file"))); + + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + + offer = offers->at(0); + + // Destroy the MOUNT disk. + updateOperationStatusMessage = + FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); + + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(raw))) + .WillOnce(FutureArg<1>(&offers)); + + driver.acceptOffers({offer.id()}, {DESTROY_DISK(created)}); + + AWAIT_READY(updateOperationStatusMessage); + EXPECT_EQ(OPERATION_FINISHED, updateOperationStatusMessage->status().state()); + + // Check if the CSI volume is actually deleted. EXPECT_FALSE(os::exists(volumePath.get())); + + AWAIT_READY(offers); }
