This is an automated email from the ASF dual-hosted git repository.
gehafearless pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 858433dd7 feat(new_metrics): show replica-level qps and capacity units
by shell `nodes` command based on new metrics (#1899)
858433dd7 is described below
commit 858433dd7f6fa38f4b6a61634dbc0f820b05f960
Author: Dan Wang <[email protected]>
AuthorDate: Wed Feb 7 10:44:08 2024 +0800
feat(new_metrics): show replica-level qps and capacity units by shell
`nodes` command based on new metrics (#1899)
---
src/shell/command_helper.h | 58 ++++++++++++++++++
src/shell/commands/node_management.cpp | 104 ++++++++++++++------------------
src/shell/commands/table_management.cpp | 2 -
src/utils/metrics.cpp | 12 +++-
src/utils/metrics.h | 6 +-
src/utils/test/metrics_test.cpp | 1 +
6 files changed, 119 insertions(+), 64 deletions(-)
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index e5ac4395b..43a2e05b0 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -704,6 +704,64 @@ inline std::vector<dsn::http_result> get_metrics(const
std::vector<node_desc> &n
}
\
} while (0)
+using stat_var_map = std::unordered_map<std::string, double *>;
+inline dsn::error_s calc_metric_deltas(const std::string &json_string_1,
+ const std::string &json_string_2,
+ const std::string &entity_type,
+ stat_var_map &incs,
+ stat_var_map &rates)
+{
+ // Currently only Gauge and Counter are considered to have "increase" and
"rate", thus brief
+ // `value` field is enough.
+ DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string_1,
query_snapshot_1);
+ DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string_2,
query_snapshot_2);
+
+ if (query_snapshot_2.timestamp_ns <= query_snapshot_1.timestamp_ns) {
+ return FMT_ERR(dsn::ERR_INVALID_DATA,
+ "duration for metric samples should be > 0:
timestamp_ns_1={}, "
+ "timestamp_ns_2={}",
+ query_snapshot_1.timestamp_ns,
+ query_snapshot_2.timestamp_ns);
+ }
+
+ const std::vector<stat_var_map *> stat_vars = {&incs, &rates};
+
+#define CALC_STAT_VAR(op)
\
+ do {
\
+ if (entity.type != entity_type) {
\
+ continue;
\
+ }
\
+
\
+ for (const auto &m : entity.metrics) {
\
+ for (auto &stat : stat_vars) {
\
+ auto iter = stat->find(m.name);
\
+ if (iter != stat->end()) {
\
+ *iter->second op m.value;
\
+ }
\
+ }
\
+ }
\
+ } while (0)
+
+ for (const auto &entity : query_snapshot_2.entities) {
+ CALC_STAT_VAR(+=);
+ }
+
+ for (const auto &entity : query_snapshot_1.entities) {
+ CALC_STAT_VAR(-=);
+ }
+
+#undef CALC_STAT_VAR
+
+ const std::chrono::duration<double, std::nano> duration_ns(
+ static_cast<double>(query_snapshot_2.timestamp_ns -
query_snapshot_1.timestamp_ns));
+ const std::chrono::duration<double> duration_s = duration_ns;
+ for (auto &rate : rates) {
+ *rate.second /= duration_s.count();
+ }
+
+ return dsn::error_s::ok();
+}
+
inline std::vector<std::pair<bool, std::string>>
call_remote_command(shell_context *sc,
const std::vector<node_desc> &nodes,
diff --git a/src/shell/commands/node_management.cpp
b/src/shell/commands/node_management.cpp
index e9d2d9f97..abb06ca29 100644
--- a/src/shell/commands/node_management.cpp
+++ b/src/shell/commands/node_management.cpp
@@ -17,36 +17,36 @@
* under the License.
*/
-// IWYU pragma: no_include <bits/getopt_core.h>
#include <getopt.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <algorithm>
+// IWYU pragma: no_include <bits/getopt_core.h>
+#include <chrono>
#include <fstream>
#include <iostream>
#include <map>
#include <memory>
#include <string>
+#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
#include "client/replication_ddl_client.h"
-#include "common/json_helper.h"
#include "common/replication_enums.h"
#include "dsn.layer2_types.h"
#include "meta_admin_types.h"
-#include "perf_counter/perf_counter_utils.h"
#include "runtime/rpc/rpc_address.h"
#include "shell/command_executor.h"
#include "shell/command_helper.h"
#include "shell/command_utils.h"
#include "shell/commands.h"
#include "shell/sds/sds.h"
-#include "utils/blob.h"
#include "utils/error_code.h"
#include "utils/errors.h"
+#include "utils/flags.h"
#include "utils/math.h"
#include "utils/metrics.h"
#include "utils/output_utils.h"
@@ -54,6 +54,8 @@
#include "utils/strings.h"
#include "utils/utils.h"
+DSN_DEFINE_uint32(shell, nodes_sample_interval_ms, 1000, "The interval between
sampling metrics.");
+
bool query_cluster_info(command_executor *e, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"resolve_ip", no_argument, 0, 'r'},
@@ -111,8 +113,6 @@ dsn::metric_filters resource_usage_filters()
dsn::error_s parse_resource_usage(const std::string &json_string,
list_nodes_helper &stat)
{
- dsn::error_s err;
-
DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string,
query_snapshot);
int64_t total_capacity_mb = 0;
@@ -172,8 +172,6 @@ dsn::metric_filters profiler_latency_filters()
dsn::error_s parse_profiler_latency(const std::string &json_string,
list_nodes_helper &stat)
{
- dsn::error_s err;
-
DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(p99, json_string, query_snapshot);
for (const auto &entity : query_snapshot.entities) {
@@ -214,6 +212,21 @@ dsn::error_s parse_profiler_latency(const std::string
&json_string, list_nodes_h
return dsn::error_s::ok();
}
+dsn::metric_filters rw_requests_filters()
+{
+ dsn::metric_filters filters;
+ filters.with_metric_fields = {dsn::kMetricNameField,
dsn::kMetricSingleValueField};
+ filters.entity_types = {"replica"};
+ filters.entity_metrics = {"get_requests",
+ "multi_get_requests",
+ "batch_get_requests",
+ "put_requests",
+ "multi_put_requests",
+ "read_capacity_units",
+ "write_capacity_units"};
+ return filters;
+}
+
} // anonymous namespace
bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
@@ -377,57 +390,32 @@ bool ls_nodes(command_executor *e, shell_context *sc,
arguments args)
return true;
}
- std::vector<std::pair<bool, std::string>> results =
- call_remote_command(sc,
- nodes,
- "perf-counters-by-prefix",
- {"replica*app.pegasus*get_qps",
- "replica*app.pegasus*multi_get_qps",
- "replica*app.pegasus*batch_get_qps",
- "replica*app.pegasus*put_qps",
- "replica*app.pegasus*multi_put_qps",
- "replica*app.pegasus*recent.read.cu",
- "replica*app.pegasus*recent.write.cu"});
-
- for (int i = 0; i < nodes.size(); ++i) {
- dsn::rpc_address node_addr = nodes[i].address;
- auto tmp_it = tmp_map.find(node_addr);
- if (tmp_it == tmp_map.end())
+ const auto &results_1 = get_metrics(nodes,
rw_requests_filters().to_query_string());
+
std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_nodes_sample_interval_ms));
+ const auto &results_2 = get_metrics(nodes,
rw_requests_filters().to_query_string());
+
+ for (size_t i = 0; i < nodes.size(); ++i) {
+ auto tmp_it = tmp_map.find(nodes[i].address);
+ if (tmp_it == tmp_map.end()) {
continue;
- if (!results[i].first) {
- std::cout << "query perf counter info from node " << node_addr
<< " failed"
- << std::endl;
- return true;
- }
- dsn::perf_counter_info info;
- dsn::blob bb(results[i].second.data(), 0,
results[i].second.size());
- if (!dsn::json::json_forwarder<dsn::perf_counter_info>::decode(bb,
info)) {
- std::cout << "decode perf counter info from node " << node_addr
- << " failed, result = " << results[i].second <<
std::endl;
- return true;
- }
- if (info.result != "OK") {
- std::cout << "query perf counter info from node " << node_addr
- << " returns error, error = " << info.result <<
std::endl;
- return true;
- }
- list_nodes_helper &h = tmp_it->second;
- for (dsn::perf_counter_metric &m : info.counters) {
- if (m.name.find("replica*app.pegasus*get_qps") !=
std::string::npos)
- h.get_qps += m.value;
- else if (m.name.find("replica*app.pegasus*multi_get_qps") !=
std::string::npos)
- h.multi_get_qps += m.value;
- else if (m.name.find("replica*app.pegasus*batch_get_qps") !=
std::string::npos)
- h.batch_get_qps += m.value;
- else if (m.name.find("replica*app.pegasus*put_qps") !=
std::string::npos)
- h.put_qps += m.value;
- else if (m.name.find("replica*app.pegasus*multi_put_qps") !=
std::string::npos)
- h.multi_put_qps += m.value;
- else if (m.name.find("replica*app.pegasus*recent.read.cu") !=
std::string::npos)
- h.read_cu += m.value;
- else if (m.name.find("replica*app.pegasus*recent.write.cu") !=
std::string::npos)
- h.write_cu += m.value;
}
+
+ RETURN_SHELL_IF_GET_METRICS_FAILED(results_1[i], nodes[i], "1st rw
requests");
+ RETURN_SHELL_IF_GET_METRICS_FAILED(results_2[i], nodes[i], "2nd rw
requests");
+
+ list_nodes_helper &stat = tmp_it->second;
+ stat_var_map incs = {{"read_capacity_units", &stat.read_cu},
+ {"write_capacity_units", &stat.write_cu}};
+ stat_var_map rates = {{"get_requests", &stat.get_qps},
+ {"multi_get_requests", &stat.multi_get_qps},
+ {"batch_get_requests", &stat.batch_get_qps},
+ {"put_requests", &stat.put_qps},
+ {"multi_put_requests", &stat.multi_put_qps}};
+ RETURN_SHELL_IF_PARSE_METRICS_FAILED(
+ calc_metric_deltas(
+ results_1[i].body(), results_2[i].body(), "replica", incs,
rates),
+ nodes[i],
+ "rw requests");
}
}
@@ -440,7 +428,7 @@ bool ls_nodes(command_executor *e, shell_context *sc,
arguments args)
const auto &results = get_metrics(nodes,
profiler_latency_filters().to_query_string());
- for (int i = 0; i < nodes.size(); ++i) {
+ for (size_t i = 0; i < nodes.size(); ++i) {
auto tmp_it = tmp_map.find(nodes[i].address);
if (tmp_it == tmp_map.end()) {
continue;
diff --git a/src/shell/commands/table_management.cpp
b/src/shell/commands/table_management.cpp
index e6167da3f..b5a5e34a1 100644
--- a/src/shell/commands/table_management.cpp
+++ b/src/shell/commands/table_management.cpp
@@ -187,8 +187,6 @@ dsn::error_s parse_sst_stat(const std::string &json_string,
std::map<int32_t, double> &count_map,
std::map<int32_t, double> &disk_map)
{
- dsn::error_s err;
-
DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string,
query_snapshot);
for (const auto &entity : query_snapshot.entities) {
diff --git a/src/utils/metrics.cpp b/src/utils/metrics.cpp
index 681b6555c..09d08c4c9 100644
--- a/src/utils/metrics.cpp
+++ b/src/utils/metrics.cpp
@@ -520,6 +520,13 @@ void encode_port(dsn::metric_json_writer &writer)
ENCODE_OBJ_VAL(rpc != nullptr, rpc->primary_address().port());
}
+void encode_timestamp_ns(dsn::metric_json_writer &writer)
+{
+ writer.Key(dsn::kMetricTimestampNsField.c_str());
+
+ ENCODE_OBJ_VAL(true, dsn_now_ns());
+}
+
#undef ENCODE_OBJ_VAL
} // anonymous namespace
@@ -549,6 +556,7 @@ void metric_registry::take_snapshot(metric_json_writer
&writer, const metric_fil
encode_role(writer);
encode_host(writer);
encode_port(writer);
+ encode_timestamp_ns(writer);
encode_entities(writer, filters);
writer.EndObject();
}
@@ -557,7 +565,7 @@ metric_registry::collected_entities_info
metric_registry::collect_stale_entities
{
collected_entities_info collected_info;
- auto now = dsn_now_ms();
+ const auto now = dsn_now_ms();
utils::auto_read_lock l(_lock);
@@ -596,7 +604,7 @@ metric_registry::retire_stale_entities(const
collected_entity_list &collected_en
retired_entities_stat retired_stat;
- auto now = dsn_now_ms();
+ const auto now = dsn_now_ms();
utils::auto_write_lock l(_lock);
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index fb88aaeb0..449bbf1a2 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -353,6 +353,7 @@ const std::string kMetricClusterField = "cluster";
const std::string kMetricRoleField = "role";
const std::string kMetricHostField = "host";
const std::string kMetricPortField = "port";
+const std::string kMetricTimestampNsField = "timestamp_ns";
const std::string kMetricEntitiesField = "entities";
class metric_entity : public ref_counter
@@ -1679,9 +1680,10 @@ private:
std::string role;
\
std::string host;
\
uint16_t port;
\
+ uint64_t timestamp_ns;
\
std::vector<metric_entity_brief_##field##_snapshot> entities;
\
\
- DEFINE_JSON_SERIALIZATION(cluster, role, host, port, entities)
\
+ DEFINE_JSON_SERIALIZATION(cluster, role, host, port, timestamp_ns,
entities) \
}
#define DEF_ALL_METRIC_BRIEF_SNAPSHOTS(field)
\
@@ -1700,7 +1702,7 @@ DEF_ALL_METRIC_BRIEF_SNAPSHOTS(p99);
if (dsn_unlikely(
\
!dsn::json::json_forwarder<dsn::metric_query_brief_##field##_snapshot>::decode(
\
bb, query_snapshot))) {
\
- return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string");
\
+ return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string: {}",
json_string); \
}
\
} while (0)
diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp
index cbacc6a7b..640f8267e 100644
--- a/src/utils/test/metrics_test.cpp
+++ b/src/utils/test/metrics_test.cpp
@@ -2271,6 +2271,7 @@ const std::unordered_set<std::string>
kAllMetricQueryFields = {kMetricClusterFie
kMetricRoleField,
kMetricHostField,
kMetricPortField,
+
kMetricTimestampNsField,
kMetricEntitiesField};
void check_entity_ids_from_json_string(const std::string &json_string,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]