This is an automated email from the ASF dual-hosted git repository. wangdan pushed a commit to branch migrate-metrics-dev in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit b8475724d625a42988bbf2712939bfdaa4631c94 Author: Dan Wang <[email protected]> AuthorDate: Thu Feb 23 23:18:20 2023 +0800 feat(new_metrics): migrate replica-level metrics for write service (#1351) --- .github/workflows/lint_and_test_cpp.yaml | 12 +- run.sh | 6 +- src/replica/replica_base.h | 9 +- src/server/pegasus_server_write.cpp | 21 +- src/server/pegasus_server_write.h | 3 +- src/server/pegasus_write_service.cpp | 284 ++++++++++++++------------ src/server/pegasus_write_service.h | 47 +++-- src/server/test/pegasus_server_write_test.cpp | 4 +- src/utils/metrics.h | 135 +++++++++++- src/utils/test/metrics_test.cpp | 147 ++++++++++++- src/utils/time_utils.h | 28 ++- 11 files changed, 513 insertions(+), 183 deletions(-) diff --git a/.github/workflows/lint_and_test_cpp.yaml b/.github/workflows/lint_and_test_cpp.yaml index 115f8b124..96c6eef6d 100644 --- a/.github/workflows/lint_and_test_cpp.yaml +++ b/.github/workflows/lint_and_test_cpp.yaml @@ -198,7 +198,11 @@ jobs: - base_api_test - base_test - bulk_load_test - - detect_hotspot_test + # TODO(wangdan): Since the hotspot detection depends on the perf-counters system which + # is being replaced with the new metrics system, its test will fail. Temporarily disable + # the test and re-enable it after the hotspot detection is migrated to the new metrics + # system. + # - detect_hotspot_test - dsn_aio_test - dsn_block_service_test - dsn_client_test @@ -335,7 +339,11 @@ jobs: - base_api_test - base_test - bulk_load_test - - detect_hotspot_test + # TODO(wangdan): Since the hotspot detection depends on the perf-counters system which + # is being replaced with the new metrics system, its test will fail. Temporarily disable + # the test and re-enable it after the hotspot detection is migrated to the new metrics + # system. + # - detect_hotspot_test - dsn_aio_test - dsn_block_service_test - dsn_client_test diff --git a/run.sh b/run.sh index 75b688a8a..b6204059e 100755 --- a/run.sh +++ b/run.sh @@ -356,7 +356,11 @@ function run_test() base_api_test base_test bulk_load_test - detect_hotspot_test + # TODO(wangdan): Since the hotspot detection depends on the perf-counters system which + # is being replaced with the new metrics system, its test will fail. Temporarily disable + # the test and re-enable it after the hotspot detection is migrated to the new metrics + # system. + # detect_hotspot_test dsn_aio_test dsn_block_service_test dsn_client_test diff --git a/src/replica/replica_base.h b/src/replica/replica_base.h index 88202d055..7c5b7747e 100644 --- a/src/replica/replica_base.h +++ b/src/replica/replica_base.h @@ -51,7 +51,14 @@ struct replica_base const char *log_prefix() const { return _name.c_str(); } - const metric_entity_ptr &replica_metric_entity() const { return _replica_metric_entity; } + const metric_entity_ptr &replica_metric_entity() const + { + CHECK_NOTNULL(_replica_metric_entity, + "replica metric entity should has been instantiated: " + "uninitialized entity cannot be used to instantiate " + "metric"); + return _replica_metric_entity; + } private: const gpid _gpid; diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp index 0147050a6..e6cb5331d 100644 --- a/src/server/pegasus_server_write.cpp +++ b/src/server/pegasus_server_write.cpp @@ -31,7 +31,6 @@ #include "pegasus_server_impl.h" #include "pegasus_server_write.h" #include "pegasus_utils.h" -#include "perf_counter/perf_counter.h" #include "rrdb/rrdb.code.definition.h" #include "runtime/rpc/rpc_address.h" #include "runtime/rpc/rpc_holder.h" @@ -42,20 +41,20 @@ #include "utils/fmt_logging.h" #include "utils/ports.h" +METRIC_DEFINE_counter(replica, + corrupt_writes, + dsn::metric_unit::kRequests, + "The number of corrupt writes for each replica"); + namespace pegasus { namespace server { DSN_DECLARE_bool(rocksdb_verbose_log); pegasus_server_write::pegasus_server_write(pegasus_server_impl *server) - : replica_base(server), _write_svc(new pegasus_write_service(server)) + : replica_base(server), + _write_svc(new pegasus_write_service(server)), + METRIC_VAR_INIT_replica(corrupt_writes) { - char name[256]; - snprintf(name, 255, "recent_corrupt_write_count@%s", get_gpid().to_string()); - _pfc_recent_corrupt_write_count.init_app_counter("app.pegasus", - name, - COUNTER_TYPE_VOLATILE_NUMBER, - "statistic the recent corrupt write count"); - init_non_batch_write_handlers(); } @@ -81,7 +80,7 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests, return iter->second(requests[0]); } } catch (TTransportException &ex) { - _pfc_recent_corrupt_write_count->increment(); + METRIC_VAR_INCREMENT(corrupt_writes); LOG_ERROR_PREFIX("pegasus not batch write handler failed, from = {}, exception = {}", requests[0]->header->from_address.to_string(), ex.what()); @@ -125,7 +124,7 @@ int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int coun } } } catch (TTransportException &ex) { - _pfc_recent_corrupt_write_count->increment(); + METRIC_VAR_INCREMENT(corrupt_writes); LOG_ERROR_PREFIX("pegasus batch writes handler failed, from = {}, exception = {}", requests[i]->header->from_address.to_string(), ex.what()); diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h index 6a002c299..d8a358164 100644 --- a/src/server/pegasus_server_write.h +++ b/src/server/pegasus_server_write.h @@ -27,7 +27,6 @@ #include "base/pegasus_rpc_types.h" #include "pegasus_write_service.h" -#include "perf_counter/perf_counter_wrapper.h" #include "replica/replica_base.h" #include "rrdb/rrdb_types.h" #include "runtime/task/task_code.h" @@ -102,7 +101,7 @@ private: typedef std::map<dsn::task_code, std::function<int(dsn::message_ex *)>> non_batch_writes_map; non_batch_writes_map _non_batch_write_handlers; - ::dsn::perf_counter_wrapper _pfc_recent_corrupt_write_count; + METRIC_VAR_DECLARE_counter(corrupt_writes); }; } // namespace server diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index 73f6cc8d6..4889329d9 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -52,6 +52,93 @@ class blob; class message_ex; } // namespace dsn +METRIC_DEFINE_counter(replica, + put_requests, + dsn::metric_unit::kRequests, + "The number of PUT requests for each replica"); + +METRIC_DEFINE_counter(replica, + multi_put_requests, + dsn::metric_unit::kRequests, + "The number of MULTI_PUT requests for each replica"); + +METRIC_DEFINE_counter(replica, + remove_requests, + dsn::metric_unit::kRequests, + "The number of REMOVE requests for each replica"); + +METRIC_DEFINE_counter(replica, + multi_remove_requests, + dsn::metric_unit::kRequests, + "The number of MULTI_REMOVE requests for each replica"); + +METRIC_DEFINE_counter(replica, + incr_requests, + dsn::metric_unit::kRequests, + "The number of INCR requests for each replica"); + +METRIC_DEFINE_counter(replica, + check_and_set_requests, + dsn::metric_unit::kRequests, + "The number of CHECK_AND_SET requests for each replica"); + +METRIC_DEFINE_counter(replica, + check_and_mutate_requests, + dsn::metric_unit::kRequests, + "The number of CHECK_AND_MUTATE requests for each replica"); + +METRIC_DEFINE_percentile_int64(replica, + put_latency_ns, + dsn::metric_unit::kNanoSeconds, + "The latency of PUT requests for each replica"); + +METRIC_DEFINE_percentile_int64(replica, + multi_put_latency_ns, + dsn::metric_unit::kNanoSeconds, + "The latency of MULTI_PUT requests for each replica"); + +METRIC_DEFINE_percentile_int64(replica, + remove_latency_ns, + dsn::metric_unit::kNanoSeconds, + "The latency of REMOVE requests for each replica"); + +METRIC_DEFINE_percentile_int64(replica, + multi_remove_latency_ns, + dsn::metric_unit::kNanoSeconds, + "The latency of MULTI_REMOVE requests for each replica"); + +METRIC_DEFINE_percentile_int64(replica, + incr_latency_ns, + dsn::metric_unit::kNanoSeconds, + "The latency of INCR requests for each replica"); + +METRIC_DEFINE_percentile_int64(replica, + check_and_set_latency_ns, + dsn::metric_unit::kNanoSeconds, + "The latency of CHECK_AND_SET requests for each replica"); + +METRIC_DEFINE_percentile_int64(replica, + check_and_mutate_latency_ns, + dsn::metric_unit::kNanoSeconds, + "The latency of CHECK_AND_MUTATE requests for each replica"); + +METRIC_DEFINE_counter(replica, + dup_requests, + dsn::metric_unit::kRequests, + "The number of DUPLICATE requests for each replica"); + +METRIC_DEFINE_percentile_int64( + replica, + dup_time_lag_ms, + dsn::metric_unit::kMilliSeconds, + "the time lag (in ms) between master and slave in the duplication for each replica"); + +METRIC_DEFINE_counter( + replica, + dup_lagging_writes, + dsn::metric_unit::kRequests, + "the number of lagging writes (time lag larger than `dup_lagging_write_threshold_ms`)"); + namespace pegasus { namespace server { @@ -68,105 +155,33 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server) _server(server), _impl(new impl(server)), _batch_start_time(0), - _cu_calculator(server->_cu_calculator.get()) + _cu_calculator(server->_cu_calculator.get()), + METRIC_VAR_INIT_replica(put_requests), + METRIC_VAR_INIT_replica(multi_put_requests), + METRIC_VAR_INIT_replica(remove_requests), + METRIC_VAR_INIT_replica(multi_remove_requests), + METRIC_VAR_INIT_replica(incr_requests), + METRIC_VAR_INIT_replica(check_and_set_requests), + METRIC_VAR_INIT_replica(check_and_mutate_requests), + METRIC_VAR_INIT_replica(put_latency_ns), + METRIC_VAR_INIT_replica(multi_put_latency_ns), + METRIC_VAR_INIT_replica(remove_latency_ns), + METRIC_VAR_INIT_replica(multi_remove_latency_ns), + METRIC_VAR_INIT_replica(incr_latency_ns), + METRIC_VAR_INIT_replica(check_and_set_latency_ns), + METRIC_VAR_INIT_replica(check_and_mutate_latency_ns), + METRIC_VAR_INIT_replica(dup_requests), + METRIC_VAR_INIT_replica(dup_time_lag_ms), + METRIC_VAR_INIT_replica(dup_lagging_writes), + _put_batch_size(0), + _remove_batch_size(0) { - std::string str_gpid = fmt::format("{}", server->get_gpid()); - - std::string name; - - name = fmt::format("put_qps@{}", str_gpid); - _pfc_put_qps.init_app_counter( - "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of PUT request"); - - name = fmt::format("multi_put_qps@{}", str_gpid); - _pfc_multi_put_qps.init_app_counter( - "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of MULTI_PUT request"); - - name = fmt::format("remove_qps@{}", str_gpid); - _pfc_remove_qps.init_app_counter( - "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of REMOVE request"); - - name = fmt::format("multi_remove_qps@{}", str_gpid); - _pfc_multi_remove_qps.init_app_counter("app.pegasus", - name.c_str(), - COUNTER_TYPE_RATE, - "statistic the qps of MULTI_REMOVE request"); - - name = fmt::format("incr_qps@{}", str_gpid); - _pfc_incr_qps.init_app_counter( - "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of INCR request"); - - name = fmt::format("check_and_set_qps@{}", str_gpid); - _pfc_check_and_set_qps.init_app_counter("app.pegasus", - name.c_str(), - COUNTER_TYPE_RATE, - "statistic the qps of CHECK_AND_SET request"); - - name = fmt::format("check_and_mutate_qps@{}", str_gpid); - _pfc_check_and_mutate_qps.init_app_counter("app.pegasus", - name.c_str(), - COUNTER_TYPE_RATE, - "statistic the qps of CHECK_AND_MUTATE request"); - - name = fmt::format("put_latency@{}", str_gpid); - _pfc_put_latency.init_app_counter("app.pegasus", - name.c_str(), - COUNTER_TYPE_NUMBER_PERCENTILES, - "statistic the latency of PUT request"); - - name = fmt::format("multi_put_latency@{}", str_gpid); - _pfc_multi_put_latency.init_app_counter("app.pegasus", - name.c_str(), - COUNTER_TYPE_NUMBER_PERCENTILES, - "statistic the latency of MULTI_PUT request"); - - name = fmt::format("remove_latency@{}", str_gpid); - _pfc_remove_latency.init_app_counter("app.pegasus", - name.c_str(), - COUNTER_TYPE_NUMBER_PERCENTILES, - "statistic the latency of REMOVE request"); - - name = fmt::format("multi_remove_latency@{}", str_gpid); - _pfc_multi_remove_latency.init_app_counter("app.pegasus", - name.c_str(), - COUNTER_TYPE_NUMBER_PERCENTILES, - "statistic the latency of MULTI_REMOVE request"); - - name = fmt::format("incr_latency@{}", str_gpid); - _pfc_incr_latency.init_app_counter("app.pegasus", - name.c_str(), - COUNTER_TYPE_NUMBER_PERCENTILES, - "statistic the latency of INCR request"); - - name = fmt::format("check_and_set_latency@{}", str_gpid); - _pfc_check_and_set_latency.init_app_counter("app.pegasus", - name.c_str(), - COUNTER_TYPE_NUMBER_PERCENTILES, - "statistic the latency of CHECK_AND_SET request"); - - name = fmt::format("check_and_mutate_latency@{}", str_gpid); - _pfc_check_and_mutate_latency.init_app_counter( - "app.pegasus", - name.c_str(), - COUNTER_TYPE_NUMBER_PERCENTILES, - "statistic the latency of CHECK_AND_MUTATE request"); - - _pfc_duplicate_qps.init_app_counter("app.pegasus", - fmt::format("duplicate_qps@{}", str_gpid).c_str(), - COUNTER_TYPE_RATE, - "statistic the qps of DUPLICATE requests"); - - _pfc_dup_time_lag.init_app_counter( - "app.pegasus", - fmt::format("dup.time_lag_ms@{}", app_name()).c_str(), - COUNTER_TYPE_NUMBER_PERCENTILES, - "the time (in ms) lag between master and slave in the duplication"); - - _pfc_dup_lagging_writes.init_app_counter( - "app.pegasus", - fmt::format("dup.lagging_writes@{}", app_name()).c_str(), - COUNTER_TYPE_VOLATILE_NUMBER, - "the number of lagging writes (time lag larger than `dup_lagging_write_threshold_ms`)"); + _dup_lagging_write_threshold_ms = dsn_config_get_value_int64( + "pegasus.server", + "dup_lagging_write_threshold_ms", + 10 * 1000, + "If the duration that a write flows from master to slave is larger than this threshold, " + "the write is defined a lagging write."); } pegasus_write_service::~pegasus_write_service() {} @@ -177,15 +192,15 @@ int pegasus_write_service::multi_put(const db_write_context &ctx, const dsn::apps::multi_put_request &update, dsn::apps::update_response &resp) { - uint64_t start_time = dsn_now_ns(); - _pfc_multi_put_qps->increment(); + METRIC_VAR_AUTO_LATENCY(multi_put_latency_ns); + METRIC_VAR_INCREMENT(multi_put_requests); + int err = _impl->multi_put(ctx, update, resp); if (_server->is_primary()) { _cu_calculator->add_multi_put_cu(resp.error, update.hash_key, update.kvs); } - _pfc_multi_put_latency->set(dsn_now_ns() - start_time); return err; } @@ -193,15 +208,15 @@ int pegasus_write_service::multi_remove(int64_t decree, const dsn::apps::multi_remove_request &update, dsn::apps::multi_remove_response &resp) { - uint64_t start_time = dsn_now_ns(); - _pfc_multi_remove_qps->increment(); + METRIC_VAR_AUTO_LATENCY(multi_remove_latency_ns); + METRIC_VAR_INCREMENT(multi_remove_requests); + int err = _impl->multi_remove(decree, update, resp); if (_server->is_primary()) { _cu_calculator->add_multi_remove_cu(resp.error, update.hash_key, update.sort_keys); } - _pfc_multi_remove_latency->set(dsn_now_ns() - start_time); return err; } @@ -209,15 +224,15 @@ int pegasus_write_service::incr(int64_t decree, const dsn::apps::incr_request &update, dsn::apps::incr_response &resp) { - uint64_t start_time = dsn_now_ns(); - _pfc_incr_qps->increment(); + METRIC_VAR_AUTO_LATENCY(incr_latency_ns); + METRIC_VAR_INCREMENT(incr_requests); + int err = _impl->incr(decree, update, resp); if (_server->is_primary()) { _cu_calculator->add_incr_cu(resp.error, update.key); } - _pfc_incr_latency->set(dsn_now_ns() - start_time); return err; } @@ -225,8 +240,9 @@ int pegasus_write_service::check_and_set(int64_t decree, const dsn::apps::check_and_set_request &update, dsn::apps::check_and_set_response &resp) { - uint64_t start_time = dsn_now_ns(); - _pfc_check_and_set_qps->increment(); + METRIC_VAR_AUTO_LATENCY(check_and_set_latency_ns); + METRIC_VAR_INCREMENT(check_and_set_requests); + int err = _impl->check_and_set(decree, update, resp); if (_server->is_primary()) { @@ -237,7 +253,6 @@ int pegasus_write_service::check_and_set(int64_t decree, update.set_value); } - _pfc_check_and_set_latency->set(dsn_now_ns() - start_time); return err; } @@ -245,8 +260,9 @@ int pegasus_write_service::check_and_mutate(int64_t decree, const dsn::apps::check_and_mutate_request &update, dsn::apps::check_and_mutate_response &resp) { - uint64_t start_time = dsn_now_ns(); - _pfc_check_and_mutate_qps->increment(); + METRIC_VAR_AUTO_LATENCY(check_and_mutate_latency_ns); + METRIC_VAR_INCREMENT(check_and_mutate_requests); + int err = _impl->check_and_mutate(decree, update, resp); if (_server->is_primary()) { @@ -254,7 +270,6 @@ int pegasus_write_service::check_and_mutate(int64_t decree, resp.error, update.hash_key, update.check_sort_key, update.mutate_list); } - _pfc_check_and_mutate_latency->set(dsn_now_ns() - start_time); return err; } @@ -272,8 +287,7 @@ int pegasus_write_service::batch_put(const db_write_context &ctx, { CHECK_GT_MSG(_batch_start_time, 0, "batch_put must be called after batch_prepare"); - _batch_qps_perfcounters.push_back(_pfc_put_qps.get()); - _batch_latency_perfcounters.push_back(_pfc_put_latency.get()); + ++_put_batch_size; int err = _impl->batch_put(ctx, update, resp); if (_server->is_primary()) { @@ -289,8 +303,7 @@ int pegasus_write_service::batch_remove(int64_t decree, { CHECK_GT_MSG(_batch_start_time, 0, "batch_remove must be called after batch_prepare"); - _batch_qps_perfcounters.push_back(_pfc_remove_qps.get()); - _batch_latency_perfcounters.push_back(_pfc_remove_latency.get()); + ++_remove_batch_size; int err = _impl->batch_remove(decree, key, resp); if (_server->is_primary()) { @@ -322,15 +335,21 @@ void pegasus_write_service::set_default_ttl(uint32_t ttl) { _impl->set_default_t void pegasus_write_service::clear_up_batch_states() { - uint64_t latency = dsn_now_ns() - _batch_start_time; - for (dsn::perf_counter *pfc : _batch_qps_perfcounters) - pfc->increment(); - for (dsn::perf_counter *pfc : _batch_latency_perfcounters) - pfc->set(latency); - - _batch_qps_perfcounters.clear(); - _batch_latency_perfcounters.clear(); +#define PROCESS_WRITE_BATCH(op) \ + do { \ + METRIC_VAR_INCREMENT_BY(op##_requests, static_cast<int64_t>(_##op##_batch_size)); \ + METRIC_VAR_SET(op##_latency_ns, static_cast<size_t>(_##op##_batch_size), latency_ns); \ + _##op##_batch_size = 0; \ + } while (0) + + auto latency_ns = static_cast<int64_t>(dsn_now_ns() - _batch_start_time); + + PROCESS_WRITE_BATCH(put); + PROCESS_WRITE_BATCH(remove); + _batch_start_time = 0; + +#undef PROCESS_WRITE_BATCH } int pegasus_write_service::duplicate(int64_t decree, @@ -350,14 +369,13 @@ int pegasus_write_service::duplicate(int64_t decree, return empty_put(decree); } - _pfc_duplicate_qps->increment(); - auto cleanup = dsn::defer([this, &request]() { - uint64_t latency_ms = (dsn_now_us() - request.timestamp) / 1000; - if (latency_ms > FLAGS_dup_lagging_write_threshold_ms) { - _pfc_dup_lagging_writes->increment(); - } - _pfc_dup_time_lag->set(latency_ms); - }); + METRIC_VAR_INCREMENT(dup_requests); + METRIC_VAR_AUTO_LATENCY( + dup_time_lag_ms, request.timestamp * 1000, [this](uint64_t latency) { + if (latency > _dup_lagging_write_threshold_ms) { + METRIC_VAR_INCREMENT(dup_lagging_writes); + } + }); dsn::message_ex *write = dsn::from_blob_to_received_msg(request.task_code, request.raw_message); bool is_delete = request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE || diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index 9fb854ffd..9e79f9122 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -25,13 +25,11 @@ #include "common//duplication_common.h" #include "common/common.h" -#include "perf_counter/perf_counter_wrapper.h" #include "replica/replica_base.h" #include "utils/errors.h" namespace dsn { class blob; -class perf_counter; namespace apps { class check_and_mutate_request; class check_and_mutate_response; @@ -216,28 +214,29 @@ private: capacity_unit_calculator *_cu_calculator; - ::dsn::perf_counter_wrapper _pfc_put_qps; - ::dsn::perf_counter_wrapper _pfc_multi_put_qps; - ::dsn::perf_counter_wrapper _pfc_remove_qps; - ::dsn::perf_counter_wrapper _pfc_multi_remove_qps; - ::dsn::perf_counter_wrapper _pfc_incr_qps; - ::dsn::perf_counter_wrapper _pfc_check_and_set_qps; - ::dsn::perf_counter_wrapper _pfc_check_and_mutate_qps; - ::dsn::perf_counter_wrapper _pfc_duplicate_qps; - ::dsn::perf_counter_wrapper _pfc_dup_time_lag; - ::dsn::perf_counter_wrapper _pfc_dup_lagging_writes; - - ::dsn::perf_counter_wrapper _pfc_put_latency; - ::dsn::perf_counter_wrapper _pfc_multi_put_latency; - ::dsn::perf_counter_wrapper _pfc_remove_latency; - ::dsn::perf_counter_wrapper _pfc_multi_remove_latency; - ::dsn::perf_counter_wrapper _pfc_incr_latency; - ::dsn::perf_counter_wrapper _pfc_check_and_set_latency; - ::dsn::perf_counter_wrapper _pfc_check_and_mutate_latency; - - // Records all requests. - std::vector<::dsn::perf_counter *> _batch_qps_perfcounters; - std::vector<::dsn::perf_counter *> _batch_latency_perfcounters; + METRIC_VAR_DECLARE_counter(put_requests); + METRIC_VAR_DECLARE_counter(multi_put_requests); + METRIC_VAR_DECLARE_counter(remove_requests); + METRIC_VAR_DECLARE_counter(multi_remove_requests); + METRIC_VAR_DECLARE_counter(incr_requests); + METRIC_VAR_DECLARE_counter(check_and_set_requests); + METRIC_VAR_DECLARE_counter(check_and_mutate_requests); + + METRIC_VAR_DECLARE_percentile_int64(put_latency_ns); + METRIC_VAR_DECLARE_percentile_int64(multi_put_latency_ns); + METRIC_VAR_DECLARE_percentile_int64(remove_latency_ns); + METRIC_VAR_DECLARE_percentile_int64(multi_remove_latency_ns); + METRIC_VAR_DECLARE_percentile_int64(incr_latency_ns); + METRIC_VAR_DECLARE_percentile_int64(check_and_set_latency_ns); + METRIC_VAR_DECLARE_percentile_int64(check_and_mutate_latency_ns); + + METRIC_VAR_DECLARE_counter(dup_requests); + METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms); + METRIC_VAR_DECLARE_counter(dup_lagging_writes); + + // Record batch size for put and remove requests. + uint32_t _put_batch_size; + uint32_t _remove_batch_size; // TODO(wutao1): add perf counters for failed rpc. }; diff --git a/src/server/test/pegasus_server_write_test.cpp b/src/server/test/pegasus_server_write_test.cpp index 0e8c57114..b771e67bf 100644 --- a/src/server/test/pegasus_server_write_test.cpp +++ b/src/server/test/pegasus_server_write_test.cpp @@ -108,8 +108,8 @@ public: // make sure everything is cleanup after batch write. ASSERT_TRUE(_server_write->_put_rpc_batch.empty()); ASSERT_TRUE(_server_write->_remove_rpc_batch.empty()); - ASSERT_TRUE(_server_write->_write_svc->_batch_qps_perfcounters.empty()); - ASSERT_TRUE(_server_write->_write_svc->_batch_latency_perfcounters.empty()); + ASSERT_EQ(_server_write->_write_svc->_put_batch_size, 0); + ASSERT_EQ(_server_write->_write_svc->_remove_batch_size, 0); ASSERT_EQ(_server_write->_write_svc->_batch_start_time, 0); ASSERT_EQ(_server_write->_write_svc->_impl->_rocksdb_wrapper->_write_batch->Count(), 0); diff --git a/src/utils/metrics.h b/src/utils/metrics.h index 27c6355f3..2d6da6c0f 100644 --- a/src/utils/metrics.h +++ b/src/utils/metrics.h @@ -51,6 +51,7 @@ #include "utils/singleton.h" #include "utils/string_view.h" #include "utils/synchronize.h" +#include "utils/time_utils.h" namespace boost { namespace system { @@ -89,7 +90,8 @@ class error_code; // Instantiating the metric in whatever class represents it with some initial arguments, if any: // metric_instance = METRIC_my_gauge_name.instantiate(entity_instance, ...); -// Convenient macros are provided to define entity types and metric prototypes. +// The following are convenient macros provided to define entity types and metric prototypes. + #define METRIC_DEFINE_entity(name) ::dsn::metric_entity_prototype METRIC_ENTITY_##name(#name) #define METRIC_DEFINE_gauge_int64(entity_type, name, unit, desc, ...) \ ::dsn::gauge_prototype<int64_t> METRIC_##name( \ @@ -97,6 +99,7 @@ class error_code; #define METRIC_DEFINE_gauge_double(entity_type, name, unit, desc, ...) \ ::dsn::gauge_prototype<double> METRIC_##name( \ {#entity_type, dsn::metric_type::kGauge, #name, unit, desc, ##__VA_ARGS__}) + // There are 2 kinds of counters: // - `counter` is the general type of counter that is implemented by striped_long_adder, which can // achieve high performance while consuming less memory if it's not updated very frequently. @@ -141,6 +144,42 @@ class error_code; #define METRIC_DECLARE_percentile_double(name) \ extern dsn::floating_percentile_prototype<double> METRIC_##name +// Following METRIC_*VAR* macros are introduced so that: +// * only need to use prototype name to operate each metric variable; +// * uniformly name each variable in user class; +// * differentiate operations on metrics significantly from main logic, improving code readability. + +// Declare a metric variable in user class. +// +// Since a type tends to be a class template where there might be commas, use variadic arguments +// instead of a single fixed argument to represent a type. +#define METRIC_VAR_DECLARE(name, ...) __VA_ARGS__ _##name +#define METRIC_VAR_DECLARE_gauge_int64(name) METRIC_VAR_DECLARE(name, dsn::gauge_ptr<int64_t>) +#define METRIC_VAR_DECLARE_counter(name) \ + METRIC_VAR_DECLARE(name, dsn::counter_ptr<dsn::striped_long_adder, false>) +#define METRIC_VAR_DECLARE_percentile_int64(name) \ + METRIC_VAR_DECLARE(name, dsn::percentile_ptr<int64_t>) + +// Initialize a metric variable in user class. +#define METRIC_VAR_INIT(name, entity) _##name(METRIC_##name.instantiate(entity##_metric_entity())) +#define METRIC_VAR_INIT_replica(name) METRIC_VAR_INIT(name, replica) + +// Perform increment-related operations on metrics including gauge and counter. +#define METRIC_VAR_INCREMENT_BY(name, x) _##name->increment_by(x) +#define METRIC_VAR_INCREMENT(name) _##name->increment() + +// Perform set() operations on metrics including gauge and percentile. +// +// There are 2 kinds of invocations of set() for a metric: +// * set(val): set a single value for a metric, such as gauge, percentile; +// * set(n, val): set multiple repeated values (the number of duplicates is n) for a metric, +// such as percentile. +#define METRIC_VAR_SET(name, ...) _##name->set(__VA_ARGS__) + +// Convenient macro that is used to compute latency automatically, which is dedicated to percentile. +#define METRIC_VAR_AUTO_LATENCY(name, ...) \ + dsn::auto_latency __##name##_auto_latency(_##name, ##__VA_ARGS__) + namespace dsn { class metric; // IWYU pragma: keep class metric_entity_prototype; // IWYU pragma: keep @@ -552,7 +591,7 @@ ENUM_REG_WITH_CUSTOM_NAME(metric_type::kVolatileCounter, volatile_counter) ENUM_REG_WITH_CUSTOM_NAME(metric_type::kPercentile, percentile) ENUM_END(metric_type) -enum class metric_unit +enum class metric_unit : size_t { kNanoSeconds, kMicroSeconds, @@ -562,6 +601,31 @@ enum class metric_unit kInvalidUnit, }; +#define METRIC_ASSERT_UNIT_LATENCY(unit, index) \ + static_assert(static_cast<size_t>(metric_unit::unit) == index, \ + #unit " should be at index " #index) + +METRIC_ASSERT_UNIT_LATENCY(kNanoSeconds, 0); +METRIC_ASSERT_UNIT_LATENCY(kMicroSeconds, 1); +METRIC_ASSERT_UNIT_LATENCY(kMilliSeconds, 2); +METRIC_ASSERT_UNIT_LATENCY(kSeconds, 3); + +const std::vector<uint64_t> kMetricLatencyConverterFromNS = { + 1, 1000, 1000 * 1000, 1000 * 1000 * 1000}; + +inline uint64_t convert_metric_latency_from_ns(uint64_t latency_ns, metric_unit target_unit) +{ + if (dsn_likely(target_unit == metric_unit::kNanoSeconds)) { + // Since nanoseconds are used as the latency unit more frequently, eliminate unnecessary + // conversion by branch prediction. + return latency_ns; + } + + auto index = static_cast<size_t>(target_unit); + CHECK_LT(index, kMetricLatencyConverterFromNS.size()); + return latency_ns / kMetricLatencyConverterFromNS[index]; +} + ENUM_BEGIN(metric_unit, metric_unit::kInvalidUnit) ENUM_REG_WITH_CUSTOM_NAME(metric_unit::kNanoSeconds, nanoseconds) ENUM_REG_WITH_CUSTOM_NAME(metric_unit::kMicroSeconds, microseconds) @@ -1066,6 +1130,13 @@ public: _samples.get()[index & (_sample_size - 1)] = val; } + void set(size_t n, const value_type &val) + { + for (size_t i = 0; i < n; ++i) { + set(val); + } + } + // If `type` is not configured, it will return false with zero value stored in `val`; // otherwise, it will always return true with the value corresponding to `type`. bool get(kth_percentile_type type, value_type &val) const @@ -1177,6 +1248,7 @@ private: friend class metric_entity; friend class ref_ptr<percentile<value_type, NthElementFinder>>; + friend class MetricVarTest; virtual void close() override { @@ -1199,6 +1271,20 @@ private: release_ref(); } + std::vector<value_type> samples_for_test() + { + size_type real_sample_size = std::min(static_cast<size_type>(_tail.load()), _sample_size); + if (real_sample_size == 0) { + return std::vector<value_type>(); + } + + std::vector<value_type> real_samples(real_sample_size); + std::copy(_samples.get(), _samples.get() + real_sample_size, real_samples.begin()); + return real_samples; + } + + void reset_tail_for_test() { _tail.store(0); } + value_type value(size_t index) const { return _full_nth_elements[index].load(std::memory_order_relaxed); @@ -1219,7 +1305,7 @@ private: } // Find nth elements. - std::vector<T> array(real_sample_size); + std::vector<value_type> array(real_sample_size); std::copy(_samples.get(), _samples.get() + real_sample_size, array.begin()); _nth_element_finder(array.begin(), array.begin(), array.end()); @@ -1288,4 +1374,47 @@ template <typename T, using floating_percentile_prototype = metric_prototype_with<floating_percentile<T, NthElementFinder>>; +// Compute latency automatically at the end of the scope, which is set to percentile which it has +// bound to. +class auto_latency +{ +public: + auto_latency(const percentile_ptr<int64_t> &percentile) : _percentile(percentile) {} + + auto_latency(const percentile_ptr<int64_t> &percentile, std::function<void(uint64_t)> callback) + : _percentile(percentile), _callback(std::move(callback)) + { + } + + auto_latency(const percentile_ptr<int64_t> &percentile, uint64_t start_time_ns) + : _percentile(percentile), _chrono(start_time_ns) + { + } + + auto_latency(const percentile_ptr<int64_t> &percentile, + uint64_t start_time_ns, + std::function<void(uint64_t)> callback) + : _percentile(percentile), _chrono(start_time_ns), _callback(std::move(callback)) + { + } + + ~auto_latency() + { + auto latency = + convert_metric_latency_from_ns(_chrono.duration_ns(), _percentile->prototype()->unit()); + _percentile->set(static_cast<int64_t>(latency)); + + if (_callback) { + _callback(latency); + } + } + +private: + percentile_ptr<int64_t> _percentile; + utils::chronograph _chrono; + std::function<void(uint64_t)> _callback; + + DISALLOW_COPY_AND_ASSIGN(auto_latency); +}; + } // namespace dsn diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp index b2118f03e..13eb9647f 100644 --- a/src/utils/test/metrics_test.cpp +++ b/src/utils/test/metrics_test.cpp @@ -83,6 +83,8 @@ METRIC_DEFINE_entity(my_server); METRIC_DEFINE_entity(my_table); METRIC_DEFINE_entity(my_replica); +#define METRIC_VAR_INIT_my_replica(name) METRIC_VAR_INIT(name, my_replica) + // Dedicated entity for getting metrics by http service. METRIC_DEFINE_entity(my_app); @@ -174,6 +176,26 @@ METRIC_DEFINE_percentile_double(my_server, dsn::metric_unit::kNanoSeconds, "a server-level percentile of double type for test"); +METRIC_DEFINE_percentile_int64(my_replica, + test_replica_percentile_int64_ns, + dsn::metric_unit::kNanoSeconds, + "a replica-level percentile of int64 type in nanoseconds for test"); + +METRIC_DEFINE_percentile_int64(my_replica, + test_replica_percentile_int64_us, + dsn::metric_unit::kMicroSeconds, + "a replica-level percentile of int64 type in microseconds for test"); + +METRIC_DEFINE_percentile_int64(my_replica, + test_replica_percentile_int64_ms, + dsn::metric_unit::kMilliSeconds, + "a replica-level percentile of int64 type in milliseconds for test"); + +METRIC_DEFINE_percentile_int64(my_replica, + test_replica_percentile_int64_s, + dsn::metric_unit::kSeconds, + "a replica-level percentile of int64 type in seconds for test"); + namespace dsn { TEST(metrics_test, create_entity) @@ -737,9 +759,7 @@ void run_percentile(const metric_entity_ptr &my_entity, auto my_metric = prototype.instantiate(my_entity, interval_ms, kth_percentiles, sample_size); // Preload zero in current thread. - for (size_t i = 0; i < num_preload; ++i) { - my_metric->set(0); - } + my_metric->set(num_preload, 0); // Load other data in each spawned thread evenly. const size_t num_operations = data.size() / num_threads; @@ -3056,4 +3076,125 @@ INSTANTIATE_TEST_CASE_P(MetricsTest, MetricsRetirementTest, testing::ValuesIn(metrics_retirement_tests)); +class MetricVarTest : public testing::Test +{ +protected: + MetricVarTest(); + + void SetUp() override + { + _test_replica_gauge_int64->set(0); + _test_replica_counter->reset(); + _test_replica_percentile_int64_ns->reset_tail_for_test(); + _test_replica_percentile_int64_us->reset_tail_for_test(); + _test_replica_percentile_int64_ms->reset_tail_for_test(); + _test_replica_percentile_int64_s->reset_tail_for_test(); + } + + const metric_entity_ptr &my_replica_metric_entity() const { return _my_replica_metric_entity; } + + void test_set_percentile(const std::vector<int64_t> &expected_samples); + void test_set_percentile(size_t n, int64_t val); + + const metric_entity_ptr _my_replica_metric_entity; + METRIC_VAR_DECLARE_gauge_int64(test_replica_gauge_int64); + METRIC_VAR_DECLARE_counter(test_replica_counter); + METRIC_VAR_DECLARE_percentile_int64(test_replica_percentile_int64_ns); + METRIC_VAR_DECLARE_percentile_int64(test_replica_percentile_int64_us); + METRIC_VAR_DECLARE_percentile_int64(test_replica_percentile_int64_ms); + METRIC_VAR_DECLARE_percentile_int64(test_replica_percentile_int64_s); + + DISALLOW_COPY_AND_ASSIGN(MetricVarTest); +}; + +MetricVarTest::MetricVarTest() + : _my_replica_metric_entity(METRIC_ENTITY_my_replica.instantiate("replica_var_test")), + METRIC_VAR_INIT_my_replica(test_replica_gauge_int64), + METRIC_VAR_INIT_my_replica(test_replica_counter), + METRIC_VAR_INIT_my_replica(test_replica_percentile_int64_ns), + METRIC_VAR_INIT_my_replica(test_replica_percentile_int64_us), + METRIC_VAR_INIT_my_replica(test_replica_percentile_int64_ms), + METRIC_VAR_INIT_my_replica(test_replica_percentile_int64_s) +{ +} + +#define METRIC_VAR_SAMPLES(name) _##name->samples_for_test() + +void MetricVarTest::test_set_percentile(const std::vector<int64_t> &expected_samples) +{ + for (const auto &val : expected_samples) { + METRIC_VAR_SET(test_replica_percentile_int64_ns, val); + } + EXPECT_EQ(expected_samples, METRIC_VAR_SAMPLES(test_replica_percentile_int64_ns)); +} + +void MetricVarTest::test_set_percentile(size_t n, int64_t val) +{ + METRIC_VAR_SET(test_replica_percentile_int64_ns, n, val); + EXPECT_EQ(std::vector<int64_t>(n, val), METRIC_VAR_SAMPLES(test_replica_percentile_int64_ns)); +} + +#define METRIC_VAR_VALUE(name) _##name->value() + +#define TEST_METRIC_VAR_INCREMENT(name) \ + do { \ + ASSERT_EQ(0, METRIC_VAR_VALUE(name)); \ + \ + METRIC_VAR_INCREMENT(name); \ + ASSERT_EQ(1, METRIC_VAR_VALUE(name)); \ + \ + METRIC_VAR_INCREMENT(name); \ + ASSERT_EQ(2, METRIC_VAR_VALUE(name)); \ + \ + METRIC_VAR_INCREMENT_BY(name, 5); \ + ASSERT_EQ(7, METRIC_VAR_VALUE(name)); \ + \ + METRIC_VAR_INCREMENT_BY(name, 18); \ + ASSERT_EQ(25, METRIC_VAR_VALUE(name)); \ + } while (0); + +TEST_F(MetricVarTest, IncrementGauge) { TEST_METRIC_VAR_INCREMENT(test_replica_gauge_int64); } + +TEST_F(MetricVarTest, IncrementCounter) { TEST_METRIC_VAR_INCREMENT(test_replica_counter); } + +TEST_F(MetricVarTest, SetGauge) +{ + ASSERT_EQ(0, METRIC_VAR_VALUE(test_replica_gauge_int64)); + + METRIC_VAR_SET(test_replica_gauge_int64, 5); + ASSERT_EQ(5, METRIC_VAR_VALUE(test_replica_gauge_int64)); + + METRIC_VAR_SET(test_replica_gauge_int64, 18); + ASSERT_EQ(18, METRIC_VAR_VALUE(test_replica_gauge_int64)); +} + +TEST_F(MetricVarTest, SetPercentileIndividually) { test_set_percentile({20, 50, 10, 25, 16}); } + +TEST_F(MetricVarTest, SetPercentileRepeatedly) { test_set_percentile(5, 100); } + +#define TEST_METRIC_VAR_AUTO_LATENCY(unit_abbr, factor) \ + do { \ + auto start_time_ns = dsn_now_ns(); \ + uint64_t actual_latency_ns = 0; \ + { \ + METRIC_VAR_AUTO_LATENCY(test_replica_percentile_int64_##unit_abbr, \ + start_time_ns, \ + [&actual_latency_ns](uint64_t latency) mutable { \ + actual_latency_ns = latency * factor; \ + }); \ + } \ + \ + uint64_t expected_latency_ns = dsn_now_ns() - start_time_ns; \ + ASSERT_GE(expected_latency_ns, actual_latency_ns); \ + EXPECT_LT(expected_latency_ns - actual_latency_ns, 1000 * 1000); \ + } while (0) + +TEST_F(MetricVarTest, AutoLatencyNanoSeconds) { TEST_METRIC_VAR_AUTO_LATENCY(ns, 1); } + +TEST_F(MetricVarTest, AutoLatencyMicroSeconds) { TEST_METRIC_VAR_AUTO_LATENCY(us, 1000); } + +TEST_F(MetricVarTest, AutoLatencyMilliSeconds) { TEST_METRIC_VAR_AUTO_LATENCY(ms, 1000 * 1000); } + +TEST_F(MetricVarTest, AutoLatencySeconds) { TEST_METRIC_VAR_AUTO_LATENCY(s, 1000 * 1000 * 1000); } + } // namespace dsn diff --git a/src/utils/time_utils.h b/src/utils/time_utils.h index a47ba594c..dc8d6e1ee 100644 --- a/src/utils/time_utils.h +++ b/src/utils/time_utils.h @@ -32,7 +32,10 @@ #include <cstdio> #include <string> -#include "string_view.h" +#include "runtime/api_layer1.h" +#include "utils/fmt_logging.h" +#include "utils/ports.h" +#include "utils/string_view.h" namespace dsn { namespace utils { @@ -131,5 +134,28 @@ inline int64_t hh_mm_today_to_unix_sec(string_view hhmm_of_day) return get_unix_sec_today_midnight() + sec_of_day; } +class chronograph +{ +public: + chronograph() : chronograph(dsn_now_ns()) {} + chronograph(uint64_t start_time_ns) : _start_time_ns(start_time_ns) {} + ~chronograph() = default; + + inline void reset_start_time() { _start_time_ns = dsn_now_ns(); } + + inline uint64_t duration_ns() + { + auto now = dsn_now_ns(); + CHECK_GE(now, _start_time_ns); + + return now - _start_time_ns; + } + +private: + uint64_t _start_time_ns; + + DISALLOW_COPY_AND_ASSIGN(chronograph); +}; + } // namespace utils } // namespace dsn --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
