This is an automated email from the ASF dual-hosted git repository.

bennoe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 18c401563c33022240fede63fbe3ec9b7bf4c385
Author: Benno Evers <[email protected]>
AuthorDate: Thu Feb 28 18:03:27 2019 +0100

    Added metrics for offer operation feedback.
    
    This commit adds additional metrics counting the
    number of operations in each state.
    
    Unit tests are added in the subsequent commit.
    
    Review: https://reviews.apache.org/r/70116
---
 docs/monitoring.md     |  61 ++++++++++++++++++++
 src/master/master.cpp  |  55 +++++++++++++++++-
 src/master/metrics.cpp | 152 +++++++++++++++++++++++++++++++++++++++++++++++++
 src/master/metrics.hpp |  48 ++++++++++++++++
 4 files changed, 313 insertions(+), 3 deletions(-)

diff --git a/docs/monitoring.md b/docs/monitoring.md
index 54b872f..239077b 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -657,6 +657,67 @@ The task states listed here match those of the task state 
machine.
 </tr>
 </table>
 
+#### Operations
+
+The following metrics provide information about offer operations on the master.
+
+Below, `OPERATION_TYPE` refers to any one of `reserve`, `unreserve`, `create`,
+`destroy`, `grow_volume`, `shrink_volume`, `create_disk` or `destroy_disk`.
+
+NOTE: The counter for terminal operation states can over-count over time. In
+particular if an agent contained unacknowledged terminal status updates when
+it was marked gone or marked unreachable, these operations will be 
double-counted
+as both their original state and `OPERATION_GONE`/`OPERATION_UNREACHABLE`.
+
+<table class="table table-striped">
+<thead>
+<tr><th>Metric</th><th>Description</th><th>Type</th>
+</thead>
+<tr>
+  <td>
+  <code>master/operations/total</code>
+  </td>
+  <td>Total number of operations known to this master</td>
+  <td>Gauge</td>
+</tr>
+<tr>
+  <td>
+  <code>master/operations/&lt;OPERATION_STATE&gt;</code>
+  </td>
+  <td>Number of operations in the given non-terminal state (`pending`, 
`recovering` or `unreachable`)</td>
+  <td>Gauge</td>
+</tr>
+<tr>
+  <td>
+  <code>master/operations/&lt;OPERATION_STATE&gt;</code>
+  </td>
+  <td>Number of operations in the given terminal state (`finished`, `error`, 
`dropped` or `gone_by_operator`)</td>
+  <td>Counter</td>
+</tr>
+
+<tr>
+  <td>
+  <code>master/operations/&lt;OPERATION_TYPE&gt;/total</code>
+  </td>
+  <td>Total number of operations with the given type known to this master</td>
+  <td>Gauge</td>
+</tr>
+<tr>
+  <td>
+  <code>master/operations/&lt;OPERATION_TYPE&gt;/&lt;OPERATION_STATE&gt;</code>
+  </td>
+  <td>Number of operations with the given type in the given non-terminal state 
(`pending`, `recovering` or `unreachable`)</td>
+  <td>Gauge</td>
+</tr>
+<tr>
+  <td>
+  <code>master/operations/&lt;OPERATION_TYPE&gt;/&lt;OPERATION_STATE&gt;</code>
+  </td>
+  <td>Number of operations with the given type in the given state (`finished`, 
`error`, `dropped` or `gone_by_operator`)</td>
+  <td>Counter</td>
+</tr>
+</table>
+
 #### Messages
 
 The following metrics provide information about messages between the master and
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 277aa7a..37f989a 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2289,12 +2289,15 @@ void Master::drop(
 {
   CHECK_NOTNULL(framework);
 
-  // TODO(jieyu): Increment a metric.
-
   LOG(WARNING) << "Dropping " << Offer::Operation::Type_Name(operation.type())
                << " operation from framework " << *framework
                << ": " << message;
 
+  // NOTE: Despite the suggestive name of this method, it is called
+  // as part of validation so the correct updated state is `OPERATION_ERROR`,
+  // not `OPERATION_DROPPED`.
+  metrics->incrementOperationState(operation.type(), OPERATION_ERROR);
+
   // NOTE: The operation validation code should be refactored. Due to the order
   // of validation, it's possible that this function will be called before the
   // master validates that operations from v0 frameworks should not have their
@@ -8737,6 +8740,8 @@ void 
Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
   CHECK(update.has_slave_id())
     << "External resource provider is not supported yet";
 
+  ++metrics->messages_operation_status_update;
+
   const SlaveID& slaveId = update.slave_id();
 
   // The status update for the operation might be for an
@@ -8789,6 +8794,7 @@ void 
Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
                        : "an operator API call")
                  << ": Agent " << slaveId << " is not registered";
 
+    ++metrics->invalid_operation_status_updates;
     return;
   }
 
