This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 9a6b3cb943fd1f8c9732cd5fb7d58a5b55c1460c
Author: Chun-Hung Hsiao <[email protected]>
AuthorDate: Wed Mar 27 21:40:33 2019 -0700

    Updated test `AgentRegisteredWithNewId` for better code coverage.
    
    This patch makes the `AgentRegisteredWithNewId` SLRP test to test the
    following scenarios:
    
      * Launch a task to access a volume imported through `CREATE_DISK`.
    
      * `CREATE_DISK` with a mismatched profile.
    
      * `DESTROY_DISK` with a `RAW` disk that has been published.
    
    Since now volumes need to be published, it becomes a ROOT test.
    
    Together with the `CreateDestroyPreprovisionedVolume` SLRP test, most
    scenarios of preprovisioned volumes are now covered.
    
    Review: https://reviews.apache.org/r/70331
---
 .../storage_local_resource_provider_tests.cpp      | 378 ++++++++++++++-------
 1 file changed, 251 insertions(+), 127 deletions(-)

diff --git a/src/tests/storage_local_resource_provider_tests.cpp 
b/src/tests/storage_local_resource_provider_tests.cpp
index ff60f11..0fbd602 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -35,6 +35,7 @@
 
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
+#include <stout/strings.hpp>
 #include <stout/uri.hpp>
 
 #include <stout/os/realpath.hpp>
@@ -1465,20 +1466,25 @@ TEST_F(StorageLocalResourceProviderTest, 
AgentFailoverPluginKilled)
 }
 
 
-// This test verifies that if an agent is registered with a new ID,
-// the ID of the resource provider would be changed as well, and any
-// created volume becomes a preprovisioned volume.
-TEST_F(StorageLocalResourceProviderTest, AgentRegisteredWithNewId)
+// This test verifies that if an agent is registered with a new ID, the 
resource
+// provider ID would change as well, and any created volume becomes a
+// preprovisioned volume. A framework should be able to either create a MOUNT
+// disk from a preprovisioned volume and use it, or destroy it directly.
+TEST_F(StorageLocalResourceProviderTest, ROOT_AgentRegisteredWithNewId)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
 
+  // We set up a 'good' and a 'bad' profile to test that `CREATE_DISK` only
+  // succeeds with the right profile. Ideally this should be tested in
+  // `CreateDestroyPreprovisionedVolume`, but since CSI v0 doesn't support
+  // validating a volume against parameters, we have to test it here.
   ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping(
-      {{"test1", JSON::Object{{"label", "foo"}}},
-       {"test2", None()}})));
+      {{"good", JSON::Object{{"label", "foo"}}},
+       {"bad", None()}})));
 
   loadUriDiskProfileAdaptorModule(profilesPath);
 
-  setupResourceProviderConfig(Gigabytes(2), None(), "label=foo");
+  setupResourceProviderConfig(Gigabytes(5), None(), "label=foo");
 
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
@@ -1506,92 +1512,124 @@ TEST_F(StorageLocalResourceProviderTest, 
AgentRegisteredWithNewId)
 
   EXPECT_CALL(sched, registered(&driver, _, _));
 
-  // The framework is expected to see the following offers in sequence:
-  //   1. One containing a RAW disk resource before the 1st `CREATE_DISK`.
-  //   2. One containing a MOUNT disk resource after the 1st `CREATE_DISK`.
-  //   3. One containing a RAW preprovisioned volume after the agent is
-  //      registered with a new ID.
-  //   4. One containing the same preprovisioned volume after the 2nd
-  //      `CREATE_DISK` that specifies a wrong profile.
-  //   5. One containing a MOUNT disk resource after the 3rd `CREATE_DISK` that
-  //      specifies the correct profile.
-  //
-  // We set up the expectations for these offers as the test progresses.
-  Future<vector<Offer>> rawDiskOffers;
-  Future<vector<Offer>> diskCreatedOffers;
-  Future<vector<Offer>> slaveRecoveredOffers;
-  Future<vector<Offer>> operationFailedOffers;
-  Future<vector<Offer>> diskRecoveredOffers;
-
-  // We use the following filter to filter offers that do not have
-  // wanted resources for 365 days (the maximum).
+  // We use the following filter to filter offers that do not have wanted
+  // resources for 365 days (the maximum).
   Filters declineFilters;
   declineFilters.set_refuse_seconds(Days(365).secs());
 
