This is an automated email from the ASF dual-hosted git repository.

gehafearless pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bd798ab1 feat(new_metrics): show table stats by shell `app_stat` 
command based on new metrics (part 2) (#1918)
4bd798ab1 is described below

commit 4bd798ab12e062716ec3313f76e1071d1ada6d07
Author: Dan Wang <[email protected]>
AuthorDate: Mon Feb 26 11:26:50 2024 +0800

    feat(new_metrics): show table stats by shell `app_stat` command based on 
new metrics (part 2) (#1918)
    
    This is the 2nd part of migrating metrics for app_stat to new framework,
    implementing partition-level aggregations for a specific table.
---
 src/shell/command_helper.h             | 240 +++++++++++++++++++--------------
 src/shell/commands/node_management.cpp |   5 +-
 2 files changed, 140 insertions(+), 105 deletions(-)

diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index 78d245827..54c8ed489 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -682,11 +682,12 @@ inline std::vector<dsn::http_result> get_metrics(const 
std::vector<node_desc> &n
     return results;
 }
 
-#define RETURN_SHELL_IF_GET_METRICS_FAILED(result, node, what)                 
                    \
+#define RETURN_SHELL_IF_GET_METRICS_FAILED(result, node, what, ...)            
                    \
     do {                                                                       
                    \
         if (dsn_unlikely(!result.error())) {                                   
                    \
-            std::cout << "ERROR: send http request to query " << what << " 
metrics from node "     \
-                      << node.address << " failed: " << result.error() << 
std::endl;               \
+            std::cout << "ERROR: send http request to query " << 
fmt::format(what, ##__VA_ARGS__)  \
+                      << " metrics from node " << node.address << " failed: " 
<< result.error()    \
+                      << std::endl;                                            
                    \
             return true;                                                       
                    \
         }                                                                      
                    \
         if (dsn_unlikely(result.status() != dsn::http_status_code::kOk)) {     
                    \
@@ -698,12 +699,13 @@ inline std::vector<dsn::http_result> get_metrics(const 
std::vector<node_desc> &n
         }                                                                      
                    \
     } while (0)
 
-#define RETURN_SHELL_IF_PARSE_METRICS_FAILED(expr, node, what)                 
                    \
+#define RETURN_SHELL_IF_PARSE_METRICS_FAILED(expr, node, what, ...)            
                    \
     do {                                                                       
                    \
         const auto &res = (expr);                                              
                    \
         if (dsn_unlikely(!res)) {                                              
                    \
-            std::cout << "ERROR: parse " << what << " metrics response from 
node " << node.address \
-                      << " failed: " << res << std::endl;                      
                    \
+            std::cout << "ERROR: parse " << fmt::format(what, ##__VA_ARGS__)   
                    \
+                      << " metrics response from node " << node.address << " 
failed: " << res      \
+                      << std::endl;                                            
                    \
             return true;                                                       
                    \
         }                                                                      
                    \
     } while (0)
@@ -1377,6 +1379,43 @@ inline std::unique_ptr<aggregate_stats_calcs> 
create_table_aggregate_stats_calcs
     return calcs;
 }
 
+// Create all aggregations for the partition-level stats.
+inline std::unique_ptr<aggregate_stats_calcs>
+create_partition_aggregate_stats_calcs(const int32_t table_id,
+                                       const 
std::vector<dsn::partition_configuration> &partitions,
+                                       const dsn::rpc_address &node,
+                                       const std::string &entity_type,
+                                       std::vector<row_data> &rows)
+{
+    CHECK_EQ(rows.size(), partitions.size());
+
+    partition_stat_map sums;
+    partition_stat_map increases;
+    partition_stat_map rates;
+    for (size_t i = 0; i < rows.size(); ++i) {
+        if (partitions[i].primary != node) {
+            // Ignore once the replica of the metrics is not the primary of 
the partition.
+            continue;
+        }
+
+        const std::vector<std::pair<partition_stat_map *, 
std::function<stat_var_map(row_data &)>>>
+            processors = {
+                {&sums, create_sums}, {&increases, create_increases}, {&rates, 
create_rates},
+            };
+        for (auto &processor : processors) {
+            // Put all dimensions of table id, partition_id,  and metric name 
into filters for
+            // each kind of aggregation.
+            processor.first->emplace(dsn::gpid(table_id, i), 
processor.second(rows[i]));
+        }
+    }
+
+    auto calcs = std::make_unique<aggregate_stats_calcs>();
+    calcs->create_sums<partition_aggregate_stats>(entity_type, 
std::move(sums));
+    calcs->create_increases<partition_aggregate_stats>(entity_type, 
std::move(increases));
+    calcs->create_rates<partition_aggregate_stats>(entity_type, 
std::move(rates));
+    return calcs;
+}
+
 inline bool
 update_app_pegasus_perf_counter(row_data &row, const std::string 
&counter_name, double value)
 {
@@ -1632,10 +1671,9 @@ inline bool get_app_partition_stat(shell_context *sc,
     return true;
 }
 
-inline bool get_app_stat(shell_context *sc,
-                         const std::string &app_name,
-                         uint32_t sample_interval_ms,
-                         std::vector<row_data> &rows)
+// Aggregate the table-level stats for all tables since table name is not 
specified.
+inline bool
+get_table_stats(shell_context *sc, uint32_t sample_interval_ms, 
std::vector<row_data> &rows)
 {
     std::vector<::dsn::app_info> apps;
     std::vector<node_desc> nodes;
@@ -1643,111 +1681,107 @@ inline bool get_app_stat(shell_context *sc,
         return false;
     }
 
-    ::dsn::app_info *app_info = nullptr;
-    if (!app_name.empty()) {
-        for (auto &app : apps) {
-            if (app.app_name == app_name) {
-                app_info = &app;
-                break;
-            }
-        }
-        if (app_info == nullptr) {
-            LOG_ERROR("app {} not found", app_name);
-            return false;
-        }
+    const auto query_string = row_data_filters().to_query_string();
+    const auto &results_start = get_metrics(nodes, query_string);
+    std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
+    const auto &results_end = get_metrics(nodes, query_string);
+
+    std::map<int32_t, std::vector<dsn::partition_configuration>> 
table_partitions;
+    if (!get_app_partitions(sc, apps, table_partitions)) {
+        return false;
     }
 
-    // TODO(wangdan): would be removed after migrating to new metrics 
completely.
-    std::vector<std::string> arguments;
-    char tmp[256];
-    if (app_name.empty()) {
-        sprintf(tmp, ".*@.*");
-    } else {
-        sprintf(tmp, ".*@%d\\..*", app_info->app_id);
+    rows.clear();
+    rows.reserve(apps.size());
+    std::transform(
+        apps.begin(), apps.end(), std::back_inserter(rows), [](const 
dsn::app_info &app) {
+            row_data row;
+            row.row_name = app.app_name;
+            row.app_id = app.app_id;
+            row.partition_count = app.partition_count;
+            return row;
+        });
+    CHECK_EQ(rows.size(), table_partitions.size());
+
+    for (size_t i = 0; i < nodes.size(); ++i) {
+        RETURN_SHELL_IF_GET_METRICS_FAILED(
+            results_start[i], nodes[i], "starting row data requests");
+        RETURN_SHELL_IF_GET_METRICS_FAILED(results_end[i], nodes[i], "ending 
row data requests");
+
+        auto calcs =
+            create_table_aggregate_stats_calcs(table_partitions, 
nodes[i].address, "replica", rows);
+        RETURN_SHELL_IF_PARSE_METRICS_FAILED(
+            calcs->aggregate_metrics(results_start[i].body(), 
results_end[i].body()),
+            nodes[i],
+            "row data requests");
     }
-    arguments.emplace_back(tmp);
-    std::vector<std::pair<bool, std::string>> results =
-        call_remote_command(sc, nodes, "perf-counters", arguments);
 
-    if (app_name.empty()) {
-        // Aggregate the table-level stats for all tables since table name is 
not specified.
+    return true;
+}
 
-        const auto &results_start = get_metrics(nodes, 
row_data_filters().to_query_string());
-        
std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
-        const auto &results_end = get_metrics(nodes, 
row_data_filters().to_query_string());
+// Aggregate the partition-level stats for the specified table.
+inline bool get_partition_stats(shell_context *sc,
+                                const std::string &table_name,
+                                uint32_t sample_interval_ms,
+                                std::vector<row_data> &rows)
+{
+    std::vector<node_desc> nodes;
+    if (!fill_nodes(sc, "replica-server", nodes)) {
+        LOG_ERROR("get replica server node list failed");
+        return false;
+    }
 
-        std::map<int32_t, std::vector<dsn::partition_configuration>> 
table_partitions;
-        if (!get_app_partitions(sc, apps, table_partitions)) {
-            return false;
-        }
+    int32_t table_id = 0;
+    int32_t partition_count = 0;
+    std::vector<dsn::partition_configuration> partitions;
+    const auto &err = sc->ddl_client->list_app(table_name, table_id, 
partition_count, partitions);
+    if (err != ::dsn::ERR_OK) {
+        LOG_ERROR("list app {} failed, error = {}", table_name, err);
+        return false;
+    }
+    CHECK_EQ(partitions.size(), partition_count);
 
-        rows.clear();
-        rows.reserve(apps.size());
-        std::transform(
-            apps.begin(), apps.end(), std::back_inserter(rows), [](const 
dsn::app_info &app) {
-                row_data row;
-                row.row_name = app.app_name;
-                row.app_id = app.app_id;
-                row.partition_count = app.partition_count;
-                return row;
-            });
-        CHECK_EQ(rows.size(), table_partitions.size());
-
-        for (size_t i = 0; i < nodes.size(); ++i) {
-            RETURN_SHELL_IF_GET_METRICS_FAILED(
-                results_start[i], nodes[i], "starting row data requests");
-            RETURN_SHELL_IF_GET_METRICS_FAILED(
-                results_end[i], nodes[i], "ending row data requests");
-
-            auto calcs = create_table_aggregate_stats_calcs(
-                table_partitions, nodes[i].address, "replica", rows);
-            RETURN_SHELL_IF_PARSE_METRICS_FAILED(
-                calcs->aggregate_metrics(results_start[i].body(), 
results_end[i].body()),
-                nodes[i],
-                "row data requests");
-        }
-    } else {
-        // Aggregate the partition-level stats for the specified table.
+    const auto query_string = row_data_filters(table_id).to_query_string();
+    const auto &results_start = get_metrics(nodes, query_string);
+    std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
+    const auto &results_end = get_metrics(nodes, query_string);
 
-        // TODO(wangdan): use partition_aggregate_stats to implement 
partition-level stats
-        // for a specific table.
-        rows.resize(app_info->partition_count);
-        for (int i = 0; i < app_info->partition_count; i++)
-            rows[i].row_name = std::to_string(i);
-        int32_t app_id = 0;
-        int32_t partition_count = 0;
-        std::vector<dsn::partition_configuration> partitions;
-        dsn::error_code err =
-            sc->ddl_client->list_app(app_name, app_id, partition_count, 
partitions);
-        if (err != ::dsn::ERR_OK) {
-            LOG_ERROR("list app {} failed, error = {}", app_name, err);
-            return false;
-        }
-        CHECK_EQ(app_id, app_info->app_id);
-        CHECK_EQ(partition_count, app_info->partition_count);
-
-        for (int i = 0; i < nodes.size(); ++i) {
-            dsn::rpc_address node_addr = nodes[i].address;
-            dsn::perf_counter_info info;
-            if (!decode_node_perf_counter_info(node_addr, results[i], info))
-                return false;
-            for (dsn::perf_counter_metric &m : info.counters) {
-                int32_t app_id_x, partition_index_x;
-                std::string counter_name;
-                bool parse_ret = parse_app_pegasus_perf_counter_name(
-                    m.name, app_id_x, partition_index_x, counter_name);
-                CHECK(parse_ret, "name = {}", m.name);
-                CHECK_EQ_MSG(app_id_x, app_id, "name = {}", m.name);
-                CHECK_LT_MSG(partition_index_x, partition_count, "name = {}", 
m.name);
-                if (partitions[partition_index_x].primary != node_addr)
-                    continue;
-                update_app_pegasus_perf_counter(rows[partition_index_x], 
counter_name, m.value);
-            }
-        }
+    rows.clear();
+    rows.reserve(partition_count);
+    for (int32_t i = 0; i < partition_count; ++i) {
+        rows.emplace_back(std::to_string(i));
     }
+
+    for (size_t i = 0; i < nodes.size(); ++i) {
+        RETURN_SHELL_IF_GET_METRICS_FAILED(
+            results_start[i], nodes[i], "starting row data requests for 
table(id={})", table_id);
+        RETURN_SHELL_IF_GET_METRICS_FAILED(
+            results_end[i], nodes[i], "ending row data requests for 
table(id={})", table_id);
+
+        auto calcs = create_partition_aggregate_stats_calcs(
+            table_id, partitions, nodes[i].address, "replica", rows);
+        RETURN_SHELL_IF_PARSE_METRICS_FAILED(
+            calcs->aggregate_metrics(results_start[i].body(), 
results_end[i].body()),
+            nodes[i],
+            "row data requests for table(id={})",
+            table_id);
+    }
+
     return true;
 }
 
+inline bool get_app_stat(shell_context *sc,
+                         const std::string &table_name,
+                         uint32_t sample_interval_ms,
+                         std::vector<row_data> &rows)
+{
+    if (table_name.empty()) {
+        return get_table_stats(sc, sample_interval_ms, rows);
+    }
+
+    return get_partition_stats(sc, table_name, sample_interval_ms, rows);
+}
+
 struct node_capacity_unit_stat
 {
     // timestamp when node perf_counter_info has updated.
diff --git a/src/shell/commands/node_management.cpp 
b/src/shell/commands/node_management.cpp
index 17a02be6e..63e990b29 100644
--- a/src/shell/commands/node_management.cpp
+++ b/src/shell/commands/node_management.cpp
@@ -390,9 +390,10 @@ bool ls_nodes(command_executor *e, shell_context *sc, 
arguments args)
             return true;
         }
 
-        const auto &results_start = get_metrics(nodes, 
rw_requests_filters().to_query_string());
+        const auto query_string = rw_requests_filters().to_query_string();
+        const auto &results_start = get_metrics(nodes, query_string);
         
std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_nodes_sample_interval_ms));
-        const auto &results_end = get_metrics(nodes, 
rw_requests_filters().to_query_string());
+        const auto &results_end = get_metrics(nodes, query_string);
 
         for (size_t i = 0; i < nodes.size(); ++i) {
             auto tmp_it = tmp_map.find(nodes[i].address);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to