Repository: mesos
Updated Branches:
  refs/heads/master b002f0e5f -> 38ab7cb0c


Added metrics for CSI plugin terminations.

Review: https://reviews.apache.org/r/65491


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/38ab7cb0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/38ab7cb0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/38ab7cb0

Branch: refs/heads/master
Commit: 38ab7cb0c11087bb042fb81b67923d7994581139
Parents: b002f0e
Author: Jie Yu <yujie....@gmail.com>
Authored: Fri Feb 2 14:38:00 2018 -0800
Committer: Jie Yu <yujie....@gmail.com>
Committed: Fri Feb 2 15:48:49 2018 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp      |  43 ++++++-
 .../storage_local_resource_provider_tests.cpp   | 119 ++++++++++++++++++-
 2 files changed, 159 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/38ab7cb0/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp 
b/src/resource_provider/storage/provider.cpp
index 163ce7f..604cadf 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -33,6 +33,9 @@
 #include <process/sequence.hpp>
 #include <process/timeout.hpp>
 
+#include <process/metrics/counter.hpp>
+#include <process/metrics/metrics.hpp>
+
 #include <mesos/resources.hpp>
 #include <mesos/type_utils.hpp>
 
@@ -100,6 +103,8 @@ using process::undiscardable;
 
 using process::http::authentication::Principal;
 
+using process::metrics::Counter;
+
 using mesos::internal::protobuf::convertLabelsToStringMap;
 using mesos::internal::protobuf::convertStringMapToLabels;
 
@@ -300,7 +305,8 @@ public:
       strict(_strict),
       reconciling(false),
       resourceVersion(id::UUID::random()),
-      operationSequence("operation-sequence")
+      operationSequence("operation-sequence"),
+      metrics("resource_providers/" + info.type() + "." + info.name() + "/")
   {
     diskProfileAdaptor = DiskProfileAdaptor::getAdaptor();
     CHECK_NOTNULL(diskProfileAdaptor.get());
@@ -472,6 +478,15 @@ private:
   // creation or destroy. These operations will not be sequentialized
   // through the sequence. It is simply used to wait for them to finish.
   Sequence operationSequence;
+
+  struct Metrics
+  {
+    explicit Metrics(const string& prefix);
+    ~Metrics();
+
+    Counter csi_controller_plugin_terminations;
+    Counter csi_node_plugin_terminations;
+  } metrics;
 };
 
 
