This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new c99cfd642 feat(new_metrics): support `server_stat` command showing 
some important server-level metrics (part 1) (#2085)
c99cfd642 is described below

commit c99cfd64284dcaa24f3be730f478fdb70ec2703a
Author: Dan Wang <[email protected]>
AuthorDate: Fri Aug 23 15:54:56 2024 +0800

    feat(new_metrics): support `server_stat` command showing some important 
server-level metrics (part 1) (#2085)
    
    As the 1st part that support `server_stat` command, both built-in metrics, 
the usage
    of virtual and physical memory would be shown.
---
 .clang-tidy                               |   2 +-
 .github/workflows/module_labeler_conf.yml |   3 +-
 build_tools/clang_tidy.py                 |   2 +-
 src/shell/command_helper.h                | 102 ++++++++++++---
 src/shell/commands/data_operations.cpp    |   3 +-
 src/shell/commands/node_management.cpp    | 207 +++++++++++++++++++++++++++---
 src/shell/commands/table_management.cpp   |  22 ++--
 src/shell/main.cpp                        |  11 +-
 src/utils/error_code.h                    |   2 +
 src/utils/metrics.h                       |  90 ++++++++++---
 10 files changed, 364 insertions(+), 80 deletions(-)

diff --git a/.clang-tidy b/.clang-tidy
index 2e072d7d8..1cdfca281 100644
--- a/.clang-tidy
+++ b/.clang-tidy
@@ -18,7 +18,7 @@
 # 
https://releases.llvm.org/14.0.0/tools/clang/tools/extra/docs/clang-tidy/index.html
 
 CheckOptions: []
-Checks: 
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-modernize-use-trailing-return-type,-cppcoreguidelines-avoid-non-const-global-variables,-fuchsia-statically-constructed-objects,-fuchsia-overloaded-operator,-bugprone-easily-swappable-parameters,-cppcoreguidelines-non-
 [...]
+Checks: 
'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-misc-definitions-in-headers,-modernize-use-trailing-return-type,-cppcoreguidelines-macro-usage,-modernize-replace-disallow-copy-and-assign-macro,-bugprone-macro-parentheses,-cppcoreguidelines-avoid-non-const-global-va
 [...]
 ExtraArgs:
 ExtraArgsBefore: []
 FormatStyle: none
diff --git a/.github/workflows/module_labeler_conf.yml 
b/.github/workflows/module_labeler_conf.yml
index da0ecf390..5994abb56 100644
--- a/.github/workflows/module_labeler_conf.yml
+++ b/.github/workflows/module_labeler_conf.yml
@@ -16,7 +16,8 @@
 # under the License.
 ---
 permissions:
-  contents: write
+  contents: 'write'
+  pull-requests: 'write'
 
 github:
   - .github/**/*
diff --git a/build_tools/clang_tidy.py b/build_tools/clang_tidy.py
index d4c8964a0..ed4f4d52a 100755
--- a/build_tools/clang_tidy.py
+++ b/build_tools/clang_tidy.py
@@ -60,7 +60,7 @@ def run_tidy(sha="HEAD", is_rev_range=False):
                    "clang-tidy",
                    "-p0",
                    "-path", BUILD_PATH,
-                   
"-checks=-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-modernize-use-trailing-return-type,-cppcoreguidelines-avoid-non-const-global-variables,-fuchsia-statically-constructed-objects,-fuchsia-overloaded-operator,-bugprone-easily-swappable-parameters,-cppcoreguidelines-non-private-member-variables-in-classes,-misc-non-private-member-variables-in-classes,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-hicpp-no-array-decay,-hicpp-named-parameter,-read
 [...]
+                   
"-checks=-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-misc-definitions-in-headers,-modernize-use-trailing-return-type,-cppcoreguidelines-macro-usage,-modernize-replace-disallow-copy-and-assign-macro,-bugprone-macro-parentheses,-cppcoreguidelines-avoid-non-const-global-variables,-fuchsia-statically-constructed-objects,-fuchsia-overloaded-operator,-bugprone-easily-swappable-parameters,-cppcoreguidelines-non-private-member-variables-in-classes,-misc-non-p
 [...]
                    "-extra-arg=-language=c++",
                    "-extra-arg=-std=c++17",
                    "-extra-arg=-Ithirdparty/output/include"]
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index 912fae917..3e3b11ca4 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -709,30 +709,67 @@ inline std::vector<dsn::http_result> get_metrics(const 
std::vector<node_desc> &n
     return results;
 }
 
+// Adapt the result returned by `get_metrics` into the structure that could be 
processed by
+// `remote_command`.
+template <typename... Args>
+inline dsn::error_s process_get_metrics_result(const dsn::http_result &result,
+                                               const node_desc &node,
+                                               const char *what,
+                                               Args &&...args)
+{
+    if (dsn_unlikely(!result.error())) {
+        return FMT_ERR(result.error().code(),
+                       "ERROR: query {} metrics from node {} failed, msg={}",
+                       fmt::format(what, std::forward<Args>(args)...),
+                       node.hp,
+                       result.error());
+    }
+
+    if (dsn_unlikely(result.status() != dsn::http_status_code::kOk)) {
+        return FMT_ERR(dsn::ERR_HTTP_ERROR,
+                       "ERROR: query {} metrics from node {} failed, 
http_status={}, msg={}",
+                       fmt::format(what, std::forward<Args>(args)...),
+                       node.hp,
+                       dsn::get_http_status_message(result.status()),
+                       result.body());
+    }
+
+    return dsn::error_s::ok();
+}
+
 #define RETURN_SHELL_IF_GET_METRICS_FAILED(result, node, what, ...)            
                    \
     do {                                                                       
                    \
-        if (dsn_unlikely(!result.error())) {                                   
                    \
-            std::cout << "ERROR: send http request to query " << 
fmt::format(what, ##__VA_ARGS__)  \
-                      << " metrics from node " << node.hp << " failed: " << 
result.error()         \
-                      << std::endl;                                            
                    \
-            return true;                                                       
                    \
-        }                                                                      
                    \
-        if (dsn_unlikely(result.status() != dsn::http_status_code::kOk)) {     
                    \
-            std::cout << "ERROR: send http request to query " << what << " 
metrics from node "     \
-                      << node.hp << " failed: " << 
dsn::get_http_status_message(result.status())   \
-                      << std::endl                                             
                    \
-                      << result.body() << std::endl;                           
                    \
+        const auto &res = process_get_metrics_result(result, node, what, 
##__VA_ARGS__);           \
+        if (dsn_unlikely(!res)) {                                              
                    \
+            fmt::println(res.description());                                   
                    \
             return true;                                                       
                    \
         }                                                                      
                    \
     } while (0)
 
+// Adapt the result of some parsing operations on the metrics returned by 
`get_metrics` into the
+// structure that could be processed by `remote_command`.
+template <typename... Args>
+inline dsn::error_s process_parse_metrics_result(const dsn::error_s &result,
+                                                 const node_desc &node,
+                                                 const char *what,
+                                                 Args &&...args)
+{
+    if (dsn_unlikely(!result)) {
+        return FMT_ERR(result.code(),
+                       "ERROR: {} metrics response from node {} failed, 
msg={}",
+                       fmt::format(what, std::forward<Args>(args)...),
+                       node.hp,
+                       result);
+    }
+
+    return dsn::error_s::ok();
+}
+
 #define RETURN_SHELL_IF_PARSE_METRICS_FAILED(expr, node, what, ...)            
                    \
     do {                                                                       
                    \
-        const auto &res = (expr);                                              
                    \
+        const auto &res = process_parse_metrics_result(expr, node, what, 
##__VA_ARGS__);           \
         if (dsn_unlikely(!res)) {                                              
                    \
-            std::cout << "ERROR: parse " << fmt::format(what, ##__VA_ARGS__)   
                    \
-                      << " metrics response from node " << node.hp << " 
failed: " << res           \
-                      << std::endl;                                            
                    \
+            fmt::println(res.description());                                   
                    \
             return true;                                                       
                    \
         }                                                                      
                    \
     } while (0)
@@ -832,12 +869,20 @@ public:
     }
 
     // Create the aggregations as needed.
+    DEF_CALC_CREATOR(assignments)
     DEF_CALC_CREATOR(sums)
     DEF_CALC_CREATOR(increases)
     DEF_CALC_CREATOR(rates)
 
 #undef DEF_CALC_CREATOR
 
+#define CALC_ASSIGNMENT_STATS(entities)                                        
                    \
+    do {                                                                       
                    \
+        if (_assignments) {                                                    
                    \
+            RETURN_NOT_OK(_assignments->assign(entities));                     
                    \
+        }                                                                      
                    \
+    } while (0)
+
 #define CALC_ACCUM_STATS(entities)                                             
                    \
     do {                                                                       
                    \
         if (_sums) {                                                           
                    \
@@ -845,24 +890,38 @@ public:
         }                                                                      
                    \
     } while (0)
 
-    // Perform the chosen accum aggregations on the fetched metrics.
+    // Perform the chosen aggregations (both assignment and accum) on the 
fetched metrics.
     dsn::error_s aggregate_metrics(const std::string &json_string)
     {
         DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string, 
query_snapshot);
 
+        return aggregate_metrics(query_snapshot);
+    }
+
+    dsn::error_s aggregate_metrics(const 
dsn::metric_query_brief_value_snapshot &query_snapshot)
+    {
+        CALC_ASSIGNMENT_STATS(query_snapshot.entities);
         CALC_ACCUM_STATS(query_snapshot.entities);
 
         return dsn::error_s::ok();
     }
 
-    // Perform all of the chosen aggregations (both accum and delta) on the 
fetched metrics.
+    // Perform the chosen aggregations (assignement, accum, delta and rate) on 
the fetched metrics.
     dsn::error_s aggregate_metrics(const std::string &json_string_start,
                                    const std::string &json_string_end)
     {
         DESERIALIZE_METRIC_QUERY_BRIEF_2_SAMPLES(
             json_string_start, json_string_end, query_snapshot_start, 
query_snapshot_end);
 
-        // Apply ending sample to the accum aggregations.
+        return aggregate_metrics(query_snapshot_start, query_snapshot_end);
+    }
+
+    dsn::error_s
+    aggregate_metrics(const dsn::metric_query_brief_value_snapshot 
&query_snapshot_start,
+                      const dsn::metric_query_brief_value_snapshot 
&query_snapshot_end)
+    {
+        // Apply ending sample to the assignment and accum aggregations.
+        CALC_ASSIGNMENT_STATS(query_snapshot_end.entities);
         CALC_ACCUM_STATS(query_snapshot_end.entities);
 
         const std::array deltas_list = {&_increases, &_rates};
@@ -884,9 +943,12 @@ public:
 
 #undef CALC_ACCUM_STATS
 
+#undef CALC_ASSIGNMENT_STATS
+
 private:
     DISALLOW_COPY_AND_ASSIGN(aggregate_stats_calcs);
 
+    std::unique_ptr<aggregate_stats> _assignments;
     std::unique_ptr<aggregate_stats> _sums;
     std::unique_ptr<aggregate_stats> _increases;
     std::unique_ptr<aggregate_stats> _rates;
@@ -1940,7 +2002,7 @@ get_table_stats(shell_context *sc, uint32_t 
sample_interval_ms, std::vector<row_
         RETURN_SHELL_IF_PARSE_METRICS_FAILED(
             calcs->aggregate_metrics(results_start[i].body(), 
results_end[i].body()),
             nodes[i],
-            "row data requests");
+            "aggregate row data requests");
     }
 
     return true;
@@ -1990,7 +2052,7 @@ inline bool get_partition_stats(shell_context *sc,
         RETURN_SHELL_IF_PARSE_METRICS_FAILED(
             calcs->aggregate_metrics(results_start[i].body(), 
results_end[i].body()),
             nodes[i],
-            "row data requests for table(id={})",
+            "aggregate row data requests for table(id={})",
             table_id);
     }
 
diff --git a/src/shell/commands/data_operations.cpp 
b/src/shell/commands/data_operations.cpp
index 150f33bed..2c36739c2 100644
--- a/src/shell/commands/data_operations.cpp
+++ b/src/shell/commands/data_operations.cpp
@@ -62,7 +62,6 @@
 #include "utils/blob.h"
 #include "utils/defer.h"
 #include "utils/error_code.h"
-#include "utils/errors.h"
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
 #include "utils/metrics.h"
@@ -2294,7 +2293,7 @@ bool get_rdb_estimated_keys_stats(shell_context *sc,
             create_rdb_estimated_keys_stats_calcs(table_id, pcs, nodes[i].hp, 
"replica", rows);
         
RETURN_SHELL_IF_PARSE_METRICS_FAILED(calcs->aggregate_metrics(results[i].body()),
                                              nodes[i],
-                                             "rdb_estimated_keys for 
table(id={})",
+                                             "aggregate rdb_estimated_keys for 
table(id={})",
                                              table_id);
     }
 
diff --git a/src/shell/commands/node_management.cpp 
b/src/shell/commands/node_management.cpp
index 32cdc091b..57b649f88 100644
--- a/src/shell/commands/node_management.cpp
+++ b/src/shell/commands/node_management.cpp
@@ -40,6 +40,7 @@
 #include <vector>
 
 #include "client/replication_ddl_client.h"
+#include "common/json_helper.h"
 #include "common/replication_enums.h"
 #include "dsn.layer2_types.h"
 #include "meta_admin_types.h"
@@ -49,6 +50,7 @@
 #include "shell/command_helper.h"
 #include "shell/command_utils.h"
 #include "shell/commands.h"
+#include "utils/blob.h"
 #include "utils/error_code.h"
 #include "utils/errors.h"
 #include "utils/flags.h"
@@ -232,19 +234,173 @@ dsn::metric_filters rw_requests_filters()
     return filters;
 }
 
+dsn::metric_filters server_stat_filters()
+{
+    dsn::metric_filters filters;
+    filters.with_metric_fields = {dsn::kMetricNameField, 
dsn::kMetricSingleValueField};
+    filters.entity_types = {"server"};
+    filters.entity_metrics = {"virtual_mem_usage_mb", "resident_mem_usage_mb"};
+    return filters;
+}
+
+struct meta_server_stats
+{
+    meta_server_stats() = default;
+
+    double virt_mem_mb{0.0};
+    double res_mem_mb{0.0};
+
+    DEFINE_JSON_SERIALIZATION(virt_mem_mb, res_mem_mb)
+};
+
+std::pair<bool, std::string>
+aggregate_meta_server_stats(const node_desc &node,
+                            const dsn::metric_query_brief_value_snapshot 
&query_snapshot)
+{
+    aggregate_stats_calcs calcs;
+    meta_server_stats stats;
+    calcs.create_assignments<total_aggregate_stats>(
+        "server",
+        stat_var_map({{"virtual_mem_usage_mb", &stats.virt_mem_mb},
+                      {"resident_mem_usage_mb", &stats.res_mem_mb}}));
+
+    auto command_result = process_parse_metrics_result(
+        calcs.aggregate_metrics(query_snapshot), node, "aggregate meta server 
stats");
+    if (!command_result) {
+        // Metrics failed to be aggregated.
+        return std::make_pair(false, command_result.description());
+    }
+
+    return std::make_pair(true,
+                          
dsn::json::json_forwarder<meta_server_stats>::encode(stats).to_string());
+}
+
+struct replica_server_stats
+{
+    replica_server_stats() = default;
+
+    double virt_mem_mb{0.0};
+    double res_mem_mb{0.0};
+
+    DEFINE_JSON_SERIALIZATION(virt_mem_mb, res_mem_mb)
+};
+
+std::pair<bool, std::string>
+aggregate_replica_server_stats(const node_desc &node,
+                               const dsn::metric_query_brief_value_snapshot 
&query_snapshot_start,
+                               const dsn::metric_query_brief_value_snapshot 
&query_snapshot_end)
+{
+    aggregate_stats_calcs calcs;
+    meta_server_stats stats;
+    calcs.create_assignments<total_aggregate_stats>(
+        "server",
+        stat_var_map({{"virtual_mem_usage_mb", &stats.virt_mem_mb},
+                      {"resident_mem_usage_mb", &stats.res_mem_mb}}));
+
+    auto command_result = process_parse_metrics_result(
+        calcs.aggregate_metrics(query_snapshot_start, query_snapshot_end),
+        node,
+        "aggregate replica server stats");
+    if (!command_result) {
+        // Metrics failed to be aggregated.
+        return std::make_pair(false, command_result.description());
+    }
+
+    return std::make_pair(true,
+                          
dsn::json::json_forwarder<meta_server_stats>::encode(stats).to_string());
+}
+
+std::vector<std::pair<bool, std::string>> get_server_stats(const 
std::vector<node_desc> &nodes,
+                                                           uint32_t 
sample_interval_ms)
+{
+    // Ask target node (meta or replica server) for the metrics of server 
stats.
+    const auto &query_string = server_stat_filters().to_query_string();
+    const auto &results_start = get_metrics(nodes, query_string);
+    std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
+    const auto &results_end = get_metrics(nodes, query_string);
+
+    std::vector<std::pair<bool, std::string>> command_results;
+    command_results.reserve(nodes.size());
+    for (size_t i = 0; i < nodes.size(); ++i) {
+
+#define SKIP_IF_PROCESS_RESULT_FALSE()                                         
                    \
+    if (!command_result) {                                                     
                    \
+        command_results.emplace_back(command_result, 
command_result.description());                \
+        continue;                                                              
                    \
+    }
+
+#define PROCESS_GET_METRICS_RESULT(result, what, ...)                          
                    \
+    {                                                                          
                    \
+        auto command_result = process_get_metrics_result(result, nodes[i], 
what, ##__VA_ARGS__);   \
+        SKIP_IF_PROCESS_RESULT_FALSE()                                         
                    \
+    }
+
+        // Skip the metrics that failed to be fetched.
+        PROCESS_GET_METRICS_RESULT(results_start[i], "starting server stats")
+        PROCESS_GET_METRICS_RESULT(results_end[i], "ending server stats")
+
+#undef PROCESS_GET_METRICS_RESULT
+
+        dsn::metric_query_brief_value_snapshot query_snapshot_start;
+        dsn::metric_query_brief_value_snapshot query_snapshot_end;
+        {
+            // Skip the metrics that failed to be deserialized.
+            auto command_result = process_parse_metrics_result(
+                deserialize_metric_query_2_samples(results_start[i].body(),
+                                                   results_end[i].body(),
+                                                   query_snapshot_start,
+                                                   query_snapshot_end),
+                nodes[i],
+                "deserialize server stats");
+            SKIP_IF_PROCESS_RESULT_FALSE()
+        }
+
+#undef SKIP_IF_PROCESS_RESULT_FALSE
+
+        if (query_snapshot_end.role == "meta") {
+            command_results.push_back(aggregate_meta_server_stats(nodes[i], 
query_snapshot_end));
+            continue;
+        }
+
+        if (query_snapshot_end.role == "replica") {
+            command_results.push_back(
+                aggregate_replica_server_stats(nodes[i], query_snapshot_start, 
query_snapshot_end));
+            continue;
+        }
+
+        command_results.emplace_back(
+            false, fmt::format("role {} is unsupported", 
query_snapshot_end.role));
+    }
+
+    return command_results;
+}
+
+std::vector<std::pair<bool, std::string>> call_nodes(shell_context *sc,
+                                                     const 
std::vector<node_desc> &nodes,
+                                                     const std::string 
&command,
+                                                     const 
std::vector<std::string> &arguments,
+                                                     uint32_t 
sample_interval_ms)
+{
+    if (command == "server_stat") {
+        return get_server_stats(nodes, sample_interval_ms);
+    }
+
+    return call_remote_command(sc, nodes, command, arguments);
+}
+
 } // anonymous namespace
 
-bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
+bool ls_nodes(command_executor *, shell_context *sc, arguments args)
 {
-    static struct option long_options[] = {{"detailed", no_argument, 0, 'd'},
-                                           {"resolve_ip", no_argument, 0, 'r'},
-                                           {"resource_usage", no_argument, 0, 
'u'},
-                                           {"qps", no_argument, 0, 'q'},
-                                           {"json", no_argument, 0, 'j'},
-                                           {"status", required_argument, 0, 
's'},
-                                           {"output", required_argument, 0, 
'o'},
-                                           {"sample_interval_ms", 
required_argument, 0, 't'},
-                                           {0, 0, 0, 0}};
+    static struct option long_options[] = {{"detailed", no_argument, nullptr, 
'd'},
+                                           {"resolve_ip", no_argument, 
nullptr, 'r'},
+                                           {"resource_usage", no_argument, 
nullptr, 'u'},
+                                           {"qps", no_argument, nullptr, 'q'},
+                                           {"json", no_argument, nullptr, 'j'},
+                                           {"status", required_argument, 
nullptr, 's'},
+                                           {"output", required_argument, 
nullptr, 'o'},
+                                           {"sample_interval_ms", 
required_argument, nullptr, 'i'},
+                                           {nullptr, 0, nullptr, 0}};
 
     std::string status;
     std::string output_file;
@@ -259,7 +415,9 @@ bool ls_nodes(command_executor *e, shell_context *sc, 
arguments args)
     optind = 0;
     while (true) {
         int option_index = 0;
-        int c = getopt_long(args.argc, args.argv, "druqjs:o:t:", long_options, 
&option_index);
+        // TODO(wangdan): getopt_long() is not thread-safe 
(clang-tidy[concurrency-mt-unsafe]),
+        // could use https://github.com/p-ranav/argparse instead.
+        int c = getopt_long(args.argc, args.argv, "druqjs:o:i:", long_options, 
&option_index);
         if (c == -1) {
             // -1 means all command-line options have been parsed.
             break;
@@ -288,7 +446,7 @@ bool ls_nodes(command_executor *e, shell_context *sc, 
arguments args)
         case 'o':
             output_file = optarg;
             break;
-        case 't':
+        case 'i':
             RETURN_FALSE_IF_SAMPLE_INTERVAL_MS_INVALID();
             break;
         default:
@@ -388,7 +546,7 @@ bool ls_nodes(command_executor *e, shell_context *sc, 
arguments args)
 
             auto &stat = tmp_it->second;
             RETURN_SHELL_IF_PARSE_METRICS_FAILED(
-                parse_resource_usage(results[i].body(), stat), nodes[i], 
"resource");
+                parse_resource_usage(results[i].body(), stat), nodes[i], 
"parse resource usage");
         }
     }
 
@@ -430,7 +588,7 @@ bool ls_nodes(command_executor *e, shell_context *sc, 
arguments args)
             RETURN_SHELL_IF_PARSE_METRICS_FAILED(
                 calcs.aggregate_metrics(results_start[i].body(), 
results_end[i].body()),
                 nodes[i],
-                "rw requests");
+                "aggregate rw requests");
         }
     }
 
@@ -452,8 +610,9 @@ bool ls_nodes(command_executor *e, shell_context *sc, 
arguments args)
             RETURN_SHELL_IF_GET_METRICS_FAILED(results[i], nodes[i], "profiler 
latency");
 
             auto &stat = tmp_it->second;
-            RETURN_SHELL_IF_PARSE_METRICS_FAILED(
-                parse_profiler_latency(results[i].body(), stat), nodes[i], 
"profiler latency");
+            
RETURN_SHELL_IF_PARSE_METRICS_FAILED(parse_profiler_latency(results[i].body(), 
stat),
+                                                 nodes[i],
+                                                 "parse profiler latency");
         }
     }
 