-  // Decline offers that contain only the agent's default resources.
+  // Decline unwanted offers, e.g., offers containing only agent-default
+  // resources.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillRepeatedly(DeclineOffers(declineFilters));
 
+  // We first wait for an offer containing a storage pool to exercise two
+  // `CREATE_DISK` operations. Since they are not done atomically, we might get
+  // subsequent offers containing a smaller storage pool after one is 
exercised,
+  // so we decline them.
+  Future<vector<Offer>> offers;
   EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
-      std::bind(isStoragePool<Resource>, lambda::_1, "test1"))))
-    .WillOnce(FutureArg<1>(&rawDiskOffers));
+      std::bind(isStoragePool<Resource>, lambda::_1, "good"))))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(DeclineOffers(declineFilters));
 
   driver.start();
 
-  AWAIT_READY(rawDiskOffers);
-  ASSERT_EQ(1u, rawDiskOffers->size());
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
 
-  Resource raw = *Resources(rawDiskOffers->at(0).resources())
-    .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test1"))
-    .begin();
+  Offer offer = offers->at(0);
 
-  // Create a MOUNT disk of profile 'test1'.
-  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
-      std::bind(isMountDisk<Resource>, lambda::_1, "test1"))))
-    .WillOnce(FutureArg<1>(&diskCreatedOffers));
+  Resources raw = Resources(offer.resources())
+    .filter(std::bind(isStoragePool<Resource>, lambda::_1, "good"));
 
-  driver.acceptOffers(
-      {rawDiskOffers->at(0).id()},
-      {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)});
+  // Create a 2GB and a 3GB MOUNT disks.
+  ASSERT_SOME_EQ(Gigabytes(5), raw.disk());
+  Resource source1 = *raw.begin();
+  source1.mutable_scalar()->set_value(
+      static_cast<double>(Gigabytes(2).bytes()) / Bytes::MEGABYTES);
 
-  AWAIT_READY(diskCreatedOffers);
-  ASSERT_EQ(1u, diskCreatedOffers->size());
+  raw -= source1;
+  ASSERT_SOME_EQ(Gigabytes(3), raw.disk());
+  Resource source2 = *raw.begin();
 
-  Resource created = *Resources(diskCreatedOffers->at(0).resources())
-    .filter(std::bind(isMountDisk<Resource>, lambda::_1, "test1"))
-    .begin();
+  // After the following operations are completed, we would get an offer
+  // containing __two__ MOUNT disks: one 2GB and one 3GB.
+  EXPECT_CALL(sched, resourceOffers(&driver, AllOf(
+      OffersHaveAnyResource([](const Resource& r) {
+        return isMountDisk(r, "good") &&
+          Megabytes(r.scalar().value()) == Gigabytes(2);
+      }),
+      OffersHaveAnyResource([](const Resource& r) {
+        return isMountDisk(r, "good") &&
+          Megabytes(r.scalar().value()) == Gigabytes(3);
+      }))))
+    .WillOnce(FutureArg<1>(&offers));
 
-  ASSERT_TRUE(created.has_provider_id());
-  ASSERT_TRUE(created.disk().source().has_vendor());
-  EXPECT_EQ(TEST_CSI_VENDOR, created.disk().source().vendor());
-  ASSERT_TRUE(created.disk().source().has_id());
-  ASSERT_TRUE(created.disk().source().has_metadata());
-  ASSERT_TRUE(created.disk().source().has_mount());
-  ASSERT_TRUE(created.disk().source().mount().has_root());
-  EXPECT_FALSE(path::absolute(created.disk().source().mount().root()));
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE_DISK(source1, Resource::DiskInfo::Source::MOUNT),
+       CREATE_DISK(source2, Resource::DiskInfo::Source::MOUNT)});
 
-  // Check if the volume is actually created by the test CSI plugin.
-  Option<string> volumePath;
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
 
