Repository: mesos Updated Branches: refs/heads/master ab484fc1b -> 9c8eee35d
Added operation state metrics in SLRP. This patch adds `operations_pending`, `operations_finished`, `operations_failed`, and `operations_dropped` metrics to count the occurances of these operation states. Additionally, An error log in `_applyOperation()` is removed because the error is already logged at the call site. Review: https://reviews.apache.org/r/65665 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/70b407df Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/70b407df Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/70b407df Branch: refs/heads/master Commit: 70b407df0aaeef171014d3453c3efb762b2c21f6 Parents: ab484fc Author: Chun-Hung Hsiao <[email protected]> Authored: Thu May 17 17:44:29 2018 -0700 Committer: Chun-Hung Hsiao <[email protected]> Committed: Thu May 31 18:29:55 2018 -0700 ---------------------------------------------------------------------- src/resource_provider/storage/provider.cpp | 154 ++++++++++++++++++++++-- 1 file changed, 141 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/70b407df/src/resource_provider/storage/provider.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index 2c7dd8d..542d3ae 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -35,6 +35,7 @@ #include <process/metrics/counter.hpp> #include <process/metrics/metrics.hpp> +#include <process/metrics/push_gauge.hpp> #include <mesos/resources.hpp> #include <mesos/type_utils.hpp> @@ -103,6 +104,7 @@ using process::spawn; using process::http::authentication::Principal; using process::metrics::Counter; +using process::metrics::PushGauge; using mesos::csi::state::VolumeState; @@ -402,7 +404,7 @@ private: void dropOperation( const id::UUID& operationUuid, const Option<FrameworkID>& frameworkId, - const Option<OperationID>& operationId, + const Option<Offer::Operation>& info, const string& message); Future<vector<ResourceConversion>> applyCreateVolumeOrBlock( @@ -494,8 +496,15 @@ private: explicit Metrics(const string& prefix); ~Metrics(); + // CSI plugin metrics. Counter csi_controller_plugin_terminations; Counter csi_node_plugin_terminations; + + // Operation state metrics. + hashmap<Offer::Operation::Type, PushGauge> operations_pending; + hashmap<Offer::Operation::Type, Counter> operations_finished; + hashmap<Offer::Operation::Type, Counter> operations_failed; + hashmap<Offer::Operation::Type, Counter> operations_dropped; } metrics; }; @@ -1210,6 +1219,26 @@ StorageLocalResourceProviderProcess::reconcileOperationStatuses() foreachpair (const id::UUID& uuid, const Operation& operation, operations) { + switch (operation.latest_status().state()) { + case OPERATION_PENDING: + ++metrics.operations_pending.at(operation.info().type()); + break; + case OPERATION_FINISHED: + ++metrics.operations_finished.at(operation.info().type()); + break; + case OPERATION_FAILED: + ++metrics.operations_failed.at(operation.info().type()); + break; + case OPERATION_UNSUPPORTED: + case OPERATION_ERROR: + case OPERATION_DROPPED: + case OPERATION_UNREACHABLE: + case OPERATION_GONE_BY_OPERATOR: + case OPERATION_RECOVERING: + case OPERATION_UNKNOWN: + UNREACHABLE(); + } + if (protobuf::isTerminalState(operation.latest_status().state())) { continue; } @@ -1419,14 +1448,12 @@ void StorageLocalResourceProviderProcess::applyOperation( Option<FrameworkID> frameworkId = operation.has_framework_id() ? operation.framework_id() : Option<FrameworkID>::none(); - Option<OperationID> operationId = operation.info().has_id() - ? operation.info().id() : Option<OperationID>::none(); if (state == SUBSCRIBED) { return dropOperation( uuid.get(), frameworkId, - operationId, + operation.info(), "Cannot apply operation in SUBSCRIBED state"); } @@ -1434,7 +1461,7 @@ void StorageLocalResourceProviderProcess::applyOperation( return dropOperation( uuid.get(), frameworkId, - operationId, + operation.info(), "Cannot apply operation when reconciling storage pools"); } @@ -1446,7 +1473,7 @@ void StorageLocalResourceProviderProcess::applyOperation( return dropOperation( uuid.get(), frameworkId, - operationId, + operation.info(), "Mismatched resource version " + stringify(operationVersion.get()) + " (expected: " + stringify(resourceVersion) + ")"); } @@ -1454,13 +1481,18 @@ void StorageLocalResourceProviderProcess::applyOperation( CHECK(!operations.contains(uuid.get())); operations[uuid.get()] = protobuf::createOperation( operation.info(), - protobuf::createOperationStatus(OPERATION_PENDING, operationId), + protobuf::createOperationStatus( + OPERATION_PENDING, + operation.info().has_id() + ? operation.info().id() : Option<OperationID>::none()), frameworkId, slaveId, protobuf::createUUID(uuid.get())); checkpointResourceProviderState(); + ++metrics.operations_pending.at(operation.info().type()); + auto err = [](const id::UUID& uuid, const string& message) { LOG(ERROR) << "Failed to apply operation (uuid: " << uuid << "): " << message; @@ -2872,10 +2904,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation( << "Applying conversion from '" << conversions->at(0).consumed << "' to '" << conversions->at(0).converted << "' for operation (uuid: " << operationUuid << ")"; - } else { - LOG(ERROR) - << "Failed to apply operation (uuid: " << operationUuid << "): " - << conversions.error(); } promise->associate( @@ -2901,7 +2929,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation( void StorageLocalResourceProviderProcess::dropOperation( const id::UUID& operationUuid, const Option<FrameworkID>& frameworkId, - const Option<OperationID>& operationId, + const Option<Offer::Operation>& info, const string& message) { LOG(WARNING) @@ -2912,7 +2940,8 @@ void StorageLocalResourceProviderProcess::dropOperation( protobuf::createUUID(operationUuid), protobuf::createOperationStatus( OPERATION_DROPPED, - operationId, + info.isSome() && info->has_id() + ? info->id() : Option<OperationID>::none(), message, None(), id::UUID::random()), @@ -2930,6 +2959,9 @@ void StorageLocalResourceProviderProcess::dropOperation( statusUpdateManager.update(std::move(update), false) .onFailed(defer(self(), std::bind(die, lambda::_1))) .onDiscarded(defer(self(), std::bind(die, "future discarded"))); + + ++metrics.operations_dropped.at( + info.isSome() ? info->type() : Offer::Operation::UNKNOWN); } @@ -3231,6 +3263,26 @@ Try<Nothing> StorageLocalResourceProviderProcess::updateOperationStatus( .onFailed(defer(self(), std::bind(die, lambda::_1))) .onDiscarded(defer(self(), std::bind(die, "future discarded"))); + --metrics.operations_pending.at(operation.info().type()); + + switch (operation.latest_status().state()) { + case OPERATION_FINISHED: + ++metrics.operations_finished.at(operation.info().type()); + break; + case OPERATION_FAILED: + ++metrics.operations_failed.at(operation.info().type()); + break; + case OPERATION_UNSUPPORTED: + case OPERATION_PENDING: + case OPERATION_ERROR: + case OPERATION_DROPPED: + case OPERATION_UNREACHABLE: + case OPERATION_GONE_BY_OPERATOR: + case OPERATION_RECOVERING: + case OPERATION_UNKNOWN: + UNREACHABLE(); + } + if (error.isSome()) { // We only send `UPDATE_STATE` for failed speculative operations. if (operation.info().type() == Offer::Operation::RESERVE || @@ -3390,6 +3442,66 @@ StorageLocalResourceProviderProcess::Metrics::Metrics(const string& prefix) { process::metrics::add(csi_controller_plugin_terminations); process::metrics::add(csi_node_plugin_terminations); + + vector<Offer::Operation::Type> operationTypes; + + // NOTE: We use a switch statement here as a compile-time sanity check so we + // won't forget to add metrics for new operations in the future. + Offer::Operation::Type firstOperationType = Offer::Operation::RESERVE; + switch (firstOperationType) { + case Offer::Operation::RESERVE: + operationTypes.push_back(Offer::Operation::RESERVE); + case Offer::Operation::UNRESERVE: + operationTypes.push_back(Offer::Operation::UNRESERVE); + case Offer::Operation::CREATE: + operationTypes.push_back(Offer::Operation::CREATE); + case Offer::Operation::DESTROY: + operationTypes.push_back(Offer::Operation::DESTROY); + case Offer::Operation::CREATE_VOLUME: + operationTypes.push_back(Offer::Operation::CREATE_VOLUME); + case Offer::Operation::DESTROY_VOLUME: + operationTypes.push_back(Offer::Operation::DESTROY_VOLUME); + case Offer::Operation::CREATE_BLOCK: + operationTypes.push_back(Offer::Operation::CREATE_BLOCK); + case Offer::Operation::DESTROY_BLOCK: + operationTypes.push_back(Offer::Operation::DESTROY_BLOCK); + break; + case Offer::Operation::GROW_VOLUME: + case Offer::Operation::SHRINK_VOLUME: + // TODO(chhsiao): These operations are currently not supported for + // resource providers, and should have been validated by the master. + UNREACHABLE(); + case Offer::Operation::UNKNOWN: + case Offer::Operation::LAUNCH: + case Offer::Operation::LAUNCH_GROUP: + UNREACHABLE(); + }; + + foreach (const Offer::Operation::Type& type, operationTypes) { + const string name = strings::lower(Offer::Operation::Type_Name(type)); + + operations_pending.put(type, PushGauge( + prefix + "operations/" + name + "/pending")); + operations_finished.put(type, Counter( + prefix + "operations/" + name + "/finished")); + operations_failed.put(type, Counter( + prefix + "operations/" + name + "/failed")); + operations_dropped.put(type, Counter( + prefix + "operations/" + name + "/dropped")); + + process::metrics::add(operations_pending.at(type)); + process::metrics::add(operations_finished.at(type)); + process::metrics::add(operations_failed.at(type)); + process::metrics::add(operations_dropped.at(type)); + } + + // Special metric for counting the number of `OPERATION_DROPPED` statuses when + // receiving explicit reconciliation for unknown operation UUIDs. + operations_dropped.put( + Offer::Operation::UNKNOWN, + Counter(prefix + "operations/unknown/dropped")); + + process::metrics::add(operations_dropped.at(Offer::Operation::UNKNOWN)); } @@ -3397,6 +3509,22 @@ StorageLocalResourceProviderProcess::Metrics::~Metrics() { process::metrics::remove(csi_controller_plugin_terminations); process::metrics::remove(csi_node_plugin_terminations); + + foreachvalue (const PushGauge& gauge, operations_pending) { + process::metrics::remove(gauge); + } + + foreachvalue (const Counter& counter, operations_finished) { + process::metrics::remove(counter); + } + + foreachvalue (const Counter& counter, operations_failed) { + process::metrics::remove(counter); + } + + foreachvalue (const Counter& counter, operations_dropped) { + process::metrics::remove(counter); + } }
