This is an automated email from the ASF dual-hosted git repository.

gehafearless pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 858433dd7 feat(new_metrics): show replica-level qps and capacity units 
by shell `nodes` command based on new metrics (#1899)
858433dd7 is described below

commit 858433dd7f6fa38f4b6a61634dbc0f820b05f960
Author: Dan Wang <[email protected]>
AuthorDate: Wed Feb 7 10:44:08 2024 +0800

    feat(new_metrics): show replica-level qps and capacity units by shell 
`nodes` command based on new metrics (#1899)
---
 src/shell/command_helper.h              |  58 ++++++++++++++++++
 src/shell/commands/node_management.cpp  | 104 ++++++++++++++------------------
 src/shell/commands/table_management.cpp |   2 -
 src/utils/metrics.cpp                   |  12 +++-
 src/utils/metrics.h                     |   6 +-
 src/utils/test/metrics_test.cpp         |   1 +
 6 files changed, 119 insertions(+), 64 deletions(-)

diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index e5ac4395b..43a2e05b0 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -704,6 +704,64 @@ inline std::vector<dsn::http_result> get_metrics(const 
std::vector<node_desc> &n
         }                                                                      
                    \
     } while (0)
 
+using stat_var_map = std::unordered_map<std::string, double *>;
+inline dsn::error_s calc_metric_deltas(const std::string &json_string_1,
+                                       const std::string &json_string_2,
+                                       const std::string &entity_type,
+                                       stat_var_map &incs,
+                                       stat_var_map &rates)
+{
+    // Currently only Gauge and Counter are considered to have "increase" and 
"rate", thus brief
+    // `value` field is enough.
+    DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string_1, 
query_snapshot_1);
+    DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string_2, 
query_snapshot_2);
+
+    if (query_snapshot_2.timestamp_ns <= query_snapshot_1.timestamp_ns) {
+        return FMT_ERR(dsn::ERR_INVALID_DATA,
+                       "duration for metric samples should be > 0: 
timestamp_ns_1={}, "
+                       "timestamp_ns_2={}",
+                       query_snapshot_1.timestamp_ns,
+                       query_snapshot_2.timestamp_ns);
+    }
+
+    const std::vector<stat_var_map *> stat_vars = {&incs, &rates};
+
+#define CALC_STAT_VAR(op)                                                      
                    \
