Repository: mesos Updated Branches: refs/heads/master b002f0e5f -> 38ab7cb0c
Added metrics for CSI plugin terminations. Review: https://reviews.apache.org/r/65491 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/38ab7cb0 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/38ab7cb0 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/38ab7cb0 Branch: refs/heads/master Commit: 38ab7cb0c11087bb042fb81b67923d7994581139 Parents: b002f0e Author: Jie Yu <[email protected]> Authored: Fri Feb 2 14:38:00 2018 -0800 Committer: Jie Yu <[email protected]> Committed: Fri Feb 2 15:48:49 2018 -0800 ---------------------------------------------------------------------- src/resource_provider/storage/provider.cpp | 43 ++++++- .../storage_local_resource_provider_tests.cpp | 119 ++++++++++++++++++- 2 files changed, 159 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/38ab7cb0/src/resource_provider/storage/provider.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index 163ce7f..604cadf 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -33,6 +33,9 @@ #include <process/sequence.hpp> #include <process/timeout.hpp> +#include <process/metrics/counter.hpp> +#include <process/metrics/metrics.hpp> + #include <mesos/resources.hpp> #include <mesos/type_utils.hpp> @@ -100,6 +103,8 @@ using process::undiscardable; using process::http::authentication::Principal; +using process::metrics::Counter; + using mesos::internal::protobuf::convertLabelsToStringMap; using mesos::internal::protobuf::convertStringMapToLabels; @@ -300,7 +305,8 @@ public: strict(_strict), reconciling(false), resourceVersion(id::UUID::random()), - operationSequence("operation-sequence") + operationSequence("operation-sequence"), + metrics("resource_providers/" + info.type() + "." + info.name() + "/") { diskProfileAdaptor = DiskProfileAdaptor::getAdaptor(); CHECK_NOTNULL(diskProfileAdaptor.get()); @@ -472,6 +478,15 @@ private: // creation or destroy. These operations will not be sequentialized // through the sequence. It is simply used to wait for them to finish. Sequence operationSequence; + + struct Metrics + { + explicit Metrics(const string& prefix); + ~Metrics(); + + Counter csi_controller_plugin_terminations; + Counter csi_node_plugin_terminations; + } metrics; }; @@ -1806,6 +1821,14 @@ Future<csi::Client> StorageLocalResourceProviderProcess::getService( })); })), std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> { + if (containerId == controllerContainerId) { + metrics.csi_controller_plugin_terminations++; + } + + if (containerId == nodeContainerId) { + metrics.csi_node_plugin_terminations++; + } + services.at(containerId)->discard(); services.at(containerId).reset(new Promise<csi::Client>()); @@ -3133,6 +3156,24 @@ void StorageLocalResourceProviderProcess::sendOperationStatusUpdate( } +StorageLocalResourceProviderProcess::Metrics::Metrics(const string& prefix) + : csi_controller_plugin_terminations( + prefix + "csi_controller_plugin_terminations"), + csi_node_plugin_terminations( + prefix + "csi_node_plugin_terminations") +{ + process::metrics::add(csi_controller_plugin_terminations); + process::metrics::add(csi_node_plugin_terminations); +} + + +StorageLocalResourceProviderProcess::Metrics::~Metrics() +{ + process::metrics::remove(csi_controller_plugin_terminations); + process::metrics::remove(csi_node_plugin_terminations); +} + + Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create( const http::URL& url, const string& workDir, http://git-wip-us.apache.org/repos/asf/mesos/blob/38ab7cb0/src/tests/storage_local_resource_provider_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index 85eaef8..2761701 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -59,6 +59,9 @@ namespace tests { constexpr char URI_DISK_PROFILE_ADAPTOR_NAME[] = "org_apache_mesos_UriDiskProfileAdaptor"; +constexpr char TEST_SLRP_TYPE[] = "org.apache.mesos.rp.local.storage"; +constexpr char TEST_SLRP_NAME[] = "test"; + class StorageLocalResourceProviderTest : public MesosTest { @@ -131,8 +134,8 @@ public: Try<string> resourceProviderConfig = strings::format( R"~( { - "type": "org.apache.mesos.rp.local.storage", - "name": "test", + "type": "%s", + "name": "%s", "default_reservations": [ { "type": "DYNAMIC", @@ -165,6 +168,8 @@ public: } } )~", + TEST_SLRP_TYPE, + TEST_SLRP_NAME, testCsiPluginName, testCsiPluginPath, testCsiPluginPath, @@ -2534,6 +2539,116 @@ TEST_F( driver.join(); } + +// This test verifies that storage local resource provider metrics are +// properly reported. +TEST_F(StorageLocalResourceProviderTest, ROOT_Metrics) +{ + loadUriDiskProfileModule(); + + setupResourceProviderConfig(Gigabytes(4)); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.isolation = "filesystem/linux"; + + // Disable HTTP authentication to simplify resource provider interactions. + slaveFlags.authenticate_http_readwrite = false; + + // Set the resource provider capability. + vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES(); + SlaveInfo::Capability capability; + capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER); + capabilities.push_back(capability); + + slaveFlags.agent_features = SlaveCapabilities(); + slaveFlags.agent_features->mutable_capabilities()->CopyFrom( + {capabilities.begin(), capabilities.end()}); + + slaveFlags.resource_provider_config_dir = resourceProviderConfigDir; + slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME; + + slave::Fetcher fetcher(slaveFlags); + + Try<slave::MesosContainerizer*> _containerizer = + slave::MesosContainerizer::create(slaveFlags, false, &fetcher); + + ASSERT_SOME(_containerizer); + + Owned<slave::MesosContainerizer> containerizer(_containerizer.get()); + + // Since the local resource provider daemon is started after the agent + // is registered, it is guaranteed that the slave will send two + // `UpdateSlaveMessage`s, where the latter one contains resources from + // the storage local resource provider. + // NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because + // Google Mock will search the expectations in reverse order. + Future<UpdateSlaveMessage> updateSlave2 = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + Future<UpdateSlaveMessage> updateSlave1 = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + Try<Owned<cluster::Slave>> slave = StartSlave( + detector.get(), + containerizer.get(), + slaveFlags); + + ASSERT_SOME(slave); + + AWAIT_READY(updateSlave1); + AWAIT_READY(updateSlave2); + + const string prefix = + "resource_providers/" + stringify(TEST_SLRP_TYPE) + + "." + stringify(TEST_SLRP_NAME) + "/"; + + JSON::Object snapshot = Metrics(); + + ASSERT_NE(0, snapshot.values.count( + prefix + "csi_controller_plugin_terminations")); + EXPECT_EQ(0, snapshot.values.at( + prefix + "csi_controller_plugin_terminations")); + ASSERT_NE(0, snapshot.values.count( + prefix + "csi_node_plugin_terminations")); + EXPECT_EQ(0, snapshot.values.at( + prefix + "csi_node_plugin_terminations")); + + // Get the ID of the CSI plugin container. + Future<hashset<ContainerID>> pluginContainers = containerizer->containers(); + + AWAIT_READY(pluginContainers); + ASSERT_EQ(1u, pluginContainers->size()); + + const ContainerID& pluginContainerId = *pluginContainers->begin(); + + Future<Nothing> pluginRestarted = + FUTURE_DISPATCH(_, &ContainerDaemonProcess::launchContainer); + + // Kill the plugin container and wait for it to restart. + Future<int> pluginKilled = containerizer->status(pluginContainerId) + .then([](const ContainerStatus& status) { + return os::kill(status.executor_pid(), SIGKILL); + }); + + AWAIT_ASSERT_EQ(0, pluginKilled); + AWAIT_READY(pluginRestarted); + + snapshot = Metrics(); + + ASSERT_NE(0, snapshot.values.count( + prefix + "csi_controller_plugin_terminations")); + EXPECT_EQ(1, snapshot.values.at( + prefix + "csi_controller_plugin_terminations")); + ASSERT_NE(0, snapshot.values.count( + prefix + "csi_node_plugin_terminations")); + EXPECT_EQ(1, snapshot.values.at( + prefix + "csi_node_plugin_terminations")); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
