This is an automated email from the ASF dual-hosted git repository. wangdan pushed a commit to branch migrate-metrics-dev in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 146561dd1808d4ae8bc3d1510ddb9165925e5cfb Author: Dan Wang <[email protected]> AuthorDate: Mon Apr 17 21:38:23 2023 +0800 feat(new_metrics): migrate replica-level metrics for pegasus_manual_compact_service (#1443) https://github.com/apache/incubator-pegasus/issues/1441 In perf counters, all of the 2 metrics of `pegasus_manual_compact_service` are about the numbers of tasks of rocksdb manual compaction: one is the number of current queued tasks, while another is the number of current running tasks. Both metrics are server-level. They would become replica-level after migrating to the new metrics, based on which server-level ones could also be achieved. A convenient class `auto_count` is also provided to increment gauge that will be decremented automatically at the end of the scope. --- src/server/pegasus_manual_compact_service.cpp | 37 ++++++++--------- src/server/pegasus_manual_compact_service.h | 6 +-- src/utils/metrics.h | 57 ++++++++++++++++++++++----- src/utils/test/metrics_test.cpp | 39 ++++++++++++++++++ 4 files changed, 108 insertions(+), 31 deletions(-) diff --git a/src/server/pegasus_manual_compact_service.cpp b/src/server/pegasus_manual_compact_service.cpp index 643e3e997..22f4b690d 100644 --- a/src/server/pegasus_manual_compact_service.cpp +++ b/src/server/pegasus_manual_compact_service.cpp @@ -29,16 +29,27 @@ #include "base/pegasus_const.h" #include "common/replication.codes.h" #include "pegasus_server_impl.h" -#include "perf_counter/perf_counter.h" #include "runtime/api_layer1.h" #include "runtime/task/async_calls.h" #include "runtime/task/task_code.h" +#include "utils/autoref_ptr.h" #include "utils/flags.h" #include "utils/fmt_logging.h" #include "utils/string_conv.h" +#include "utils/string_view.h" #include "utils/strings.h" #include "utils/time_utils.h" +METRIC_DEFINE_gauge_int64(replica, + rdb_manual_compact_queued_tasks, + dsn::metric_unit::kTasks, + "The number of current queued tasks of rocksdb manual compaction"); + +METRIC_DEFINE_gauge_int64(replica, + rdb_manual_compact_running_tasks, + dsn::metric_unit::kTasks, + "The number of current running tasks of rocksdb manual compaction"); + namespace pegasus { namespace server { @@ -58,17 +69,10 @@ pegasus_manual_compact_service::pegasus_manual_compact_service(pegasus_server_im _manual_compact_enqueue_time_ms(0), _manual_compact_start_running_time_ms(0), _manual_compact_last_finish_time_ms(0), - _manual_compact_last_time_used_ms(0) + _manual_compact_last_time_used_ms(0), + METRIC_VAR_INIT_replica(rdb_manual_compact_queued_tasks), + METRIC_VAR_INIT_replica(rdb_manual_compact_running_tasks) { - _pfc_manual_compact_enqueue_count.init_app_counter("app.pegasus", - "manual.compact.enqueue.count", - COUNTER_TYPE_NUMBER, - "current manual compact in queue count"); - - _pfc_manual_compact_running_count.init_app_counter("app.pegasus", - "manual.compact.running.count", - COUNTER_TYPE_NUMBER, - "current manual compact running count"); } void pegasus_manual_compact_service::init_last_finish_time_ms(uint64_t last_finish_time_ms) @@ -106,9 +110,9 @@ void pegasus_manual_compact_service::start_manual_compact_if_needed( rocksdb::CompactRangeOptions options; extract_manual_compact_opts(envs, compact_rule, options); - _pfc_manual_compact_enqueue_count->increment(); + METRIC_VAR_INCREMENT(rdb_manual_compact_queued_tasks); dsn::tasking::enqueue(LPC_MANUAL_COMPACT, &_app->_tracker, [this, options]() { - _pfc_manual_compact_enqueue_count->decrement(); + METRIC_VAR_DECREMENT(rdb_manual_compact_queued_tasks); manual_compact(options); }); } else { @@ -295,9 +299,8 @@ void pegasus_manual_compact_service::manual_compact(const rocksdb::CompactRangeO } // if current running count exceeds the limit, it would not to be started. - _pfc_manual_compact_running_count->increment(); - if (_pfc_manual_compact_running_count->get_integer_value() > _max_concurrent_running_count) { - _pfc_manual_compact_running_count->decrement(); + METRIC_VAR_AUTO_COUNT(rdb_manual_compact_running_tasks); + if (METRIC_VAR_VALUE(rdb_manual_compact_running_tasks) > _max_concurrent_running_count) { LOG_INFO_PREFIX("ignored compact because exceed max_concurrent_running_count({})", _max_concurrent_running_count.load()); _manual_compact_enqueue_time_ms.store(0); @@ -307,8 +310,6 @@ void pegasus_manual_compact_service::manual_compact(const rocksdb::CompactRangeO uint64_t start = begin_manual_compact(); uint64_t finish = _app->do_manual_compact(options); end_manual_compact(start, finish); - - _pfc_manual_compact_running_count->decrement(); } uint64_t pegasus_manual_compact_service::begin_manual_compact() diff --git a/src/server/pegasus_manual_compact_service.h b/src/server/pegasus_manual_compact_service.h index c47bbb5ec..d57343de2 100644 --- a/src/server/pegasus_manual_compact_service.h +++ b/src/server/pegasus_manual_compact_service.h @@ -25,8 +25,8 @@ #include <string> #include "metadata_types.h" -#include "perf_counter/perf_counter_wrapper.h" #include "replica/replica_base.h" +#include "utils/metrics.h" namespace rocksdb { struct CompactRangeOptions; @@ -97,8 +97,8 @@ private: std::atomic<uint64_t> _manual_compact_last_finish_time_ms; std::atomic<uint64_t> _manual_compact_last_time_used_ms; - ::dsn::perf_counter_wrapper _pfc_manual_compact_enqueue_count; - ::dsn::perf_counter_wrapper _pfc_manual_compact_running_count; + METRIC_VAR_DECLARE_gauge_int64(rdb_manual_compact_queued_tasks); + METRIC_VAR_DECLARE_gauge_int64(rdb_manual_compact_running_tasks); }; } // namespace server diff --git a/src/utils/metrics.h b/src/utils/metrics.h index bdc0ccd50..8b9d396a9 100644 --- a/src/utils/metrics.h +++ b/src/utils/metrics.h @@ -170,7 +170,7 @@ class error_code; #define METRIC_VAR_INIT_partition(name, ...) METRIC_VAR_INIT(name, partition, ##__VA_ARGS__) #define METRIC_VAR_INIT_backup_policy(name, ...) METRIC_VAR_INIT(name, backup_policy, ##__VA_ARGS__) -// Perform increment-related operations on metrics including gauge and counter. +// Perform increment-related operations on gauges and counters. #define METRIC_VAR_INCREMENT_BY(name, x) \ do { \ const auto v = (x); \ @@ -179,9 +179,13 @@ class error_code; } \ } while (0) +// Perform increment() operations on gauges and counters. #define METRIC_VAR_INCREMENT(name) _##name->increment() -// Perform set() operations on metrics including gauge and percentile. +// Perform decrement() operations on gauges. +#define METRIC_VAR_DECREMENT(name) _##name->decrement() + +// Perform set() operations on gauges and percentiles. // // There are 2 kinds of invocations of set() for a metric: // * set(val): set a single value for a metric, such as gauge, percentile; @@ -189,7 +193,7 @@ class error_code; // such as percentile. #define METRIC_VAR_SET(name, ...) _##name->set(__VA_ARGS__) -// Read the current measurement of the metric. +// Read the current measurement of gauges and counters. #define METRIC_VAR_VALUE(name) _##name->value() // Convenient macro that is used to compute latency automatically, which is dedicated to percentile. @@ -198,6 +202,10 @@ class error_code; #define METRIC_VAR_AUTO_LATENCY_DURATION_NS(name) __##name##_auto_latency.duration_ns() +// Convenient macro that is used to increment/decrement gauge automatically in current scope. +#define METRIC_VAR_AUTO_COUNT(name, ...) \ + dsn::auto_count __##name##_auto_count(_##name, ##__VA_ARGS__) + #define METRIC_DEFINE_INCREMENT_BY(name) \ void increment_##name##_by(int64_t x) { METRIC_VAR_INCREMENT_BY(name, x); } @@ -650,6 +658,7 @@ enum class metric_unit : size_t kWrites, kChanges, kOperations, + kTasks, kDisconnections, kServers, kInvalidUnit, @@ -1433,22 +1442,22 @@ using floating_percentile_prototype = class auto_latency { public: - auto_latency(const percentile_ptr<int64_t> &percentile) : _percentile(percentile) {} + auto_latency(const percentile_ptr<int64_t> &p) : _percentile(p) {} - auto_latency(const percentile_ptr<int64_t> &percentile, std::function<void(uint64_t)> callback) - : _percentile(percentile), _callback(std::move(callback)) + auto_latency(const percentile_ptr<int64_t> &p, std::function<void(uint64_t)> callback) + : _percentile(p), _callback(std::move(callback)) { } - auto_latency(const percentile_ptr<int64_t> &percentile, uint64_t start_time_ns) - : _percentile(percentile), _chrono(start_time_ns) + auto_latency(const percentile_ptr<int64_t> &p, uint64_t start_time_ns) + : _percentile(p), _chrono(start_time_ns) { } - auto_latency(const percentile_ptr<int64_t> &percentile, + auto_latency(const percentile_ptr<int64_t> &p, uint64_t start_time_ns, std::function<void(uint64_t)> callback) - : _percentile(percentile), _chrono(start_time_ns), _callback(std::move(callback)) + : _percentile(p), _chrono(start_time_ns), _callback(std::move(callback)) { } @@ -1473,6 +1482,34 @@ private: DISALLOW_COPY_AND_ASSIGN(auto_latency); }; +// Increment gauge and decrement it automatically at the end of the scope. +class auto_count +{ +public: + auto_count(const gauge_ptr<int64_t> &g) : _gauge(g) { _gauge->increment(); } + + auto_count(const gauge_ptr<int64_t> &g, std::function<void()> callback) + : _gauge(g), _callback(std::move(callback)) + { + _gauge->increment(); + } + + ~auto_count() + { + if (_callback) { + _callback(); + } + + _gauge->decrement(); + } + +private: + gauge_ptr<int64_t> _gauge; + std::function<void()> _callback; + + DISALLOW_COPY_AND_ASSIGN(auto_count); +}; + } // namespace dsn // Since server_metric_entity() will be called in macros such as METRIC_VAR_INIT_server(), its diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp index c430b5dfb..5188ff611 100644 --- a/src/utils/test/metrics_test.cpp +++ b/src/utils/test/metrics_test.cpp @@ -3096,6 +3096,8 @@ protected: void test_set_percentile(const std::vector<int64_t> &expected_samples); void test_set_percentile(size_t n, int64_t val); + void test_auto_count(); + const metric_entity_ptr _my_replica_metric_entity; METRIC_VAR_DECLARE_gauge_int64(test_replica_gauge_int64); METRIC_VAR_DECLARE_counter(test_replica_counter); @@ -3134,6 +3136,19 @@ void MetricVarTest::test_set_percentile(size_t n, int64_t val) EXPECT_EQ(std::vector<int64_t>(n, val), METRIC_VAR_SAMPLES(test_replica_percentile_int64_ns)); } +void MetricVarTest::test_auto_count() +{ + ASSERT_EQ(0, METRIC_VAR_VALUE(test_replica_gauge_int64)); + + { + METRIC_VAR_AUTO_COUNT(test_replica_gauge_int64, [this]() { + ASSERT_EQ(1, METRIC_VAR_VALUE(test_replica_gauge_int64)); + }); + } + + ASSERT_EQ(0, METRIC_VAR_VALUE(test_replica_gauge_int64)); +} + #define TEST_METRIC_VAR_INCREMENT(name) \ do { \ ASSERT_EQ(0, METRIC_VAR_VALUE(name)); \ @@ -3155,6 +3170,28 @@ TEST_F(MetricVarTest, IncrementGauge) { TEST_METRIC_VAR_INCREMENT(test_replica_g TEST_F(MetricVarTest, IncrementCounter) { TEST_METRIC_VAR_INCREMENT(test_replica_counter); } +#define TEST_METRIC_VAR_DECREMENT(name) \ + do { \ + ASSERT_EQ(0, METRIC_VAR_VALUE(name)); \ + \ + METRIC_VAR_INCREMENT_BY(name, 11); \ + ASSERT_EQ(11, METRIC_VAR_VALUE(name)); \ + \ + METRIC_VAR_DECREMENT(name); \ + ASSERT_EQ(10, METRIC_VAR_VALUE(name)); \ + \ + METRIC_VAR_DECREMENT(name); \ + ASSERT_EQ(9, METRIC_VAR_VALUE(name)); \ + \ + METRIC_VAR_INCREMENT(name); \ + ASSERT_EQ(10, METRIC_VAR_VALUE(name)); \ + \ + METRIC_VAR_DECREMENT(name); \ + ASSERT_EQ(9, METRIC_VAR_VALUE(name)); \ + } while (0); + +TEST_F(MetricVarTest, DecrementGauge) { TEST_METRIC_VAR_DECREMENT(test_replica_gauge_int64); } + TEST_F(MetricVarTest, SetGauge) { ASSERT_EQ(0, METRIC_VAR_VALUE(test_replica_gauge_int64)); @@ -3195,4 +3232,6 @@ TEST_F(MetricVarTest, AutoLatencyMilliSeconds) { TEST_METRIC_VAR_AUTO_LATENCY(ms TEST_F(MetricVarTest, AutoLatencySeconds) { TEST_METRIC_VAR_AUTO_LATENCY(s, 1000 * 1000 * 1000); } +TEST_F(MetricVarTest, AutoCount) { ASSERT_NO_FATAL_FAILURE(test_auto_count()); } + } // namespace dsn --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
