This is an automated email from the ASF dual-hosted git repository.
gehafearless pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 550da576c feat(new_metrics): show estimated number of keys by shell
`count_data` command based on new metrics (#1920)
550da576c is described below
commit 550da576c66cceb243382324c4f6aece2efb1cce
Author: Dan Wang <[email protected]>
AuthorDate: Tue Feb 27 15:31:31 2024 +0800
feat(new_metrics): show estimated number of keys by shell `count_data`
command based on new metrics (#1920)
---
src/client_lib/pegasus_client_factory_impl.cpp | 11 ++-
src/shell/command_helper.h | 36 ++++++--
src/shell/commands/data_operations.cpp | 114 ++++++++++++++++++++++---
src/shell/commands/node_management.cpp | 2 +-
src/shell/commands/table_management.cpp | 7 +-
5 files changed, 140 insertions(+), 30 deletions(-)
diff --git a/src/client_lib/pegasus_client_factory_impl.cpp
b/src/client_lib/pegasus_client_factory_impl.cpp
index a590e5255..b98cb139a 100644
--- a/src/client_lib/pegasus_client_factory_impl.cpp
+++ b/src/client_lib/pegasus_client_factory_impl.cpp
@@ -26,6 +26,7 @@
#include "runtime/app_model.h"
#include "runtime/tool_api.h"
#include "utils/fmt_logging.h"
+#include "utils/strings.h"
#include "utils/zlocks.h"
namespace pegasus {
@@ -62,11 +63,12 @@ bool pegasus_client_factory_impl::initialize(const char
*config_file)
pegasus_client *pegasus_client_factory_impl::get_client(const char
*cluster_name,
const char *app_name)
{
- if (cluster_name == nullptr || cluster_name[0] == '\0') {
+ if (dsn::utils::is_empty(cluster_name)) {
LOG_ERROR("invalid parameter 'cluster_name'");
return nullptr;
}
- if (app_name == nullptr || app_name[0] == '\0') {
+
+ if (dsn::utils::is_empty(app_name)) {
LOG_ERROR("invalid parameter 'app_name'");
return nullptr;
}
@@ -88,5 +90,6 @@ pegasus_client *pegasus_client_factory_impl::get_client(const
char *cluster_name
return it2->second;
}
-}
-} // namespace
+
+} // namespace client
+} // namespace pegasus
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index 54c8ed489..2979c1a06 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -811,16 +811,32 @@ public:
#undef DEF_CALC_CREATOR
- // Perform the chosen aggregations on the fetched metrics.
+#define CALC_ACCUM_STATS(entities)
\
+ do {
\
+ if (_sums) {
\
+ RETURN_NOT_OK(_sums->add_assign(entities));
\
+ }
\
+ } while (0)
+
+ // Perform the chosen accum aggregations on the fetched metrics.
+ dsn::error_s aggregate_metrics(const std::string &json_string)
+ {
+ DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string,
query_snapshot);
+
+ CALC_ACCUM_STATS(query_snapshot.entities);
+
+ return dsn::error_s::ok();
+ }
+
+ // Perform all of the chosen aggregations (both accum and delta) on the
fetched metrics.
dsn::error_s aggregate_metrics(const std::string &json_string_start,
const std::string &json_string_end)
{
DESERIALIZE_METRIC_QUERY_BRIEF_2_SAMPLES(
json_string_start, json_string_end, query_snapshot_start,
query_snapshot_end);
- if (_sums) {
- RETURN_NOT_OK(_sums->add_assign(query_snapshot_end.entities));
- }
+ // Apply ending sample to the accum aggregations.
+ CALC_ACCUM_STATS(query_snapshot_end.entities);
const std::array deltas_list = {&_increases, &_rates};
for (const auto stats : deltas_list) {
@@ -839,6 +855,8 @@ public:
return dsn::error_s::ok();
}
+#undef CALC_ACCUM_STATS
+
private:
DISALLOW_COPY_AND_ASSIGN(aggregate_stats_calcs);
@@ -1335,7 +1353,8 @@ inline stat_var_map create_rates(row_data &row)
#undef BIND_ROW
-// Create all aggregations for the table-level stats.
+// Given all tables, create all aggregations needed for the table-level stats.
All selected
+// partitions should have their primary replicas on this node.
inline std::unique_ptr<aggregate_stats_calcs>
create_table_aggregate_stats_calcs(
const std::map<int32_t, std::vector<dsn::partition_configuration>>
&table_partitions,
const dsn::rpc_address &node,
@@ -1379,7 +1398,8 @@ inline std::unique_ptr<aggregate_stats_calcs>
create_table_aggregate_stats_calcs
return calcs;
}
-// Create all aggregations for the partition-level stats.
+// Given a table and all of its partitions, create all aggregations needed for
the partition-level
+// stats. All selected partitions should have their primary replicas on this
node.
inline std::unique_ptr<aggregate_stats_calcs>
create_partition_aggregate_stats_calcs(const int32_t table_id,
const
std::vector<dsn::partition_configuration> &partitions,
@@ -1681,7 +1701,7 @@ get_table_stats(shell_context *sc, uint32_t
sample_interval_ms, std::vector<row_
return false;
}
- const auto query_string = row_data_filters().to_query_string();
+ const auto &query_string = row_data_filters().to_query_string();
const auto &results_start = get_metrics(nodes, query_string);
std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
const auto &results_end = get_metrics(nodes, query_string);
@@ -1741,7 +1761,7 @@ inline bool get_partition_stats(shell_context *sc,
}
CHECK_EQ(partitions.size(), partition_count);
- const auto query_string = row_data_filters(table_id).to_query_string();
+ const auto &query_string = row_data_filters(table_id).to_query_string();
const auto &results_start = get_metrics(nodes, query_string);
std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
const auto &results_end = get_metrics(nodes, query_string);
diff --git a/src/shell/commands/data_operations.cpp
b/src/shell/commands/data_operations.cpp
index f1805b724..065d6a55d 100644
--- a/src/shell/commands/data_operations.cpp
+++ b/src/shell/commands/data_operations.cpp
@@ -42,6 +42,7 @@
#include <vector>
#include "client/replication_ddl_client.h"
+#include "common/gpid.h"
#include "dsn.layer2_types.h"
#include "geo/lib/geo_client.h"
#include "idl_utils.h"
@@ -60,8 +61,10 @@
#include "utils/blob.h"
#include "utils/defer.h"
#include "utils/error_code.h"
+#include "utils/errors.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
+#include "utils/metrics.h"
#include "utils/output_utils.h"
#include "utils/string_conv.h"
@@ -2212,6 +2215,93 @@ bool clear_data(command_executor *e, shell_context *sc,
arguments args)
return true;
}
+namespace {
+
+inline dsn::metric_filters rdb_estimated_keys_filters(int32_t table_id)
+{
+ dsn::metric_filters filters;
+ filters.with_metric_fields = {dsn::kMetricNameField,
dsn::kMetricSingleValueField};
+ filters.entity_types = {"replica"};
+ filters.entity_attrs = {"table_id", std::to_string(table_id)};
+ filters.entity_metrics = {"rdb_estimated_keys"};
+ return filters;
+}
+
+// Given a table and all of its partitions, aggregate partition-level stats
for rdb_estimated_keys.
+// All selected partitions should have their primary replicas on this node.
+std::unique_ptr<aggregate_stats_calcs>
+create_rdb_estimated_keys_stats_calcs(const int32_t table_id,
+ const
std::vector<dsn::partition_configuration> &partitions,
+ const dsn::rpc_address &node,
+ const std::string &entity_type,
+ std::vector<row_data> &rows)
+{
+ CHECK_EQ(rows.size(), partitions.size());
+
+ partition_stat_map sums;
+ for (size_t i = 0; i < rows.size(); ++i) {
+ if (partitions[i].primary != node) {
+ // Ignore once the replica of the metrics is not the primary of
the partition.
+ continue;
+ }
+
+ // Add (table id, partition_id, metric_name) as dimensions.
+ sums.emplace(dsn::gpid(table_id, i),
+ stat_var_map({{"rdb_estimated_keys",
&rows[i].rdb_estimate_num_keys}}));
+ }
+
+ auto calcs = std::make_unique<aggregate_stats_calcs>();
+ calcs->create_sums<partition_aggregate_stats>(entity_type,
std::move(sums));
+ return calcs;
+}
+
+// Aggregate the partition-level rdb_estimated_keys for the specified table.
+bool get_rdb_estimated_keys_stats(shell_context *sc,
+ const std::string &table_name,
+ std::vector<row_data> &rows)
+{
+ std::vector<node_desc> nodes;
+ if (!fill_nodes(sc, "replica-server", nodes)) {
+ LOG_ERROR("get replica server node list failed");
+ return false;
+ }
+
+ int32_t table_id = 0;
+ int32_t partition_count = 0;
+ std::vector<dsn::partition_configuration> partitions;
+ const auto &err = sc->ddl_client->list_app(table_name, table_id,
partition_count, partitions);
+ if (err != ::dsn::ERR_OK) {
+ LOG_ERROR("list app {} failed, error = {}", table_name, err);
+ return false;
+ }
+ CHECK_EQ(partitions.size(), partition_count);
+
+ const auto &results =
+ get_metrics(nodes,
rdb_estimated_keys_filters(table_id).to_query_string());
+
+ rows.clear();
+ rows.reserve(partition_count);
+ for (int32_t i = 0; i < partition_count; ++i) {
+ rows.emplace_back(std::to_string(i));
+ }
+
+ for (size_t i = 0; i < nodes.size(); ++i) {
+ RETURN_SHELL_IF_GET_METRICS_FAILED(
+ results[i], nodes[i], "rdb_estimated_keys for table(id={})",
table_id);
+
+ auto calcs = create_rdb_estimated_keys_stats_calcs(
+ table_id, partitions, nodes[i].address, "replica", rows);
+
RETURN_SHELL_IF_PARSE_METRICS_FAILED(calcs->aggregate_metrics(results[i].body()),
+ nodes[i],
+ "rdb_estimated_keys for
table(id={})",
+ table_id);
+ }
+
+ return true;
+}
+
+} // anonymous namespace
+
bool count_data(command_executor *e, shell_context *sc, arguments args)
{
static struct option long_options[] = {{"precise", no_argument, 0, 'c'},
@@ -2352,20 +2442,18 @@ bool count_data(command_executor *e, shell_context *sc,
arguments args)
return false;
}
- // get estimate key number
std::vector<row_data> rows;
- std::string app_name = sc->pg_client->get_app_name();
- // TODO(wangdan): no need to use get_app_stat since only
rdb_estimate_num_keys is needed.
- // Would be refactored later.
- // if (!get_app_stat(sc, app_name, rows)) {
- // fprintf(stderr, "ERROR: query app stat from server failed");
- // return true;
- // }
-
- rows.resize(rows.size() + 1);
- row_data &sum = rows.back();
- sum.row_name = "(total:" + std::to_string(rows.size() - 1) + ")";
- for (int i = 0; i < rows.size() - 1; ++i) {
+ const std::string table_name(sc->pg_client->get_app_name());
+ CHECK(!table_name.empty(), "table_name must be non-empty, see
data_operations()");
+
+ if (!get_rdb_estimated_keys_stats(sc, table_name, rows)) {
+ fprintf(stderr, "ERROR: get rdb_estimated_keys stats failed");
+ return true;
+ }
+
+ rows.emplace_back(fmt::format("(total:{})", rows.size() - 1));
+ auto &sum = rows.back();
+ for (size_t i = 0; i < rows.size() - 1; ++i) {
const row_data &row = rows[i];
sum.rdb_estimate_num_keys += row.rdb_estimate_num_keys;
}
diff --git a/src/shell/commands/node_management.cpp
b/src/shell/commands/node_management.cpp
index 63e990b29..52910671f 100644
--- a/src/shell/commands/node_management.cpp
+++ b/src/shell/commands/node_management.cpp
@@ -390,7 +390,7 @@ bool ls_nodes(command_executor *e, shell_context *sc,
arguments args)
return true;
}
- const auto query_string = rw_requests_filters().to_query_string();
+ const auto &query_string = rw_requests_filters().to_query_string();
const auto &results_start = get_metrics(nodes, query_string);
std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_nodes_sample_interval_ms));
const auto &results_end = get_metrics(nodes, query_string);
diff --git a/src/shell/commands/table_management.cpp
b/src/shell/commands/table_management.cpp
index 0458ed3cd..6cede043d 100644
--- a/src/shell/commands/table_management.cpp
+++ b/src/shell/commands/table_management.cpp
@@ -532,10 +532,9 @@ bool app_stat(command_executor *e, shell_context *sc,
arguments args)
return true;
}
- rows.resize(rows.size() + 1);
- row_data &sum = rows.back();
- sum.row_name = "(total:" + std::to_string(rows.size() - 1) + ")";
- for (int i = 0; i < rows.size() - 1; ++i) {
+ rows.emplace_back(fmt::format("(total:{})", rows.size() - 1));
+ auto &sum = rows.back();
+ for (size_t i = 0; i < rows.size() - 1; ++i) {
row_data &row = rows[i];
sum.partition_count += row.partition_count;
sum.get_qps += row.get_qps;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]