This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 9a6b3cb943fd1f8c9732cd5fb7d58a5b55c1460c Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Wed Mar 27 21:40:33 2019 -0700 Updated test `AgentRegisteredWithNewId` for better code coverage. This patch makes the `AgentRegisteredWithNewId` SLRP test to test the following scenarios: * Launch a task to access a volume imported through `CREATE_DISK`. * `CREATE_DISK` with a mismatched profile. * `DESTROY_DISK` with a `RAW` disk that has been published. Since now volumes need to be published, it becomes a ROOT test. Together with the `CreateDestroyPreprovisionedVolume` SLRP test, most scenarios of preprovisioned volumes are now covered. Review: https://reviews.apache.org/r/70331 --- .../storage_local_resource_provider_tests.cpp | 378 ++++++++++++++------- 1 file changed, 251 insertions(+), 127 deletions(-) diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index ff60f11..0fbd602 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -35,6 +35,7 @@ #include <stout/foreach.hpp> #include <stout/hashmap.hpp> +#include <stout/strings.hpp> #include <stout/uri.hpp> #include <stout/os/realpath.hpp> @@ -1465,20 +1466,25 @@ TEST_F(StorageLocalResourceProviderTest, AgentFailoverPluginKilled) } -// This test verifies that if an agent is registered with a new ID, -// the ID of the resource provider would be changed as well, and any -// created volume becomes a preprovisioned volume. -TEST_F(StorageLocalResourceProviderTest, AgentRegisteredWithNewId) +// This test verifies that if an agent is registered with a new ID, the resource +// provider ID would change as well, and any created volume becomes a +// preprovisioned volume. A framework should be able to either create a MOUNT +// disk from a preprovisioned volume and use it, or destroy it directly. +TEST_F(StorageLocalResourceProviderTest, ROOT_AgentRegisteredWithNewId) { const string profilesPath = path::join(sandbox.get(), "profiles.json"); + // We set up a 'good' and a 'bad' profile to test that `CREATE_DISK` only + // succeeds with the right profile. Ideally this should be tested in + // `CreateDestroyPreprovisionedVolume`, but since CSI v0 doesn't support + // validating a volume against parameters, we have to test it here. ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping( - {{"test1", JSON::Object{{"label", "foo"}}}, - {"test2", None()}}))); + {{"good", JSON::Object{{"label", "foo"}}}, + {"bad", None()}}))); loadUriDiskProfileAdaptorModule(profilesPath); - setupResourceProviderConfig(Gigabytes(2), None(), "label=foo"); + setupResourceProviderConfig(Gigabytes(5), None(), "label=foo"); Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); @@ -1506,92 +1512,124 @@ TEST_F(StorageLocalResourceProviderTest, AgentRegisteredWithNewId) EXPECT_CALL(sched, registered(&driver, _, _)); - // The framework is expected to see the following offers in sequence: - // 1. One containing a RAW disk resource before the 1st `CREATE_DISK`. - // 2. One containing a MOUNT disk resource after the 1st `CREATE_DISK`. - // 3. One containing a RAW preprovisioned volume after the agent is - // registered with a new ID. - // 4. One containing the same preprovisioned volume after the 2nd - // `CREATE_DISK` that specifies a wrong profile. - // 5. One containing a MOUNT disk resource after the 3rd `CREATE_DISK` that - // specifies the correct profile. - // - // We set up the expectations for these offers as the test progresses. - Future<vector<Offer>> rawDiskOffers; - Future<vector<Offer>> diskCreatedOffers; - Future<vector<Offer>> slaveRecoveredOffers; - Future<vector<Offer>> operationFailedOffers; - Future<vector<Offer>> diskRecoveredOffers; - - // We use the following filter to filter offers that do not have - // wanted resources for 365 days (the maximum). + // We use the following filter to filter offers that do not have wanted + // resources for 365 days (the maximum). Filters declineFilters; declineFilters.set_refuse_seconds(Days(365).secs()); - // Decline offers that contain only the agent's default resources. + // Decline unwanted offers, e.g., offers containing only agent-default + // resources. EXPECT_CALL(sched, resourceOffers(&driver, _)) .WillRepeatedly(DeclineOffers(declineFilters)); + // We first wait for an offer containing a storage pool to exercise two + // `CREATE_DISK` operations. Since they are not done atomically, we might get + // subsequent offers containing a smaller storage pool after one is exercised, + // so we decline them. + Future<vector<Offer>> offers; EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( - std::bind(isStoragePool<Resource>, lambda::_1, "test1")))) - .WillOnce(FutureArg<1>(&rawDiskOffers)); + std::bind(isStoragePool<Resource>, lambda::_1, "good")))) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(DeclineOffers(declineFilters)); driver.start(); - AWAIT_READY(rawDiskOffers); - ASSERT_EQ(1u, rawDiskOffers->size()); + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); - Resource raw = *Resources(rawDiskOffers->at(0).resources()) - .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test1")) - .begin(); + Offer offer = offers->at(0); - // Create a MOUNT disk of profile 'test1'. - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( - std::bind(isMountDisk<Resource>, lambda::_1, "test1")))) - .WillOnce(FutureArg<1>(&diskCreatedOffers)); + Resources raw = Resources(offer.resources()) + .filter(std::bind(isStoragePool<Resource>, lambda::_1, "good")); - driver.acceptOffers( - {rawDiskOffers->at(0).id()}, - {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)}); + // Create a 2GB and a 3GB MOUNT disks. + ASSERT_SOME_EQ(Gigabytes(5), raw.disk()); + Resource source1 = *raw.begin(); + source1.mutable_scalar()->set_value( + static_cast<double>(Gigabytes(2).bytes()) / Bytes::MEGABYTES); - AWAIT_READY(diskCreatedOffers); - ASSERT_EQ(1u, diskCreatedOffers->size()); + raw -= source1; + ASSERT_SOME_EQ(Gigabytes(3), raw.disk()); + Resource source2 = *raw.begin(); - Resource created = *Resources(diskCreatedOffers->at(0).resources()) - .filter(std::bind(isMountDisk<Resource>, lambda::_1, "test1")) - .begin(); + // After the following operations are completed, we would get an offer + // containing __two__ MOUNT disks: one 2GB and one 3GB. + EXPECT_CALL(sched, resourceOffers(&driver, AllOf( + OffersHaveAnyResource([](const Resource& r) { + return isMountDisk(r, "good") && + Megabytes(r.scalar().value()) == Gigabytes(2); + }), + OffersHaveAnyResource([](const Resource& r) { + return isMountDisk(r, "good") && + Megabytes(r.scalar().value()) == Gigabytes(3); + })))) + .WillOnce(FutureArg<1>(&offers)); - ASSERT_TRUE(created.has_provider_id()); - ASSERT_TRUE(created.disk().source().has_vendor()); - EXPECT_EQ(TEST_CSI_VENDOR, created.disk().source().vendor()); - ASSERT_TRUE(created.disk().source().has_id()); - ASSERT_TRUE(created.disk().source().has_metadata()); - ASSERT_TRUE(created.disk().source().has_mount()); - ASSERT_TRUE(created.disk().source().mount().has_root()); - EXPECT_FALSE(path::absolute(created.disk().source().mount().root())); + driver.acceptOffers( + {offer.id()}, + {CREATE_DISK(source1, Resource::DiskInfo::Source::MOUNT), + CREATE_DISK(source2, Resource::DiskInfo::Source::MOUNT)}); - // Check if the volume is actually created by the test CSI plugin. - Option<string> volumePath; + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); - foreach (const Label& label, created.disk().source().metadata().labels()) { - if (label.key() == "path") { - volumePath = label.value(); - break; - } + offer = offers->at(0); + + std::vector<Resource> created = google::protobuf::convert<Resource>( + Resources(offer.resources()) + .filter(std::bind(isMountDisk<Resource>, lambda::_1, "good"))); + + ASSERT_EQ(2u, created.size()); + + // Create persistent MOUNT volumes then launch a tasks to write files. + Resources persistentVolumes; + hashmap<string, ResourceProviderID> sourceIdToProviderId; + const vector<string> containerPaths = {"volume1", "volume2"}; + for (size_t i = 0; i < created.size(); i++) { + const Resource& volume = created[i]; + ASSERT_TRUE(volume.has_provider_id()); + sourceIdToProviderId.put(volume.disk().source().id(), volume.provider_id()); + + Resource persistentVolume = volume; + persistentVolume.mutable_disk()->mutable_persistence() + ->set_id(id::UUID::random().toString()); + persistentVolume.mutable_disk()->mutable_persistence() + ->set_principal(framework.principal()); + persistentVolume.mutable_disk()->mutable_volume() + ->set_container_path(containerPaths[i]); + persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW); + + persistentVolumes += persistentVolume; } - ASSERT_SOME(volumePath); - EXPECT_TRUE(os::exists(volumePath.get())); + Future<Nothing> taskFinished; + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_STARTING))); + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_RUNNING))); + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_FINISHED))) + .WillOnce(FutureSatisfy(&taskFinished)); + + // After the task is finished, we would get an offer containing the two + // persistent MOUNT volumes. We just match any one here for simplicity. + EXPECT_CALL( sched, resourceOffers(&driver, OffersHaveAnyResource( + Resources::isPersistentVolume))) + .WillOnce(FutureArg<1>(&offers)); + + driver.acceptOffers( + {offer.id()}, + {CREATE(persistentVolumes), + LAUNCH({createTask( + offer.slave_id(), + persistentVolumes, + createCommandInfo( + "touch " + path::join(containerPaths[0], "file") + " " + + path::join(containerPaths[1], "file")))})}); + + AWAIT_READY(taskFinished); + + AWAIT_READY(offers); // Shut down the agent. - // - // NOTE: In addition to the last offer being rescinded, the master may send - // an offer after receiving an `UpdateSlaveMessage` containing only the - // preprovisioned volume, and then receive another `UpdateSlaveMessage` - // containing both the volume and a 'test1' storage pool of before the offer - // gets declined. In this case, the offer will be rescinded as well. - EXPECT_CALL(sched, offerRescinded(&driver, _)) - .Times(Between(1, 2)); + EXPECT_CALL(sched, offerRescinded(&driver, _)); slave.get()->terminate(); @@ -1599,90 +1637,176 @@ TEST_F(StorageLocalResourceProviderTest, AgentRegisteredWithNewId) const string metaDir = slave::paths::getMetaRootDir(slaveFlags.work_dir); ASSERT_SOME(os::rm(slave::paths::getLatestSlavePath(metaDir))); - // NOTE: We setup up the resource provider with an extra storage pool, so that - // when the storage pool is offered, we know that the corresponding profile is - // known to the resource provider. - setupResourceProviderConfig(Gigabytes(4), None(), "label=foo"); + // NOTE: We set up the resource provider with an extra storage pool, so that + // when the storage pool shows up, we know that the resource provider has + // finished reconciling storage pools and thus operations won't be dropped. + // + // To achieve this, we drop `SlaveRegisteredMessage`s other than the first one + // to avoid unexpected `UpdateSlaveMessage`s. Then, we also drop the first two + // `UpdateSlaveMessage`s (one sent after agent registration and one after + // resource provider registration) and wait for the third one, which should + // contain the extra storage pool. We let it fall through to trigger an offer + // allocation for the persistent volume. + // + // TODO(chhsiao): Remove this workaround once MESOS-9553 is done. + setupResourceProviderConfig(Gigabytes(6), None(), "label=foo"); - // A new registration would trigger another `SlaveRegisteredMessage`. - slaveRegisteredMessage = + // NOTE: The order of these expectations is reversed because Google Mock will + // search the expectations in reverse order. + Future<UpdateSlaveMessage> updateSlaveMessage = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + DROP_PROTOBUF(UpdateSlaveMessage(), _, _); + DROP_PROTOBUF(UpdateSlaveMessage(), _, _); + DROP_PROTOBUFS(SlaveRegisteredMessage(), _, _); + + Future<SlaveRegisteredMessage> newSlaveRegisteredMessage = FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, Not(slave.get()->pid)); - // NOTE: Instead of expecting a preprovisioned volume, we expect an offer with - // a 'test1' storage pool as an indication that the profile is known to the - // resource provider. The offer should also have the preprovisioned volume. - // But, an extra offer with the storage pool may be received as a side effect - // of this workaround, so we decline it if this happens. + // After the new agent starts, we wait for an offer containing the two + // preprovisioned volumes to exercise two `CREATE_DISK` operations. Since one + // of them would fail, we might get subsequent offers containing a + // preprovisioned volumes afterwards, so we decline them. EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( - std::bind(isStoragePool<Resource>, lambda::_1, "test1")))) - .WillOnce(FutureArg<1>(&slaveRecoveredOffers)) + isPreprovisionedVolume<Resource>))) + .WillOnce(FutureArg<1>(&offers)) .WillRepeatedly(DeclineOffers(declineFilters)); slave = StartSlave(detector.get(), slaveFlags); ASSERT_SOME(slave); - AWAIT_READY(slaveRegisteredMessage); + AWAIT_READY(newSlaveRegisteredMessage); + ASSERT_NE(slaveRegisteredMessage->slave_id(), + newSlaveRegisteredMessage->slave_id()); - AWAIT_READY(slaveRecoveredOffers); - ASSERT_EQ(1u, slaveRecoveredOffers->size()); + AWAIT_READY(updateSlaveMessage); + ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size()); + ASSERT_FALSE(Resources( + updateSlaveMessage->resource_providers().providers(0).total_resources()) + .filter(std::bind(isStoragePool<Resource>, lambda::_1, "good")) + .empty()); - Resources _preprovisioned = Resources(slaveRecoveredOffers->at(0).resources()) - .filter(isPreprovisionedVolume<Resource>); + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); - ASSERT_SOME_EQ(Gigabytes(2), _preprovisioned.disk()); + offer = offers->at(0); - Resource preprovisioned = *_preprovisioned.begin(); - ASSERT_TRUE(preprovisioned.has_provider_id()); - ASSERT_NE(created.provider_id(), preprovisioned.provider_id()); - ASSERT_EQ(created.disk().source().id(), preprovisioned.disk().source().id()); - ASSERT_EQ( - created.disk().source().metadata(), - preprovisioned.disk().source().metadata()); + vector<Resource> preprovisioned = google::protobuf::convert<Resource>( + Resources(offer.resources()).filter(isPreprovisionedVolume<Resource>)); - // Apply profile 'test2' to the preprovisioned volume, which will fail. - EXPECT_CALL( - sched, resourceOffers(&driver, OffersHaveResource(preprovisioned))) - .WillOnce(FutureArg<1>(&operationFailedOffers)); + ASSERT_EQ(2u, preprovisioned.size()); - Future<UpdateOperationStatusMessage> operationFailedStatus = - FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); + // Check that the resource provider IDs of the volumes have changed. + vector<string> volumePaths; + foreach (const Resource& volume, preprovisioned) { + ASSERT_TRUE(volume.has_provider_id()); + ASSERT_TRUE(sourceIdToProviderId.contains(volume.disk().source().id())); + ASSERT_NE(sourceIdToProviderId.at(volume.disk().source().id()), + volume.provider_id()); - driver.acceptOffers( - {slaveRecoveredOffers->at(0).id()}, - {CREATE_DISK( - preprovisioned, Resource::DiskInfo::Source::MOUNT, "test2")}); + Option<string> volumePath; + foreach (const Label& label, volume.disk().source().metadata().labels()) { + if (label.key() == "path") { + volumePath = label.value(); + break; + } + } - AWAIT_READY(operationFailedStatus); - EXPECT_EQ(OPERATION_FAILED, operationFailedStatus->status().state()); + ASSERT_SOME(volumePath); + ASSERT_TRUE(os::exists(volumePath.get())); + volumePaths.push_back(volumePath.get()); + } - AWAIT_READY(operationFailedOffers); - ASSERT_EQ(1u, operationFailedOffers->size()); + // Apply profiles 'good' and 'bad' to the preprovisioned volumes. The first + // operation would succeed but the second would fail. + Future<multiset<OperationState>> createDiskOperationStates = + process::collect( + FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _), + FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _)) + .then([](const std::tuple< + UpdateOperationStatusMessage, + UpdateOperationStatusMessage>& operationStatuses) { + return multiset<OperationState>{ + std::get<0>(operationStatuses).status().state(), + std::get<1>(operationStatuses).status().state()}; + }); - // Apply profile 'test1' to the preprovisioned volume, which will succeed. - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( - std::bind(isMountDisk<Resource>, lambda::_1, "test1")))) - .WillOnce(FutureArg<1>(&diskRecoveredOffers)); + // After both operations are completed, we would get an offer containing both + // a MOUNT disk and a preprovisioned volume. + EXPECT_CALL(sched, resourceOffers(&driver, AllOf( + OffersHaveAnyResource( + std::bind(isMountDisk<Resource>, lambda::_1, "good")), + OffersHaveResource(preprovisioned[1])))) + .WillOnce(FutureArg<1>(&offers)); driver.acceptOffers( - {operationFailedOffers->at(0).id()}, + {offer.id()}, {CREATE_DISK( - preprovisioned, Resource::DiskInfo::Source::MOUNT, "test1")}); + preprovisioned[0], Resource::DiskInfo::Source::MOUNT, "good"), + CREATE_DISK( + preprovisioned[1], Resource::DiskInfo::Source::MOUNT, "bad")}); - AWAIT_READY(diskRecoveredOffers); - ASSERT_EQ(1u, diskRecoveredOffers->size()); + multiset<OperationState> expectedOperationStates = { + OperationState::OPERATION_FINISHED, + OperationState::OPERATION_FAILED}; - Resource recovered = *Resources(diskRecoveredOffers->at(0).resources()) - .filter(std::bind(isMountDisk<Resource>, lambda::_1, "test1")) + AWAIT_EXPECT_EQ(expectedOperationStates, createDiskOperationStates); + + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + + offer = offers->at(0); + + Resource imported = *Resources(offer.resources()) + .filter(std::bind(isMountDisk<Resource>, lambda::_1, "good")) .begin(); - ASSERT_EQ(preprovisioned.provider_id(), recovered.provider_id()); + // Create persistent MOUNT volumes from the imported volume then launch a + // tasks to write files, and destroy the other unimported volume. + imported.mutable_disk()->mutable_persistence() + ->set_id(id::UUID::random().toString()); + imported.mutable_disk()->mutable_persistence() + ->set_principal(framework.principal()); + imported.mutable_disk()->mutable_volume()->set_container_path("volume"); + imported.mutable_disk()->mutable_volume()->set_mode(Volume::RW); - ASSERT_EQ( - preprovisioned.disk().source().id(), recovered.disk().source().id()); + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_STARTING))); + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_RUNNING))); + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_FINISHED))) + .WillOnce(FutureSatisfy(&taskFinished)); - ASSERT_EQ( - preprovisioned.disk().source().metadata(), - recovered.disk().source().metadata()); + Future<UpdateOperationStatusMessage> destroyDiskOperationStatus = + FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); + + // After the task is finished and the operation is completed, we would get an + // offer the persistent volume and a storage pool that has the freed space. + EXPECT_CALL(sched, resourceOffers(&driver, AllOf( + OffersHaveResource(imported), + OffersHaveAnyResource([](const Resource& r) { + return isStoragePool(r, "good") && + Megabytes(r.scalar().value()) >= Gigabytes(2); + })))) + .WillOnce(FutureArg<1>(&offers)); + + driver.acceptOffers( + {offer.id()}, + {CREATE(imported), + LAUNCH({createTask( + offer.slave_id(), + imported, + createCommandInfo("test -f " + path::join("volume", "file")))}), + DESTROY_DISK(preprovisioned[1])}); + + AWAIT_READY(taskFinished); + + AWAIT_READY(destroyDiskOperationStatus); + EXPECT_EQ(OperationState::OPERATION_FINISHED, + destroyDiskOperationStatus->status().state()); + + AWAIT_READY(offers); + + // Check if the unimported volume is deleted by the test CSI plugin. + EXPECT_FALSE(os::exists(volumePaths[1])); }