-  foreach (const Label& label, created.disk().source().metadata().labels()) {
-    if (label.key() == "path") {
-      volumePath = label.value();
-      break;
-    }
+  offer = offers->at(0);
+
+  std::vector<Resource> created = google::protobuf::convert<Resource>(
+      Resources(offer.resources())
+        .filter(std::bind(isMountDisk<Resource>, lambda::_1, "good")));
+
+  ASSERT_EQ(2u, created.size());
+
+  // Create persistent MOUNT volumes then launch a tasks to write files.
+  Resources persistentVolumes;
+  hashmap<string, ResourceProviderID> sourceIdToProviderId;
+  const vector<string> containerPaths = {"volume1", "volume2"};
+  for (size_t i = 0; i < created.size(); i++) {
+    const Resource& volume = created[i];
+    ASSERT_TRUE(volume.has_provider_id());
+    sourceIdToProviderId.put(volume.disk().source().id(), 
volume.provider_id());
+
+    Resource persistentVolume = volume;
+    persistentVolume.mutable_disk()->mutable_persistence()
+      ->set_id(id::UUID::random().toString());
+    persistentVolume.mutable_disk()->mutable_persistence()
+      ->set_principal(framework.principal());
+    persistentVolume.mutable_disk()->mutable_volume()
+      ->set_container_path(containerPaths[i]);
+    persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW);
+
+    persistentVolumes += persistentVolume;
   }
 
-  ASSERT_SOME(volumePath);
-  EXPECT_TRUE(os::exists(volumePath.get()));
+  Future<Nothing> taskFinished;
+  EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_STARTING)));
+  EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_RUNNING)));
+  EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_FINISHED)))
+    .WillOnce(FutureSatisfy(&taskFinished));
+
+  // After the task is finished, we would get an offer containing the two
+  // persistent MOUNT volumes. We just match any one here for simplicity.
+  EXPECT_CALL( sched, resourceOffers(&driver, OffersHaveAnyResource(
+      Resources::isPersistentVolume)))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE(persistentVolumes),
+       LAUNCH({createTask(
+           offer.slave_id(),
+           persistentVolumes,
+           createCommandInfo(
+               "touch " + path::join(containerPaths[0], "file") + " " +
+               path::join(containerPaths[1], "file")))})});
+
+  AWAIT_READY(taskFinished);
+
+  AWAIT_READY(offers);
 
   // Shut down the agent.
-  //
-  // NOTE: In addition to the last offer being rescinded, the master may send
-  // an offer after receiving an `UpdateSlaveMessage` containing only the
-  // preprovisioned volume, and then receive another `UpdateSlaveMessage`
-  // containing both the volume and a 'test1' storage pool of before the offer
-  // gets declined. In this case, the offer will be rescinded as well.
-  EXPECT_CALL(sched, offerRescinded(&driver, _))
-    .Times(Between(1, 2));
+  EXPECT_CALL(sched, offerRescinded(&driver, _));
 
   slave.get()->terminate();
 
@@ -1599,90 +1637,176 @@ TEST_F(StorageLocalResourceProviderTest, 
AgentRegisteredWithNewId)
   const string metaDir = slave::paths::getMetaRootDir(slaveFlags.work_dir);
   ASSERT_SOME(os::rm(slave::paths::getLatestSlavePath(metaDir)));
 
-  // NOTE: We setup up the resource provider with an extra storage pool, so 
that
-  // when the storage pool is offered, we know that the corresponding profile 
is
-  // known to the resource provider.
-  setupResourceProviderConfig(Gigabytes(4), None(), "label=foo");
+  // NOTE: We set up the resource provider with an extra storage pool, so that
+  // when the storage pool shows up, we know that the resource provider has
+  // finished reconciling storage pools and thus operations won't be dropped.
+  //
+  // To achieve this, we drop `SlaveRegisteredMessage`s other than the first 
one
+  // to avoid unexpected `UpdateSlaveMessage`s. Then, we also drop the first 
two
+  // `UpdateSlaveMessage`s (one sent after agent registration and one after
+  // resource provider registration) and wait for the third one, which should
+  // contain the extra storage pool. We let it fall through to trigger an offer
+  // allocation for the persistent volume.
+  //
+  // TODO(chhsiao): Remove this workaround once MESOS-9553 is done.
+  setupResourceProviderConfig(Gigabytes(6), None(), "label=foo");
 
-  // A new registration would trigger another `SlaveRegisteredMessage`.
-  slaveRegisteredMessage =
+  // NOTE: The order of these expectations is reversed because Google Mock will
+  // search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  DROP_PROTOBUF(UpdateSlaveMessage(), _, _);
+  DROP_PROTOBUF(UpdateSlaveMessage(), _, _);
+  DROP_PROTOBUFS(SlaveRegisteredMessage(), _, _);
+
+  Future<SlaveRegisteredMessage> newSlaveRegisteredMessage =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, Not(slave.get()->pid));
 