@@ -1806,6 +1821,14 @@ Future<csi::Client> 
StorageLocalResourceProviderProcess::getService(
           }));
       })),
       std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> {
+        if (containerId == controllerContainerId) {
+          metrics.csi_controller_plugin_terminations++;
+        }
+
+        if (containerId == nodeContainerId) {
+          metrics.csi_node_plugin_terminations++;
+        }
+
         services.at(containerId)->discard();
         services.at(containerId).reset(new Promise<csi::Client>());
 
@@ -3133,6 +3156,24 @@ void 
StorageLocalResourceProviderProcess::sendOperationStatusUpdate(
 }
 
 
+StorageLocalResourceProviderProcess::Metrics::Metrics(const string& prefix)
+  : csi_controller_plugin_terminations(
+        prefix + "csi_controller_plugin_terminations"),
+    csi_node_plugin_terminations(
+        prefix + "csi_node_plugin_terminations")
+{
+  process::metrics::add(csi_controller_plugin_terminations);
+  process::metrics::add(csi_node_plugin_terminations);
+}
+
+
+StorageLocalResourceProviderProcess::Metrics::~Metrics()
+{
+  process::metrics::remove(csi_controller_plugin_terminations);
+  process::metrics::remove(csi_node_plugin_terminations);
+}
+
+
 Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create(
     const http::URL& url,
     const string& workDir,

http://git-wip-us.apache.org/repos/asf/mesos/blob/38ab7cb0/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp 
b/src/tests/storage_local_resource_provider_tests.cpp
index 85eaef8..2761701 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -59,6 +59,9 @@ namespace tests {
 constexpr char URI_DISK_PROFILE_ADAPTOR_NAME[] =
   "org_apache_mesos_UriDiskProfileAdaptor";
 
+constexpr char TEST_SLRP_TYPE[] = "org.apache.mesos.rp.local.storage";
+constexpr char TEST_SLRP_NAME[] = "test";
+
 
 class StorageLocalResourceProviderTest : public MesosTest
 {
@@ -131,8 +134,8 @@ public:
     Try<string> resourceProviderConfig = strings::format(
         R"~(
         {
-          "type": "org.apache.mesos.rp.local.storage",
-          "name": "test",
+          "type": "%s",
+          "name": "%s",
           "default_reservations": [
             {
               "type": "DYNAMIC",
@@ -165,6 +168,8 @@ public:
           }
         }
         )~",
+        TEST_SLRP_TYPE,
+        TEST_SLRP_NAME,
         testCsiPluginName,
         testCsiPluginPath,
         testCsiPluginPath,
@@ -2534,6 +2539,116 @@ TEST_F(
   driver.join();
 }
 
+
+// This test verifies that storage local resource provider metrics are
+// properly reported.
+TEST_F(StorageLocalResourceProviderTest, ROOT_Metrics)
+{
+  loadUriDiskProfileModule();
+
+  setupResourceProviderConfig(Gigabytes(4));
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.isolation = "filesystem/linux";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  slaveFlags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability.
+  vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+  SlaveInfo::Capability capability;
+  capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+  capabilities.push_back(capability);
+
+  slaveFlags.agent_features = SlaveCapabilities();
+  slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+      {capabilities.begin(), capabilities.end()});
+
+  slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  slave::Fetcher fetcher(slaveFlags);
+
+  Try<slave::MesosContainerizer*> _containerizer =
+    slave::MesosContainerizer::create(slaveFlags, false, &fetcher);
+
+  ASSERT_SOME(_containerizer);
+
+  Owned<slave::MesosContainerizer> containerizer(_containerizer.get());
+
+  // Since the local resource provider daemon is started after the agent
+  // is registered, it is guaranteed that the slave will send two
+  // `UpdateSlaveMessage`s, where the latter one contains resources from
+  // the storage local resource provider.
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlave2 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave1 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(
+      detector.get(),
+      containerizer.get(),
+      slaveFlags);
+
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(updateSlave1);
+  AWAIT_READY(updateSlave2);
+
+  const string prefix =
+    "resource_providers/" + stringify(TEST_SLRP_TYPE) +
+    "." + stringify(TEST_SLRP_NAME) + "/";
+
+  JSON::Object snapshot = Metrics();
+
+  ASSERT_NE(0, snapshot.values.count(
+      prefix + "csi_controller_plugin_terminations"));
+  EXPECT_EQ(0, snapshot.values.at(
+      prefix + "csi_controller_plugin_terminations"));
+  ASSERT_NE(0, snapshot.values.count(
+      prefix + "csi_node_plugin_terminations"));
+  EXPECT_EQ(0, snapshot.values.at(
+      prefix + "csi_node_plugin_terminations"));
+
+  // Get the ID of the CSI plugin container.
+  Future<hashset<ContainerID>> pluginContainers = containerizer->containers();
+
+  AWAIT_READY(pluginContainers);
+  ASSERT_EQ(1u, pluginContainers->size());
+
+  const ContainerID& pluginContainerId = *pluginContainers->begin();
+
+  Future<Nothing> pluginRestarted =
+    FUTURE_DISPATCH(_, &ContainerDaemonProcess::launchContainer);
+
+  // Kill the plugin container and wait for it to restart.
+  Future<int> pluginKilled = containerizer->status(pluginContainerId)
+    .then([](const ContainerStatus& status) {
+      return os::kill(status.executor_pid(), SIGKILL);
+    });
+
+  AWAIT_ASSERT_EQ(0, pluginKilled);
+  AWAIT_READY(pluginRestarted);
+
+  snapshot = Metrics();
+
+  ASSERT_NE(0, snapshot.values.count(
+      prefix + "csi_controller_plugin_terminations"));
+  EXPECT_EQ(1, snapshot.values.at(
+      prefix + "csi_controller_plugin_terminations"));
+  ASSERT_NE(0, snapshot.values.count(
+      prefix + "csi_node_plugin_terminations"));
+  EXPECT_EQ(1, snapshot.values.at(
+      prefix + "csi_node_plugin_terminations"));
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to