This is an automated email from the ASF dual-hosted git repository. wangdan pushed a commit to branch migrate-metrics-dev in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 5325c60ef555d4d2ceeafac87013f1ad72f03b7e Author: Dan Wang <[email protected]> AuthorDate: Mon May 22 11:18:36 2023 +0800 feat(new_metrics): migrate metrics for some duplication class (#1482) https://github.com/apache/incubator-pegasus/issues/1481 Some duplication-related classes, including `ship_mutation`, `mutation_buffer`, `load_from_private_log`, are migrated to the new framework. During this migration, there are 6 metrics which are changed from server-level to replica-level, all of which are duplication-related, including the shipped size of private log, the number of times private log files have failed to be loaded, the bytes of mutations that have been skipped due to failed loadings of private log files, the size read from private log, the number of mutations read from private log, the number of lost mutations recently. Another 2 metrics, the numbers of failed/successful DUPLICATE requests sent from client, are renamed according to the new style of naming for duplication. --- src/replica/duplication/duplication_pipeline.cpp | 18 +++--- src/replica/duplication/duplication_pipeline.h | 4 +- src/replica/duplication/load_from_private_log.cpp | 73 ++++++++++++---------- src/replica/duplication/load_from_private_log.h | 14 +++-- src/replica/duplication/mutation_batch.cpp | 21 +++---- src/replica/duplication/mutation_batch.h | 4 +- .../test/load_from_private_log_test.cpp | 23 +++---- src/server/pegasus_mutation_duplicator.cpp | 16 ++--- src/server/pegasus_mutation_duplicator.h | 4 +- src/utils/metrics.h | 1 + 10 files changed, 93 insertions(+), 85 deletions(-) diff --git a/src/replica/duplication/duplication_pipeline.cpp b/src/replica/duplication/duplication_pipeline.cpp index 02a818bd9..a89e6014a 100644 --- a/src/replica/duplication/duplication_pipeline.cpp +++ b/src/replica/duplication/duplication_pipeline.cpp @@ -23,7 +23,6 @@ #include "dsn.layer2_types.h" #include "load_from_private_log.h" -#include "perf_counter/perf_counter.h" #include "replica/duplication/replica_duplicator.h" #include "replica/mutation_log.h" #include "replica/replica.h" @@ -31,9 +30,14 @@ #include "utils/autoref_ptr.h" #include "utils/errors.h" #include "utils/fmt_logging.h" +#include "utils/string_view.h" + +METRIC_DEFINE_counter(replica, + dup_shipped_bytes, + dsn::metric_unit::kBytes, + "The shipped size of private log for dup"); namespace dsn { -class string_view; namespace replication { @@ -80,7 +84,7 @@ void ship_mutation::ship(mutation_tuple_set &&in) { _mutation_duplicator->duplicate(std::move(in), [this](size_t total_shipped_size) mutable { update_progress(); - _counter_dup_shipped_bytes_rate->add(total_shipped_size); + METRIC_VAR_INCREMENT_BY(dup_shipped_bytes, total_shipped_size); step_down_next_stage(); }); } @@ -113,16 +117,12 @@ ship_mutation::ship_mutation(replica_duplicator *duplicator) : replica_base(duplicator), _duplicator(duplicator), _replica(duplicator->_replica), - _stub(duplicator->_replica->get_replica_stub()) + _stub(duplicator->_replica->get_replica_stub()), + METRIC_VAR_INIT_replica(dup_shipped_bytes) { _mutation_duplicator = new_mutation_duplicator( duplicator, _duplicator->remote_cluster_name(), _replica->get_app_info()->app_name); _mutation_duplicator->set_task_environment(duplicator); - - _counter_dup_shipped_bytes_rate.init_app_counter("eon.replica_stub", - "dup.shipped_bytes_rate", - COUNTER_TYPE_RATE, - "shipping rate of private log in bytes"); } } // namespace replication diff --git a/src/replica/duplication/duplication_pipeline.h b/src/replica/duplication/duplication_pipeline.h index 23d2028b3..d380cc81c 100644 --- a/src/replica/duplication/duplication_pipeline.h +++ b/src/replica/duplication/duplication_pipeline.h @@ -20,11 +20,11 @@ #include <memory> #include "common/replication_other_types.h" -#include "perf_counter/perf_counter_wrapper.h" #include "replica/duplication/mutation_duplicator.h" #include "replica/replica_base.h" #include "runtime/pipeline.h" #include "utils/chrono_literals.h" +#include "utils/metrics.h" namespace dsn { namespace replication { @@ -90,7 +90,7 @@ private: decree _last_decree{invalid_decree}; - perf_counter_wrapper _counter_dup_shipped_bytes_rate; + METRIC_VAR_DECLARE_counter(dup_shipped_bytes); }; } // namespace replication diff --git a/src/replica/duplication/load_from_private_log.cpp b/src/replica/duplication/load_from_private_log.cpp index aa458b22a..e3742a29e 100644 --- a/src/replica/duplication/load_from_private_log.cpp +++ b/src/replica/duplication/load_from_private_log.cpp @@ -21,7 +21,6 @@ #include "common/duplication_common.h" #include "duplication_types.h" #include "load_from_private_log.h" -#include "perf_counter/perf_counter.h" #include "replica/duplication/mutation_batch.h" #include "replica/mutation.h" #include "replica/mutation_log_utils.h" @@ -35,6 +34,27 @@ #include "utils/ports.h" #include "utils/string_view.h" +METRIC_DEFINE_counter(replica, + dup_log_file_load_failed_count, + dsn::metric_unit::kFileLoads, + "The number of times private log files have failed to be loaded during dup"); + +METRIC_DEFINE_counter(replica, + dup_log_file_load_skipped_bytes, + dsn::metric_unit::kBytes, + "The bytes of mutations that have been skipped due to failed loadings of " + "private log files during dup"); + +METRIC_DEFINE_counter(replica, + dup_log_read_bytes, + dsn::metric_unit::kBytes, + "The size read from private log for dup"); + +METRIC_DEFINE_counter(replica, + dup_log_read_mutations, + dsn::metric_unit::kMutations, + "The number of mutations read from private log for dup"); + namespace dsn { namespace replication { @@ -170,17 +190,17 @@ void load_from_private_log::find_log_file_to_start(std::map<int, log_file_ptr> l void load_from_private_log::replay_log_block() { - error_s err = - mutation_log::replay_block(_current, - [this](int log_bytes_length, mutation_ptr &mu) -> bool { - auto es = _mutation_batch.add(std::move(mu)); - CHECK_PREFIX_MSG(es.is_ok(), es.description()); - _counter_dup_log_read_bytes_rate->add(log_bytes_length); - _counter_dup_log_read_mutations_rate->increment(); - return true; - }, - _start_offset, - _current_global_end_offset); + error_s err = mutation_log::replay_block( + _current, + [this](int log_bytes_length, mutation_ptr &mu) -> bool { + auto es = _mutation_batch.add(std::move(mu)); + CHECK_PREFIX_MSG(es.is_ok(), es.description()); + METRIC_VAR_INCREMENT_BY(dup_log_read_bytes, log_bytes_length); + METRIC_VAR_INCREMENT(dup_log_read_mutations); + return true; + }, + _start_offset, + _current_global_end_offset); if (!err.is_ok() && err.code() != ERR_HANDLE_EOF) { // Error handling on loading failure: // - If block loading failed for `MAX_ALLOWED_REPEATS` times, it restarts reading the file. @@ -197,7 +217,7 @@ void load_from_private_log::replay_log_block() err, _current->path(), _start_offset); - _counter_dup_load_file_failed_count->increment(); + METRIC_VAR_INCREMENT(dup_log_file_load_failed_count); _err_file_repeats_num++; if (dsn_unlikely(will_fail_skip())) { // skip this file @@ -210,7 +230,7 @@ void load_from_private_log::replay_log_block() if (switch_to_next_log_file()) { // successfully skip to next file auto skipped_bytes = _current_global_end_offset - prev_offset; - _counter_dup_load_skipped_bytes_count->add(skipped_bytes); + METRIC_VAR_INCREMENT_BY(dup_log_file_load_skipped_bytes, skipped_bytes); repeat(_repeat_delay); return; } @@ -252,27 +272,12 @@ load_from_private_log::load_from_private_log(replica *r, replica_duplicator *dup _private_log(r->private_log()), _duplicator(dup), _stub(r->get_replica_stub()), - _mutation_batch(dup) + _mutation_batch(dup), + METRIC_VAR_INIT_replica(dup_log_file_load_failed_count), + METRIC_VAR_INIT_replica(dup_log_file_load_skipped_bytes), + METRIC_VAR_INIT_replica(dup_log_read_bytes), + METRIC_VAR_INIT_replica(dup_log_read_mutations) { - _counter_dup_log_read_bytes_rate.init_app_counter("eon.replica_stub", - "dup.log_read_bytes_rate", - COUNTER_TYPE_RATE, - "reading rate of private log in bytes"); - _counter_dup_log_read_mutations_rate.init_app_counter( - "eon.replica_stub", - "dup.log_read_mutations_rate", - COUNTER_TYPE_RATE, - "reading rate of mutations from private log"); - _counter_dup_load_file_failed_count.init_app_counter( - "eon.replica_stub", - "dup.load_file_failed_count", - COUNTER_TYPE_VOLATILE_NUMBER, - "the number of failures loading a private log file during duplication"); - _counter_dup_load_skipped_bytes_count.init_app_counter( - "eon.replica_stub", - "dup.load_skipped_bytes_count", - COUNTER_TYPE_VOLATILE_NUMBER, - "bytes of mutations that were skipped because of failure during duplication"); } void load_from_private_log::set_start_decree(decree start_decree) diff --git a/src/replica/duplication/load_from_private_log.h b/src/replica/duplication/load_from_private_log.h index 523002b54..dea4bca98 100644 --- a/src/replica/duplication/load_from_private_log.h +++ b/src/replica/duplication/load_from_private_log.h @@ -25,13 +25,14 @@ #include "common/replication_other_types.h" #include "mutation_batch.h" -#include "perf_counter/perf_counter_wrapper.h" #include "replica/duplication/mutation_duplicator.h" #include "replica/log_file.h" #include "replica/mutation_log.h" #include "replica/replica_base.h" #include "runtime/pipeline.h" +#include "utils/autoref_ptr.h" #include "utils/chrono_literals.h" +#include "utils/metrics.h" namespace dsn { namespace replication { @@ -76,6 +77,9 @@ public: void TEST_set_repeat_delay(std::chrono::milliseconds delay) { _repeat_delay = delay; } + METRIC_DEFINE_VALUE(dup_log_file_load_failed_count, int64_t) + METRIC_DEFINE_VALUE(dup_log_file_load_skipped_bytes, int64_t) + static constexpr int MAX_ALLOWED_BLOCK_REPEATS{3}; static constexpr int MAX_ALLOWED_FILE_REPEATS{10}; @@ -103,10 +107,10 @@ private: decree _start_decree{0}; - perf_counter_wrapper _counter_dup_load_file_failed_count; - perf_counter_wrapper _counter_dup_load_skipped_bytes_count; - perf_counter_wrapper _counter_dup_log_read_bytes_rate; - perf_counter_wrapper _counter_dup_log_read_mutations_rate; + METRIC_VAR_DECLARE_counter(dup_log_file_load_failed_count); + METRIC_VAR_DECLARE_counter(dup_log_file_load_skipped_bytes); + METRIC_VAR_DECLARE_counter(dup_log_read_bytes); + METRIC_VAR_DECLARE_counter(dup_log_read_mutations); std::chrono::milliseconds _repeat_delay{10_s}; }; diff --git a/src/replica/duplication/mutation_batch.cpp b/src/replica/duplication/mutation_batch.cpp index 786c4d61e..36018b8f7 100644 --- a/src/replica/duplication/mutation_batch.cpp +++ b/src/replica/duplication/mutation_batch.cpp @@ -15,21 +15,16 @@ // specific language governing permissions and limitations // under the License. -#include <fmt/core.h> -#include <fmt/ostream.h> #include <functional> -#include <iosfwd> #include <string> #include <tuple> #include <utility> #include <vector> -#include "common/gpid.h" #include "common/replication.codes.h" #include "consensus_types.h" #include "metadata_types.h" #include "mutation_batch.h" -#include "perf_counter/perf_counter.h" #include "replica_duplicator.h" #include "runtime/task/task_code.h" #include "runtime/task/task_spec.h" @@ -40,6 +35,11 @@ #include "utils/smart_pointers.h" #include "utils/string_view.h" +METRIC_DEFINE_gauge_int64(replica, + dup_recent_lost_mutations, + dsn::metric_unit::kMutations, + "The number of lost mutations recently for dup"); + namespace dsn { namespace replication { @@ -49,11 +49,9 @@ mutation_buffer::mutation_buffer(replica_base *r, decree init_decree, int max_count, mutation_committer committer) - : prepare_list(r, init_decree, max_count, committer) + : prepare_list(r, init_decree, max_count, committer), + METRIC_VAR_INIT_replica(dup_recent_lost_mutations) { - auto counter_str = fmt::format("dup_recent_mutation_loss_count@{}", r->get_gpid()); - _counter_dulication_mutation_loss_count.init_app_counter( - "eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str()); } void mutation_buffer::commit(decree d, commit_type ct) @@ -78,8 +76,7 @@ void mutation_buffer::commit(decree d, commit_type ct) // // just LOG_ERROR but not CHECK if mutation loss or other problem, it's different from // base class implement. And from the error and perf-counter, we can choose restart - // duplication - // or ignore the loss. + // duplication or ignore the loss. if (next_committed_mutation == nullptr || !next_committed_mutation->is_logged()) { LOG_ERROR_PREFIX("mutation[{}] is lost in prepare_list: " "prepare_last_committed_decree={}, prepare_min_decree={}, " @@ -88,7 +85,7 @@ void mutation_buffer::commit(decree d, commit_type ct) last_committed_decree(), min_decree(), max_decree()); - _counter_dulication_mutation_loss_count->set(min_decree() - last_committed_decree()); + METRIC_VAR_SET(dup_recent_lost_mutations, min_decree() - last_committed_decree()); // if next_commit_mutation loss, let last_commit_decree catch up with min_decree, and // the next loop will commit from min_decree _last_committed_decree = min_decree() - 1; diff --git a/src/replica/duplication/mutation_batch.h b/src/replica/duplication/mutation_batch.h index fb5efa123..97795cea2 100644 --- a/src/replica/duplication/mutation_batch.h +++ b/src/replica/duplication/mutation_batch.h @@ -22,12 +22,12 @@ #include <memory> #include "common/replication_other_types.h" -#include "perf_counter/perf_counter_wrapper.h" #include "replica/duplication/mutation_duplicator.h" #include "replica/mutation.h" #include "replica/prepare_list.h" #include "replica/replica_base.h" #include "utils/errors.h" +#include "utils/metrics.h" namespace dsn { namespace replication { @@ -45,7 +45,7 @@ public: void commit(decree d, commit_type ct) override; private: - perf_counter_wrapper _counter_dulication_mutation_loss_count; + METRIC_VAR_DECLARE_gauge_int64(dup_recent_lost_mutations); }; // A sorted array of committed mutations that are ready for duplication. diff --git a/src/replica/duplication/test/load_from_private_log_test.cpp b/src/replica/duplication/test/load_from_private_log_test.cpp index 342669f63..0a308f31a 100644 --- a/src/replica/duplication/test/load_from_private_log_test.cpp +++ b/src/replica/duplication/test/load_from_private_log_test.cpp @@ -30,8 +30,6 @@ #include "common/replication_other_types.h" #include "consensus_types.h" #include "duplication_types.h" -#include "perf_counter/perf_counter.h" -#include "perf_counter/perf_counter_wrapper.h" #include "replica/duplication/mutation_duplicator.h" #include "replica/duplication/replica_duplicator.h" #include "replica/log_file.h" @@ -50,6 +48,7 @@ #include "utils/filesystem.h" #include "utils/flags.h" #include "utils/fmt_logging.h" +#include "utils/metrics.h" #define BOOST_NO_CXX11_SCOPED_ENUMS #include <boost/filesystem/operations.hpp> @@ -393,6 +392,8 @@ public: load = std::make_unique<load_from_private_log>(_replica.get(), duplicator.get()); load->TEST_set_repeat_delay(0_ms); // no delay load->set_start_decree(duplicator->progress().last_decree + 1); + load->METRIC_VAR_NAME(dup_log_file_load_failed_count)->reset(); + load->METRIC_VAR_NAME(dup_log_file_load_skipped_bytes)->reset(); end_stage = std::make_unique<end_stage_t>( [this, num_entries](decree &&d, mutation_tuple_set &&mutations) { load->set_start_decree(d + 1); @@ -413,7 +414,7 @@ public: TEST_F(load_fail_mode_test, fail_skip) { duplicator->update_fail_mode(duplication_fail_mode::FAIL_SKIP); - ASSERT_EQ(load->_counter_dup_load_skipped_bytes_count->get_integer_value(), 0); + ASSERT_EQ(METRIC_VALUE(*load, dup_log_file_load_skipped_bytes), 0); // will trigger fail-skip and read the subsequent file, some mutations will be lost. auto repeats = load->MAX_ALLOWED_BLOCK_REPEATS * load->MAX_ALLOWED_FILE_REPEATS; @@ -423,16 +424,16 @@ TEST_F(load_fail_mode_test, fail_skip) duplicator->wait_all(); fail::teardown(); - ASSERT_EQ(load->_counter_dup_load_file_failed_count->get_integer_value(), + ASSERT_EQ(METRIC_VALUE(*load, dup_log_file_load_failed_count), load_from_private_log::MAX_ALLOWED_FILE_REPEATS); - ASSERT_GT(load->_counter_dup_load_skipped_bytes_count->get_integer_value(), 0); + ASSERT_GT(METRIC_VALUE(*load, dup_log_file_load_skipped_bytes), 0); } TEST_F(load_fail_mode_test, fail_slow) { duplicator->update_fail_mode(duplication_fail_mode::FAIL_SLOW); - ASSERT_EQ(load->_counter_dup_load_skipped_bytes_count->get_integer_value(), 0); - ASSERT_EQ(load->_counter_dup_load_file_failed_count->get_integer_value(), 0); + ASSERT_EQ(METRIC_VALUE(*load, dup_log_file_load_skipped_bytes), 0); + ASSERT_EQ(METRIC_VALUE(*load, dup_log_file_load_failed_count), 0); // will trigger fail-slow and retry infinitely auto repeats = load->MAX_ALLOWED_BLOCK_REPEATS * load->MAX_ALLOWED_FILE_REPEATS; @@ -442,9 +443,9 @@ TEST_F(load_fail_mode_test, fail_slow) duplicator->wait_all(); fail::teardown(); - ASSERT_EQ(load->_counter_dup_load_file_failed_count->get_integer_value(), + ASSERT_EQ(METRIC_VALUE(*load, dup_log_file_load_failed_count), load_from_private_log::MAX_ALLOWED_FILE_REPEATS); - ASSERT_EQ(load->_counter_dup_load_skipped_bytes_count->get_integer_value(), 0); + ASSERT_EQ(METRIC_VALUE(*load, dup_log_file_load_skipped_bytes), 0); } TEST_F(load_fail_mode_test, fail_skip_real_corrupted_file) @@ -464,9 +465,9 @@ TEST_F(load_fail_mode_test, fail_skip_real_corrupted_file) duplicator->wait_all(); // ensure the bad file will be skipped - ASSERT_EQ(load->_counter_dup_load_file_failed_count->get_integer_value(), + ASSERT_EQ(METRIC_VALUE(*load, dup_log_file_load_failed_count), load_from_private_log::MAX_ALLOWED_FILE_REPEATS); - ASSERT_GT(load->_counter_dup_load_skipped_bytes_count->get_integer_value(), 0); + ASSERT_GT(METRIC_VALUE(*load, dup_log_file_load_skipped_bytes), 0); } } // namespace replication diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 74832d5e6..e940d2b6a 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -48,14 +48,14 @@ #include "utils/rand.h" METRIC_DEFINE_counter(replica, - mutation_dup_successful_requests, + dup_shipped_successful_requests, dsn::metric_unit::kRequests, - "The number of successful DUPLICATE requests sent from mutation duplicator"); + "The number of successful DUPLICATE requests sent from client"); METRIC_DEFINE_counter(replica, - mutation_dup_failed_requests, + dup_shipped_failed_requests, dsn::metric_unit::kRequests, - "The number of failed DUPLICATE requests sent from mutation duplicator"); + "The number of failed DUPLICATE requests sent from client"); namespace dsn { namespace replication { @@ -107,8 +107,8 @@ pegasus_mutation_duplicator::pegasus_mutation_duplicator(dsn::replication::repli dsn::string_view app) : mutation_duplicator(r), _remote_cluster(remote_cluster), - METRIC_VAR_INIT_replica(mutation_dup_successful_requests), - METRIC_VAR_INIT_replica(mutation_dup_failed_requests) + METRIC_VAR_INIT_replica(dup_shipped_successful_requests), + METRIC_VAR_INIT_replica(dup_shipped_failed_requests) { // initialize pegasus-client when this class is first time used. static __attribute__((unused)) bool _dummy = pegasus_client_factory::initialize(nullptr); @@ -162,7 +162,7 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash, } if (perr != PERR_OK || err != dsn::ERR_OK) { - METRIC_VAR_INCREMENT(mutation_dup_failed_requests); + METRIC_VAR_INCREMENT(dup_shipped_failed_requests); // randomly log the 1% of the failed duplicate rpc, because minor number of // errors are acceptable. @@ -175,7 +175,7 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash, // duplicating an illegal write to server is unacceptable, fail fast. CHECK_NE_PREFIX_MSG(perr, PERR_INVALID_ARGUMENT, rpc.response().error_hint); } else { - METRIC_VAR_INCREMENT(mutation_dup_successful_requests); + METRIC_VAR_INCREMENT(dup_shipped_successful_requests); _total_shipped_size += rpc.dsn_request()->header->body_length + rpc.dsn_request()->header->hdr_length; } diff --git a/src/server/pegasus_mutation_duplicator.h b/src/server/pegasus_mutation_duplicator.h index dfe126df7..73da5f883 100644 --- a/src/server/pegasus_mutation_duplicator.h +++ b/src/server/pegasus_mutation_duplicator.h @@ -89,8 +89,8 @@ private: size_t _total_shipped_size{0}; - METRIC_VAR_DECLARE_counter(mutation_dup_successful_requests); - METRIC_VAR_DECLARE_counter(mutation_dup_failed_requests); + METRIC_VAR_DECLARE_counter(dup_shipped_successful_requests); + METRIC_VAR_DECLARE_counter(dup_shipped_failed_requests); }; // Decodes the binary `request_data` into write request in thrift struct, and diff --git a/src/utils/metrics.h b/src/utils/metrics.h index 3b88ba5e9..7a7d64bd0 100644 --- a/src/utils/metrics.h +++ b/src/utils/metrics.h @@ -686,6 +686,7 @@ enum class metric_unit : size_t kRounds, kResets, kBackups, + kFileLoads, kFileUploads, kBulkLoads, kInvalidUnit, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
