This is an automated email from the ASF dual-hosted git repository. wangdan pushed a commit to branch migrate-metrics-dev in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 2503d2ba41124eb67a7289b44fe4f8da095bcf47 Author: Dan Wang <[email protected]> AuthorDate: Wed Apr 19 22:57:26 2023 +0800 feat(new_metrics): migrate metrics for replica_stub (part 1) (#1455) https://github.com/apache/incubator-pegasus/issues/1454 This is the 1st part of migrating metrics of `replica_stub` to new framework. After migrating to new framework, the 3 metrics, including the total number of replicas, the number of opening/closing replicas, are still kept server-level. Another metric, the number of committed requests, is changed to replica-level. The naming of metric variable would lead to duplication with class member (such as `_opening_replicas` in `replica_stub` class). Therefore, a macro `METRIC_VAR_NAME` is introduced to manage the new naming, which is prefixed with `_metric_` to avoid duplication. Also, generated metric function names are also managed by related macros. --- src/meta/table_metrics.h | 10 +++--- src/replica/replica.cpp | 10 ------ src/replica/replica.h | 5 --- src/replica/replica_stub.cpp | 60 +++++++++++++++++++----------------- src/replica/replica_stub.h | 8 ++--- src/replica/replication_app_base.cpp | 10 ++++-- src/replica/replication_app_base.h | 4 +++ src/utils/metrics.h | 40 ++++++++++++++---------- src/utils/test/metrics_test.cpp | 14 ++++----- 9 files changed, 84 insertions(+), 77 deletions(-) diff --git a/src/meta/table_metrics.h b/src/meta/table_metrics.h index 7dc5e83a3..cec35e64b 100644 --- a/src/meta/table_metrics.h +++ b/src/meta/table_metrics.h @@ -93,7 +93,7 @@ public: METRIC_DEFINE_SET(healthy_partitions, int64_t) #define __METRIC_DEFINE_INCREMENT_BY(name) \ - void increment_##name##_by(int32_t partition_id, int64_t x) \ + void METRIC_FUNC_NAME_INCREMENT_BY(name)(int32_t partition_id, int64_t x) \ { \ CHECK_LT(partition_id, _partition_metrics.size()); \ METRIC_INCREMENT_BY(*(_partition_metrics[partition_id]), name, x); \ @@ -106,7 +106,7 @@ public: #undef __METRIC_DEFINE_INCREMENT_BY #define __METRIC_DEFINE_INCREMENT(name) \ - void increment_##name(int32_t partition_id) \ + void METRIC_FUNC_NAME_INCREMENT(name)(int32_t partition_id) \ { \ CHECK_LT(partition_id, _partition_metrics.size()); \ METRIC_INCREMENT(*(_partition_metrics[partition_id]), name); \ @@ -120,7 +120,7 @@ public: #undef __METRIC_DEFINE_INCREMENT #define __METRIC_DEFINE_SET(name, value_type) \ - void set_##name(int32_t partition_id, value_type value) \ + void METRIC_FUNC_NAME_SET(name)(int32_t partition_id, value_type value) \ { \ CHECK_LT(partition_id, _partition_metrics.size()); \ METRIC_SET(*(_partition_metrics[partition_id]), name, value); \ @@ -167,7 +167,7 @@ public: using partition_map = std::unordered_map<gpid, partition_stats>; #define __METRIC_DEFINE_INCREMENT(name) \ - void increment_##name(const gpid &id, bool balance_checker) \ + void METRIC_FUNC_NAME_INCREMENT(name)(const gpid &id, bool balance_checker) \ { \ auto &partition = _partition_map[id]; \ ++(partition.greedy_recent_balance_operations); \ @@ -210,7 +210,7 @@ public: void clear_entities(); #define __METRIC_DEFINE_INCREMENT(name) \ - void increment_##name(const gpid &id) \ + void METRIC_FUNC_NAME_INCREMENT(name)(const gpid &id) \ { \ utils::auto_read_lock l(_lock); \ \ diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index ebbdf05e5..fd8b73d3c 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -48,7 +48,6 @@ #include "mutation.h" #include "mutation_log.h" #include "perf_counter/perf_counter.h" -#include "perf_counter/perf_counter_wrapper.h" #include "perf_counter/perf_counters.h" #include "replica/prepare_list.h" #include "replica/replica_context.h" @@ -229,15 +228,6 @@ void replica::update_last_checkpoint_generate_time() _last_checkpoint_generate_time_ms + rand::next_u64(max_interval_ms / 2, max_interval_ms); } -// // -// Statistics // -// // - -void replica::update_commit_qps(int count) -{ - _stub->_counter_replicas_commit_qps->add((uint64_t)count); -} - void replica::init_state() { _inactive_is_transient = false; diff --git a/src/replica/replica.h b/src/replica/replica.h index 62845469c..91e7896dd 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -288,11 +288,6 @@ public: replica_follower *get_replica_follower() const { return _replica_follower.get(); }; - // - // Statistics - // - void update_commit_qps(int count); - // routine for get extra envs from replica const std::map<std::string, std::string> &get_replica_extra_envs() const { return _extra_envs; } const dir_node *get_dir_node() const { return _dir_node; } diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index e9bbab7a4..bf60d6a8c 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -97,6 +97,21 @@ #include "remote_cmd/remote_command.h" #include "utils/fail_point.h" +METRIC_DEFINE_gauge_int64(server, + total_replicas, + dsn::metric_unit::kReplicas, + "The total number of replicas"); + +METRIC_DEFINE_gauge_int64(server, + opening_replicas, + dsn::metric_unit::kReplicas, + "The number of opening replicas"); + +METRIC_DEFINE_gauge_int64(server, + closing_replicas, + dsn::metric_unit::kReplicas, + "The number of closing replicas"); + namespace dsn { namespace replication { DSN_DEFINE_bool(replication, @@ -202,7 +217,10 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, _learn_app_concurrent_count(0), _bulk_load_downloading_count(0), _manual_emergency_checkpointing_count(0), - _is_running(false) + _is_running(false), + METRIC_VAR_INIT_server(total_replicas), + METRIC_VAR_INIT_server(opening_replicas), + METRIC_VAR_INIT_server(closing_replicas) { #ifdef DSN_ENABLE_GPERF _is_releasing_memory = false; @@ -220,20 +238,6 @@ replica_stub::~replica_stub(void) { close(); } void replica_stub::install_perf_counters() { - _counter_replicas_count.init_app_counter( - "eon.replica_stub", "replica(Count)", COUNTER_TYPE_NUMBER, "# in replica_stub._replicas"); - _counter_replicas_opening_count.init_app_counter("eon.replica_stub", - "opening.replica(Count)", - COUNTER_TYPE_NUMBER, - "# in replica_stub._opening_replicas"); - _counter_replicas_closing_count.init_app_counter("eon.replica_stub", - "closing.replica(Count)", - COUNTER_TYPE_NUMBER, - "# in replica_stub._closing_replicas"); - _counter_replicas_commit_qps.init_app_counter("eon.replica_stub", - "replicas.commit.qps", - COUNTER_TYPE_RATE, - "server-level commit throughput"); _counter_replicas_learning_count.init_app_counter("eon.replica_stub", "replicas.learning.count", COUNTER_TYPE_NUMBER, @@ -815,7 +819,7 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f // attach rps _replicas = std::move(rps); - _counter_replicas_count->add((uint64_t)_replicas.size()); + METRIC_VAR_INCREMENT_BY(total_replicas, _replicas.size()); for (const auto &kv : _replicas) { _fs_manager.add_replica(kv.first, kv.second->dir()); } @@ -2016,10 +2020,10 @@ task_ptr replica_stub::begin_open_replica( if (rep->status() == partition_status::PS_INACTIVE && tsk->cancel(false)) { // reopen it _closing_replicas.erase(it); - _counter_replicas_closing_count->decrement(); + METRIC_VAR_DECREMENT(closing_replicas); _replicas.emplace(id, rep); - _counter_replicas_count->increment(); + METRIC_VAR_INCREMENT(total_replicas); _closed_replicas.erase(id); @@ -2045,7 +2049,7 @@ task_ptr replica_stub::begin_open_replica( std::bind(&replica_stub::open_replica, this, app, id, group_check, configuration_update)); _opening_replicas[id] = task; - _counter_replicas_opening_count->increment(); + METRIC_VAR_INCREMENT(opening_replicas); _closed_replicas.erase(id); _replicas_lock.unlock_write(); @@ -2158,7 +2162,7 @@ void replica_stub::open_replica( 0, "replica {} is not in _opening_replicas", id.to_string()); - _counter_replicas_opening_count->decrement(); + METRIC_VAR_DECREMENT(opening_replicas); return; } @@ -2168,13 +2172,13 @@ void replica_stub::open_replica( 0, "replica {} is not in _opening_replicas", id.to_string()); - _counter_replicas_opening_count->decrement(); + METRIC_VAR_DECREMENT(opening_replicas); CHECK(_replicas.find(id) == _replicas.end(), "replica {} is already in _replicas", id.to_string()); _replicas.insert(replicas::value_type(rep->get_gpid(), rep)); - _counter_replicas_count->increment(); + METRIC_VAR_INCREMENT(total_replicas); _closed_replicas.erase(id); } @@ -2341,7 +2345,7 @@ task_ptr replica_stub::begin_close_replica(replica_ptr r) return nullptr; } - _counter_replicas_count->decrement(); + METRIC_VAR_DECREMENT(total_replicas); int delay_ms = 0; if (r->status() == partition_status::PS_INACTIVE) { @@ -2360,7 +2364,7 @@ task_ptr replica_stub::begin_close_replica(replica_ptr r) 0, std::chrono::milliseconds(delay_ms)); _closing_replicas[id] = std::make_tuple(task, r, std::move(a_info), std::move(r_info)); - _counter_replicas_closing_count->increment(); + METRIC_VAR_INCREMENT(closing_replicas); return task; } @@ -2380,7 +2384,7 @@ void replica_stub::close_replica(replica_ptr r) _closed_replicas.emplace( id, std::make_pair(std::get<2>(find->second), std::get<3>(find->second))); _closing_replicas.erase(find); - _counter_replicas_closing_count->decrement(); + METRIC_VAR_DECREMENT(closing_replicas); } _fs_manager.remove_replica(id); @@ -2844,7 +2848,7 @@ void replica_stub::close() task->cancel(true); - _counter_replicas_opening_count->decrement(); + METRIC_VAR_DECREMENT(opening_replicas); _replicas_lock.lock_write(); _opening_replicas.erase(_opening_replicas.begin()); } @@ -2852,7 +2856,7 @@ void replica_stub::close() while (!_replicas.empty()) { _replicas.begin()->second->close(); - _counter_replicas_count->decrement(); + METRIC_VAR_DECREMENT(total_replicas); _replicas.erase(_replicas.begin()); } } @@ -2976,7 +2980,7 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid, if (rep != nullptr) { auto pr = _replicas.insert(replicas::value_type(child_pid, rep)); CHECK(pr.second, "child replica {} has been existed", rep->name()); - _counter_replicas_count->increment(); + METRIC_VAR_INCREMENT(total_replicas); _closed_replicas.erase(child_pid); } return rep; diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index b6f54c485..7e4d75c60 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -71,6 +71,7 @@ #include "utils/autoref_ptr.h" #include "utils/error_code.h" #include "utils/flags.h" +#include "utils/metrics.h" #include "utils/zlocks.h" namespace dsn { @@ -494,10 +495,9 @@ private: #endif // performance counters - perf_counter_wrapper _counter_replicas_count; - perf_counter_wrapper _counter_replicas_opening_count; - perf_counter_wrapper _counter_replicas_closing_count; - perf_counter_wrapper _counter_replicas_commit_qps; + METRIC_VAR_DECLARE_gauge_int64(total_replicas); + METRIC_VAR_DECLARE_gauge_int64(opening_replicas); + METRIC_VAR_DECLARE_gauge_int64(closing_replicas); perf_counter_wrapper _counter_replicas_learning_count; perf_counter_wrapper _counter_replicas_learning_max_duration_time_ms; diff --git a/src/replica/replication_app_base.cpp b/src/replica/replication_app_base.cpp index db92bdf17..a88559b35 100644 --- a/src/replica/replication_app_base.cpp +++ b/src/replica/replication_app_base.cpp @@ -66,6 +66,11 @@ #include "utils/threadpool_code.h" #include "utils/utils.h" +METRIC_DEFINE_counter(replica, + committed_requests, + dsn::metric_unit::kRequests, + "The number of committed requests"); + namespace dsn { class disk_file; @@ -254,7 +259,8 @@ replication_app_base *replication_app_base::new_storage_instance(const std::stri return utils::factory_store<replication_app_base>::create(name.c_str(), PROVIDER_TYPE_MAIN, r); } -replication_app_base::replication_app_base(replica *replica) : replica_base(replica) +replication_app_base::replication_app_base(replica *replica) + : replica_base(replica), METRIC_VAR_INIT_replica(committed_requests) { _dir_data = utils::filesystem::path_combine(replica->dir(), "data"); _dir_learn = utils::filesystem::path_combine(replica->dir(), "learn"); @@ -495,7 +501,7 @@ error_code replication_app_base::apply_mutation(const mutation *mu) "mutation {} committed on {}, batched_count = {}", mu->name(), str, batched_count); } - _replica->update_commit_qps(batched_count); + METRIC_VAR_INCREMENT_BY(committed_requests, batched_count); return ERR_OK; } diff --git a/src/replica/replication_app_base.h b/src/replica/replication_app_base.h index e48ef8a5d..e7ae8eff4 100644 --- a/src/replica/replication_app_base.h +++ b/src/replica/replication_app_base.h @@ -39,6 +39,7 @@ #include "replica/replica_base.h" #include "replica_admin_types.h" #include "utils/error_code.h" +#include "utils/metrics.h" #include "utils/ports.h" namespace dsn { @@ -312,6 +313,9 @@ protected: replica_init_info _info; explicit replication_app_base(replication::replica *replica); + +private: + METRIC_VAR_DECLARE_counter(committed_requests); }; } // namespace replication diff --git a/src/utils/metrics.h b/src/utils/metrics.h index 8b9d396a9..f9ab9c2dc 100644 --- a/src/utils/metrics.h +++ b/src/utils/metrics.h @@ -153,7 +153,8 @@ class error_code; // // Since a type tends to be a class template where there might be commas, use variadic arguments // instead of a single fixed argument to represent a type. -#define METRIC_VAR_DECLARE(name, ...) __VA_ARGS__ _##name +#define METRIC_VAR_NAME(name) _metric_##name +#define METRIC_VAR_DECLARE(name, ...) __VA_ARGS__ METRIC_VAR_NAME(name) #define METRIC_VAR_DECLARE_gauge_int64(name) METRIC_VAR_DECLARE(name, dsn::gauge_ptr<int64_t>) #define METRIC_VAR_DECLARE_counter(name) \ METRIC_VAR_DECLARE(name, dsn::counter_ptr<dsn::striped_long_adder, false>) @@ -162,7 +163,7 @@ class error_code; // Initialize a metric variable in user class. #define METRIC_VAR_INIT(name, entity, ...) \ - _##name(METRIC_##name.instantiate(entity##_metric_entity(), ##__VA_ARGS__)) + METRIC_VAR_NAME(name)(METRIC_##name.instantiate(entity##_metric_entity(), ##__VA_ARGS__)) #define METRIC_VAR_INIT_replica(name, ...) METRIC_VAR_INIT(name, replica, ##__VA_ARGS__) #define METRIC_VAR_INIT_server(name, ...) METRIC_VAR_INIT(name, server, ##__VA_ARGS__) #define METRIC_VAR_INIT_disk(name, ...) METRIC_VAR_INIT(name, disk, ##__VA_ARGS__) @@ -175,15 +176,15 @@ class error_code; do { \ const auto v = (x); \ if (v != 0) { \ - _##name->increment_by(v); \ + METRIC_VAR_NAME(name)->increment_by(v); \ } \ } while (0) // Perform increment() operations on gauges and counters. -#define METRIC_VAR_INCREMENT(name) _##name->increment() +#define METRIC_VAR_INCREMENT(name) METRIC_VAR_NAME(name)->increment() // Perform decrement() operations on gauges. -#define METRIC_VAR_DECREMENT(name) _##name->decrement() +#define METRIC_VAR_DECREMENT(name) METRIC_VAR_NAME(name)->decrement() // Perform set() operations on gauges and percentiles. // @@ -191,38 +192,44 @@ class error_code; // * set(val): set a single value for a metric, such as gauge, percentile; // * set(n, val): set multiple repeated values (the number of duplicates is n) for a metric, // such as percentile. -#define METRIC_VAR_SET(name, ...) _##name->set(__VA_ARGS__) +#define METRIC_VAR_SET(name, ...) METRIC_VAR_NAME(name)->set(__VA_ARGS__) // Read the current measurement of gauges and counters. -#define METRIC_VAR_VALUE(name) _##name->value() +#define METRIC_VAR_VALUE(name) METRIC_VAR_NAME(name)->value() // Convenient macro that is used to compute latency automatically, which is dedicated to percentile. #define METRIC_VAR_AUTO_LATENCY(name, ...) \ - dsn::auto_latency __##name##_auto_latency(_##name, ##__VA_ARGS__) + dsn::auto_latency __##name##_auto_latency(METRIC_VAR_NAME(name), ##__VA_ARGS__) #define METRIC_VAR_AUTO_LATENCY_DURATION_NS(name) __##name##_auto_latency.duration_ns() // Convenient macro that is used to increment/decrement gauge automatically in current scope. #define METRIC_VAR_AUTO_COUNT(name, ...) \ - dsn::auto_count __##name##_auto_count(_##name, ##__VA_ARGS__) + dsn::auto_count __##name##_auto_count(METRIC_VAR_NAME(name), ##__VA_ARGS__) + +#define METRIC_FUNC_NAME_INCREMENT_BY(name) increment_##name##_by #define METRIC_DEFINE_INCREMENT_BY(name) \ - void increment_##name##_by(int64_t x) { METRIC_VAR_INCREMENT_BY(name, x); } + void METRIC_FUNC_NAME_INCREMENT_BY(name)(int64_t x) { METRIC_VAR_INCREMENT_BY(name, x); } // To be adaptive to self-defined `increment_by` methods, arguments are declared as variadic. -#define METRIC_INCREMENT_BY(obj, name, ...) (obj).increment_##name##_by(__VA_ARGS__) +#define METRIC_INCREMENT_BY(obj, name, ...) (obj).METRIC_FUNC_NAME_INCREMENT_BY(name)(__VA_ARGS__) + +#define METRIC_FUNC_NAME_INCREMENT(name) increment_##name #define METRIC_DEFINE_INCREMENT(name) \ - void increment_##name() { METRIC_VAR_INCREMENT(name); } + void METRIC_FUNC_NAME_INCREMENT(name)() { METRIC_VAR_INCREMENT(name); } // To be adaptive to self-defined `increment` methods, arguments are declared as variadic. -#define METRIC_INCREMENT(obj, name, ...) (obj).increment_##name(__VA_ARGS__) +#define METRIC_INCREMENT(obj, name, ...) (obj).METRIC_FUNC_NAME_INCREMENT(name)(__VA_ARGS__) + +#define METRIC_FUNC_NAME_SET(name) set_##name #define METRIC_DEFINE_SET(name, value_type) \ - void set_##name(value_type value) { METRIC_VAR_SET(name, value); } + void METRIC_FUNC_NAME_SET(name)(value_type value) { METRIC_VAR_SET(name, value); } // To be adaptive to self-defined `set` methods, arguments are declared as variadic. -#define METRIC_SET(obj, name, ...) (obj).set_##name(__VA_ARGS__) +#define METRIC_SET(obj, name, ...) (obj).METRIC_FUNC_NAME_SET(name)(__VA_ARGS__) namespace dsn { class metric; // IWYU pragma: keep @@ -646,6 +653,8 @@ enum class metric_unit : size_t kCapacityUnits, kPercent, kPartitions, + kReplicas, + kServers, kRequests, kSeeks, kPointLookups, @@ -660,7 +669,6 @@ enum class metric_unit : size_t kOperations, kTasks, kDisconnections, - kServers, kInvalidUnit, }; diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp index 5188ff611..fc5d80ef0 100644 --- a/src/utils/test/metrics_test.cpp +++ b/src/utils/test/metrics_test.cpp @@ -3083,12 +3083,12 @@ protected: void SetUp() override { - _test_replica_gauge_int64->set(0); - _test_replica_counter->reset(); - _test_replica_percentile_int64_ns->reset_tail_for_test(); - _test_replica_percentile_int64_us->reset_tail_for_test(); - _test_replica_percentile_int64_ms->reset_tail_for_test(); - _test_replica_percentile_int64_s->reset_tail_for_test(); + METRIC_VAR_SET(test_replica_gauge_int64, 0); + METRIC_VAR_NAME(test_replica_counter)->reset(); + METRIC_VAR_NAME(test_replica_percentile_int64_ns)->reset_tail_for_test(); + METRIC_VAR_NAME(test_replica_percentile_int64_us)->reset_tail_for_test(); + METRIC_VAR_NAME(test_replica_percentile_int64_ms)->reset_tail_for_test(); + METRIC_VAR_NAME(test_replica_percentile_int64_s)->reset_tail_for_test(); } const metric_entity_ptr &my_replica_metric_entity() const { return _my_replica_metric_entity; } @@ -3120,7 +3120,7 @@ MetricVarTest::MetricVarTest() { } -#define METRIC_VAR_SAMPLES(name) _##name->samples_for_test() +#define METRIC_VAR_SAMPLES(name) METRIC_VAR_NAME(name)->samples_for_test() void MetricVarTest::test_set_percentile(const std::vector<int64_t> &expected_samples) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