@@ -568,6 +727,7 @@ bool remote_command(command_executor *e, shell_context *sc, 
arguments args)
     //                                            [-t 
all|meta-server|replica-server]
     //                                            [-r|--resolve_ip]
     //                                            [-l host:port,host:port...]
+    //                                            [-i|--sample_interval_ms num]
     argh::parser cmd(args.argc, args.argv, 
argh::parser::PREFER_PARAM_FOR_UNREG_OPTION);
 
     std::string command;
@@ -593,9 +753,8 @@ bool remote_command(command_executor *e, shell_context *sc, 
arguments args)
         }
 
         // Initialize the command.
-        const std::map<std::string, std::string> kCmdsMapping({{"server_info", 
"server-info"},
-                                                               {"server_stat", 
"server-stat"},
-                                                               {"flush_log", 
"flush-log"}});
+        const std::map<std::string, std::string> kCmdsMapping(
+            {{"server_info", "server-info"}, {"flush_log", "flush-log"}});
         const auto &it = kCmdsMapping.find(pos_arg.str());
         if (it != kCmdsMapping.end()) {
             // Use the mapped command.
@@ -652,10 +811,16 @@ bool remote_command(command_executor *e, shell_context 
*sc, arguments args)
 
     nlohmann::json info;
     info["command"] = fmt::format("{} {}", command, fmt::join(pos_args, " "));
-    const auto results = call_remote_command(sc, nodes, command, pos_args);
+
+    uint32_t sample_interval_ms = 0;
+    PARSE_OPT_UINT(
+        sample_interval_ms, FLAGS_nodes_sample_interval_ms, {"-i", 
"--sample_interval_ms"});
+
+    const auto &results = call_nodes(sc, nodes, command, pos_args, 
sample_interval_ms);
+    CHECK_EQ(results.size(), nodes.size());
+
     int succeed = 0;
     int failed = 0;
-    CHECK_EQ(results.size(), nodes.size());
     for (int i = 0; i < nodes.size(); ++i) {
         nlohmann::json node_info;
         node_info["role"] = nodes[i].desc;
diff --git a/src/shell/commands/table_management.cpp 
b/src/shell/commands/table_management.cpp
index b75e27f56..f93939ff6 100644
--- a/src/shell/commands/table_management.cpp
+++ b/src/shell/commands/table_management.cpp
@@ -312,7 +312,7 @@ bool app_disk(command_executor *e, shell_context *sc, 
arguments args)
         RETURN_SHELL_IF_PARSE_METRICS_FAILED(
             parse_sst_stat(results[i].body(), count_map[nodes[i].hp], 
disk_map[nodes[i].hp]),
             nodes[i],
-            "sst");
+            "parse sst stats");
     }
 
     ::dsn::utils::table_printer tp_general("result");
