This is an automated email from the ASF dual-hosted git repository. wangdan pushed a commit to branch migrate-metrics-dev in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 62fb0f8cc7ae871a434e7b1a10822e1be22a0af8 Author: Dan Wang <[email protected]> AuthorDate: Fri May 5 10:22:39 2023 +0800 feat(new_metrics): migrate metrics for replica_stub (part 4) (#1463) https://github.com/apache/incubator-pegasus/issues/1454 This is the 4th part of migrating metrics of replica_stub to new framework. During this migration, there are 3 metrics which are changed from server-level to replica-level, including the number of confirmed/pending mutations for dup and the number of write requests whose size exceeds threshold. Another 5 metrics are still kept server-level, including the memory bytes that are released by tcmalloc recently, the number of failed read/write requests and the number of busy read/write requests. --- src/replica/duplication/duplication_sync_timer.cpp | 7 +- src/replica/duplication/replica_duplicator.cpp | 15 ++-- src/replica/duplication/replica_duplicator.h | 6 ++ .../duplication/replica_duplicator_manager.cpp | 16 +++- .../duplication/replica_duplicator_manager.h | 13 ++- src/replica/replica.cpp | 13 ++- src/replica/replica.h | 5 ++ src/replica/replica_2pc.cpp | 4 +- src/replica/replica_stub.cpp | 100 ++++++++++----------- src/replica/replica_stub.h | 24 ++--- src/replica/test/replica_test.cpp | 10 +-- src/utils/metrics.h | 9 ++ 12 files changed, 125 insertions(+), 97 deletions(-) diff --git a/src/replica/duplication/duplication_sync_timer.cpp b/src/replica/duplication/duplication_sync_timer.cpp index d04d014fe..da1cd933f 100644 --- a/src/replica/duplication/duplication_sync_timer.cpp +++ b/src/replica/duplication/duplication_sync_timer.cpp @@ -26,8 +26,6 @@ #include "common/replication.codes.h" #include "duplication_sync_timer.h" #include "metadata_types.h" -#include "perf_counter/perf_counter.h" -#include "perf_counter/perf_counter_wrapper.h" #include "replica/replica.h" #include "replica/replica_stub.h" #include "replica_duplicator_manager.h" @@ -37,6 +35,7 @@ #include "utils/autoref_ptr.h" #include "utils/error_code.h" #include "utils/fmt_logging.h" +#include "utils/metrics.h" #include "utils/threadpool_code.h" namespace dsn { @@ -65,15 +64,13 @@ void duplication_sync_timer::run() req->node = _stub->primary_address(); // collects confirm points from all primaries on this server - uint64_t pending_muts_cnt = 0; for (const replica_ptr &r : get_all_primaries()) { auto confirmed = r->get_duplication_manager()->get_duplication_confirms_to_update(); if (!confirmed.empty()) { req->confirm_list[r->get_gpid()] = std::move(confirmed); } - pending_muts_cnt += r->get_duplication_manager()->get_pending_mutations_count(); + METRIC_SET(*r, dup_pending_mutations); } - _stub->_counter_dup_pending_mutations_count->set(pending_muts_cnt); duplication_sync_rpc rpc(std::move(req), RPC_CM_DUPLICATION_SYNC, 3_s); rpc_address meta_server_address(_stub->get_meta_server_address()); diff --git a/src/replica/duplication/replica_duplicator.cpp b/src/replica/duplication/replica_duplicator.cpp index a19947fe2..e8018db34 100644 --- a/src/replica/duplication/replica_duplicator.cpp +++ b/src/replica/duplication/replica_duplicator.cpp @@ -32,15 +32,18 @@ #include "dsn.layer2_types.h" #include "duplication_pipeline.h" #include "load_from_private_log.h" -#include "perf_counter/perf_counter.h" -#include "perf_counter/perf_counter_wrapper.h" #include "replica/mutation_log.h" #include "replica/replica.h" -#include "replica/replica_stub.h" #include "runtime/task/async_calls.h" #include "utils/autoref_ptr.h" #include "utils/error_code.h" #include "utils/fmt_logging.h" +#include "utils/string_view.h" + +METRIC_DEFINE_counter(replica, + dup_confirmed_mutations, + dsn::metric_unit::kMutations, + "The number of confirmed mutations for dup"); namespace dsn { namespace replication { @@ -50,7 +53,8 @@ replica_duplicator::replica_duplicator(const duplication_entry &ent, replica *r) _id(ent.dupid), _remote_cluster_name(ent.remote), _replica(r), - _stub(r->get_replica_stub()) + _stub(r->get_replica_stub()), + METRIC_VAR_INIT_replica(dup_confirmed_mutations) { _status = ent.status; @@ -222,7 +226,8 @@ error_s replica_duplicator::update_progress(const duplication_progress &p) } if (_progress.confirmed_decree > last_confirmed_decree) { // has confirmed_decree updated. - _stub->_counter_dup_confirmed_rate->add(_progress.confirmed_decree - last_confirmed_decree); + METRIC_VAR_INCREMENT_BY(dup_confirmed_mutations, + _progress.confirmed_decree - last_confirmed_decree); } return error_s::ok(); diff --git a/src/replica/duplication/replica_duplicator.h b/src/replica/duplication/replica_duplicator.h index 1b2526d29..fa185b8c7 100644 --- a/src/replica/duplication/replica_duplicator.h +++ b/src/replica/duplication/replica_duplicator.h @@ -29,6 +29,7 @@ #include "runtime/pipeline.h" #include "runtime/task/task_tracker.h" #include "utils/errors.h" +#include "utils/metrics.h" #include "utils/zlocks.h" namespace dsn { @@ -169,6 +170,11 @@ private: std::unique_ptr<load_mutation> _load; std::unique_ptr<ship_mutation> _ship; std::unique_ptr<load_from_private_log> _load_private; + + // <- Duplication Metrics -> + // TODO(wutao1): calculate the counters independently for each remote cluster + // if we need to duplicate to multiple clusters someday. + METRIC_VAR_DECLARE_counter(dup_confirmed_mutations); }; typedef std::unique_ptr<replica_duplicator> replica_duplicator_u_ptr; diff --git a/src/replica/duplication/replica_duplicator_manager.cpp b/src/replica/duplication/replica_duplicator_manager.cpp index 56ec34a39..ef53a9b43 100644 --- a/src/replica/duplication/replica_duplicator_manager.cpp +++ b/src/replica/duplication/replica_duplicator_manager.cpp @@ -23,12 +23,24 @@ #include "common/gpid.h" #include "replica/duplication/replica_duplicator.h" #include "replica_duplicator_manager.h" +#include "utils/autoref_ptr.h" #include "utils/errors.h" #include "utils/fmt_logging.h" +#include "utils/string_view.h" + +METRIC_DEFINE_gauge_int64(replica, + dup_pending_mutations, + dsn::metric_unit::kMutations, + "The number of pending mutations for dup"); namespace dsn { namespace replication { +replica_duplicator_manager::replica_duplicator_manager(replica *r) + : replica_base(r), _replica(r), METRIC_VAR_INIT_replica(dup_pending_mutations) +{ +} + std::vector<duplication_confirm_entry> replica_duplicator_manager::get_duplication_confirms_to_update() const { @@ -149,13 +161,13 @@ void replica_duplicator_manager::update_confirmed_decree_if_secondary(decree con } } -int64_t replica_duplicator_manager::get_pending_mutations_count() const +void replica_duplicator_manager::METRIC_FUNC_NAME_SET(dup_pending_mutations)() { int64_t total = 0; for (const auto &dup : _duplications) { total += dup.second->get_pending_mutations_count(); } - return total; + METRIC_VAR_SET(dup_pending_mutations, total); } std::vector<replica_duplicator_manager::dup_state> diff --git a/src/replica/duplication/replica_duplicator_manager.h b/src/replica/duplication/replica_duplicator_manager.h index 20bdcd0ac..f2b1593e4 100644 --- a/src/replica/duplication/replica_duplicator_manager.h +++ b/src/replica/duplication/replica_duplicator_manager.h @@ -31,6 +31,7 @@ #include "replica/replica_base.h" #include "replica_duplicator.h" #include "utils/fmt_logging.h" +#include "utils/metrics.h" #include "utils/zlocks.h" namespace dsn { @@ -43,7 +44,7 @@ namespace replication { class replica_duplicator_manager : public replica_base { public: - explicit replica_duplicator_manager(replica *r) : replica_base(r), _replica(r) {} + explicit replica_duplicator_manager(replica *r); // Immediately stop duplication in the following conditions: // - replica is not primary on replica-server perspective (status != PRIMARY) @@ -77,9 +78,8 @@ public: /// \see replica_check.cpp void update_confirmed_decree_if_secondary(decree confirmed); - /// Sums up the number of pending mutations for all duplications - /// on this replica, for metric "dup.pending_mutations_count". - int64_t get_pending_mutations_count() const; + /// Sums up the number of pending mutations for all duplications on this replica. + void METRIC_FUNC_NAME_SET(dup_pending_mutations)(); struct dup_state { @@ -121,6 +121,11 @@ private: // avoid thread conflict between replica::on_checkpoint_timer and // duplication_sync_timer. mutable zlock _lock; + + // <- Duplication Metrics -> + // TODO(wutao1): calculate the counters independently for each remote cluster + // if we need to duplicate to multiple clusters someday. + METRIC_VAR_DECLARE_gauge_int64(dup_pending_mutations); }; } // namespace replication diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 91835c445..9774b913b 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -204,6 +204,11 @@ METRIC_DEFINE_counter(replica, dsn::metric_unit::kCheckpoints, "The number of triggered emergency checkpoints"); +METRIC_DEFINE_counter(replica, + write_size_exceed_threshold_requests, + dsn::metric_unit::kRequests, + "The number of write requests whose size exceeds threshold"); + namespace dsn { namespace replication { @@ -282,7 +287,8 @@ replica::replica(replica_stub *stub, METRIC_VAR_INIT_replica(learn_successful_count), METRIC_VAR_INIT_replica(prepare_failed_requests), METRIC_VAR_INIT_replica(group_check_failed_requests), - METRIC_VAR_INIT_replica(emergency_checkpoints) + METRIC_VAR_INIT_replica(emergency_checkpoints), + METRIC_VAR_INIT_replica(write_size_exceed_threshold_requests) { CHECK(!_app_info.app_type.empty(), ""); CHECK_NOTNULL(stub, ""); @@ -714,5 +720,10 @@ bool replica::access_controller_allowed(message_ex *msg, const ranger::access_ty int64_t replica::get_backup_request_count() const { return METRIC_VAR_VALUE(backup_requests); } +void replica::METRIC_FUNC_NAME_SET(dup_pending_mutations)() +{ + METRIC_SET(*_duplication_mgr, dup_pending_mutations); +} + } // namespace replication } // namespace dsn diff --git a/src/replica/replica.h b/src/replica/replica.h index 05695dae4..56e993cd8 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -292,6 +292,9 @@ public: const std::map<std::string, std::string> &get_replica_extra_envs() const { return _extra_envs; } const dir_node *get_dir_node() const { return _dir_node; } + METRIC_DEFINE_VALUE(write_size_exceed_threshold_requests, int64_t) + void METRIC_FUNC_NAME_SET(dup_pending_mutations)(); + static const std::string kAppInfo; protected: @@ -686,6 +689,8 @@ private: METRIC_VAR_DECLARE_counter(emergency_checkpoints); + METRIC_VAR_DECLARE_counter(write_size_exceed_threshold_requests); + dsn::task_tracker _tracker; // the thread access checker dsn::thread_access_checker _checker; diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 944a6a3f5..ec9c16bb1 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -49,8 +49,6 @@ #include "metadata_types.h" #include "mutation.h" #include "mutation_log.h" -#include "perf_counter/perf_counter.h" -#include "perf_counter/perf_counter_wrapper.h" #include "replica.h" #include "replica/prepare_list.h" #include "replica/replica_context.h" @@ -150,7 +148,7 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) request_info, request->body_size(), FLAGS_max_allowed_write_size); - _stub->_counter_recent_write_size_exceed_threshold_count->increment(); + METRIC_VAR_INCREMENT(write_size_exceed_threshold_requests); response_client_write(request, ERR_INVALID_DATA); return; } diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 95ba2502b..dd10102f6 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -163,6 +163,33 @@ METRIC_DEFINE_gauge_int64(server, dsn::metric_unit::kDirs, "The number of origin replica dirs (*.ori) for disk migration"); +#ifdef DSN_ENABLE_GPERF +METRIC_DEFINE_gauge_int64(server, + tcmalloc_released_bytes, + dsn::metric_unit::kBytes, + "The memory bytes that are released by tcmalloc recently"); +#endif + +METRIC_DEFINE_counter(server, + read_failed_requests, + dsn::metric_unit::kRequests, + "The number of failed read requests"); + +METRIC_DEFINE_counter(server, + write_failed_requests, + dsn::metric_unit::kRequests, + "The number of failed write requests"); + +METRIC_DEFINE_counter(server, + read_busy_requests, + dsn::metric_unit::kRequests, + "The number of busy read requests"); + +METRIC_DEFINE_counter(server, + write_busy_requests, + dsn::metric_unit::kRequests, + "The number of busy write requests"); + namespace dsn { namespace replication { DSN_DEFINE_bool(replication, @@ -281,7 +308,14 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, METRIC_VAR_INIT_server(replica_error_dirs), METRIC_VAR_INIT_server(replica_garbage_dirs), METRIC_VAR_INIT_server(replica_tmp_dirs), - METRIC_VAR_INIT_server(replica_origin_dirs) + METRIC_VAR_INIT_server(replica_origin_dirs), +#ifdef DSN_ENABLE_GPERF + METRIC_VAR_INIT_server(tcmalloc_released_bytes), +#endif + METRIC_VAR_INIT_server(read_failed_requests), + METRIC_VAR_INIT_server(write_failed_requests), + METRIC_VAR_INIT_server(read_busy_requests), + METRIC_VAR_INIT_server(write_busy_requests) { #ifdef DSN_ENABLE_GPERF _is_releasing_memory = false; @@ -299,18 +333,6 @@ replica_stub::~replica_stub(void) { close(); } void replica_stub::install_perf_counters() { - // <- Duplication Metrics -> - - _counter_dup_confirmed_rate.init_app_counter("eon.replica_stub", - "dup.confirmed_rate", - COUNTER_TYPE_RATE, - "increasing rate of confirmed mutations"); - _counter_dup_pending_mutations_count.init_app_counter( - "eon.replica_stub", - "dup.pending_mutations_count", - COUNTER_TYPE_VOLATILE_NUMBER, - "number of mutations pending for duplication"); - // <- Cold Backup Metrics -> _counter_cold_backup_running_count.init_app_counter("eon.replica_stub", @@ -368,29 +390,6 @@ void replica_stub::install_perf_counters() COUNTER_TYPE_NUMBER, "current cold backup max upload file size"); - _counter_recent_read_fail_count.init_app_counter("eon.replica_stub", - "recent.read.fail.count", - COUNTER_TYPE_VOLATILE_NUMBER, - "read fail count in the recent period"); - _counter_recent_write_fail_count.init_app_counter("eon.replica_stub", - "recent.write.fail.count", - COUNTER_TYPE_VOLATILE_NUMBER, - "write fail count in the recent period"); - _counter_recent_read_busy_count.init_app_counter("eon.replica_stub", - "recent.read.busy.count", - COUNTER_TYPE_VOLATILE_NUMBER, - "read busy count in the recent period"); - _counter_recent_write_busy_count.init_app_counter("eon.replica_stub", - "recent.write.busy.count", - COUNTER_TYPE_VOLATILE_NUMBER, - "write busy count in the recent period"); - - _counter_recent_write_size_exceed_threshold_count.init_app_counter( - "eon.replica_stub", - "recent_write_size_exceed_threshold_count", - COUNTER_TYPE_VOLATILE_NUMBER, - "write size exceed threshold count in the recent period"); - // <- Bulk Load Metrics -> _counter_bulk_load_running_count.init_app_counter("eon.replica_stub", @@ -437,13 +436,6 @@ void replica_stub::install_perf_counters() COUNTER_TYPE_NUMBER, "bulk load max duration time(ms)"); -#ifdef DSN_ENABLE_GPERF - _counter_tcmalloc_release_memory_size.init_app_counter("eon.replica_stub", - "tcmalloc.release.memory.size", - COUNTER_TYPE_NUMBER, - "current tcmalloc release memory size"); -#endif - // <- Partition split Metrics -> _counter_replicas_splitting_count.init_app_counter("eon.replica_stub", @@ -1620,15 +1612,17 @@ void replica_stub::response_client(gpid id, error_code error) { if (error == ERR_BUSY) { - if (is_read) - _counter_recent_read_busy_count->increment(); - else - _counter_recent_write_busy_count->increment(); + if (is_read) { + METRIC_VAR_INCREMENT(read_busy_requests); + } else { + METRIC_VAR_INCREMENT(write_busy_requests); + } } else if (error != ERR_OK) { - if (is_read) - _counter_recent_read_fail_count->increment(); - else - _counter_recent_write_fail_count->increment(); + if (is_read) { + METRIC_VAR_INCREMENT(read_failed_requests); + } else { + METRIC_VAR_INCREMENT(write_failed_requests); + } LOG_ERROR("{}@{}: {} fail: client = {}, code = {}, timeout = {}, status = {}, error = {}", id, _primary_address_str, @@ -2806,7 +2800,7 @@ uint64_t replica_stub::gc_tcmalloc_memory(bool release_all) auto tcmalloc_released_bytes = 0; if (!_release_tcmalloc_memory) { _is_releasing_memory.store(false); - _counter_tcmalloc_release_memory_size->set(tcmalloc_released_bytes); + METRIC_VAR_SET(tcmalloc_released_bytes, tcmalloc_released_bytes); return tcmalloc_released_bytes; } @@ -2837,7 +2831,7 @@ uint64_t replica_stub::gc_tcmalloc_memory(bool release_all) release_bytes -= 1024 * 1024; } } - _counter_tcmalloc_release_memory_size->set(tcmalloc_released_bytes); + METRIC_VAR_SET(tcmalloc_released_bytes, tcmalloc_released_bytes); _is_releasing_memory.store(false); return tcmalloc_released_bytes; } diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index b73287f86..4cc961e38 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -511,11 +511,14 @@ private: METRIC_VAR_DECLARE_gauge_int64(replica_tmp_dirs); METRIC_VAR_DECLARE_gauge_int64(replica_origin_dirs); - // <- Duplication Metrics -> - // TODO(wutao1): calculate the counters independently for each remote cluster - // if we need to duplicate to multiple clusters someday. - perf_counter_wrapper _counter_dup_confirmed_rate; - perf_counter_wrapper _counter_dup_pending_mutations_count; +#ifdef DSN_ENABLE_GPERF + METRIC_VAR_DECLARE_gauge_int64(tcmalloc_released_bytes); +#endif + + METRIC_VAR_DECLARE_counter(read_failed_requests); + METRIC_VAR_DECLARE_counter(write_failed_requests); + METRIC_VAR_DECLARE_counter(read_busy_requests); + METRIC_VAR_DECLARE_counter(write_busy_requests); perf_counter_wrapper _counter_cold_backup_running_count; perf_counter_wrapper _counter_cold_backup_recent_start_count; @@ -529,17 +532,6 @@ private: perf_counter_wrapper _counter_cold_backup_max_duration_time_ms; perf_counter_wrapper _counter_cold_backup_max_upload_file_size; - perf_counter_wrapper _counter_recent_read_fail_count; - perf_counter_wrapper _counter_recent_write_fail_count; - perf_counter_wrapper _counter_recent_read_busy_count; - perf_counter_wrapper _counter_recent_write_busy_count; - - perf_counter_wrapper _counter_recent_write_size_exceed_threshold_count; - -#ifdef DSN_ENABLE_GPERF - perf_counter_wrapper _counter_tcmalloc_release_memory_size; -#endif - // <- Bulk load Metrics -> perf_counter_wrapper _counter_bulk_load_running_count; perf_counter_wrapper _counter_bulk_load_downloading_count; diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp index 5c21c2153..89fd4dcaa 100644 --- a/src/replica/test/replica_test.cpp +++ b/src/replica/test/replica_test.cpp @@ -45,8 +45,6 @@ #include "dsn.layer2_types.h" #include "http/http_server.h" #include "metadata_types.h" -#include "perf_counter/perf_counter.h" -#include "perf_counter/perf_counter_wrapper.h" #include "replica/disk_cleaner.h" #include "replica/replica.h" #include "replica/replica_http_service.h" @@ -67,6 +65,7 @@ #include "utils/filesystem.h" #include "utils/flags.h" #include "utils/fmt_logging.h" +#include "utils/metrics.h" #include "utils/string_conv.h" #include "utils/test_macros.h" @@ -100,11 +99,6 @@ public: FLAGS_cold_backup_root = "test_cluster"; } - int get_write_size_exceed_threshold_count() - { - return stub->_counter_recent_write_size_exceed_threshold_count->get_value(); - } - int64_t get_backup_request_count() const { return _mock_replica->get_backup_request_count(); } bool get_validate_partition_hash() const { return _mock_replica->_validate_partition_hash; } @@ -283,7 +277,7 @@ TEST_F(replica_test, write_size_limited) stub->on_client_write(_pid, write_request); } - ASSERT_EQ(get_write_size_exceed_threshold_count(), count); + ASSERT_EQ(count, METRIC_VALUE(*_mock_replica, write_size_exceed_threshold_requests)); } TEST_F(replica_test, backup_request_count) diff --git a/src/utils/metrics.h b/src/utils/metrics.h index ec2fb6977..e90d5d9af 100644 --- a/src/utils/metrics.h +++ b/src/utils/metrics.h @@ -231,6 +231,14 @@ class error_code; // To be adaptive to self-defined `set` methods, arguments are declared as variadic. #define METRIC_SET(obj, name, ...) (obj).METRIC_FUNC_NAME_SET(name)(__VA_ARGS__) +#define METRIC_FUNC_NAME_VALUE(name) get_##name + +#define METRIC_DEFINE_VALUE(name, value_type) \ + value_type METRIC_FUNC_NAME_VALUE(name)() { return METRIC_VAR_VALUE(name); } + +// To be adaptive to self-defined `value` methods, arguments are declared as variadic. +#define METRIC_VALUE(obj, name, ...) (obj).METRIC_FUNC_NAME_VALUE(name)(__VA_ARGS__) + namespace dsn { class metric; // IWYU pragma: keep class metric_entity_prototype; // IWYU pragma: keep @@ -667,6 +675,7 @@ enum class metric_unit : size_t kCheckpoints, kFlushes, kCompactions, + kMutations, kWrites, kChanges, kOperations, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
