Repository: mesos
Updated Branches:
  refs/heads/master ab484fc1b -> 9c8eee35d


Added operation state metrics in SLRP.

This patch adds `operations_pending`, `operations_finished`,
`operations_failed`, and `operations_dropped` metrics to count the
occurances of these operation states.

Additionally, An error log in `_applyOperation()` is removed because the
error is already logged at the call site.

Review: https://reviews.apache.org/r/65665


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/70b407df
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/70b407df
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/70b407df

Branch: refs/heads/master
Commit: 70b407df0aaeef171014d3453c3efb762b2c21f6
Parents: ab484fc
Author: Chun-Hung Hsiao <[email protected]>
Authored: Thu May 17 17:44:29 2018 -0700
Committer: Chun-Hung Hsiao <[email protected]>
Committed: Thu May 31 18:29:55 2018 -0700

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 154 ++++++++++++++++++++++--
 1 file changed, 141 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/70b407df/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp 
b/src/resource_provider/storage/provider.cpp
index 2c7dd8d..542d3ae 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -35,6 +35,7 @@
 
 #include <process/metrics/counter.hpp>
 #include <process/metrics/metrics.hpp>
+#include <process/metrics/push_gauge.hpp>
 
 #include <mesos/resources.hpp>
 #include <mesos/type_utils.hpp>
@@ -103,6 +104,7 @@ using process::spawn;
 using process::http::authentication::Principal;
 
 using process::metrics::Counter;
+using process::metrics::PushGauge;
 
 using mesos::csi::state::VolumeState;
 
@@ -402,7 +404,7 @@ private:
   void dropOperation(
       const id::UUID& operationUuid,
       const Option<FrameworkID>& frameworkId,
-      const Option<OperationID>& operationId,
+      const Option<Offer::Operation>& info,
       const string& message);
 
   Future<vector<ResourceConversion>> applyCreateVolumeOrBlock(
@@ -494,8 +496,15 @@ private:
     explicit Metrics(const string& prefix);
     ~Metrics();
 
+    // CSI plugin metrics.
     Counter csi_controller_plugin_terminations;
     Counter csi_node_plugin_terminations;
+
+    // Operation state metrics.
+    hashmap<Offer::Operation::Type, PushGauge> operations_pending;
+    hashmap<Offer::Operation::Type, Counter> operations_finished;
+    hashmap<Offer::Operation::Type, Counter> operations_failed;
+    hashmap<Offer::Operation::Type, Counter> operations_dropped;
   } metrics;
 };
 