-  // NOTE: Instead of expecting a preprovisioned volume, we expect an offer 
with
-  // a 'test1' storage pool as an indication that the profile is known to the
-  // resource provider. The offer should also have the preprovisioned volume.
-  // But, an extra offer with the storage pool may be received as a side effect
-  // of this workaround, so we decline it if this happens.
+  // After the new agent starts, we wait for an offer containing the two
+  // preprovisioned volumes to exercise two `CREATE_DISK` operations. Since one
+  // of them would fail, we might get subsequent offers containing a
+  // preprovisioned volumes afterwards, so we decline them.
   EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
-      std::bind(isStoragePool<Resource>, lambda::_1, "test1"))))
-    .WillOnce(FutureArg<1>(&slaveRecoveredOffers))
+      isPreprovisionedVolume<Resource>)))
+    .WillOnce(FutureArg<1>(&offers))
     .WillRepeatedly(DeclineOffers(declineFilters));
 
   slave = StartSlave(detector.get(), slaveFlags);
   ASSERT_SOME(slave);
 
-  AWAIT_READY(slaveRegisteredMessage);
+  AWAIT_READY(newSlaveRegisteredMessage);
+  ASSERT_NE(slaveRegisteredMessage->slave_id(),
+            newSlaveRegisteredMessage->slave_id());
 
-  AWAIT_READY(slaveRecoveredOffers);
-  ASSERT_EQ(1u, slaveRecoveredOffers->size());
+  AWAIT_READY(updateSlaveMessage);
+  ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
+  ASSERT_FALSE(Resources(
+      updateSlaveMessage->resource_providers().providers(0).total_resources())
+    .filter(std::bind(isStoragePool<Resource>, lambda::_1, "good"))
+    .empty());
 
-  Resources _preprovisioned = 
Resources(slaveRecoveredOffers->at(0).resources())
-    .filter(isPreprovisionedVolume<Resource>);
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
 
-  ASSERT_SOME_EQ(Gigabytes(2), _preprovisioned.disk());
+  offer = offers->at(0);
 
-  Resource preprovisioned = *_preprovisioned.begin();
-  ASSERT_TRUE(preprovisioned.has_provider_id());
-  ASSERT_NE(created.provider_id(), preprovisioned.provider_id());
-  ASSERT_EQ(created.disk().source().id(), preprovisioned.disk().source().id());
-  ASSERT_EQ(
-      created.disk().source().metadata(),
-      preprovisioned.disk().source().metadata());
+  vector<Resource> preprovisioned = google::protobuf::convert<Resource>(
+      Resources(offer.resources()).filter(isPreprovisionedVolume<Resource>));
 
-  // Apply profile 'test2' to the preprovisioned volume, which will fail.
-  EXPECT_CALL(
-      sched, resourceOffers(&driver, OffersHaveResource(preprovisioned)))
-    .WillOnce(FutureArg<1>(&operationFailedOffers));
+  ASSERT_EQ(2u, preprovisioned.size());
 
-  Future<UpdateOperationStatusMessage> operationFailedStatus =
-    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+  // Check that the resource provider IDs of the volumes have changed.
+  vector<string> volumePaths;
+  foreach (const Resource& volume, preprovisioned) {
+    ASSERT_TRUE(volume.has_provider_id());
+    ASSERT_TRUE(sourceIdToProviderId.contains(volume.disk().source().id()));
+    ASSERT_NE(sourceIdToProviderId.at(volume.disk().source().id()),
+              volume.provider_id());
 
-  driver.acceptOffers(
-      {slaveRecoveredOffers->at(0).id()},
-      {CREATE_DISK(
-          preprovisioned, Resource::DiskInfo::Source::MOUNT, "test2")});
+    Option<string> volumePath;
+    foreach (const Label& label, volume.disk().source().metadata().labels()) {
+      if (label.key() == "path") {
+        volumePath = label.value();
+        break;
+      }
+    }
 
-  AWAIT_READY(operationFailedStatus);
-  EXPECT_EQ(OPERATION_FAILED, operationFailedStatus->status().state());
+    ASSERT_SOME(volumePath);
+    ASSERT_TRUE(os::exists(volumePath.get()));
+    volumePaths.push_back(volumePath.get());
+  }
 