@@ -452,15 +452,15 @@ bool app_disk(command_executor *e, shell_context *sc, 
arguments args)
     return true;
 }
 
-bool app_stat(command_executor *e, shell_context *sc, arguments args)
+bool app_stat(command_executor *, shell_context *sc, arguments args)
 {
-    static struct option long_options[] = {{"app_name", required_argument, 0, 
'a'},
-                                           {"only_qps", no_argument, 0, 'q'},
-                                           {"only_usage", no_argument, 0, 'u'},
-                                           {"json", no_argument, 0, 'j'},
-                                           {"output", required_argument, 0, 
'o'},
-                                           {"sample_interval_ms", 
required_argument, 0, 't'},
-                                           {0, 0, 0, 0}};
+    static struct option long_options[] = {{"app_name", required_argument, 
nullptr, 'a'},
+                                           {"only_qps", no_argument, nullptr, 
'q'},
+                                           {"only_usage", no_argument, 
nullptr, 'u'},
+                                           {"json", no_argument, nullptr, 'j'},
+                                           {"output", required_argument, 
nullptr, 'o'},
+                                           {"sample_interval_ms", 
required_argument, nullptr, 'i'},
+                                           {nullptr, 0, nullptr, 0}};
 
     std::string app_name;
     std::string out_file;
@@ -472,7 +472,7 @@ bool app_stat(command_executor *e, shell_context *sc, 
arguments args)
     optind = 0;
     while (true) {
         int option_index = 0;
-        int c = getopt_long(args.argc, args.argv, "a:qujo:t:", long_options, 
&option_index);
+        int c = getopt_long(args.argc, args.argv, "a:qujo:i:", long_options, 
&option_index);
         if (c == -1) {
             // -1 means all command-line options have been parsed.
             break;
@@ -494,7 +494,7 @@ bool app_stat(command_executor *e, shell_context *sc, 
arguments args)
         case 'o':
             out_file = optarg;
             break;
-        case 't':
+        case 'i':
             RETURN_FALSE_IF_SAMPLE_INTERVAL_MS_INVALID();
             break;
         default:
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index a6df779fe..41dd95d14 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -101,7 +101,7 @@ static command_executor commands[] = {
         "get the node status for this cluster",
         "[-d|--detailed] [-j|--json] [-r|--resolve_ip] [-u|--resource_usage] "
         "[-o|--output file_name] [-s|--status all|alive|unalive] [-q|--qps] "
-        "[-t|--sample_interval_ms num]",
+        "[-i|--sample_interval_ms num]",
         ls_nodes,
     },
     {
@@ -372,8 +372,8 @@ static command_executor commands[] = {
     {
         "remote_command",
         "send remote command to servers",
-        "[-t all|meta-server|replica-server] [-r|--resolve_ip] [-l 
host:port,host:port...]"
-        " <command> [arguments...]",
+        "[-t all|meta-server|replica-server] [-r|--resolve_ip] [-l 
host:port,host:port...] "
+        "[-i|--sample_interval_ms num] <command> [arguments...]",
         remote_command,
     },
     {
@@ -385,14 +385,15 @@ static command_executor commands[] = {
     {
         "server_stat",
         "get stat of servers",
-        "[-t all|meta-server|replica-server] [-l host:port,host:port...] 
[-r|--resolve_ip]",
+        "[-t all|meta-server|replica-server] [-l host:port,host:port...] 
[-r|--resolve_ip] "
+        "[-i|--sample_interval_ms num]",
         server_stat,
     },
     {
         "app_stat",
         "get stat of apps",
         "[-a|--app_name str] [-q|--only_qps] [-u|--only_usage] [-j|--json] "
-        "[-o|--output file_name] [-t|--sample_interval_ms num]",
+        "[-o|--output file_name] [-i|--sample_interval_ms num]",
         app_stat,
     },
     {
diff --git a/src/utils/error_code.h b/src/utils/error_code.h
index 023ec2b25..dfdc68030 100644
--- a/src/utils/error_code.h
+++ b/src/utils/error_code.h
@@ -185,6 +185,8 @@ DEFINE_ERR_CODE(ERR_CURL_FAILED)
 
 DEFINE_ERR_CODE(ERR_DUP_EXIST)
 
+DEFINE_ERR_CODE(ERR_HTTP_ERROR)
+
 } // namespace dsn
 
 USER_DEFINED_STRUCTURE_FORMATTER(::dsn::error_code);
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 9d9b0671d..b399d9313 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -42,9 +42,11 @@
 
 #include <string_view>
 #include "common/json_helper.h"
+#include "gutil/map_util.h"
 #include "http/http_server.h"
 #include "utils/alloc.h"
 #include "utils/autoref_ptr.h"
+#include "utils/blob.h"
 #include "utils/casts.h"
 #include "utils/enum_helper.h"
 #include "utils/error_code.h"
@@ -52,7 +54,6 @@
 #include "utils/fmt_logging.h"
 #include "utils/long_adder.h"
 #include "utils/macros.h"
-#include "gutil/map_util.h"
 #include "utils/nth_element.h"
 #include "utils/ports.h"
 #include "utils/singleton.h"
@@ -1662,7 +1663,7 @@ private:
     struct metric_brief_##field##_snapshot                                     
                    \
     {                                                                          
                    \
         std::string name;                                                      
                    \
-        double field;                                                          
                    \
+        double field = 0.0;                                                    
                    \
                                                                                
                    \
         DEFINE_JSON_SERIALIZATION(name, field)                                 
                    \
     }
@@ -1700,31 +1701,84 @@ DEF_ALL_METRIC_BRIEF_SNAPSHOTS(value);
 
 DEF_ALL_METRIC_BRIEF_SNAPSHOTS(p99);
 
-#define DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(field, json_string, 
query_snapshot)                \
-    dsn::metric_query_brief_##field##_snapshot query_snapshot;                 
                    \
+// Deserialize the json string into the snapshot.
+template <typename TMetricSnapshot>
+inline error_s deserialize_metric_snapshot(const std::string &json_string,
+                                           TMetricSnapshot &snapshot)
+{
+    dsn::blob bb(json_string.data(), 0, json_string.size());
+    if (dsn_unlikely(!dsn::json::json_forwarder<TMetricSnapshot>::decode(bb, 
snapshot))) {
+        return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string: {}", 
json_string);
+    }
+
+    return error_s::ok();
+}
+
+#define DESERIALIZE_METRIC_SNAPSHOT(json_string, query_snapshot)               
                    \
     do {                                                                       
                    \
-        dsn::blob bb(json_string.data(), 0, json_string.size());               
                    \
-        if (dsn_unlikely(                                                      
                    \
-                
!dsn::json::json_forwarder<dsn::metric_query_brief_##field##_snapshot>::decode( 
   \
-                    bb, query_snapshot))) {                                    
                    \
-            return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string: {}", 
json_string);         \
+        const auto &res = deserialize_metric_snapshot(json_string, 
query_snapshot);                \
+        if (dsn_unlikely(!res)) {                                              
                    \
+            return res;                                                        
                    \
         }                                                                      
                    \
     } while (0)
 
+// Deserialize the json string into the snapshot specially for metric query 
which is declared
+// internally.
+#define DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(field, json_string, 
query_snapshot)                \
+    dsn::metric_query_brief_##field##_snapshot query_snapshot;                 
                    \
+    DESERIALIZE_METRIC_SNAPSHOT(json_string, query_snapshot)
+
+// Deserialize both json string samples into respective snapshots.
+template <typename TMetricSnapshot>
+inline error_s deserialize_metric_2_samples(const std::string 
&json_string_start,
+                                            const std::string &json_string_end,
+                                            TMetricSnapshot &snapshot_start,
+                                            TMetricSnapshot &snapshot_end)
+{
+    DESERIALIZE_METRIC_SNAPSHOT(json_string_start, snapshot_start);
+    DESERIALIZE_METRIC_SNAPSHOT(json_string_end, snapshot_end);
+    return error_s::ok();
+}
+
+// Deserialize both json string samples into respective snapshots specially 
for metric queries.
+template <typename TMetricQuerySnapshot>
+inline error_s deserialize_metric_query_2_samples(const std::string 
&json_string_start,
+                                                  const std::string 
&json_string_end,
+                                                  TMetricQuerySnapshot 
&snapshot_start,
+                                                  TMetricQuerySnapshot 
&snapshot_end)
+{
+    const auto &res = deserialize_metric_2_samples(
+        json_string_start, json_string_end, snapshot_start, snapshot_end);
+    if (!res) {
+        return res;
+    }
+
+    if (snapshot_end.timestamp_ns <= snapshot_start.timestamp_ns) {
+        return FMT_ERR(dsn::ERR_INVALID_DATA,
+                       "duration for metric samples should be > 0: 
timestamp_ns_start={}, "
+                       "timestamp_ns_end={}",
+                       snapshot_start.timestamp_ns,
+                       snapshot_end.timestamp_ns);
+    }
+
+    return error_s::ok();
+}
+
+// Deserialize both json string samples into respective snapshots specially 
for metric queries
+// which are declared internally.
+//
 // Currently only Gauge and Counter are considered to have "increase" and 
"rate", which means
 // samples are needed. Thus brief `value` field is enough.
 #define DESERIALIZE_METRIC_QUERY_BRIEF_2_SAMPLES(                              
                    \
     json_string_start, json_string_end, query_snapshot_start, 
query_snapshot_end)                  \
-    DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string_start, 
query_snapshot_start);       \
-    DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string_end, 
query_snapshot_end);           \
+    dsn::metric_query_brief_value_snapshot query_snapshot_start;               
                    \
