This is an automated email from the ASF dual-hosted git repository. wangdan pushed a commit to branch migrate-metrics-dev in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 272382a642d47f41440488fd21c8d8ba52ae51a8 Author: Dan Wang <[email protected]> AuthorDate: Tue Mar 7 14:32:56 2023 +0800 feat(new_metrics): migrate replica-level metrics for pegasus_server_impl (part 1) (#1374) This PR is to migrate replica-level metrics of `pegasus_server_impl` to new framework (https://github.com/apache/incubator-pegasus/issues/1333). Since there are many replica-level metrics in `pegasus_server_impl`, this PR is part 1 for this migration and other metrics (all of which are gauges) will be migrated in the later PRs. --- src/server/pegasus_server_impl.cpp | 250 ++++++++++++++------------- src/server/pegasus_server_impl.h | 32 ++-- src/server/pegasus_server_impl_init.cpp | 126 +++++++------- src/server/pegasus_write_service_impl.h | 5 +- src/server/rocksdb_wrapper.cpp | 6 +- src/server/rocksdb_wrapper.h | 2 +- src/server/test/pegasus_server_impl_test.cpp | 4 +- src/utils/metrics.h | 16 +- src/utils/test/metrics_test.cpp | 2 - src/utils/time_utils.h | 2 +- 10 files changed, 242 insertions(+), 203 deletions(-) diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index e6e9d48ed..ab6c4dc1c 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -322,13 +322,50 @@ int pegasus_server_impl::on_batched_write_requests(int64_t decree, return _server_write->on_batched_write_requests(requests, count, decree, timestamp); } +// Since LOG_ERROR_PREFIX depends on log_prefix(), this method could not be declared as static or +// with anonymous namespace. +void pegasus_server_impl::log_expired_data(const char *op, + const dsn::rpc_address &addr, + const dsn::blob &hash_key, + const dsn::blob &sort_key) const +{ + LOG_ERROR_PREFIX("rocksdb data expired for {} from {}: hash_key = \"{}\", sort_key = \"{}\"", + op, + addr, + pegasus::utils::c_escape_string(hash_key), + pegasus::utils::c_escape_string(sort_key)); +} + +void pegasus_server_impl::log_expired_data(const char *op, + const dsn::rpc_address &addr, + const dsn::blob &key) const +{ + dsn::blob hash_key, sort_key; + pegasus_restore_key(key, hash_key, sort_key); + log_expired_data(op, addr, hash_key, sort_key); +} + +void pegasus_server_impl::log_expired_data(const char *op, + const dsn::rpc_address &addr, + const rocksdb::Slice &key) const +{ + dsn::blob raw_key(key.data(), 0, key.size()); + log_expired_data(op, addr, raw_key); +} + +#define LOG_EXPIRED_DATA_IF_VERBOSE(...) \ + do { \ + if (dsn_unlikely(FLAGS_rocksdb_verbose_log)) { \ + log_expired_data(__FUNCTION__, rpc.remote_address(), ##__VA_ARGS__); \ + } \ + } while (0) + void pegasus_server_impl::on_get(get_rpc rpc) { - CHECK(_is_open, ""); - _pfc_get_qps->increment(); - uint64_t start_time = dsn_now_ns(); + CHECK_TRUE(_is_open); + + METRIC_VAR_INCREMENT(get_requests); - const auto &key = rpc.request(); auto &resp = rpc.response(); resp.app_id = _gpid.get_app_id(); resp.partition_index = _gpid.get_partition_index(); @@ -345,21 +382,20 @@ void pegasus_server_impl::on_get(get_rpc rpc) return; } + METRIC_VAR_AUTO_LATENCY(get_latency_ns); + + const auto &key = rpc.request(); rocksdb::Slice skey(key.data(), key.length()); std::string value; rocksdb::Status status = _db->Get(_data_cf_rd_opts, _data_cf, skey, &value); if (status.ok()) { if (check_if_record_expired(utils::epoch_now(), value)) { - _pfc_recent_expire_count->increment(); - if (FLAGS_rocksdb_verbose_log) { - LOG_ERROR_PREFIX("rocksdb data expired for get from {}", rpc.remote_address()); - } + METRIC_VAR_INCREMENT(read_expired_values); + LOG_EXPIRED_DATA_IF_VERBOSE(key); status = rocksdb::Status::NotFound(); } - } - - if (!status.ok()) { + } else { if (FLAGS_rocksdb_verbose_log) { ::dsn::blob hash_key, sort_key; pegasus_restore_key(key, hash_key, sort_key); @@ -382,7 +418,7 @@ void pegasus_server_impl::on_get(get_rpc rpc) usleep(10 * 1000); #endif - uint64_t time_used = dsn_now_ns() - start_time; + auto time_used = METRIC_VAR_AUTO_LATENCY_DURATION_NS(get_latency_ns); if (is_get_abnormal(time_used, value.size())) { ::dsn::blob hash_key, sort_key; pegasus_restore_key(key, hash_key, sort_key); @@ -395,7 +431,7 @@ void pegasus_server_impl::on_get(get_rpc rpc) status.ToString(), value.size(), time_used); - _pfc_recent_abnormal_count->increment(); + METRIC_VAR_INCREMENT(abnormal_read_requests); } resp.error = status.code(); @@ -404,17 +440,14 @@ void pegasus_server_impl::on_get(get_rpc rpc) } _cu_calculator->add_get_cu(rpc.dsn_request(), resp.error, key, resp.value); - _pfc_get_latency->set(dsn_now_ns() - start_time); } void pegasus_server_impl::on_multi_get(multi_get_rpc rpc) { - CHECK(_is_open, ""); - _pfc_multi_get_qps->increment(); - uint64_t start_time = dsn_now_ns(); + CHECK_TRUE(_is_open); + + METRIC_VAR_INCREMENT(multi_get_requests); - const auto &request = rpc.request(); - dsn::message_ex *req = rpc.dsn_request(); auto &resp = rpc.response(); resp.app_id = _gpid.get_app_id(); resp.partition_index = _gpid.get_partition_index(); @@ -426,6 +459,10 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc) return; } + METRIC_VAR_AUTO_LATENCY(multi_get_latency_ns); + + const auto &request = rpc.request(); + dsn::message_ex *req = rpc.dsn_request(); if (!is_filter_type_supported(request.sort_key_filter_type)) { LOG_ERROR_PREFIX( "invalid argument for multi_get from {}: sort key filter type {} not supported", @@ -433,7 +470,6 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc) request.sort_key_filter_type); resp.error = rocksdb::Status::kInvalidArgument; _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs); - _pfc_multi_get_latency->set(dsn_now_ns() - start_time); return; } @@ -520,8 +556,6 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc) } resp.error = rocksdb::Status::kOk; _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs); - _pfc_multi_get_latency->set(dsn_now_ns() - start_time); - return; } @@ -724,7 +758,6 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc) for (int i = 0; i < keys.size(); i++) { rocksdb::Status &status = statuses[i]; std::string &value = values[i]; - // print log if (!status.ok()) { if (FLAGS_rocksdb_verbose_log) { LOG_ERROR_PREFIX("rocksdb get failed for multi_get from {}: hash_key = \"{}\", " @@ -738,41 +771,38 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc) rpc.remote_address(), status.ToString()); } - } - // check ttl - if (status.ok()) { - uint32_t expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, value); - if (expire_ts > 0 && expire_ts <= epoch_now) { - expire_count++; - if (FLAGS_rocksdb_verbose_log) { - LOG_ERROR_PREFIX("rocksdb data expired for multi_get from {}", - rpc.remote_address()); - } - status = rocksdb::Status::NotFound(); - } - } - // extract value - if (status.ok()) { - // check if exceed limit - if (count >= max_kv_count || size >= max_kv_size) { - exceed_limit = true; - break; - } - ::dsn::apps::key_value kv; - kv.key = request.sort_keys[i]; - if (!request.no_value) { - pegasus_extract_user_data(_pegasus_data_version, std::move(value), kv.value); + + if (status.IsNotFound()) { + continue; } - count++; - size += kv.key.length() + kv.value.length(); - resp.kvs.emplace_back(std::move(kv)); - } - // if error occurred - if (!status.ok() && !status.IsNotFound()) { + error_occurred = true; final_status = status; break; } + + // check ttl + if (check_if_record_expired(epoch_now, value)) { + expire_count++; + LOG_EXPIRED_DATA_IF_VERBOSE(request.hash_key, request.sort_keys[i]); + status = rocksdb::Status::NotFound(); + continue; + } + + // check if exceed limit + if (dsn_unlikely(count >= max_kv_count || size >= max_kv_size)) { + exceed_limit = true; + break; + } + + ::dsn::apps::key_value kv; + kv.key = request.sort_keys[i]; + if (!request.no_value) { + pegasus_extract_user_data(_pegasus_data_version, std::move(value), kv.value); + } + count++; + size += kv.key.length() + kv.value.length(); + resp.kvs.emplace_back(std::move(kv)); } if (error_occurred) { @@ -790,7 +820,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc) usleep(10 * 1000); #endif - uint64_t time_used = dsn_now_ns() - start_time; + auto time_used = METRIC_VAR_AUTO_LATENCY_DURATION_NS(multi_get_latency_ns); if (is_multi_get_abnormal(time_used, size, iteration_count)) { LOG_WARNING_PREFIX( "rocksdb abnormal multi_get from {}: hash_key = {}, " @@ -816,25 +846,20 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc) expire_count, filter_count, time_used); - _pfc_recent_abnormal_count->increment(); + METRIC_VAR_INCREMENT(abnormal_read_requests); } - if (expire_count > 0) { - _pfc_recent_expire_count->add(expire_count); - } - if (filter_count > 0) { - _pfc_recent_filter_count->add(filter_count); - } + METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count); + METRIC_VAR_INCREMENT_BY(read_filtered_values, filter_count); _cu_calculator->add_multi_get_cu(req, resp.error, request.hash_key, resp.kvs); - _pfc_multi_get_latency->set(dsn_now_ns() - start_time); } void pegasus_server_impl::on_batch_get(batch_get_rpc rpc) { - CHECK(_is_open, ""); - _pfc_batch_get_qps->increment(); - int64_t start_time = dsn_now_ns(); + CHECK_TRUE(_is_open); + + METRIC_VAR_INCREMENT(batch_get_requests); auto &response = rpc.response(); response.app_id = _gpid.get_app_id(); @@ -847,13 +872,14 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc) return; } + METRIC_VAR_AUTO_LATENCY(batch_get_latency_ns); + const auto &request = rpc.request(); if (request.keys.empty()) { response.error = rocksdb::Status::kInvalidArgument; LOG_ERROR_PREFIX("Invalid argument for batch_get from {}: 'keys' field in request is empty", rpc.remote_address().to_string()); _cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error, response.data); - _pfc_batch_get_latency->set(dsn_now_ns() - start_time); return; } @@ -872,6 +898,8 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc) bool error_occurred = false; int64_t total_data_size = 0; uint32_t epoch_now = pegasus::utils::epoch_now(); + uint64_t expire_count = 0; + std::vector<std::string> values; std::vector<rocksdb::Status> statuses = _db->MultiGet(_data_cf_rd_opts, keys, &values); response.data.reserve(request.keys.size()); @@ -887,13 +915,8 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc) if (dsn_likely(status.ok())) { if (check_if_record_expired(epoch_now, value)) { - if (FLAGS_rocksdb_verbose_log) { - LOG_ERROR_PREFIX( - "rocksdb data expired for batch_get from {}, hash_key = {}, sort_key = {}", - rpc.remote_address().to_string(), - pegasus::utils::c_escape_string(hash_key), - pegasus::utils::c_escape_string(sort_key)); - } + ++expire_count; + LOG_EXPIRED_DATA_IF_VERBOSE(hash_key, sort_key); continue; } @@ -931,7 +954,7 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc) response.error = rocksdb::Status::kOk; } - int64_t time_used = dsn_now_ns() - start_time; + auto time_used = METRIC_VAR_AUTO_LATENCY_DURATION_NS(batch_get_latency_ns); if (is_batch_get_abnormal(time_used, total_data_size, request.keys.size())) { LOG_WARNING_PREFIX( "rocksdb abnormal batch_get from {}: total data size = {}, row count = {}, " @@ -940,33 +963,36 @@ void pegasus_server_impl::on_batch_get(batch_get_rpc rpc) total_data_size, request.keys.size(), time_used / 1000); - _pfc_recent_abnormal_count->increment(); + METRIC_VAR_INCREMENT(abnormal_read_requests); } + METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count); + _cu_calculator->add_batch_get_cu(rpc.dsn_request(), response.error, response.data); - _pfc_batch_get_latency->set(time_used); } void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc) { - CHECK(_is_open, ""); + CHECK_TRUE(_is_open); - _pfc_scan_qps->increment(); - uint64_t start_time = dsn_now_ns(); + METRIC_VAR_INCREMENT(scan_requests); - const auto &hash_key = rpc.request(); auto &resp = rpc.response(); resp.app_id = _gpid.get_app_id(); resp.partition_index = _gpid.get_partition_index(); resp.server = _primary_address; + if (!_read_size_throttling_controller->available()) { rpc.error() = dsn::ERR_BUSY; _counter_recent_read_throttling_reject_count->increment(); return; } + METRIC_VAR_AUTO_LATENCY(scan_latency_ns); + // scan ::dsn::blob start_key, stop_key; + const auto &hash_key = rpc.request(); pegasus_generate_key(start_key, hash_key, ::dsn::blob()); pegasus_generate_next_blob(stop_key, hash_key); rocksdb::Slice start(start_key.data(), start_key.length()); @@ -988,18 +1014,14 @@ void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc) if (check_if_record_expired(epoch_now, it->value())) { expire_count++; - if (FLAGS_rocksdb_verbose_log) { - LOG_ERROR_PREFIX("rocksdb data expired for sortkey_count from {}", - rpc.remote_address()); - } + LOG_EXPIRED_DATA_IF_VERBOSE(it->key()); } else { resp.count++; } it->Next(); } - if (expire_count > 0) { - _pfc_recent_expire_count->add(expire_count); - } + + METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count); resp.error = it->status().code(); if (!it->status().ok()) { @@ -1025,7 +1047,6 @@ void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc) } _cu_calculator->add_sortkey_count_cu(rpc.dsn_request(), resp.error, hash_key); - _pfc_scan_latency->set(dsn_now_ns() - start_time); } void pegasus_server_impl::on_ttl(ttl_rpc rpc) @@ -1053,7 +1074,7 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc) if (status.ok()) { expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, value); if (check_if_ts_expired(now_ts, expire_ts)) { - _pfc_recent_expire_count->increment(); + METRIC_VAR_INCREMENT(read_expired_values); if (FLAGS_rocksdb_verbose_log) { LOG_ERROR_PREFIX("rocksdb data expired for ttl from {}", rpc.remote_address()); } @@ -1093,12 +1114,10 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc) void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc) { - CHECK(_is_open, ""); - _pfc_scan_qps->increment(); - uint64_t start_time = dsn_now_ns(); + CHECK_TRUE(_is_open); + + METRIC_VAR_INCREMENT(scan_requests); - const auto &request = rpc.request(); - dsn::message_ex *req = rpc.dsn_request(); auto &resp = rpc.response(); resp.app_id = _gpid.get_app_id(); resp.partition_index = _gpid.get_partition_index(); @@ -1110,6 +1129,10 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc) return; } + METRIC_VAR_AUTO_LATENCY(scan_latency_ns); + + const auto &request = rpc.request(); + dsn::message_ex *req = rpc.dsn_request(); if (!is_filter_type_supported(request.hash_key_filter_type)) { LOG_ERROR_PREFIX( "invalid argument for get_scanner from {}: hash key filter type {} not supported", @@ -1117,10 +1140,9 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc) request.hash_key_filter_type); resp.error = rocksdb::Status::kInvalidArgument; _cu_calculator->add_scan_cu(req, resp.error, resp.kvs); - _pfc_scan_latency->set(dsn_now_ns() - start_time); - return; } + if (!is_filter_type_supported(request.sort_key_filter_type)) { LOG_ERROR_PREFIX( "invalid argument for get_scanner from {}: sort key filter type {} not supported", @@ -1128,8 +1150,6 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc) request.sort_key_filter_type); resp.error = rocksdb::Status::kInvalidArgument; _cu_calculator->add_scan_cu(req, resp.error, resp.kvs); - _pfc_scan_latency->set(dsn_now_ns() - start_time); - return; } @@ -1185,8 +1205,6 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc) } resp.error = rocksdb::Status::kOk; _cu_calculator->add_scan_cu(req, resp.error, resp.kvs); - _pfc_scan_latency->set(dsn_now_ns() - start_time); - return; } @@ -1339,24 +1357,18 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc) resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED; } - if (expire_count > 0) { - _pfc_recent_expire_count->add(expire_count); - } - if (filter_count > 0) { - _pfc_recent_filter_count->add(filter_count); - } + METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count); + METRIC_VAR_INCREMENT_BY(read_filtered_values, filter_count); _cu_calculator->add_scan_cu(req, resp.error, resp.kvs); - _pfc_scan_latency->set(dsn_now_ns() - start_time); } void pegasus_server_impl::on_scan(scan_rpc rpc) { - CHECK(_is_open, ""); - _pfc_scan_qps->increment(); - uint64_t start_time = dsn_now_ns(); - const auto &request = rpc.request(); - dsn::message_ex *req = rpc.dsn_request(); + CHECK_TRUE(_is_open); + + METRIC_VAR_INCREMENT(scan_requests); + auto &resp = rpc.response(); resp.app_id = _gpid.get_app_id(); resp.partition_index = _gpid.get_partition_index(); @@ -1368,6 +1380,10 @@ void pegasus_server_impl::on_scan(scan_rpc rpc) return; } + METRIC_VAR_AUTO_LATENCY(scan_latency_ns); + + const auto &request = rpc.request(); + dsn::message_ex *req = rpc.dsn_request(); std::unique_ptr<pegasus_scan_context> context = _context_cache.fetch(request.context_id); if (context) { rocksdb::Iterator *it = context->iterator.get(); @@ -1490,18 +1506,14 @@ void pegasus_server_impl::on_scan(scan_rpc rpc) resp.context_id = pegasus::SCAN_CONTEXT_ID_COMPLETED; } - if (expire_count > 0) { - _pfc_recent_expire_count->add(expire_count); - } - if (filter_count > 0) { - _pfc_recent_filter_count->add(filter_count); - } + METRIC_VAR_INCREMENT_BY(read_expired_values, expire_count); + METRIC_VAR_INCREMENT_BY(read_filtered_values, filter_count); + } else { resp.error = rocksdb::Status::Code::kNotFound; } _cu_calculator->add_scan_cu(req, resp.error, resp.kvs); - _pfc_scan_latency->set(dsn_now_ns() - start_time); } void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache.fetch(args); } diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index d156acdce..7203d0ce2 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -446,6 +446,15 @@ private: dsn::replication::manual_compaction_status::type query_compact_status() const override; + // Log expired keys for verbose mode. + void log_expired_data(const char *op, + const dsn::rpc_address &addr, + const dsn::blob &hash_key, + const dsn::blob &sort_key) const; + void log_expired_data(const char *op, const dsn::rpc_address &addr, const dsn::blob &key) const; + void + log_expired_data(const char *op, const dsn::rpc_address &addr, const rocksdb::Slice &key) const; + private: static const std::chrono::seconds kServerStatUpdateTimeSec; static const std::string COMPRESSION_HEADER; @@ -517,20 +526,19 @@ private: std::shared_ptr<throttling_controller> _read_size_throttling_controller; - // perf counters - ::dsn::perf_counter_wrapper _pfc_get_qps; - ::dsn::perf_counter_wrapper _pfc_multi_get_qps; - ::dsn::perf_counter_wrapper _pfc_batch_get_qps; - ::dsn::perf_counter_wrapper _pfc_scan_qps; + METRIC_VAR_DECLARE_counter(get_requests); + METRIC_VAR_DECLARE_counter(multi_get_requests); + METRIC_VAR_DECLARE_counter(batch_get_requests); + METRIC_VAR_DECLARE_counter(scan_requests); - ::dsn::perf_counter_wrapper _pfc_get_latency; - ::dsn::perf_counter_wrapper _pfc_multi_get_latency; - ::dsn::perf_counter_wrapper _pfc_batch_get_latency; - ::dsn::perf_counter_wrapper _pfc_scan_latency; + METRIC_VAR_DECLARE_percentile_int64(get_latency_ns); + METRIC_VAR_DECLARE_percentile_int64(multi_get_latency_ns); + METRIC_VAR_DECLARE_percentile_int64(batch_get_latency_ns); + METRIC_VAR_DECLARE_percentile_int64(scan_latency_ns); - ::dsn::perf_counter_wrapper _pfc_recent_expire_count; - ::dsn::perf_counter_wrapper _pfc_recent_filter_count; - ::dsn::perf_counter_wrapper _pfc_recent_abnormal_count; + METRIC_VAR_DECLARE_counter(read_expired_values); + METRIC_VAR_DECLARE_counter(read_filtered_values); + METRIC_VAR_DECLARE_counter(abnormal_read_requests); // rocksdb internal statistics // server level diff --git a/src/server/pegasus_server_impl_init.cpp b/src/server/pegasus_server_impl_init.cpp index 896f8ab94..27aebdfab 100644 --- a/src/server/pegasus_server_impl_init.cpp +++ b/src/server/pegasus_server_impl_init.cpp @@ -62,6 +62,61 @@ class replica; } // namespace replication } // namespace dsn +METRIC_DEFINE_counter(replica, + get_requests, + dsn::metric_unit::kRequests, + "The number of GET requests for each replica"); + +METRIC_DEFINE_counter(replica, + multi_get_requests, + dsn::metric_unit::kRequests, + "The number of MULTI_GET requests for each replica"); + +METRIC_DEFINE_counter(replica, + batch_get_requests, + dsn::metric_unit::kRequests, + "The number of BATCH_GET requests for each replica"); + +METRIC_DEFINE_counter(replica, + scan_requests, + dsn::metric_unit::kRequests, + "The number of SCAN requests for each replica"); + +METRIC_DEFINE_percentile_int64(replica, + get_latency_ns, + dsn::metric_unit::kNanoSeconds, + "The latency of GET requests for each replica"); + +METRIC_DEFINE_percentile_int64(replica, + multi_get_latency_ns, + dsn::metric_unit::kNanoSeconds, + "The latency of MULTI_GET requests for each replica"); + +METRIC_DEFINE_percentile_int64(replica, + batch_get_latency_ns, + dsn::metric_unit::kNanoSeconds, + "The latency of BATCH_GET requests for each replica"); + +METRIC_DEFINE_percentile_int64(replica, + scan_latency_ns, + dsn::metric_unit::kNanoSeconds, + "The latency of SCAN requests for each replica"); + +METRIC_DEFINE_counter(replica, + read_expired_values, + dsn::metric_unit::kValues, + "The number of expired values read for each replica"); + +METRIC_DEFINE_counter(replica, + read_filtered_values, + dsn::metric_unit::kValues, + "The number of filtered values read for each replica"); + +METRIC_DEFINE_counter(replica, + abnormal_read_requests, + dsn::metric_unit::kRequests, + "The number of abnormal read requests for each replica"); + namespace pegasus { namespace server { @@ -403,7 +458,18 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) _last_durable_decree(0), _is_checkpointing(false), _manual_compact_svc(this), - _partition_version(0) + _partition_version(0), + METRIC_VAR_INIT_replica(get_requests), + METRIC_VAR_INIT_replica(multi_get_requests), + METRIC_VAR_INIT_replica(batch_get_requests), + METRIC_VAR_INIT_replica(scan_requests), + METRIC_VAR_INIT_replica(get_latency_ns), + METRIC_VAR_INIT_replica(multi_get_latency_ns), + METRIC_VAR_INIT_replica(batch_get_latency_ns), + METRIC_VAR_INIT_replica(scan_latency_ns), + METRIC_VAR_INIT_replica(read_expired_values), + METRIC_VAR_INIT_replica(read_filtered_values), + METRIC_VAR_INIT_replica(abnormal_read_requests) { _primary_address = dsn::rpc_address(dsn_primary_address()).to_string(); _gpid = get_gpid(); @@ -610,64 +676,6 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) char name[256]; // register the perf counters - snprintf(name, 255, "get_qps@%s", str_gpid.c_str()); - _pfc_get_qps.init_app_counter( - "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of GET request"); - - snprintf(name, 255, "multi_get_qps@%s", str_gpid.c_str()); - _pfc_multi_get_qps.init_app_counter( - "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of MULTI_GET request"); - - snprintf(name, 255, "batch_get_qps@%s", str_gpid.c_str()); - _pfc_batch_get_qps.init_app_counter( - "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of BATCH_GET request"); - - snprintf(name, 255, "scan_qps@%s", str_gpid.c_str()); - _pfc_scan_qps.init_app_counter( - "app.pegasus", name, COUNTER_TYPE_RATE, "statistic the qps of SCAN request"); - - snprintf(name, 255, "get_latency@%s", str_gpid.c_str()); - _pfc_get_latency.init_app_counter("app.pegasus", - name, - COUNTER_TYPE_NUMBER_PERCENTILES, - "statistic the latency of GET request"); - - snprintf(name, 255, "multi_get_latency@%s", str_gpid.c_str()); - _pfc_multi_get_latency.init_app_counter("app.pegasus", - name, - COUNTER_TYPE_NUMBER_PERCENTILES, - "statistic the latency of MULTI_GET request"); - - snprintf(name, 255, "batch_get_latency@%s", str_gpid.c_str()); - _pfc_batch_get_latency.init_app_counter("app.pegasus", - name, - COUNTER_TYPE_NUMBER_PERCENTILES, - "statistic the latency of BATCH_GET request"); - - snprintf(name, 255, "scan_latency@%s", str_gpid.c_str()); - _pfc_scan_latency.init_app_counter("app.pegasus", - name, - COUNTER_TYPE_NUMBER_PERCENTILES, - "statistic the latency of SCAN request"); - - snprintf(name, 255, "recent.expire.count@%s", str_gpid.c_str()); - _pfc_recent_expire_count.init_app_counter("app.pegasus", - name, - COUNTER_TYPE_VOLATILE_NUMBER, - "statistic the recent expired value read count"); - - snprintf(name, 255, "recent.filter.count@%s", str_gpid.c_str()); - _pfc_recent_filter_count.init_app_counter("app.pegasus", - name, - COUNTER_TYPE_VOLATILE_NUMBER, - "statistic the recent filtered value read count"); - - snprintf(name, 255, "recent.abnormal.count@%s", str_gpid.c_str()); - _pfc_recent_abnormal_count.init_app_counter("app.pegasus", - name, - COUNTER_TYPE_VOLATILE_NUMBER, - "statistic the recent abnormal read count"); - snprintf(name, 255, "disk.storage.sst.count@%s", str_gpid.c_str()); _pfc_rdb_sst_count.init_app_counter( "app.pegasus", name, COUNTER_TYPE_NUMBER, "statistic the count of sstable files"); diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index b75b379a3..344c89560 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -89,8 +89,7 @@ public: explicit impl(pegasus_server_impl *server) : replica_base(server), _primary_address(server->_primary_address), - _pegasus_data_version(server->_pegasus_data_version), - _pfc_recent_expire_count(server->_pfc_recent_expire_count) + _pegasus_data_version(server->_pegasus_data_version) { _rocksdb_wrapper = std::make_unique<rocksdb_wrapper>(server); } @@ -689,8 +688,6 @@ private: const std::string _primary_address; const uint32_t _pegasus_data_version; - ::dsn::perf_counter_wrapper &_pfc_recent_expire_count; - std::unique_ptr<rocksdb_wrapper> _rocksdb_wrapper; // for setting update_response.error after committed. diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index 59cd199d2..88cca9cd4 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -39,6 +39,8 @@ #include "utils/fmt_logging.h" #include "utils/ports.h" +METRIC_DECLARE_counter(read_expired_values); + namespace pegasus { namespace server { @@ -60,7 +62,7 @@ rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server) _rd_opts(server->_data_cf_rd_opts), _meta_cf(server->_meta_cf), _pegasus_data_version(server->_pegasus_data_version), - _pfc_recent_expire_count(server->_pfc_recent_expire_count), + METRIC_VAR_INIT_replica(read_expired_values), _default_ttl(0) { _write_batch = std::make_unique<rocksdb::WriteBatch>(); @@ -82,7 +84,7 @@ int rocksdb_wrapper::get(dsn::string_view raw_key, /*out*/ db_get_context *ctx) ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value); if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) { ctx->expired = true; - _pfc_recent_expire_count->increment(); + METRIC_VAR_INCREMENT(read_expired_values); } return rocksdb::Status::kOk; } else if (s.IsNotFound()) { diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h index e3c713512..fef30a629 100644 --- a/src/server/rocksdb_wrapper.h +++ b/src/server/rocksdb_wrapper.h @@ -87,7 +87,7 @@ private: rocksdb::ColumnFamilyHandle *_meta_cf; const uint32_t _pegasus_data_version; - dsn::perf_counter_wrapper &_pfc_recent_expire_count; + METRIC_VAR_DECLARE_counter(read_expired_values); volatile uint32_t _default_ttl; friend class rocksdb_wrapper_test; diff --git a/src/server/test/pegasus_server_impl_test.cpp b/src/server/test/pegasus_server_impl_test.cpp index 1363d8054..075193fef 100644 --- a/src/server/test/pegasus_server_impl_test.cpp +++ b/src/server/test/pegasus_server_impl_test.cpp @@ -77,7 +77,7 @@ public: _server->update_app_envs(envs); // do on_get/on_multi_get operation, - long before_count = _server->_pfc_recent_abnormal_count->get_integer_value(); + auto before_count = _server->METRIC_VAR_VALUE(abnormal_read_requests); if (!test.is_multi_get) { get_rpc rpc(std::make_unique<dsn::blob>(test_key), dsn::apps::RPC_RRDB_RRDB_GET); _server->on_get(rpc); @@ -90,7 +90,7 @@ public: dsn::apps::RPC_RRDB_RRDB_MULTI_GET); _server->on_multi_get(rpc); } - long after_count = _server->_pfc_recent_abnormal_count->get_integer_value(); + auto after_count = _server->METRIC_VAR_VALUE(abnormal_read_requests); ASSERT_EQ(before_count + test.expect_perf_counter_incr, after_count); } diff --git a/src/utils/metrics.h b/src/utils/metrics.h index 2d6da6c0f..da1d056d8 100644 --- a/src/utils/metrics.h +++ b/src/utils/metrics.h @@ -165,7 +165,13 @@ class error_code; #define METRIC_VAR_INIT_replica(name) METRIC_VAR_INIT(name, replica) // Perform increment-related operations on metrics including gauge and counter. -#define METRIC_VAR_INCREMENT_BY(name, x) _##name->increment_by(x) +#define METRIC_VAR_INCREMENT_BY(name, x) \ + do { \ + if (x != 0) { \ + _##name->increment_by(x); \ + } \ + } while (0) + #define METRIC_VAR_INCREMENT(name) _##name->increment() // Perform set() operations on metrics including gauge and percentile. @@ -176,10 +182,15 @@ class error_code; // such as percentile. #define METRIC_VAR_SET(name, ...) _##name->set(__VA_ARGS__) +// Read the current measurement of the metric. +#define METRIC_VAR_VALUE(name) _##name->value() + // Convenient macro that is used to compute latency automatically, which is dedicated to percentile. #define METRIC_VAR_AUTO_LATENCY(name, ...) \ dsn::auto_latency __##name##_auto_latency(_##name, ##__VA_ARGS__) +#define METRIC_VAR_AUTO_LATENCY_DURATION_NS(name) __##name##_auto_latency.duration_ns() + namespace dsn { class metric; // IWYU pragma: keep class metric_entity_prototype; // IWYU pragma: keep @@ -598,6 +609,7 @@ enum class metric_unit : size_t kMilliSeconds, kSeconds, kRequests, + kValues, kInvalidUnit, }; @@ -1409,6 +1421,8 @@ public: } } + inline uint64_t duration_ns() const { return _chrono.duration_ns(); } + private: percentile_ptr<int64_t> _percentile; utils::chronograph _chrono; diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp index 13eb9647f..c430b5dfb 100644 --- a/src/utils/test/metrics_test.cpp +++ b/src/utils/test/metrics_test.cpp @@ -3134,8 +3134,6 @@ void MetricVarTest::test_set_percentile(size_t n, int64_t val) EXPECT_EQ(std::vector<int64_t>(n, val), METRIC_VAR_SAMPLES(test_replica_percentile_int64_ns)); } -#define METRIC_VAR_VALUE(name) _##name->value() - #define TEST_METRIC_VAR_INCREMENT(name) \ do { \ ASSERT_EQ(0, METRIC_VAR_VALUE(name)); \ diff --git a/src/utils/time_utils.h b/src/utils/time_utils.h index dc8d6e1ee..c3bf30e6c 100644 --- a/src/utils/time_utils.h +++ b/src/utils/time_utils.h @@ -143,7 +143,7 @@ public: inline void reset_start_time() { _start_time_ns = dsn_now_ns(); } - inline uint64_t duration_ns() + inline uint64_t duration_ns() const { auto now = dsn_now_ns(); CHECK_GE(now, _start_time_ns); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