-  AWAIT_READY(operationFailedOffers);
-  ASSERT_EQ(1u, operationFailedOffers->size());
+  // Apply profiles 'good' and 'bad' to the preprovisioned volumes. The first
+  // operation would succeed but the second would fail.
+  Future<multiset<OperationState>> createDiskOperationStates =
+    process::collect(
+        FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _),
+        FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _))
+      .then([](const std::tuple<
+                UpdateOperationStatusMessage,
+                UpdateOperationStatusMessage>& operationStatuses) {
+        return multiset<OperationState>{
+          std::get<0>(operationStatuses).status().state(),
+          std::get<1>(operationStatuses).status().state()};
+      });
 
-  // Apply profile 'test1' to the preprovisioned volume, which will succeed.
-  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
-      std::bind(isMountDisk<Resource>, lambda::_1, "test1"))))
-    .WillOnce(FutureArg<1>(&diskRecoveredOffers));
+  // After both operations are completed, we would get an offer containing both
+  // a MOUNT disk and a preprovisioned volume.
+  EXPECT_CALL(sched, resourceOffers(&driver, AllOf(
+      OffersHaveAnyResource(
+          std::bind(isMountDisk<Resource>, lambda::_1, "good")),
+      OffersHaveResource(preprovisioned[1]))))
+    .WillOnce(FutureArg<1>(&offers));
 
   driver.acceptOffers(
-      {operationFailedOffers->at(0).id()},
+      {offer.id()},
       {CREATE_DISK(
-          preprovisioned, Resource::DiskInfo::Source::MOUNT, "test1")});
+           preprovisioned[0], Resource::DiskInfo::Source::MOUNT, "good"),
+       CREATE_DISK(
+           preprovisioned[1], Resource::DiskInfo::Source::MOUNT, "bad")});
 
-  AWAIT_READY(diskRecoveredOffers);
-  ASSERT_EQ(1u, diskRecoveredOffers->size());
+  multiset<OperationState> expectedOperationStates = {
+    OperationState::OPERATION_FINISHED,
+    OperationState::OPERATION_FAILED};
 
-  Resource recovered = *Resources(diskRecoveredOffers->at(0).resources())
-    .filter(std::bind(isMountDisk<Resource>, lambda::_1, "test1"))
+  AWAIT_EXPECT_EQ(expectedOperationStates, createDiskOperationStates);
+
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
+
+  offer = offers->at(0);
+
+  Resource imported = *Resources(offer.resources())
+    .filter(std::bind(isMountDisk<Resource>, lambda::_1, "good"))
     .begin();
 
-  ASSERT_EQ(preprovisioned.provider_id(), recovered.provider_id());
+  // Create persistent MOUNT volumes from the imported volume then launch a
+  // tasks to write files, and destroy the other unimported volume.
+  imported.mutable_disk()->mutable_persistence()
+    ->set_id(id::UUID::random().toString());
+  imported.mutable_disk()->mutable_persistence()
+    ->set_principal(framework.principal());
+  imported.mutable_disk()->mutable_volume()->set_container_path("volume");
+  imported.mutable_disk()->mutable_volume()->set_mode(Volume::RW);
 
-  ASSERT_EQ(
-      preprovisioned.disk().source().id(), recovered.disk().source().id());
+  EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_STARTING)));
+  EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_RUNNING)));
+  EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_FINISHED)))
+    .WillOnce(FutureSatisfy(&taskFinished));
 
-  ASSERT_EQ(
-      preprovisioned.disk().source().metadata(),
-      recovered.disk().source().metadata());
+  Future<UpdateOperationStatusMessage> destroyDiskOperationStatus =
+    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+  // After the task is finished and the operation is completed, we would get an
+  // offer the persistent volume and a storage pool that has the freed space.
+  EXPECT_CALL(sched, resourceOffers(&driver, AllOf(
+      OffersHaveResource(imported),
+      OffersHaveAnyResource([](const Resource& r) {
+        return isStoragePool(r, "good") &&
+          Megabytes(r.scalar().value()) >= Gigabytes(2);
+      }))))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE(imported),
+       LAUNCH({createTask(
+           offer.slave_id(),
+           imported,
+           createCommandInfo("test -f " + path::join("volume", "file")))}),
+       DESTROY_DISK(preprovisioned[1])});
+
+  AWAIT_READY(taskFinished);
+
+  AWAIT_READY(destroyDiskOperationStatus);
+  EXPECT_EQ(OperationState::OPERATION_FINISHED,
+            destroyDiskOperationStatus->status().state());
+
+  AWAIT_READY(offers);
+
+  // Check if the unimported volume is deleted by the test CSI plugin.
+  EXPECT_FALSE(os::exists(volumePaths[1]));
 }
 
 

Reply via email to