+    dsn::metric_query_brief_value_snapshot query_snapshot_end;                 
                    \
                                                                                
                    \
     do {                                                                       
                    \
-        if (query_snapshot_end.timestamp_ns <= 
query_snapshot_start.timestamp_ns) {                \
-            return FMT_ERR(dsn::ERR_INVALID_DATA,                              
                    \
-                           "duration for metric samples should be > 0: 
timestamp_ns_start={}, "    \
-                           "timestamp_ns_end={}",                              
                    \
-                           query_snapshot_start.timestamp_ns,                  
                    \
-                           query_snapshot_end.timestamp_ns);                   
                    \
+        const auto &res = deserialize_metric_query_2_samples(                  
                    \
+            json_string_start, json_string_end, query_snapshot_start, 
query_snapshot_end);         \
+        if (dsn_unlikely(!res)) {                                              
                    \
+            return res;                                                        
                    \
         }                                                                      
                    \
     } while (0)
 
@@ -1756,7 +1810,7 @@ inline error_s parse_metric_attribute(const 
metric_entity::attr_map &attrs,
         return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid {}: {}", name, 
*value_ptr);
     }
 
-    return dsn::error_s::ok();
+    return error_s::ok();
 }
 
 inline error_s parse_metric_table_id(const metric_entity::attr_map &attrs, 
int32_t &table_id)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to