This is an automated email from the ASF dual-hosted git repository. wangdan pushed a commit to branch migrate-metrics-dev in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 227263a9c72b4a631de2ba0c39df6e68aea0ebad Author: Dan Wang <[email protected]> AuthorDate: Tue Mar 14 17:18:35 2023 +0800 feat(new_metrics): migrate replica-level metrics for capacity_unit_calculator (#1387) https://github.com/apache/incubator-pegasus/issues/1334 Migrate the metrics in capacity_unit_calculator to new framework, including read/write capacity units and the number of bytes consumed by get, multi_get, batch_get, scan, put, multi_put, check_and_set, check_and_mutate and backup requests. --- src/server/capacity_unit_calculator.cpp | 142 ++++++++++++++++++-------------- src/server/capacity_unit_calculator.h | 26 +++--- src/server/pegasus_write_service.cpp | 47 +++++------ src/utils/metrics.h | 6 +- 4 files changed, 118 insertions(+), 103 deletions(-) diff --git a/src/server/capacity_unit_calculator.cpp b/src/server/capacity_unit_calculator.cpp index 157577b98..7a62b3d56 100644 --- a/src/server/capacity_unit_calculator.cpp +++ b/src/server/capacity_unit_calculator.cpp @@ -35,6 +35,61 @@ #include "utils/fmt_logging.h" #include "utils/token_bucket_throttling_controller.h" +METRIC_DEFINE_counter(replica, + read_capacity_units, + dsn::metric_unit::kCapacityUnits, + "The number of capacity units for read requests"); + +METRIC_DEFINE_counter(replica, + write_capacity_units, + dsn::metric_unit::kCapacityUnits, + "The number of capacity units for write requests"); + +METRIC_DEFINE_counter(replica, + get_bytes, + dsn::metric_unit::kBytes, + "The number of bytes for GET requests"); + +METRIC_DEFINE_counter(replica, + multi_get_bytes, + dsn::metric_unit::kBytes, + "The number of bytes for MULTI_GET requests"); + +METRIC_DEFINE_counter(replica, + batch_get_bytes, + dsn::metric_unit::kBytes, + "The number of bytes for BATCH_GET requests"); + +METRIC_DEFINE_counter(replica, + scan_bytes, + dsn::metric_unit::kBytes, + "The number of bytes for SCAN requests"); + +METRIC_DEFINE_counter(replica, + put_bytes, + dsn::metric_unit::kBytes, + "The number of bytes for PUT requests"); + +METRIC_DEFINE_counter(replica, + multi_put_bytes, + dsn::metric_unit::kBytes, + "The number of bytes for MULTI_PUT requests"); + +METRIC_DEFINE_counter(replica, + check_and_set_bytes, + dsn::metric_unit::kBytes, + "The number of bytes for CHECK_AND_SET requests"); + +METRIC_DEFINE_counter(replica, + check_and_mutate_bytes, + dsn::metric_unit::kBytes, + "The number of bytes for CHECK_AND_MUTATE requests"); + +METRIC_DEFINE_counter(replica, + backup_request_bytes, + dsn::metric_unit::kBytes, + "The number of bytes for backup requests"); + namespace pegasus { namespace server { @@ -58,6 +113,17 @@ capacity_unit_calculator::capacity_unit_calculator( std::shared_ptr<hotkey_collector> write_hotkey_collector, std::shared_ptr<throttling_controller> read_size_throttling_controller) : replica_base(r), + METRIC_VAR_INIT_replica(read_capacity_units), + METRIC_VAR_INIT_replica(write_capacity_units), + METRIC_VAR_INIT_replica(get_bytes), + METRIC_VAR_INIT_replica(multi_get_bytes), + METRIC_VAR_INIT_replica(batch_get_bytes), + METRIC_VAR_INIT_replica(scan_bytes), + METRIC_VAR_INIT_replica(put_bytes), + METRIC_VAR_INIT_replica(multi_put_bytes), + METRIC_VAR_INIT_replica(check_and_set_bytes), + METRIC_VAR_INIT_replica(check_and_mutate_bytes), + METRIC_VAR_INIT_replica(backup_request_bytes), _read_hotkey_collector(read_hotkey_collector), _write_hotkey_collector(write_hotkey_collector), _read_size_throttling_controller(read_size_throttling_controller) @@ -68,55 +134,6 @@ capacity_unit_calculator::capacity_unit_calculator( _log_read_cu_size = log(FLAGS_perf_counter_read_capacity_unit_size) / log(2); _log_write_cu_size = log(FLAGS_perf_counter_write_capacity_unit_size) / log(2); - - std::string str_gpid = r->get_gpid().to_string(); - char name[256]; - snprintf(name, 255, "recent.read.cu@%s", str_gpid.c_str()); - _pfc_recent_read_cu.init_app_counter("app.pegasus", - name, - COUNTER_TYPE_VOLATILE_NUMBER, - "statistic the recent read capacity units"); - snprintf(name, 255, "recent.write.cu@%s", str_gpid.c_str()); - _pfc_recent_write_cu.init_app_counter("app.pegasus", - name, - COUNTER_TYPE_VOLATILE_NUMBER, - "statistic the recent write capacity units"); - - snprintf(name, 255, "get_bytes@%s", str_gpid.c_str()); - _pfc_get_bytes.init_app_counter( - "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the get bytes"); - - snprintf(name, 255, "multi_get_bytes@%s", str_gpid.c_str()); - _pfc_multi_get_bytes.init_app_counter( - "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the multi get bytes"); - - snprintf(name, 255, "batch_get_bytes@%s", str_gpid.c_str()); - _pfc_batch_get_bytes.init_app_counter( - "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the batch get bytes"); - - snprintf(name, 255, "scan_bytes@%s", str_gpid.c_str()); - _pfc_scan_bytes.init_app_counter( - "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the scan bytes"); - - snprintf(name, 255, "put_bytes@%s", str_gpid.c_str()); - _pfc_put_bytes.init_app_counter( - "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the put bytes"); - - snprintf(name, 255, "multi_put_bytes@%s", str_gpid.c_str()); - _pfc_multi_put_bytes.init_app_counter( - "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the multi put bytes"); - - snprintf(name, 255, "check_and_set_bytes@%s", str_gpid.c_str()); - _pfc_check_and_set_bytes.init_app_counter( - "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the check and set bytes"); - - snprintf(name, 255, "check_and_mutate_bytes@%s", str_gpid.c_str()); - _pfc_check_and_mutate_bytes.init_app_counter( - "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the check and mutate bytes"); - - snprintf(name, 255, "backup_request_bytes@%s", str_gpid.c_str()); - _pfc_backup_request_bytes.init_app_counter( - "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the backup request bytes"); } int64_t capacity_unit_calculator::add_read_cu(int64_t read_data_size) @@ -125,7 +142,7 @@ int64_t capacity_unit_calculator::add_read_cu(int64_t read_data_size) read_data_size > 0 ? (read_data_size + FLAGS_perf_counter_read_capacity_unit_size - 1) >> _log_read_cu_size : 1; - _pfc_recent_read_cu->add(read_cu); + METRIC_VAR_INCREMENT_BY(read_capacity_units, read_cu); _read_size_throttling_controller->consume_token(read_data_size); return read_cu; } @@ -136,7 +153,7 @@ int64_t capacity_unit_calculator::add_write_cu(int64_t write_data_size) ? (write_data_size + FLAGS_perf_counter_write_capacity_unit_size - 1) >> _log_write_cu_size : 1; - _pfc_recent_write_cu->add(write_cu); + METRIC_VAR_INCREMENT_BY(write_capacity_units, write_cu); return write_cu; } @@ -146,7 +163,7 @@ void capacity_unit_calculator::add_get_cu(dsn::message_ex *req, const dsn::blob &value) { auto total_size = key.size() + value.size(); - _pfc_get_bytes->add(total_size); + METRIC_VAR_INCREMENT_BY(get_bytes, total_size); add_backup_request_bytes(req, total_size); if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) { return; @@ -173,7 +190,7 @@ void capacity_unit_calculator::add_multi_get_cu(dsn::message_ex *req, data_size += hash_key.size() + kv.key.size() + kv.value.size(); } auto total_size = hash_key.size() + multi_get_bytes; - _pfc_multi_get_bytes->add(total_size); + METRIC_VAR_INCREMENT_BY(multi_get_bytes, total_size); add_backup_request_bytes(req, total_size); if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound && @@ -201,7 +218,7 @@ void capacity_unit_calculator::add_batch_get_cu(dsn::message_ex *req, _read_hotkey_collector->capture_hash_key(data.hash_key, 1); } - _pfc_batch_get_bytes->add(data_size); + METRIC_VAR_INCREMENT_BY(batch_get_bytes, data_size); add_backup_request_bytes(req, data_size); if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound && @@ -237,7 +254,7 @@ void capacity_unit_calculator::add_scan_cu(dsn::message_ex *req, data_size += kv.key.size() + kv.value.size(); } add_read_cu(data_size); - _pfc_scan_bytes->add(data_size); + METRIC_VAR_INCREMENT_BY(scan_bytes, data_size); add_backup_request_bytes(req, data_size); } @@ -269,7 +286,7 @@ void capacity_unit_calculator::add_put_cu(int32_t status, const dsn::blob &key, const dsn::blob &value) { - _pfc_put_bytes->add(key.size() + value.size()); + METRIC_VAR_INCREMENT_BY(put_bytes, key.size() + value.size()); if (status != rocksdb::Status::kOk) { return; } @@ -296,7 +313,7 @@ void capacity_unit_calculator::add_multi_put_cu(int32_t status, multi_put_bytes += kv.key.size() + kv.value.size(); data_size += hash_key.size() + kv.key.size() + kv.value.size(); } - _pfc_multi_put_bytes->add(hash_key.size() + multi_put_bytes); + METRIC_VAR_INCREMENT_BY(multi_put_bytes, hash_key.size() + multi_put_bytes); uint64_t key_count = kvs.size(); _write_hotkey_collector->capture_hash_key(hash_key, key_count); @@ -343,8 +360,9 @@ void capacity_unit_calculator::add_check_and_set_cu(int32_t status, const dsn::blob &value) { - _pfc_check_and_set_bytes->add(hash_key.size() + check_sort_key.size() + set_sort_key.size() + - value.size()); + METRIC_VAR_INCREMENT_BY(check_and_set_bytes, + hash_key.size() + check_sort_key.size() + set_sort_key.size() + + value.size()); if (status != rocksdb::Status::kOk && status != rocksdb::Status::kInvalidArgument && status != rocksdb::Status::kTryAgain) { return; @@ -370,8 +388,8 @@ void capacity_unit_calculator::add_check_and_mutate_cu( check_and_mutate_bytes += m.sort_key.size() + m.value.size(); data_size += hash_key.size() + m.sort_key.size() + m.value.size(); } - _pfc_check_and_mutate_bytes->add(hash_key.size() + check_sort_key.size() + - check_and_mutate_bytes); + METRIC_VAR_INCREMENT_BY(check_and_mutate_bytes, + hash_key.size() + check_sort_key.size() + check_and_mutate_bytes); if (status != rocksdb::Status::kOk && status != rocksdb::Status::kInvalidArgument && status != rocksdb::Status::kTryAgain) { @@ -389,7 +407,7 @@ void capacity_unit_calculator::add_check_and_mutate_cu( void capacity_unit_calculator::add_backup_request_bytes(dsn::message_ex *req, int64_t bytes) { if (req->is_backup_request()) { - _pfc_backup_request_bytes->add(bytes); + METRIC_VAR_INCREMENT_BY(backup_request_bytes, bytes); } } diff --git a/src/server/capacity_unit_calculator.h b/src/server/capacity_unit_calculator.h index 0b91ea4d2..6d30a07ef 100644 --- a/src/server/capacity_unit_calculator.h +++ b/src/server/capacity_unit_calculator.h @@ -108,18 +108,20 @@ private: uint32_t _log_read_cu_size; uint32_t _log_write_cu_size; - ::dsn::perf_counter_wrapper _pfc_recent_read_cu; - ::dsn::perf_counter_wrapper _pfc_recent_write_cu; - - ::dsn::perf_counter_wrapper _pfc_get_bytes; - ::dsn::perf_counter_wrapper _pfc_multi_get_bytes; - ::dsn::perf_counter_wrapper _pfc_batch_get_bytes; - ::dsn::perf_counter_wrapper _pfc_scan_bytes; - ::dsn::perf_counter_wrapper _pfc_put_bytes; - ::dsn::perf_counter_wrapper _pfc_multi_put_bytes; - ::dsn::perf_counter_wrapper _pfc_check_and_set_bytes; - ::dsn::perf_counter_wrapper _pfc_check_and_mutate_bytes; - ::dsn::perf_counter_wrapper _pfc_backup_request_bytes; + METRIC_VAR_DECLARE_counter(read_capacity_units); + METRIC_VAR_DECLARE_counter(write_capacity_units); + + METRIC_VAR_DECLARE_counter(get_bytes); + METRIC_VAR_DECLARE_counter(multi_get_bytes); + METRIC_VAR_DECLARE_counter(batch_get_bytes); + METRIC_VAR_DECLARE_counter(scan_bytes); + + METRIC_VAR_DECLARE_counter(put_bytes); + METRIC_VAR_DECLARE_counter(multi_put_bytes); + METRIC_VAR_DECLARE_counter(check_and_set_bytes); + METRIC_VAR_DECLARE_counter(check_and_mutate_bytes); + + METRIC_VAR_DECLARE_counter(backup_request_bytes); /* hotkey capturing weight rules: diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index 4889329d9..969e7d141 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -55,83 +55,82 @@ class message_ex; METRIC_DEFINE_counter(replica, put_requests, dsn::metric_unit::kRequests, - "The number of PUT requests for each replica"); + "The number of PUT requests"); METRIC_DEFINE_counter(replica, multi_put_requests, dsn::metric_unit::kRequests, - "The number of MULTI_PUT requests for each replica"); + "The number of MULTI_PUT requests"); METRIC_DEFINE_counter(replica, remove_requests, dsn::metric_unit::kRequests, - "The number of REMOVE requests for each replica"); + "The number of REMOVE requests"); METRIC_DEFINE_counter(replica, multi_remove_requests, dsn::metric_unit::kRequests, - "The number of MULTI_REMOVE requests for each replica"); + "The number of MULTI_REMOVE requests"); METRIC_DEFINE_counter(replica, incr_requests, dsn::metric_unit::kRequests, - "The number of INCR requests for each replica"); + "The number of INCR requests"); METRIC_DEFINE_counter(replica, check_and_set_requests, dsn::metric_unit::kRequests, - "The number of CHECK_AND_SET requests for each replica"); + "The number of CHECK_AND_SET requests"); METRIC_DEFINE_counter(replica, check_and_mutate_requests, dsn::metric_unit::kRequests, - "The number of CHECK_AND_MUTATE requests for each replica"); + "The number of CHECK_AND_MUTATE requests"); METRIC_DEFINE_percentile_int64(replica, put_latency_ns, dsn::metric_unit::kNanoSeconds, - "The latency of PUT requests for each replica"); + "The latency of PUT requests"); METRIC_DEFINE_percentile_int64(replica, multi_put_latency_ns, dsn::metric_unit::kNanoSeconds, - "The latency of MULTI_PUT requests for each replica"); + "The latency of MULTI_PUT requests"); METRIC_DEFINE_percentile_int64(replica, remove_latency_ns, dsn::metric_unit::kNanoSeconds, - "The latency of REMOVE requests for each replica"); + "The latency of REMOVE requests"); METRIC_DEFINE_percentile_int64(replica, multi_remove_latency_ns, dsn::metric_unit::kNanoSeconds, - "The latency of MULTI_REMOVE requests for each replica"); + "The latency of MULTI_REMOVE requests"); METRIC_DEFINE_percentile_int64(replica, incr_latency_ns, dsn::metric_unit::kNanoSeconds, - "The latency of INCR requests for each replica"); + "The latency of INCR requests"); METRIC_DEFINE_percentile_int64(replica, check_and_set_latency_ns, dsn::metric_unit::kNanoSeconds, - "The latency of CHECK_AND_SET requests for each replica"); + "The latency of CHECK_AND_SET requests"); METRIC_DEFINE_percentile_int64(replica, check_and_mutate_latency_ns, dsn::metric_unit::kNanoSeconds, - "The latency of CHECK_AND_MUTATE requests for each replica"); + "The latency of CHECK_AND_MUTATE requests"); METRIC_DEFINE_counter(replica, dup_requests, dsn::metric_unit::kRequests, - "The number of DUPLICATE requests for each replica"); + "The number of DUPLICATE requests"); -METRIC_DEFINE_percentile_int64( - replica, - dup_time_lag_ms, - dsn::metric_unit::kMilliSeconds, - "the time lag (in ms) between master and slave in the duplication for each replica"); +METRIC_DEFINE_percentile_int64(replica, + dup_time_lag_ms, + dsn::metric_unit::kMilliSeconds, + "the time lag (in ms) between master and slave in the duplication"); METRIC_DEFINE_counter( replica, @@ -176,12 +175,6 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server) _put_batch_size(0), _remove_batch_size(0) { - _dup_lagging_write_threshold_ms = dsn_config_get_value_int64( - "pegasus.server", - "dup_lagging_write_threshold_ms", - 10 * 1000, - "If the duration that a write flows from master to slave is larger than this threshold, " - "the write is defined a lagging write."); } pegasus_write_service::~pegasus_write_service() {} @@ -372,7 +365,7 @@ int pegasus_write_service::duplicate(int64_t decree, METRIC_VAR_INCREMENT(dup_requests); METRIC_VAR_AUTO_LATENCY( dup_time_lag_ms, request.timestamp * 1000, [this](uint64_t latency) { - if (latency > _dup_lagging_write_threshold_ms) { + if (latency > FLAGS_dup_lagging_write_threshold_ms) { METRIC_VAR_INCREMENT(dup_lagging_writes); } }); diff --git a/src/utils/metrics.h b/src/utils/metrics.h index facc83f26..333534703 100644 --- a/src/utils/metrics.h +++ b/src/utils/metrics.h @@ -168,8 +168,9 @@ class error_code; // Perform increment-related operations on metrics including gauge and counter. #define METRIC_VAR_INCREMENT_BY(name, x) \ do { \ - if (x != 0) { \ - _##name->increment_by(x); \ + const auto v = (x); \ + if (v != 0) { \ + _##name->increment_by(v); \ } \ } while (0) @@ -611,6 +612,7 @@ enum class metric_unit : size_t kSeconds, kBytes, kMegaBytes, + kCapacityUnits, kRequests, kSeeks, kPointLookups, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
