This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 8aaf50291f8cc601cf1742dfca610bd27bc897f4 Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Thu Jan 31 19:05:48 2019 -0800 Made SLRP `PublishResourcesReboot` test to check persistent volume cleanup. This patch renames the `ROOT_PublishResourcesReboot` test to `ROOT_CreateDestroyPersistentMountVolumeWithReboot` and makes it verify that the persistent volume is cleaned up with `DESTROY` after a reboot. NOTE: The `filesystem/linux` isolator has been removed from the test because it is not necessary for the test. However, the root privilege is still required by the test CSI plugin for bind-mounting. Review: https://reviews.apache.org/r/69896 --- .../storage_local_resource_provider_tests.cpp | 438 +++++++++++---------- 1 file changed, 237 insertions(+), 201 deletions(-) diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index 679d995..f446a46 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -2264,9 +2264,22 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery) } -// This test verifies that the storage local resource provider can -// destroy a published volume after agent reboot. -TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesReboot) +// This test verifies that the storage local resource provider can republish and +// clean up persistent volumes after agent reboot. +// +// To accomplish this: +// 1. Create a MOUNT disk from a RAW disk resource. +// 2. Create a persistent volume on the MOUNT disk then launches a task to +// write a file into it. +// 3. Simulate an agent reboot. +// 4. Launch another task to read the file. +// 5. Simulate another agent reboot. +// 6. Destroy the persistent volume but keep the MOUNT disk. The file should +// be deleted. +// 7. Destroy the MOUNT disk. +TEST_F( + StorageLocalResourceProviderTest, + ROOT_CreateDestroyPersistentMountVolumeWithReboot) { const string profilesPath = path::join(sandbox.get(), "profiles.json"); @@ -2283,8 +2296,6 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesReboot) Owned<MasterDetector> detector = master.get()->createDetector(); slave::Flags slaveFlags = CreateSlaveFlags(); - slaveFlags.isolation = "filesystem/linux"; - slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME; Future<SlaveRegisteredMessage> slaveRegisteredMessage = @@ -2305,120 +2316,74 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesReboot) EXPECT_CALL(sched, registered(&driver, _, _)); - // The framework is expected to see the following offers in sequence: - // 1. One containing a RAW disk resource before `CREATE_DISK`. - // 2. One containing a MOUNT disk resource after `CREATE_DISK`. - // 3. One containing a persistent volume after `CREATE` and `LAUNCH`. - // 4. One containing the same persistent volume after the agent - // recovers from a failover. - // 5. One containing the same persistent volume after another `LAUNCH`. - // 6. One containing the original RAW disk resource after `DESTROY` - // and `DESTROY_DISK`. - // - // We set up the expectations for these offers as the test progresses. - Future<vector<Offer>> rawDiskOffers; - Future<vector<Offer>> volumeCreatedOffers; - Future<vector<Offer>> task1FinishedOffers; - Future<vector<Offer>> slaveRecoveredOffers; - Future<vector<Offer>> task2FinishedOffers; - Future<vector<Offer>> volumeDestroyedOffers; - - Sequence offers; - - // We use the following filter to filter offers that do not have - // wanted resources for 365 days (the maximum). + // We use the following filter to filter offers that do not have wanted + // resources for 365 days (the maximum). Filters declineFilters; declineFilters.set_refuse_seconds(Days(365).secs()); - // Decline offers that contain only the agent's default resources. + // Decline unwanted offers. The master can send such offers before the + // resource provider receives profile updates. EXPECT_CALL(sched, resourceOffers(&driver, _)) .WillRepeatedly(DeclineOffers(declineFilters)); - // We are only interested in any storage pool or volume with a "test" profile. - auto hasSourceType = []( - const Resource& r, - const Resource::DiskInfo::Source::Type& type) { + auto isStoragePool = [](const Resource& r, const string& profile) { return r.has_disk() && r.disk().has_source() && + r.disk().source().type() == Resource::DiskInfo::Source::RAW && + r.disk().source().has_vendor() && + r.disk().source().vendor() == TEST_CSI_VENDOR && + !r.disk().source().has_id() && r.disk().source().has_profile() && - r.disk().source().profile() == "test" && - r.disk().source().type() == type; + r.disk().source().profile() == profile; }; + Future<vector<Offer>> offers; EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( - std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW)))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&rawDiskOffers)); + std::bind(isStoragePool, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); driver.start(); - AWAIT_READY(rawDiskOffers); - ASSERT_FALSE(rawDiskOffers->empty()); + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); - Option<Resource> source; + Offer offer = offers->at(0); - foreach (const Resource& resource, rawDiskOffers->at(0).resources()) { - if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) { - source = resource; - break; - } - } + // Create a MOUNT disk. + Resource raw = *Resources(offer.resources()) + .filter(std::bind(isStoragePool, lambda::_1, "test")) + .begin(); - ASSERT_SOME(source); + auto isMountDisk = [](const Resource& r, const string& profile) { + return r.has_disk() && + r.disk().has_source() && + r.disk().source().type() == Resource::DiskInfo::Source::MOUNT && + r.disk().source().has_vendor() && + r.disk().source().vendor() == TEST_CSI_VENDOR && + r.disk().source().has_id() && + r.disk().source().has_profile() && + r.disk().source().profile() == profile && + !r.disk().has_persistence(); + }; - // Create a volume. EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( - std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT)))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&volumeCreatedOffers)); - - // We use the following filter so that the resources will not be - // filtered for 5 seconds (the default). - Filters acceptFilters; - acceptFilters.set_refuse_seconds(0); + std::bind(isMountDisk, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); driver.acceptOffers( - {rawDiskOffers->at(0).id()}, - {CREATE_DISK(source.get(), Resource::DiskInfo::Source::MOUNT)}, - acceptFilters); - - AWAIT_READY(volumeCreatedOffers); - ASSERT_FALSE(volumeCreatedOffers->empty()); - - Option<Resource> volume; - - foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) { - if (hasSourceType(resource, Resource::DiskInfo::Source::MOUNT)) { - volume = resource; - break; - } - } - - ASSERT_SOME(volume); - ASSERT_TRUE(volume->disk().source().has_vendor()); - EXPECT_EQ(TEST_CSI_VENDOR, volume->disk().source().vendor()); - ASSERT_TRUE(volume->disk().source().has_id()); - ASSERT_TRUE(volume->disk().source().has_metadata()); - ASSERT_TRUE(volume->disk().source().has_mount()); - ASSERT_TRUE(volume->disk().source().mount().has_root()); - EXPECT_FALSE(path::absolute(volume->disk().source().mount().root())); + {offer.id()}, {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)}); - // Check if the volume is actually created by the test CSI plugin. - Option<string> volumePath; + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); - foreach (const Label& label, volume->disk().source().metadata().labels()) { - if (label.key() == "path") { - volumePath = label.value(); - break; - } - } + offer = offers->at(0); - ASSERT_SOME(volumePath); - EXPECT_TRUE(os::exists(volumePath.get())); + Resource created = *Resources(offer.resources()) + .filter(std::bind(isMountDisk, lambda::_1, "test")) + .begin(); - // Create a persistent volume on the CSI volume, then launch a task to - // use the persistent volume. - Resource persistentVolume = volume.get(); + // Create a persistent MOUNT volume then launch a task to write a file. + Resource persistentVolume = created; persistentVolume.mutable_disk()->mutable_persistence() ->set_id(id::UUID::random().toString()); persistentVolume.mutable_disk()->mutable_persistence() @@ -2427,149 +2392,220 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesReboot) ->set_container_path("volume"); persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW); - { - Future<TaskStatus> taskStarting; - Future<TaskStatus> taskRunning; - Future<TaskStatus> taskFinished; + Future<Nothing> taskFinished; + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_STARTING))); + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_RUNNING))); + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_FINISHED))) + .WillOnce(FutureSatisfy(&taskFinished)); - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&taskStarting)) - .WillOnce(FutureArg<1>(&taskRunning)) - .WillOnce(FutureArg<1>(&taskFinished)); + EXPECT_CALL( + sched, resourceOffers(&driver, OffersHaveResource(persistentVolume))) + .WillOnce(FutureArg<1>(&offers)); - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource( - persistentVolume))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&task1FinishedOffers)); + driver.acceptOffers( + {offer.id()}, + {CREATE(persistentVolume), + LAUNCH({createTask( + offer.slave_id(), + persistentVolume, + createCommandInfo("touch " + path::join("volume", "file")))})}); - driver.acceptOffers( - {volumeCreatedOffers->at(0).id()}, - {CREATE(persistentVolume), - LAUNCH({createTask( - volumeCreatedOffers->at(0).slave_id(), - persistentVolume, - createCommandInfo("touch " + path::join("volume", "file")))})}, - acceptFilters); + AWAIT_READY(taskFinished); - AWAIT_READY(taskStarting); - EXPECT_EQ(TASK_STARTING, taskStarting->state()); + AWAIT_READY(offers); - AWAIT_READY(taskRunning); - EXPECT_EQ(TASK_RUNNING, taskRunning->state()); + // Shutdown the agent and unmount all CSI volumes to simulate the 1st reboot. + EXPECT_CALL(sched, offerRescinded(_, _)); - AWAIT_READY(taskFinished); - EXPECT_EQ(TASK_FINISHED, taskFinished->state()); - } + slave->reset(); - AWAIT_READY(task1FinishedOffers); + const string csiRootDir = slave::paths::getCsiRootDir(slaveFlags.work_dir); + ASSERT_SOME(fs::unmountAll(csiRootDir)); - // Destruct the agent to shut down all containers. - EXPECT_CALL(sched, offerRescinded(_, _)); + // Inject the boot IDs to simulate the 1st reboot. + auto injectBootIds = [&](const string& bootId) { + ASSERT_SOME(os::write( + slave::paths::getBootIdPath( + slave::paths::getMetaRootDir(slaveFlags.work_dir)), + bootId)); - slave->reset(); + Try<list<string>> volumePaths = + csi::paths::getVolumePaths(csiRootDir, "*", "*"); + ASSERT_SOME(volumePaths); + ASSERT_FALSE(volumePaths->empty()); - // Modify the boot ID to simulate a reboot. - ASSERT_SOME(os::write( - slave::paths::getBootIdPath( - slave::paths::getMetaRootDir(slaveFlags.work_dir)), - "rebooted! ;)")); + foreach (const string& path, volumePaths.get()) { + Try<csi::paths::VolumePath> volumePath = + csi::paths::parseVolumePath(csiRootDir, path); + ASSERT_SOME(volumePath); - const string csiRootDir = slave::paths::getCsiRootDir(slaveFlags.work_dir); + const string volumeStatePath = csi::paths::getVolumeStatePath( + csiRootDir, volumePath->type, volumePath->name, volumePath->volumeId); + + Result<csi::state::VolumeState> volumeState = + slave::state::read<csi::state::VolumeState>(volumeStatePath); - Try<list<string>> volumePaths = - csi::paths::getVolumePaths(csiRootDir, "*", "*"); - ASSERT_SOME(volumePaths); - ASSERT_FALSE(volumePaths->empty()); - - foreach (const string& path, volumePaths.get()) { - Try<csi::paths::VolumePath> volumePath = - csi::paths::parseVolumePath(csiRootDir, path); - ASSERT_SOME(volumePath); - - const string volumeStatePath = csi::paths::getVolumeStatePath( - csiRootDir, - volumePath->type, - volumePath->name, - volumePath->volumeId); - - Result<csi::state::VolumeState> volumeState = - slave::state::read<csi::state::VolumeState>(volumeStatePath); - ASSERT_SOME(volumeState); - - if (volumeState->state() == csi::state::VolumeState::PUBLISHED) { - volumeState->set_boot_id("rebooted! ;)"); - ASSERT_SOME(slave::state::checkpoint(volumeStatePath, volumeState.get())); + ASSERT_SOME(volumeState); + + if (volumeState->state() == csi::state::VolumeState::PUBLISHED) { + volumeState->set_boot_id(bootId); + ASSERT_SOME( + slave::state::checkpoint(volumeStatePath, volumeState.get())); + } } - } + }; + + injectBootIds("1st reboot! ;)"); - // Unmount all CSI volumes to simulate a reboot. + // Restart the agent. + EXPECT_CALL( + sched, resourceOffers(&driver, OffersHaveResource(persistentVolume))) + .WillOnce(FutureArg<1>(&offers)); + + slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + + offer = offers->at(0); + + // Launch another task to read the file created by the previous task. + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_STARTING))); + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_RUNNING))); + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_FINISHED))) + .WillOnce(FutureSatisfy(&taskFinished)); + + EXPECT_CALL( + sched, resourceOffers(&driver, OffersHaveResource(persistentVolume))) + .WillOnce(FutureArg<1>(&offers)); + + driver.acceptOffers( + {offer.id()}, + {LAUNCH({createTask( + offer.slave_id(), + persistentVolume, + createCommandInfo("test -f " + path::join("volume", "file")))})}); + + AWAIT_READY(taskFinished); + + AWAIT_READY(offers); + + // Shutdown the agent and unmount all CSI volumes to simulate the 2nd reboot. + EXPECT_CALL(sched, offerRescinded(_, _)); + + slave->reset(); ASSERT_SOME(fs::unmountAll(csiRootDir)); + // Inject the boot IDs to simulate the 2nd reboot. + injectBootIds("2nd reboot! ;)"); + + // NOTE: We set up the resource provider with an extra storage pool, so that + // when the storage pool shows up, we know that the resource provider has + // finished reconciling storage pools and thus operations won't be dropped. + // + // To achieve this, we drop `SlaveRegisteredMessage`s other than the first one + // to avoid unexpected `UpdateSlaveMessage`s. Then, we also drop the first two + // `UpdateSlaveMessage`s (one sent after agent reregistration and one after + // resource provider reregistration) and wait for the third one, which should + // contain the extra storage pool. We let it fall through to trigger an offer + // allocation for the persistent volume. + // + // Since the extra storage pool is never used, we reject the offers if only + // the storage pool is presented. + // + // TODO(chhsiao): Remove this workaround once MESOS-9553 is done. + setupResourceProviderConfig(Gigabytes(5)); + + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(isStoragePool, lambda::_1, "test")))) + .WillRepeatedly(DeclineOffers(declineFilters)); + + // NOTE: The order of these expectations is reversed because Google Mock will + // search the expectations in reverse order. + Future<UpdateSlaveMessage> updateSlaveMessage = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + DROP_PROTOBUF(UpdateSlaveMessage(), _, _); + DROP_PROTOBUF(UpdateSlaveMessage(), _, _); + DROP_PROTOBUFS(SlaveReregisteredMessage(), _, _); + FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + // Restart the agent. - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource( - persistentVolume))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&slaveRecoveredOffers)); + EXPECT_CALL( + sched, resourceOffers(&driver, OffersHaveResource(persistentVolume))) + .WillOnce(FutureArg<1>(&offers)); slave = StartSlave(detector.get(), slaveFlags); ASSERT_SOME(slave); - AWAIT_READY(slaveRecoveredOffers); - ASSERT_FALSE(slaveRecoveredOffers->empty()); + AWAIT_READY(updateSlaveMessage); + ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size()); + ASSERT_FALSE(Resources( + updateSlaveMessage->resource_providers().providers(0).total_resources()) + .filter(std::bind(isStoragePool, lambda::_1, "test")) + .empty()); - // Launch another task to read the file that is created by the - // previous task on the persistent volume. - { - Future<TaskStatus> taskStarting; - Future<TaskStatus> taskRunning; - Future<TaskStatus> taskFinished; + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&taskStarting)) - .WillOnce(FutureArg<1>(&taskRunning)) - .WillOnce(FutureArg<1>(&taskFinished)); + offer = offers->at(0); - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource( - persistentVolume))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&task2FinishedOffers)); + // Destroy the persistent volume. + Future<UpdateOperationStatusMessage> updateOperationStatusMessage = + FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); - driver.acceptOffers( - {slaveRecoveredOffers->at(0).id()}, - {LAUNCH({createTask( - slaveRecoveredOffers->at(0).slave_id(), - persistentVolume, - createCommandInfo("test -f " + path::join("volume", "file")))})}, - acceptFilters); + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(created))) + .WillOnce(FutureArg<1>(&offers)); - AWAIT_READY(taskStarting); - EXPECT_EQ(TASK_STARTING, taskStarting->state()); + // TODO(chhsiao): We use the following filter so that the resources will not + // be filtered for 5 seconds (the default) because of MESOS-9616. Remove the + // filter once it is resolved. + Filters acceptFilters; + acceptFilters.set_refuse_seconds(0); + driver.acceptOffers({offer.id()}, {DESTROY(persistentVolume)}, acceptFilters); - AWAIT_READY(taskRunning); - EXPECT_EQ(TASK_RUNNING, taskRunning->state()); + // NOTE: Since `DESTROY` would be applied by the master synchronously, we + // might get an offer before the persistent volume is cleaned up on the agent, + // so we wait for an `UpdateOperationStatusMessage` before the check below. + AWAIT_READY(updateOperationStatusMessage); + EXPECT_EQ(OPERATION_FINISHED, updateOperationStatusMessage->status().state()); - AWAIT_READY(taskFinished); - EXPECT_EQ(TASK_FINISHED, taskFinished->state()); + // Check if the CSI volume still exists but has being cleaned up. + Option<string> volumePath; + + foreach (const Label& label, created.disk().source().metadata().labels()) { + if (label.key() == "path") { + volumePath = label.value(); + break; + } } - AWAIT_READY(task2FinishedOffers); + ASSERT_SOME(volumePath); + EXPECT_TRUE(os::exists(volumePath.get())); + EXPECT_FALSE(os::exists(path::join(volumePath.get(), "file"))); - // Destroy the persistent volume and the CSI volume. - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(source.get()))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&volumeDestroyedOffers)); + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); - driver.acceptOffers( - {task2FinishedOffers->at(0).id()}, - {DESTROY(persistentVolume), - DESTROY_DISK(volume.get())}, - acceptFilters); + offer = offers->at(0); - AWAIT_READY(volumeDestroyedOffers); - ASSERT_FALSE(volumeDestroyedOffers->empty()); + // Destroy the MOUNT disk. + updateOperationStatusMessage = + FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); - // Check if the volume is actually deleted by the test CSI plugin. + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(raw))) + .WillOnce(FutureArg<1>(&offers)); + + driver.acceptOffers({offer.id()}, {DESTROY_DISK(created)}); + + AWAIT_READY(updateOperationStatusMessage); + EXPECT_EQ(OPERATION_FINISHED, updateOperationStatusMessage->status().state()); + + // Check if the CSI volume is actually deleted. EXPECT_FALSE(os::exists(volumePath.get())); + + AWAIT_READY(offers); }
