Added a unit test for SLRP operation state metrics. This patch adds the `ROOT_OperationStateMetrics` test that issues a `CREATE_VOLUME` followed by two `DESTROY_VOLUME`s. The first one will fail due to an out-of-band deletion of the actual volume, and the second one will fail due to modifying the resource version.
Review: https://reviews.apache.org/r/65666/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5bdea195 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5bdea195 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5bdea195 Branch: refs/heads/master Commit: 5bdea1951a63b92dacc4b97ca5dd8b2e86467f98 Parents: 70b407d Author: Chun-Hung Hsiao <[email protected]> Authored: Thu May 17 17:45:06 2018 -0700 Committer: Chun-Hung Hsiao <[email protected]> Committed: Thu May 31 18:29:56 2018 -0700 ---------------------------------------------------------------------- .../storage_local_resource_provider_tests.cpp | 301 +++++++++++++++++-- 1 file changed, 277 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/5bdea195/src/tests/storage_local_resource_provider_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index 04a75fc..3a2eec3 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -56,9 +56,9 @@ using mesos::master::detector::StandaloneMasterDetector; using process::Clock; using process::Future; using process::Owned; +using process::post; using testing::AtMost; -using testing::DoAll; using testing::Not; using testing::Sequence; @@ -265,6 +265,12 @@ public: ASSERT_SOME(write); } + string metricName(const string& basename) + { + return "resource_providers/" + stringify(TEST_SLRP_TYPE) + "." + + stringify(TEST_SLRP_NAME) + "/" + basename; + } + protected: Modules modules; vector<string> slaveWorkDirs; @@ -2783,9 +2789,9 @@ TEST_F( } -// This test verifies that storage local resource provider metrics are -// properly reported. -TEST_F(StorageLocalResourceProviderTest, ROOT_Metrics) +// This test verifies that storage local resource provider properly +// reports metrics related to CSI plugin terminations. +TEST_F(StorageLocalResourceProviderTest, ROOT_PluginTerminationMetrics) { setupResourceProviderConfig(Gigabytes(4)); @@ -2820,20 +2826,16 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_Metrics) AWAIT_READY(pluginConnected); - const string prefix = - "resource_providers/" + stringify(TEST_SLRP_TYPE) + - "." + stringify(TEST_SLRP_NAME) + "/"; - JSON::Object snapshot = Metrics(); - ASSERT_NE(0u, snapshot.values.count( - prefix + "csi_controller_plugin_terminations")); - EXPECT_EQ(0, snapshot.values.at( - prefix + "csi_controller_plugin_terminations")); - ASSERT_NE(0u, snapshot.values.count( - prefix + "csi_node_plugin_terminations")); - EXPECT_EQ(0, snapshot.values.at( - prefix + "csi_node_plugin_terminations")); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_controller_plugin_terminations"))); + EXPECT_EQ(0, snapshot.values.at(metricName( + "csi_controller_plugin_terminations"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_node_plugin_terminations"))); + EXPECT_EQ(0, snapshot.values.at(metricName( + "csi_node_plugin_terminations"))); // Get the ID of the CSI plugin container. Future<hashset<ContainerID>> pluginContainers = containerizer->containers(); @@ -2860,14 +2862,265 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_Metrics) snapshot = Metrics(); - ASSERT_NE(0u, snapshot.values.count( - prefix + "csi_controller_plugin_terminations")); - EXPECT_EQ(1, snapshot.values.at( - prefix + "csi_controller_plugin_terminations")); - ASSERT_NE(0u, snapshot.values.count( - prefix + "csi_node_plugin_terminations")); - EXPECT_EQ(1, snapshot.values.at( - prefix + "csi_node_plugin_terminations")); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_controller_plugin_terminations"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_controller_plugin_terminations"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "csi_node_plugin_terminations"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "csi_node_plugin_terminations"))); +} + + +// This test verifies that storage local resource provider properly +// reports metrics related to operation states. +// TODO(chhsiao): Currently there is no way to test the `pending` metric for +// operations since we have no control over the completion of an operation. Once +// we support out-of-band CSI plugins through domain sockets, we could test this +// metric against a mock CSI plugin. +TEST_F(StorageLocalResourceProviderTest, ROOT_OperationStateMetrics) +{ + loadUriDiskProfileAdaptorModule(); + + setupResourceProviderConfig(Gigabytes(4)); + setupDiskProfileMapping(); + + master::Flags masterFlags = CreateMasterFlags(); + masterFlags.allocation_interval = Milliseconds(50); + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.isolation = "filesystem/linux"; + + slaveFlags.resource_provider_config_dir = resourceProviderConfigDir; + slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME; + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + AWAIT_READY(slaveRegisteredMessage); + + // Register a framework to exercise operations. + FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO; + framework.set_roles(0, "storage"); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + // The framework is expected to see the following offers in sequence: + // 1. One containing a RAW disk resource before `CREATE_VOLUME`. + // 2. One containing a MOUNT disk resource after `CREATE_VOLUME`. + // 3. One containing the same MOUNT disk resource after a failed + // `DESTROY_VOLUME`. + // + // We set up the expectations for these offers as the test progresses. + Future<vector<Offer>> rawDiskOffers; + Future<vector<Offer>> volumeCreatedOffers; + Future<vector<Offer>> operationFailedOffers; + + Sequence offers; + + // We use the following filter to filter offers that do not have + // wanted resources for 365 days (the maximum). + Filters declineFilters; + declineFilters.set_refuse_seconds(Days(365).secs()); + + // Decline offers that contain only the agent's default resources. + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillRepeatedly(DeclineOffers(declineFilters)); + + // We are only interested in any storage pool or created volume which + // has a "volume-default" profile. + auto hasSourceType = []( + const Resource& r, + const Resource::DiskInfo::Source::Type& type) { + return r.has_disk() && + r.disk().has_source() && + r.disk().source().has_profile() && + r.disk().source().profile() == "volume-default" && + r.disk().source().type() == type; + }; + + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW)))) + .InSequence(offers) + .WillOnce(FutureArg<1>(&rawDiskOffers)); + + driver.start(); + + AWAIT_READY(rawDiskOffers); + ASSERT_FALSE(rawDiskOffers->empty()); + + Option<Resource> source; + + foreach (const Resource& resource, rawDiskOffers->at(0).resources()) { + if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) { + source = resource; + break; + } + } + + ASSERT_SOME(source); + + JSON::Object snapshot = Metrics(); + + ASSERT_NE(0u, snapshot.values.count(metricName( + "operations/create_volume/finished"))); + EXPECT_EQ(0, snapshot.values.at(metricName( + "operations/create_volume/finished"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "operations/destroy_volume/failed"))); + EXPECT_EQ(0, snapshot.values.at(metricName( + "operations/destroy_volume/failed"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "operations/destroy_volume/dropped"))); + EXPECT_EQ(0, snapshot.values.at(metricName( + "operations/destroy_volume/dropped"))); + + // Create a volume. + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT)))) + .InSequence(offers) + .WillOnce(FutureArg<1>(&volumeCreatedOffers)); + + // We use the following filter so that the resources will not be + // filtered for 5 seconds (the default). + Filters acceptFilters; + acceptFilters.set_refuse_seconds(0); + + driver.acceptOffers( + {rawDiskOffers->at(0).id()}, + {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)}, + acceptFilters); + + AWAIT_READY(volumeCreatedOffers); + ASSERT_FALSE(volumeCreatedOffers->empty()); + + Option<Resource> volume; + + foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) { + if (hasSourceType(resource, Resource::DiskInfo::Source::MOUNT)) { + volume = resource; + break; + } + } + + ASSERT_SOME(volume); + ASSERT_TRUE(volume->disk().source().has_id()); + ASSERT_TRUE(volume->disk().source().has_metadata()); + ASSERT_TRUE(volume->disk().source().has_mount()); + ASSERT_TRUE(volume->disk().source().mount().has_root()); + EXPECT_FALSE(path::absolute(volume->disk().source().mount().root())); + + snapshot = Metrics(); + + ASSERT_NE(0u, snapshot.values.count(metricName( + "operations/create_volume/finished"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "operations/create_volume/finished"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "operations/destroy_volume/failed"))); + EXPECT_EQ(0, snapshot.values.at(metricName( + "operations/destroy_volume/failed"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "operations/destroy_volume/dropped"))); + EXPECT_EQ(0, snapshot.values.at(metricName( + "operations/destroy_volume/dropped"))); + + // Remove the volume out of band to fail `DESTROY_VOLUME`. + Option<string> volumePath; + + foreach (const Label& label, volume->disk().source().metadata().labels()) { + if (label.key() == "path") { + volumePath = label.value(); + break; + } + } + + ASSERT_SOME(volumePath); + ASSERT_SOME(os::rmdir(volumePath.get())); + + // Destroy the created volume, which will fail. + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(volume.get()))) + .InSequence(offers) + .WillOnce(FutureArg<1>(&operationFailedOffers)) + .WillRepeatedly(DeclineOffers(declineFilters)); // Decline further offers. + + driver.acceptOffers( + {volumeCreatedOffers->at(0).id()}, + {DESTROY_VOLUME(volume.get())}, + acceptFilters); + + AWAIT_READY(operationFailedOffers); + ASSERT_FALSE(operationFailedOffers->empty()); + + snapshot = Metrics(); + + ASSERT_NE(0u, snapshot.values.count(metricName( + "operations/create_volume/finished"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "operations/create_volume/finished"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "operations/destroy_volume/failed"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "operations/destroy_volume/failed"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "operations/destroy_volume/dropped"))); + EXPECT_EQ(0, snapshot.values.at(metricName( + "operations/destroy_volume/dropped"))); + + // Destroy the volume again, which will be dropped this time. + Future<ApplyOperationMessage> applyOperationMessage = + DROP_PROTOBUF(ApplyOperationMessage(), _, _); + + driver.acceptOffers( + {operationFailedOffers->at(0).id()}, + {DESTROY_VOLUME(volume.get())}, + acceptFilters); + + AWAIT_READY(applyOperationMessage); + ASSERT_TRUE(applyOperationMessage + ->resource_version_uuid().has_resource_provider_id()); + + // Modify the resource version UUID to drop `DESTROY_VOLUME`. + Future<UpdateOperationStatusMessage> operationDroppedStatus = + FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); + + ApplyOperationMessage spoofedApplyOperationMessage = + applyOperationMessage.get(); + spoofedApplyOperationMessage.mutable_resource_version_uuid()->mutable_uuid() + ->set_value(id::UUID::random().toBytes()); + + post(master.get()->pid, slave.get()->pid, spoofedApplyOperationMessage); + + AWAIT_READY(operationDroppedStatus); + EXPECT_EQ(OPERATION_DROPPED, operationDroppedStatus->status().state()); + + snapshot = Metrics(); + + ASSERT_NE(0u, snapshot.values.count(metricName( + "operations/create_volume/finished"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "operations/create_volume/finished"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "operations/destroy_volume/failed"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "operations/destroy_volume/failed"))); + ASSERT_NE(0u, snapshot.values.count(metricName( + "operations/destroy_volume/dropped"))); + EXPECT_EQ(1, snapshot.values.at(metricName( + "operations/destroy_volume/dropped"))); }