@@ -8810,9 +8816,15 @@ void 
Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
       framework->send(update);
     }
 
+    ++metrics->invalid_operation_status_updates;
     return;
   }
 
+  // TODO(bevers): Most of the `CHECK()`s below could probably be turned
+  // into validation steps that would just reject the message as opposed
+  // to crashing the master.
+  ++metrics->valid_operation_status_updates;
+
   if (operation->info().has_id()) {
     // Agents don't include the framework and operation IDs when sending
     // operation status updates for dropped operations in response to a
@@ -11301,6 +11313,10 @@ void Master::_removeSlave(
   // reconciliation requests. However, since the same thing happens during
   // master failover, the scheduler must be able to handle this scenario
   // anyway so we allow it to happen here.
+  //
+  // TODO(bevers): The operations removed here are implicitly transitioned
+  // to `OPERATION_UNKNOWN` state, but we don't have a corresponding metric
+  // for that, nor is it the correct state.
   foreachvalue (Operation* operation, utils::copy(slave->operations)) {
     removeOperation(operation);
   }
@@ -11452,8 +11468,15 @@ void Master::__removeSlave(
   // reconciliation requests. However, since the same thing happens during
   // master failover, the scheduler must be able to handle this scenario
   // anyway so we allow it to happen here.
+  OperationState transitionState = unreachableTime.isSome() ?
+    OPERATION_UNREACHABLE :
+    OPERATION_GONE_BY_OPERATOR;
+
   foreachvalue (Operation* operation, utils::copy(slave->operations)) {
     removeOperation(operation);
+    metrics->incrementOperationState(
+        operation->info().type(),
+        transitionState);
   }
 
   foreachvalue (
@@ -11463,6 +11486,9 @@ void Master::__removeSlave(
         Operation* operation,
         utils::copy(provider.operations)) {
       removeOperation(operation);
+      metrics->incrementOperationState(
+          operation->info().type(),
+          transitionState);
     }
   }
 
@@ -11742,6 +11768,9 @@ void Master::addOperation(
   CHECK_NOTNULL(operation);
   CHECK_NOTNULL(slave);
 
+  metrics->incrementOperationState(
+      operation->info().type(), operation->latest_status().state());
+
   slave->addOperation(operation);
 
   if (framework != nullptr) {
@@ -11784,6 +11813,11 @@ void Master::updateOperation(
             << " (latest state: " << operation->latest_status().state()
             << ", status update state: " << status.state() << ")";
 
+  metrics->transitionOperationState(
+      operation->info().type(),
+      operation->latest_status().state(),
+      status.state());
+
   // Whether the operation has just become terminated.
   const bool terminated =
     !protobuf::isTerminalState(operation->latest_status().state()) &&
@@ -11998,12 +12032,27 @@ void Master::removeOperation(Operation* operation)
 
   slave->removeOperation(operation);
 
+  OperationState state = operation->latest_status().state();
+
+  // The common case is that an operation is removed after a terminal status
+  // update has been acknowledged, in thase we have nothing to do here because
+  // the counters for terminal operations represent lifetime totals.
+  // However, it can happen that we need to remove non-terminal operations,
+  // e.g. when an agent is marked gone or a resource provider on an agent
+  // disappears. In this case we need to adjust the metrics to reflect the
+  // current numbers.
+  if (!protobuf::isTerminalState(state)) {
+    metrics->decrementOperationState(
+        operation->info().type(),
+        state);
+  }
+
   // If the operation was not speculated and is not terminal we
   // need to also recover its used resources in the allocator.
   // If the operation is an orphan, the resources have already been
   // recovered from the allocator.
   if (!protobuf::isSpeculativeOperation(operation->info()) &&
-      !protobuf::isTerminalState(operation->latest_status().state()) &&
+      !protobuf::isTerminalState(state) &&
       !slave->orphanedOperations.contains(operation->uuid())) {
     Try<Resources> consumed = 
protobuf::getConsumedResources(operation->info());
     CHECK_SOME(consumed);
diff --git a/src/master/metrics.cpp b/src/master/metrics.cpp
index 4dd73fb..e3c871c 100644
--- a/src/master/metrics.cpp
+++ b/src/master/metrics.cpp
@@ -80,6 +80,8 @@ Metrics::Metrics(const Master& master)
     outstanding_offers(
         "master/outstanding_offers",
         defer(master, &Master::_outstanding_offers)),
+    operation_states(
+        "master/operations/"),
     operator_event_stream_subscribers(
         "master/operator_event_stream_subscribers"),
     tasks_staging(
@@ -157,6 +159,8 @@ Metrics::Metrics(const Master& master)
         "master/messages_unregister_slave"),
     messages_status_update(
         "master/messages_status_update"),
+    messages_operation_status_update(
+        "master/messages_operation_status_update"),
     messages_exited_executor(
         "master/messages_exited_executor"),
     messages_update_slave(
@@ -179,6 +183,10 @@ Metrics::Metrics(const Master& master)
         "master/valid_status_update_acknowledgements"),
     invalid_status_update_acknowledgements(
         "master/invalid_status_update_acknowledgements"),
+    valid_operation_status_updates(
+        "master/valid_operation_status_updates"),
+    invalid_operation_status_updates(
+        "master/invalid_operation_status_updates"),
     valid_operation_status_update_acknowledgements(
         "master/valid_operation_status_update_acknowledgements"),
     invalid_operation_status_update_acknowledgements(
@@ -278,6 +286,7 @@ Metrics::Metrics(const Master& master)
   process::metrics::add(messages_reregister_slave);
   process::metrics::add(messages_unregister_slave);
   process::metrics::add(messages_status_update);
+  process::metrics::add(messages_operation_status_update);
   process::metrics::add(messages_exited_executor);
   process::metrics::add(messages_update_slave);
 
@@ -296,6 +305,9 @@ Metrics::Metrics(const Master& master)
   process::metrics::add(valid_status_update_acknowledgements);
   process::metrics::add(invalid_status_update_acknowledgements);
 
+  process::metrics::add(valid_operation_status_updates);
+  process::metrics::add(invalid_operation_status_updates);
+
   process::metrics::add(valid_operation_status_update_acknowledgements);
   process::metrics::add(invalid_operation_status_update_acknowledgements);
 
@@ -368,6 +380,30 @@ Metrics::Metrics(const Master& master)
     process::metrics::add(used);
     process::metrics::add(percent);
   }
+
+  for (int index = 0;
+       index < Offer::Operation::Type_descriptor()->value_count();
+       index++) {
+    const google::protobuf::EnumValueDescriptor* descriptor =
+      Offer::Operation::Type_descriptor()->value(index);
+
+    const Offer::Operation::Type type =
+      static_cast<Offer::Operation::Type>(descriptor->number());
+
+    // Skip `LAUNCH` and `LAUNCH_GROUP` because they are special-cased
+    // in the master and don't go through the usual offer operation feedback
+    // cycle.
+    if (type == Offer::Operation::UNKNOWN ||
+        type == Offer::Operation::LAUNCH ||
+        type == Offer::Operation::LAUNCH_GROUP) {
+      continue;
+    }
+
+    std::string prefix =
+      "master/operations/" + strings::lower(descriptor->name()) + "/";
+
+    operation_type_states.emplace(type, prefix);
+  }
 }
 
 
@@ -432,6 +468,7 @@ Metrics::~Metrics()
   process::metrics::remove(messages_reregister_slave);
   process::metrics::remove(messages_unregister_slave);
   process::metrics::remove(messages_status_update);
+  process::metrics::remove(messages_operation_status_update);
   process::metrics::remove(messages_exited_executor);
   process::metrics::remove(messages_update_slave);
 
@@ -450,6 +487,9 @@ Metrics::~Metrics()
   process::metrics::remove(valid_status_update_acknowledgements);
   process::metrics::remove(invalid_status_update_acknowledgements);
 
+  process::metrics::remove(valid_operation_status_updates);
+  process::metrics::remove(invalid_operation_status_updates);
+
   process::metrics::remove(valid_operation_status_update_acknowledgements);
   process::metrics::remove(invalid_operation_status_update_acknowledgements);
 
@@ -562,6 +602,118 @@ void Metrics::incrementTasksStates(
 }
 
 
+Metrics::OperationStates::OperationStates(const std::string& prefix)
+  : total(prefix + "total"),
+    pending(prefix + "pending"),
+    recovering(prefix + "recovering"),
+    unreachable(prefix + "unreachable"),
+    finished(prefix + "finished"),
+    failed(prefix + "failed"),
+    error(prefix + "error"),
+    dropped(prefix + "dropped"),
+    gone_by_operator(prefix + "gone_by_operator")
+{
+  process::metrics::add(total);
+  process::metrics::add(pending);
+  process::metrics::add(recovering);
+  process::metrics::add(finished);
+  process::metrics::add(failed);
+  process::metrics::add(error);
+  process::metrics::add(dropped);
+  process::metrics::add(unreachable);
+  process::metrics::add(gone_by_operator);
+}
+
+
+Metrics::OperationStates::~OperationStates()
+{
+  process::metrics::remove(total);
+  process::metrics::remove(pending);
+  process::metrics::remove(recovering);
+  process::metrics::remove(finished);
+  process::metrics::remove(failed);
+  process::metrics::remove(error);
+  process::metrics::remove(dropped);
+  process::metrics::remove(unreachable);
+  process::metrics::remove(gone_by_operator);
+}
+
+
+void Metrics::OperationStates::update(
+    const OperationState& operationState,
+    int delta)
+{
+  total += delta;
+
+  switch(operationState) {
+    case OPERATION_FINISHED:
+      finished += delta;
+      break;
+    case OPERATION_DROPPED:
+      dropped += delta;
+      break;
+    case OPERATION_ERROR:
+      error += delta;
+      break;
+    case OPERATION_FAILED:
+      failed += delta;
+      break;
+    case OPERATION_GONE_BY_OPERATOR:
+      gone_by_operator += delta;
+      break;
+    case OPERATION_PENDING:
+      pending += delta;
+      break;
+    case OPERATION_UNREACHABLE:
+      unreachable += delta;
+      break;
+    case OPERATION_RECOVERING:
+      recovering += delta;
+      break;
+    case OPERATION_UNSUPPORTED:
+    case OPERATION_UNKNOWN:
+      LOG(ERROR) << "Unexpected operation state: " << operationState;
+      break;
+  }
+}
+
+
+void Metrics::incrementOperationState(
+    Offer::Operation::Type type,
+    const OperationState& state)
+{
+  operation_states.update(state, 1);
+  if (operation_type_states.count(type)) {
+    operation_type_states.at(type).update(state, 1);
+  }
+}
+
+
+void Metrics::decrementOperationState(
+    Offer::Operation::Type type,
+    const OperationState& state)
+{
+  operation_states.update(state, -1);
+  if (operation_type_states.count(type)) {
+    operation_type_states.at(type).update(state, -1);
+  }
+}
+
+
+void Metrics::transitionOperationState(
+    Offer::Operation::Type type,
+    const OperationState& oldState,
+    const OperationState& newState)
+{
+  if (oldState == newState) {
+    return;  // Nothing to do.
+  }
+
+  decrementOperationState(type, oldState);
+  incrementOperationState(type, newState);
+}
+
+
 FrameworkMetrics::FrameworkMetrics(
     const FrameworkInfo& _frameworkInfo,
     bool _publishPerFrameworkMetrics)
diff --git a/src/master/metrics.hpp b/src/master/metrics.hpp
index 4495e65..04fecf2 100644
--- a/src/master/metrics.hpp
+++ b/src/master/metrics.hpp
@@ -60,6 +60,50 @@ struct Metrics
 
   process::metrics::PullGauge outstanding_offers;
 
+  // Contains counters 'prefix/pending', 'prefix/recovering', etc.
+  struct OperationStates {
+    OperationStates(const std::string& prefix);
+    ~OperationStates();
+
+    void update(const OperationState& state, int delta);
+
+    process::metrics::Counter total;
+
+    process::metrics::PushGauge pending;
+    process::metrics::PushGauge recovering;
+    process::metrics::PushGauge unreachable;
+    process::metrics::Counter finished;
+    process::metrics::Counter failed;
+    process::metrics::Counter error;
+    process::metrics::Counter dropped;
+    process::metrics::Counter gone_by_operator;
+  };
+
+  // Operation states are tracked in two granularities: master-wide and
+  // per operation type. Additionally, for every framework the types of
+  // operations are tracked but not their states.
+  //
+  // NOTE: These metrics are missing the implicit operation statuses that
+  // are generated on operation reconciliation. For example, when a framework
+  // queries the state of an unknown operation on an unreachable agent,
+  // the master will generate an `OPERATION_UNREACHABLE` update that is not
+  // counted by these metrics.
+  OperationStates operation_states;
+  hashmap<Offer::Operation::Type, OperationStates> operation_type_states;
+
+  void incrementOperationState(
+      Offer::Operation::Type type,
+      const OperationState& state);
+
+  void decrementOperationState(
+      Offer::Operation::Type type,
+      const OperationState& state);
+
+  void transitionOperationState(
+      Offer::Operation::Type type,
+      const OperationState& oldState,
+      const OperationState& newState);
+
   process::metrics::PushGauge operator_event_stream_subscribers;
 
   // Task state metrics.
@@ -158,6 +202,7 @@ struct Metrics
   process::metrics::Counter messages_reregister_slave;
   process::metrics::Counter messages_unregister_slave;
   process::metrics::Counter messages_status_update;
+  process::metrics::Counter messages_operation_status_update;
   process::metrics::Counter messages_exited_executor;
   process::metrics::Counter messages_update_slave;
 
@@ -175,6 +220,9 @@ struct Metrics
   process::metrics::Counter valid_status_update_acknowledgements;
   process::metrics::Counter invalid_status_update_acknowledgements;
 
+  process::metrics::Counter valid_operation_status_updates;
+  process::metrics::Counter invalid_operation_status_updates;
+
   process::metrics::Counter valid_operation_status_update_acknowledgements;
   process::metrics::Counter invalid_operation_status_update_acknowledgements;
 

Reply via email to