@@ -1210,6 +1219,26 @@ 
StorageLocalResourceProviderProcess::reconcileOperationStatuses()
       foreachpair (const id::UUID& uuid,
                    const Operation& operation,
                    operations) {
+        switch (operation.latest_status().state()) {
+          case OPERATION_PENDING:
+            ++metrics.operations_pending.at(operation.info().type());
+            break;
+          case OPERATION_FINISHED:
+            ++metrics.operations_finished.at(operation.info().type());
+            break;
+          case OPERATION_FAILED:
+            ++metrics.operations_failed.at(operation.info().type());
+            break;
+          case OPERATION_UNSUPPORTED:
+          case OPERATION_ERROR:
+          case OPERATION_DROPPED:
+          case OPERATION_UNREACHABLE:
+          case OPERATION_GONE_BY_OPERATOR:
+          case OPERATION_RECOVERING:
+          case OPERATION_UNKNOWN:
+            UNREACHABLE();
+        }
+
         if (protobuf::isTerminalState(operation.latest_status().state())) {
           continue;
         }
@@ -1419,14 +1448,12 @@ void 
StorageLocalResourceProviderProcess::applyOperation(
 
   Option<FrameworkID> frameworkId = operation.has_framework_id()
     ? operation.framework_id() : Option<FrameworkID>::none();
-  Option<OperationID> operationId = operation.info().has_id()
-    ? operation.info().id() : Option<OperationID>::none();
 
   if (state == SUBSCRIBED) {
     return dropOperation(
         uuid.get(),
         frameworkId,
-        operationId,
+        operation.info(),
         "Cannot apply operation in SUBSCRIBED state");
   }
 
@@ -1434,7 +1461,7 @@ void StorageLocalResourceProviderProcess::applyOperation(
     return dropOperation(
         uuid.get(),
         frameworkId,
-        operationId,
+        operation.info(),
         "Cannot apply operation when reconciling storage pools");
   }
 
@@ -1446,7 +1473,7 @@ void StorageLocalResourceProviderProcess::applyOperation(
     return dropOperation(
         uuid.get(),
         frameworkId,
-        operationId,
+        operation.info(),
         "Mismatched resource version " + stringify(operationVersion.get()) +
         " (expected: " + stringify(resourceVersion) + ")");
   }
@@ -1454,13 +1481,18 @@ void 
StorageLocalResourceProviderProcess::applyOperation(
   CHECK(!operations.contains(uuid.get()));
   operations[uuid.get()] = protobuf::createOperation(
       operation.info(),
-      protobuf::createOperationStatus(OPERATION_PENDING, operationId),
+      protobuf::createOperationStatus(
+          OPERATION_PENDING,
+          operation.info().has_id()
+            ? operation.info().id() : Option<OperationID>::none()),
       frameworkId,
       slaveId,
       protobuf::createUUID(uuid.get()));
 
   checkpointResourceProviderState();
 
+  ++metrics.operations_pending.at(operation.info().type());
+
   auto err = [](const id::UUID& uuid, const string& message) {
     LOG(ERROR)
       << "Failed to apply operation (uuid: " << uuid << "): " << message;
@@ -2872,10 +2904,6 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::_applyOperation(
           << "Applying conversion from '" << conversions->at(0).consumed
           << "' to '" << conversions->at(0).converted
           << "' for operation (uuid: " << operationUuid << ")";
-      } else {
-        LOG(ERROR)
-          << "Failed to apply operation (uuid: " << operationUuid << "): "
-          << conversions.error();
       }
 
       promise->associate(
@@ -2901,7 +2929,7 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::_applyOperation(
 void StorageLocalResourceProviderProcess::dropOperation(
     const id::UUID& operationUuid,
     const Option<FrameworkID>& frameworkId,
-    const Option<OperationID>& operationId,
+    const Option<Offer::Operation>& info,
     const string& message)
 {
   LOG(WARNING)
@@ -2912,7 +2940,8 @@ void StorageLocalResourceProviderProcess::dropOperation(
        protobuf::createUUID(operationUuid),
        protobuf::createOperationStatus(
            OPERATION_DROPPED,
-           operationId,
+           info.isSome() && info->has_id()
+             ? info->id() : Option<OperationID>::none(),
            message,
            None(),
            id::UUID::random()),
@@ -2930,6 +2959,9 @@ void StorageLocalResourceProviderProcess::dropOperation(
   statusUpdateManager.update(std::move(update), false)
     .onFailed(defer(self(), std::bind(die, lambda::_1)))
     .onDiscarded(defer(self(), std::bind(die, "future discarded")));
+
+  ++metrics.operations_dropped.at(
+      info.isSome() ? info->type() : Offer::Operation::UNKNOWN);
 }
 
 
@@ -3231,6 +3263,26 @@ Try<Nothing> 
StorageLocalResourceProviderProcess::updateOperationStatus(
     .onFailed(defer(self(), std::bind(die, lambda::_1)))
     .onDiscarded(defer(self(), std::bind(die, "future discarded")));
 
+  --metrics.operations_pending.at(operation.info().type());
+
+  switch (operation.latest_status().state()) {
+    case OPERATION_FINISHED:
+      ++metrics.operations_finished.at(operation.info().type());
+      break;
+    case OPERATION_FAILED:
+      ++metrics.operations_failed.at(operation.info().type());
+      break;
+    case OPERATION_UNSUPPORTED:
+    case OPERATION_PENDING:
+    case OPERATION_ERROR:
+    case OPERATION_DROPPED:
+    case OPERATION_UNREACHABLE:
+    case OPERATION_GONE_BY_OPERATOR:
+    case OPERATION_RECOVERING:
+    case OPERATION_UNKNOWN:
+      UNREACHABLE();
+  }
+
   if (error.isSome()) {
     // We only send `UPDATE_STATE` for failed speculative operations.
     if (operation.info().type() == Offer::Operation::RESERVE ||
@@ -3390,6 +3442,66 @@ 
StorageLocalResourceProviderProcess::Metrics::Metrics(const string& prefix)
 {
   process::metrics::add(csi_controller_plugin_terminations);
   process::metrics::add(csi_node_plugin_terminations);
+
+  vector<Offer::Operation::Type> operationTypes;
+
+  // NOTE: We use a switch statement here as a compile-time sanity check so we
+  // won't forget to add metrics for new operations in the future.
+  Offer::Operation::Type firstOperationType = Offer::Operation::RESERVE;
+  switch (firstOperationType) {
+    case Offer::Operation::RESERVE:
+      operationTypes.push_back(Offer::Operation::RESERVE);
+    case Offer::Operation::UNRESERVE:
+      operationTypes.push_back(Offer::Operation::UNRESERVE);
+    case Offer::Operation::CREATE:
+      operationTypes.push_back(Offer::Operation::CREATE);
+    case Offer::Operation::DESTROY:
+      operationTypes.push_back(Offer::Operation::DESTROY);
+    case Offer::Operation::CREATE_VOLUME:
+      operationTypes.push_back(Offer::Operation::CREATE_VOLUME);
+    case Offer::Operation::DESTROY_VOLUME:
+      operationTypes.push_back(Offer::Operation::DESTROY_VOLUME);
+    case Offer::Operation::CREATE_BLOCK:
+      operationTypes.push_back(Offer::Operation::CREATE_BLOCK);
+    case Offer::Operation::DESTROY_BLOCK:
+      operationTypes.push_back(Offer::Operation::DESTROY_BLOCK);
+      break;
+    case Offer::Operation::GROW_VOLUME:
+    case Offer::Operation::SHRINK_VOLUME:
+      // TODO(chhsiao): These operations are currently not supported for
+      // resource providers, and should have been validated by the master.
+      UNREACHABLE();
+    case Offer::Operation::UNKNOWN:
+    case Offer::Operation::LAUNCH:
+    case Offer::Operation::LAUNCH_GROUP:
+      UNREACHABLE();
+  };
+
+  foreach (const Offer::Operation::Type& type, operationTypes) {
+    const string name = strings::lower(Offer::Operation::Type_Name(type));
+
+    operations_pending.put(type, PushGauge(
+        prefix + "operations/" + name + "/pending"));
+    operations_finished.put(type, Counter(
+        prefix + "operations/" + name + "/finished"));
+    operations_failed.put(type, Counter(
+        prefix + "operations/" + name + "/failed"));
+    operations_dropped.put(type, Counter(
+        prefix + "operations/" + name + "/dropped"));
+
+    process::metrics::add(operations_pending.at(type));
+    process::metrics::add(operations_finished.at(type));
+    process::metrics::add(operations_failed.at(type));
+    process::metrics::add(operations_dropped.at(type));
+  }
+
+  // Special metric for counting the number of `OPERATION_DROPPED` statuses 
when
+  // receiving explicit reconciliation for unknown operation UUIDs.
+  operations_dropped.put(
+      Offer::Operation::UNKNOWN,
+      Counter(prefix + "operations/unknown/dropped"));
+
+  process::metrics::add(operations_dropped.at(Offer::Operation::UNKNOWN));
 }
 
 
@@ -3397,6 +3509,22 @@ StorageLocalResourceProviderProcess::Metrics::~Metrics()
 {
   process::metrics::remove(csi_controller_plugin_terminations);
   process::metrics::remove(csi_node_plugin_terminations);
+
+  foreachvalue (const PushGauge& gauge, operations_pending) {
+    process::metrics::remove(gauge);
+  }
+
+  foreachvalue (const Counter& counter, operations_finished) {
+    process::metrics::remove(counter);
+  }
+
+  foreachvalue (const Counter& counter, operations_failed) {
+    process::metrics::remove(counter);
+  }
+
+  foreachvalue (const Counter& counter, operations_dropped) {
+    process::metrics::remove(counter);
+  }
 }
 
 

Reply via email to