+    do {                                                                       
                    \
+        if (entity.type != entity_type) {                                      
                    \
+            continue;                                                          
                    \
+        }                                                                      
                    \
+                                                                               
                    \
+        for (const auto &m : entity.metrics) {                                 
                    \
+            for (auto &stat : stat_vars) {                                     
                    \
+                auto iter = stat->find(m.name);                                
                    \
+                if (iter != stat->end()) {                                     
                    \
+                    *iter->second op m.value;                                  
                    \
+                }                                                              
                    \
+            }                                                                  
                    \
+        }                                                                      
                    \
+    } while (0)
+
+    for (const auto &entity : query_snapshot_2.entities) {
+        CALC_STAT_VAR(+=);
+    }
+
+    for (const auto &entity : query_snapshot_1.entities) {
+        CALC_STAT_VAR(-=);
+    }
+
+#undef CALC_STAT_VAR
+
+    const std::chrono::duration<double, std::nano> duration_ns(
+        static_cast<double>(query_snapshot_2.timestamp_ns - 
query_snapshot_1.timestamp_ns));
+    const std::chrono::duration<double> duration_s = duration_ns;
+    for (auto &rate : rates) {
+        *rate.second /= duration_s.count();
+    }
+
+    return dsn::error_s::ok();
+}
+
 inline std::vector<std::pair<bool, std::string>>
 call_remote_command(shell_context *sc,
                     const std::vector<node_desc> &nodes,
diff --git a/src/shell/commands/node_management.cpp 
b/src/shell/commands/node_management.cpp
index e9d2d9f97..abb06ca29 100644
--- a/src/shell/commands/node_management.cpp
+++ b/src/shell/commands/node_management.cpp
@@ -17,36 +17,36 @@
  * under the License.
  */
 
-// IWYU pragma: no_include <bits/getopt_core.h>
 #include <getopt.h>
 #include <stdint.h>
 #include <stdio.h>
 #include <string.h>
 #include <algorithm>
+// IWYU pragma: no_include <bits/getopt_core.h>
+#include <chrono>
 #include <fstream>
 #include <iostream>
 #include <map>
 #include <memory>
 #include <string>
+#include <thread>
 #include <unordered_map>
 #include <utility>
 #include <vector>
 
 #include "client/replication_ddl_client.h"
-#include "common/json_helper.h"
 #include "common/replication_enums.h"
 #include "dsn.layer2_types.h"
 #include "meta_admin_types.h"
-#include "perf_counter/perf_counter_utils.h"
 #include "runtime/rpc/rpc_address.h"
 #include "shell/command_executor.h"
 #include "shell/command_helper.h"
 #include "shell/command_utils.h"
 #include "shell/commands.h"
 #include "shell/sds/sds.h"
-#include "utils/blob.h"
 #include "utils/error_code.h"
 #include "utils/errors.h"
+#include "utils/flags.h"
 #include "utils/math.h"
 #include "utils/metrics.h"
 #include "utils/output_utils.h"
@@ -54,6 +54,8 @@
 #include "utils/strings.h"
 #include "utils/utils.h"
 
+DSN_DEFINE_uint32(shell, nodes_sample_interval_ms, 1000, "The interval between 
sampling metrics.");
+
 bool query_cluster_info(command_executor *e, shell_context *sc, arguments args)
 {
     static struct option long_options[] = {{"resolve_ip", no_argument, 0, 'r'},
@@ -111,8 +113,6 @@ dsn::metric_filters resource_usage_filters()
 
 dsn::error_s parse_resource_usage(const std::string &json_string, 
list_nodes_helper &stat)
 {
-    dsn::error_s err;
-
     DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string, 
query_snapshot);
 
     int64_t total_capacity_mb = 0;
@@ -172,8 +172,6 @@ dsn::metric_filters profiler_latency_filters()
 
 dsn::error_s parse_profiler_latency(const std::string &json_string, 
list_nodes_helper &stat)
 {
-    dsn::error_s err;
-
     DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(p99, json_string, query_snapshot);
 
     for (const auto &entity : query_snapshot.entities) {
@@ -214,6 +212,21 @@ dsn::error_s parse_profiler_latency(const std::string 
&json_string, list_nodes_h
     return dsn::error_s::ok();
 }
 
+dsn::metric_filters rw_requests_filters()
+{
+    dsn::metric_filters filters;
+    filters.with_metric_fields = {dsn::kMetricNameField, 
dsn::kMetricSingleValueField};
+    filters.entity_types = {"replica"};
+    filters.entity_metrics = {"get_requests",
+                              "multi_get_requests",
+                              "batch_get_requests",
+                              "put_requests",
+                              "multi_put_requests",
+                              "read_capacity_units",
+                              "write_capacity_units"};
+    return filters;
+}
+
 } // anonymous namespace
 
 bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
@@ -377,57 +390,32 @@ bool ls_nodes(command_executor *e, shell_context *sc, 
arguments args)
             return true;
         }
 
-        std::vector<std::pair<bool, std::string>> results =
-            call_remote_command(sc,
-                                nodes,
-                                "perf-counters-by-prefix",
-                                {"replica*app.pegasus*get_qps",
-                                 "replica*app.pegasus*multi_get_qps",
-                                 "replica*app.pegasus*batch_get_qps",
-                                 "replica*app.pegasus*put_qps",
-                                 "replica*app.pegasus*multi_put_qps",
-                                 "replica*app.pegasus*recent.read.cu",
-                                 "replica*app.pegasus*recent.write.cu"});
-
-        for (int i = 0; i < nodes.size(); ++i) {
-            dsn::rpc_address node_addr = nodes[i].address;
-            auto tmp_it = tmp_map.find(node_addr);
-            if (tmp_it == tmp_map.end())
+        const auto &results_1 = get_metrics(nodes, 
rw_requests_filters().to_query_string());
+        
std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_nodes_sample_interval_ms));
+        const auto &results_2 = get_metrics(nodes, 
rw_requests_filters().to_query_string());
+
+        for (size_t i = 0; i < nodes.size(); ++i) {
+            auto tmp_it = tmp_map.find(nodes[i].address);
+            if (tmp_it == tmp_map.end()) {
                 continue;
-            if (!results[i].first) {
-                std::cout << "query perf counter info from node " << node_addr 
<< " failed"
-                          << std::endl;
-                return true;
-            }
-            dsn::perf_counter_info info;
-            dsn::blob bb(results[i].second.data(), 0, 
results[i].second.size());
-            if (!dsn::json::json_forwarder<dsn::perf_counter_info>::decode(bb, 
info)) {
-                std::cout << "decode perf counter info from node " << node_addr
-                          << " failed, result = " << results[i].second << 
std::endl;
-                return true;
-            }
-            if (info.result != "OK") {
-                std::cout << "query perf counter info from node " << node_addr
-                          << " returns error, error = " << info.result << 
std::endl;
-                return true;
-            }
-            list_nodes_helper &h = tmp_it->second;
-            for (dsn::perf_counter_metric &m : info.counters) {
-                if (m.name.find("replica*app.pegasus*get_qps") != 
std::string::npos)
-                    h.get_qps += m.value;
-                else if (m.name.find("replica*app.pegasus*multi_get_qps") != 
std::string::npos)
-                    h.multi_get_qps += m.value;
-                else if (m.name.find("replica*app.pegasus*batch_get_qps") != 
std::string::npos)
-                    h.batch_get_qps += m.value;
-                else if (m.name.find("replica*app.pegasus*put_qps") != 
std::string::npos)
-                    h.put_qps += m.value;
-                else if (m.name.find("replica*app.pegasus*multi_put_qps") != 
std::string::npos)
-                    h.multi_put_qps += m.value;
-                else if (m.name.find("replica*app.pegasus*recent.read.cu") != 
std::string::npos)
-                    h.read_cu += m.value;
-                else if (m.name.find("replica*app.pegasus*recent.write.cu") != 
std::string::npos)
-                    h.write_cu += m.value;
             }
+
+            RETURN_SHELL_IF_GET_METRICS_FAILED(results_1[i], nodes[i], "1st rw 
requests");
+            RETURN_SHELL_IF_GET_METRICS_FAILED(results_2[i], nodes[i], "2nd rw 
requests");
+
+            list_nodes_helper &stat = tmp_it->second;
+            stat_var_map incs = {{"read_capacity_units", &stat.read_cu},
+                                 {"write_capacity_units", &stat.write_cu}};
+            stat_var_map rates = {{"get_requests", &stat.get_qps},
+                                  {"multi_get_requests", &stat.multi_get_qps},
+                                  {"batch_get_requests", &stat.batch_get_qps},
+                                  {"put_requests", &stat.put_qps},
+                                  {"multi_put_requests", &stat.multi_put_qps}};
+            RETURN_SHELL_IF_PARSE_METRICS_FAILED(
+                calc_metric_deltas(
+                    results_1[i].body(), results_2[i].body(), "replica", incs, 
rates),
+                nodes[i],
+                "rw requests");
         }
     }
 
@@ -440,7 +428,7 @@ bool ls_nodes(command_executor *e, shell_context *sc, 
arguments args)
 
         const auto &results = get_metrics(nodes, 
profiler_latency_filters().to_query_string());
 
-        for (int i = 0; i < nodes.size(); ++i) {
+        for (size_t i = 0; i < nodes.size(); ++i) {
             auto tmp_it = tmp_map.find(nodes[i].address);
             if (tmp_it == tmp_map.end()) {
                 continue;
diff --git a/src/shell/commands/table_management.cpp 
b/src/shell/commands/table_management.cpp
index e6167da3f..b5a5e34a1 100644
--- a/src/shell/commands/table_management.cpp
+++ b/src/shell/commands/table_management.cpp
@@ -187,8 +187,6 @@ dsn::error_s parse_sst_stat(const std::string &json_string,
                             std::map<int32_t, double> &count_map,
                             std::map<int32_t, double> &disk_map)
 {
-    dsn::error_s err;
-
     DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string, 
query_snapshot);
 
     for (const auto &entity : query_snapshot.entities) {
diff --git a/src/utils/metrics.cpp b/src/utils/metrics.cpp
index 681b6555c..09d08c4c9 100644
--- a/src/utils/metrics.cpp
+++ b/src/utils/metrics.cpp
@@ -520,6 +520,13 @@ void encode_port(dsn::metric_json_writer &writer)
     ENCODE_OBJ_VAL(rpc != nullptr, rpc->primary_address().port());
 }
 
+void encode_timestamp_ns(dsn::metric_json_writer &writer)
+{
+    writer.Key(dsn::kMetricTimestampNsField.c_str());
+
+    ENCODE_OBJ_VAL(true, dsn_now_ns());
+}
+
 #undef ENCODE_OBJ_VAL
 
 } // anonymous namespace
@@ -549,6 +556,7 @@ void metric_registry::take_snapshot(metric_json_writer 
&writer, const metric_fil
     encode_role(writer);
     encode_host(writer);
     encode_port(writer);
+    encode_timestamp_ns(writer);
     encode_entities(writer, filters);
     writer.EndObject();
 }
@@ -557,7 +565,7 @@ metric_registry::collected_entities_info 
metric_registry::collect_stale_entities
 {
     collected_entities_info collected_info;
 
-    auto now = dsn_now_ms();
+    const auto now = dsn_now_ms();
 
     utils::auto_read_lock l(_lock);
 
@@ -596,7 +604,7 @@ metric_registry::retire_stale_entities(const 
collected_entity_list &collected_en
 
     retired_entities_stat retired_stat;
 
-    auto now = dsn_now_ms();
+    const auto now = dsn_now_ms();
 
     utils::auto_write_lock l(_lock);
 
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index fb88aaeb0..449bbf1a2 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -353,6 +353,7 @@ const std::string kMetricClusterField = "cluster";
 const std::string kMetricRoleField = "role";
 const std::string kMetricHostField = "host";
 const std::string kMetricPortField = "port";
+const std::string kMetricTimestampNsField = "timestamp_ns";
 const std::string kMetricEntitiesField = "entities";
 
 class metric_entity : public ref_counter
@@ -1679,9 +1680,10 @@ private:
         std::string role;                                                      
                    \
         std::string host;                                                      
                    \
         uint16_t port;                                                         
                    \
+        uint64_t timestamp_ns;                                                 
                    \
         std::vector<metric_entity_brief_##field##_snapshot> entities;          
                    \
                                                                                
                    \
-        DEFINE_JSON_SERIALIZATION(cluster, role, host, port, entities)         
                    \
+        DEFINE_JSON_SERIALIZATION(cluster, role, host, port, timestamp_ns, 
entities)               \
     }
 
 #define DEF_ALL_METRIC_BRIEF_SNAPSHOTS(field)                                  
                    \
@@ -1700,7 +1702,7 @@ DEF_ALL_METRIC_BRIEF_SNAPSHOTS(p99);
         if (dsn_unlikely(                                                      
                    \
                 
!dsn::json::json_forwarder<dsn::metric_query_brief_##field##_snapshot>::decode( 
   \
                     bb, query_snapshot))) {                                    
                    \
-            return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string");      
                    \
+            return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string: {}", 
json_string);         \
         }                                                                      
                    \
     } while (0)
 
diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp
index cbacc6a7b..640f8267e 100644
--- a/src/utils/test/metrics_test.cpp
+++ b/src/utils/test/metrics_test.cpp
@@ -2271,6 +2271,7 @@ const std::unordered_set<std::string> 
kAllMetricQueryFields = {kMetricClusterFie
                                                                
kMetricRoleField,
                                                                
kMetricHostField,
                                                                
kMetricPortField,
+                                                               
kMetricTimestampNsField,
                                                                
kMetricEntitiesField};
 
 void check_entity_ids_from_json_string(const std::string &json_string,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to