This is an automated email from the ASF dual-hosted git repository. wangdan pushed a commit to branch migrate-metrics-dev in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 0203427964bb9266d7105a5e99829ec7871b61cd Author: Dan Wang <[email protected]> AuthorDate: Thu Mar 23 18:06:01 2023 +0800 feat(new_metrics): migrate replica-level metrics for pegasus_event_listener (#1407) https://github.com/apache/incubator-pegasus/issues/1343 Migrate replica-level metrics in pegasus_event_listener class to new framework, all of which are rocksdb-related, including the number of completed flushes/compactions, the size of flush output in bytes, the size of compaction input/output in bytes. Note that in old perf counters there are just 2 replica-level metrics for pegasus_event_listener while all of others are server-level. Migrated to new framework all of the metrics have become replica-level; once server-level metrics are needed, just aggregate on replica-level ones. --- src/server/pegasus_event_listener.cpp | 123 ++++++++++++++++------------------ src/server/pegasus_event_listener.h | 26 ++++--- src/utils/metrics.h | 3 + 3 files changed, 72 insertions(+), 80 deletions(-) diff --git a/src/server/pegasus_event_listener.cpp b/src/server/pegasus_event_listener.cpp index e5414d8b7..330f93af9 100644 --- a/src/server/pegasus_event_listener.cpp +++ b/src/server/pegasus_event_listener.cpp @@ -19,100 +19,91 @@ #include "pegasus_event_listener.h" -#include <fmt/core.h> -#include <fmt/ostream.h> #include <rocksdb/compaction_job_stats.h> #include <rocksdb/table_properties.h> -#include <iosfwd> -#include <string> -#include "common/gpid.h" -#include "perf_counter/perf_counter.h" +#include "utils/autoref_ptr.h" #include "utils/fmt_logging.h" +#include "utils/string_view.h" namespace rocksdb { class DB; } // namespace rocksdb +METRIC_DEFINE_counter(replica, + rdb_flush_completed_count, + dsn::metric_unit::kFlushes, + "The number of completed rocksdb flushes"); + +METRIC_DEFINE_counter(replica, + rdb_flush_output_bytes, + dsn::metric_unit::kBytes, + "The size of rocksdb flush output in bytes"); + +METRIC_DEFINE_counter(replica, + rdb_compaction_completed_count, + dsn::metric_unit::kCompactions, + "The number of completed rocksdb compactions"); + +METRIC_DEFINE_counter(replica, + rdb_compaction_input_bytes, + dsn::metric_unit::kBytes, + "The size of rocksdb compaction input in bytes"); + +METRIC_DEFINE_counter(replica, + rdb_compaction_output_bytes, + dsn::metric_unit::kBytes, + "The size of rocksdb compaction output in bytes"); + +METRIC_DEFINE_counter( + replica, + rdb_changed_delayed_writes, + dsn::metric_unit::kWrites, + "The number of rocksdb delayed writes changed from another write stall condition"); + +METRIC_DEFINE_counter( + replica, + rdb_changed_stopped_writes, + dsn::metric_unit::kWrites, + "The number of rocksdb stopped writes changed from another write stall condition"); + namespace pegasus { namespace server { -pegasus_event_listener::pegasus_event_listener(replica_base *r) : replica_base(r) +pegasus_event_listener::pegasus_event_listener(replica_base *r) + : replica_base(r), + METRIC_VAR_INIT_replica(rdb_flush_completed_count), + METRIC_VAR_INIT_replica(rdb_flush_output_bytes), + METRIC_VAR_INIT_replica(rdb_compaction_completed_count), + METRIC_VAR_INIT_replica(rdb_compaction_input_bytes), + METRIC_VAR_INIT_replica(rdb_compaction_output_bytes), + METRIC_VAR_INIT_replica(rdb_changed_delayed_writes), + METRIC_VAR_INIT_replica(rdb_changed_stopped_writes) { - _pfc_recent_flush_completed_count.init_app_counter("app.pegasus", - "recent.flush.completed.count", - COUNTER_TYPE_VOLATILE_NUMBER, - "rocksdb recent flush completed count"); - _pfc_recent_flush_output_bytes.init_app_counter("app.pegasus", - "recent.flush.output.bytes", - COUNTER_TYPE_VOLATILE_NUMBER, - "rocksdb recent flush output bytes"); - _pfc_recent_compaction_completed_count.init_app_counter( - "app.pegasus", - "recent.compaction.completed.count", - COUNTER_TYPE_VOLATILE_NUMBER, - "rocksdb recent compaction completed count"); - _pfc_recent_compaction_input_bytes.init_app_counter("app.pegasus", - "recent.compaction.input.bytes", - COUNTER_TYPE_VOLATILE_NUMBER, - "rocksdb recent compaction input bytes"); - _pfc_recent_compaction_output_bytes.init_app_counter("app.pegasus", - "recent.compaction.output.bytes", - COUNTER_TYPE_VOLATILE_NUMBER, - "rocksdb recent compaction output bytes"); - _pfc_recent_write_change_delayed_count.init_app_counter( - "app.pegasus", - "recent.write.change.delayed.count", - COUNTER_TYPE_VOLATILE_NUMBER, - "rocksdb recent write change delayed count"); - _pfc_recent_write_change_stopped_count.init_app_counter( - "app.pegasus", - "recent.write.change.stopped.count", - COUNTER_TYPE_VOLATILE_NUMBER, - "rocksdb recent write change stopped count"); - - // replica-level perfcounter - std::string counter_str = fmt::format("recent_rdb_compaction_input_bytes@{}", r->get_gpid()); - _pfc_recent_rdb_compaction_input_bytes.init_app_counter( - "app.pegasus", - counter_str.c_str(), - COUNTER_TYPE_VOLATILE_NUMBER, - "rocksdb recent compaction input bytes"); - - counter_str = fmt::format("recent_rdb_compaction_output_bytes@{}", r->get_gpid()); - _pfc_recent_rdb_compaction_output_bytes.init_app_counter( - "app.pegasus", - counter_str.c_str(), - COUNTER_TYPE_VOLATILE_NUMBER, - "rocksdb recent compaction output bytes"); } -void pegasus_event_listener::OnFlushCompleted(rocksdb::DB *db, - const rocksdb::FlushJobInfo &flush_job_info) +void pegasus_event_listener::OnFlushCompleted(rocksdb::DB *db, const rocksdb::FlushJobInfo &info) { - _pfc_recent_flush_completed_count->increment(); - _pfc_recent_flush_output_bytes->add(flush_job_info.table_properties.data_size); + METRIC_VAR_INCREMENT(rdb_flush_completed_count); + METRIC_VAR_INCREMENT_BY(rdb_flush_output_bytes, info.table_properties.data_size); } void pegasus_event_listener::OnCompactionCompleted(rocksdb::DB *db, - const rocksdb::CompactionJobInfo &ci) + const rocksdb::CompactionJobInfo &info) { - _pfc_recent_compaction_completed_count->increment(); - _pfc_recent_compaction_input_bytes->add(ci.stats.total_input_bytes); - _pfc_recent_compaction_output_bytes->add(ci.stats.total_output_bytes); - - _pfc_recent_rdb_compaction_input_bytes->add(ci.stats.total_input_bytes); - _pfc_recent_rdb_compaction_output_bytes->add(ci.stats.total_output_bytes); + METRIC_VAR_INCREMENT(rdb_compaction_completed_count); + METRIC_VAR_INCREMENT_BY(rdb_compaction_input_bytes, info.stats.total_input_bytes); + METRIC_VAR_INCREMENT_BY(rdb_compaction_output_bytes, info.stats.total_output_bytes); } void pegasus_event_listener::OnStallConditionsChanged(const rocksdb::WriteStallInfo &info) { if (info.condition.cur == rocksdb::WriteStallCondition::kDelayed) { LOG_ERROR_PREFIX("rocksdb write delayed"); - _pfc_recent_write_change_delayed_count->increment(); + METRIC_VAR_INCREMENT(rdb_changed_delayed_writes); } else if (info.condition.cur == rocksdb::WriteStallCondition::kStopped) { LOG_ERROR_PREFIX("rocksdb write stopped"); - _pfc_recent_write_change_stopped_count->increment(); + METRIC_VAR_INCREMENT(rdb_changed_stopped_writes); } } diff --git a/src/server/pegasus_event_listener.h b/src/server/pegasus_event_listener.h index 5e6ab7e1a..900fabdc8 100644 --- a/src/server/pegasus_event_listener.h +++ b/src/server/pegasus_event_listener.h @@ -21,8 +21,8 @@ #include <rocksdb/listener.h> -#include "perf_counter/perf_counter_wrapper.h" #include "replica/replica_base.h" +#include "utils/metrics.h" namespace rocksdb { class DB; @@ -37,24 +37,22 @@ public: explicit pegasus_event_listener(replica_base *r); ~pegasus_event_listener() override = default; - void OnFlushCompleted(rocksdb::DB *db, const rocksdb::FlushJobInfo &flush_job_info) override; + void OnFlushCompleted(rocksdb::DB *db, const rocksdb::FlushJobInfo &info) override; - void OnCompactionCompleted(rocksdb::DB *db, const rocksdb::CompactionJobInfo &ci) override; + void OnCompactionCompleted(rocksdb::DB *db, const rocksdb::CompactionJobInfo &info) override; void OnStallConditionsChanged(const rocksdb::WriteStallInfo &info) override; private: - ::dsn::perf_counter_wrapper _pfc_recent_flush_completed_count; - ::dsn::perf_counter_wrapper _pfc_recent_flush_output_bytes; - ::dsn::perf_counter_wrapper _pfc_recent_compaction_completed_count; - ::dsn::perf_counter_wrapper _pfc_recent_compaction_input_bytes; - ::dsn::perf_counter_wrapper _pfc_recent_compaction_output_bytes; - ::dsn::perf_counter_wrapper _pfc_recent_write_change_delayed_count; - ::dsn::perf_counter_wrapper _pfc_recent_write_change_stopped_count; - - // replica-level perfcounter - ::dsn::perf_counter_wrapper _pfc_recent_rdb_compaction_input_bytes; - ::dsn::perf_counter_wrapper _pfc_recent_rdb_compaction_output_bytes; + METRIC_VAR_DECLARE_counter(rdb_flush_completed_count); + METRIC_VAR_DECLARE_counter(rdb_flush_output_bytes); + + METRIC_VAR_DECLARE_counter(rdb_compaction_completed_count); + METRIC_VAR_DECLARE_counter(rdb_compaction_input_bytes); + METRIC_VAR_DECLARE_counter(rdb_compaction_output_bytes); + + METRIC_VAR_DECLARE_counter(rdb_changed_delayed_writes); + METRIC_VAR_DECLARE_counter(rdb_changed_stopped_writes); }; } // namespace server diff --git a/src/utils/metrics.h b/src/utils/metrics.h index 333534703..9087b78e5 100644 --- a/src/utils/metrics.h +++ b/src/utils/metrics.h @@ -620,6 +620,9 @@ enum class metric_unit : size_t kKeys, kFiles, kAmplification, + kFlushes, + kCompactions, + kWrites, kInvalidUnit, }; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
