This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 2d6874826 refactor(remote_commands): Simplify the command register
process (#1912)
2d6874826 is described below
commit 2d68748264500a3fb3850a0dcfc8af8e8595a19a
Author: Yingchun Lai <[email protected]>
AuthorDate: Tue Apr 9 22:01:02 2024 +0800
refactor(remote_commands): Simplify the command register process (#1912)
This patch refacors on the command_manager class and related callers.
- Introduces register_single_command() and register_multiple_commands() to
distinguish whether registering a single or multiple commands. The
duplicate
command strings in the former register_command() are reduced to avoid
inconsistency.
- Changes the output of some commands to JSON format to improve readability
by programs (e.g., Python scripts), and ease the coding style. (NOTE: Some
output JSON strings are compacted and some are not, the compacted ones are
typically simple and in one line. Including:
- fd.allow_list
- meta.lb.get_balance_operation_count
- meta.lb.ignored_app_list
- meta.lb.assign_secondary_black_list
- engine
- system.queue
- server-info
- replica.query-compact
- Improves some commands help information
---
src/failure_detector/failure_detector.cpp | 31 +++---
src/meta/greedy_load_balancer.cpp | 54 +++++-----
src/meta/load_balance_policy.cpp | 89 +++++++++-------
src/meta/partition_guardian.cpp | 68 +++++++-----
src/meta/partition_guardian.h | 4 +-
src/meta/server_state.cpp | 27 ++---
src/perf_counter/perf_counters.cpp | 38 +++----
src/replica/replica_stub.cpp | 58 +++++------
src/runtime/service_api_c.cpp | 37 ++++---
src/runtime/service_engine.cpp | 102 +++++++++---------
src/runtime/service_engine.h | 8 +-
src/runtime/task/task_code.cpp | 24 ++---
src/runtime/task/task_engine.cpp | 143 +++++++++++++-------------
src/runtime/task/task_engine.h | 14 +--
src/runtime/test/task_engine.cpp | 8 +-
src/runtime/tracer.cpp | 12 +--
src/server/main.cpp | 23 +++--
src/server/pegasus_manual_compact_service.cpp | 38 +++----
src/shell/commands/node_management.cpp | 4 +-
src/shell/main.cpp | 2 +-
src/utils/command_manager.cpp | 133 +++++++++++++++---------
src/utils/command_manager.h | 38 +++++--
src/utils/simple_logger.cpp | 14 +--
src/utils/test/command_manager_test.cpp | 8 +-
24 files changed, 516 insertions(+), 461 deletions(-)
diff --git a/src/failure_detector/failure_detector.cpp
b/src/failure_detector/failure_detector.cpp
index bd528b5d8..70a5c2e9e 100644
--- a/src/failure_detector/failure_detector.cpp
+++ b/src/failure_detector/failure_detector.cpp
@@ -26,16 +26,20 @@
#include "failure_detector/failure_detector.h"
+#include <nlohmann/json.hpp>
#include <chrono>
#include <ctime>
+#include <map>
#include <mutex>
-#include <ostream>
#include <type_traits>
#include <utility>
#include "absl/strings/string_view.h"
#include "failure_detector/fd.code.definition.h"
#include "fd_types.h"
+#include "fmt/core.h"
+#include "fmt/format.h"
+#include "nlohmann/json_fwd.hpp"
#include "runtime/api_layer1.h"
#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/rpc_address.h"
@@ -70,10 +74,10 @@ void failure_detector::register_ctrl_commands()
{
static std::once_flag flag;
std::call_once(flag, [&]() {
- _get_allow_list = dsn::command_manager::instance().register_command(
- {"fd.allow_list"},
+ _get_allow_list =
dsn::command_manager::instance().register_single_command(
"fd.allow_list",
- "show allow list of failure detector",
+ "Show the allow list of failure detector",
+ "",
[this](const std::vector<std::string> &args) { return
get_allow_list(args); });
});
}
@@ -337,18 +341,17 @@ void failure_detector::set_allow_list(const
std::vector<std::string> &replica_hp
std::string failure_detector::get_allow_list(const std::vector<std::string>
&args) const
{
- if (!_is_started)
- return "error: FD is not started";
+ if (!_is_started) {
+ nlohmann::json err_msg;
+ err_msg["error"] = fmt::format("FD is not started");
+ return err_msg.dump(2);
+ }
- std::stringstream oss;
+ nlohmann::json info;
dsn::zauto_lock l(_lock);
- oss << "get ok: allow list " << (_use_allow_list ? "enabled. list: " :
"disabled.");
- for (auto iter = _allow_list.begin(); iter != _allow_list.end(); ++iter) {
- if (iter != _allow_list.begin())
- oss << ",";
- oss << *iter;
- }
- return oss.str();
+ info["enabled"] = _use_allow_list;
+ info["allow_list"] = fmt::format("{}", fmt::join(_allow_list, ","));
+ return info.dump(2);
}
void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/
beacon_ack &ack)
diff --git a/src/meta/greedy_load_balancer.cpp
b/src/meta/greedy_load_balancer.cpp
index 0905be033..81d7482bd 100644
--- a/src/meta/greedy_load_balancer.cpp
+++ b/src/meta/greedy_load_balancer.cpp
@@ -24,8 +24,12 @@
* THE SOFTWARE.
*/
+#include <fmt/core.h>
+#include <nlohmann/json.hpp>
+#include <nlohmann/json_fwd.hpp>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <string.h>
+#include <algorithm>
#include <cstdint>
#include <map>
#include <type_traits>
@@ -72,39 +76,35 @@ greedy_load_balancer::~greedy_load_balancer() {}
void greedy_load_balancer::register_ctrl_commands()
{
- _get_balance_operation_count =
dsn::command_manager::instance().register_command(
- {"meta.lb.get_balance_operation_count"},
- "meta.lb.get_balance_operation_count [total | move_pri | copy_pri |
copy_sec | detail]",
- "get balance operation count",
+ _get_balance_operation_count =
dsn::command_manager::instance().register_single_command(
+ "meta.lb.get_balance_operation_count",
+ "Get balance operation count",
+ "[total | move_pri | copy_pri | copy_sec | detail]",
[this](const std::vector<std::string> &args) { return
get_balance_operation_count(args); });
}
std::string greedy_load_balancer::get_balance_operation_count(const
std::vector<std::string> &args)
{
- if (args.empty()) {
- return std::string("total=" +
std::to_string(t_operation_counters[ALL_COUNT]));
+ nlohmann::json info;
+ if (args.size() > 1) {
+ info["error"] = fmt::format("invalid arguments");
+ } else if (args.empty() || args[0] == "total") {
+ info["total"] = t_operation_counters[ALL_COUNT];
+ } else if (args[0] == "move_pri") {
+ info["move_pri"] = t_operation_counters[MOVE_PRI_COUNT];
+ } else if (args[0] == "copy_pri") {
+ info["copy_pri"] = t_operation_counters[COPY_PRI_COUNT];
+ } else if (args[0] == "copy_sec") {
+ info["copy_sec"] = t_operation_counters[COPY_SEC_COUNT];
+ } else if (args[0] == "detail") {
+ info["move_pri"] = t_operation_counters[MOVE_PRI_COUNT];
+ info["copy_pri"] = t_operation_counters[COPY_PRI_COUNT];
+ info["copy_sec"] = t_operation_counters[COPY_SEC_COUNT];
+ info["total"] = t_operation_counters[ALL_COUNT];
+ } else {
+ info["error"] = fmt::format("invalid arguments");
}
-
- if (args[0] == "total") {
- return std::string("total=" +
std::to_string(t_operation_counters[ALL_COUNT]));
- }
-
- std::string result("unknown");
- if (args[0] == "move_pri")
- result = std::string("move_pri=" +
std::to_string(t_operation_counters[MOVE_PRI_COUNT]));
- else if (args[0] == "copy_pri")
- result = std::string("copy_pri=" +
std::to_string(t_operation_counters[COPY_PRI_COUNT]));
- else if (args[0] == "copy_sec")
- result = std::string("copy_sec=" +
std::to_string(t_operation_counters[COPY_SEC_COUNT]));
- else if (args[0] == "detail")
- result = std::string("move_pri=" +
std::to_string(t_operation_counters[MOVE_PRI_COUNT]) +
- ",copy_pri=" +
std::to_string(t_operation_counters[COPY_PRI_COUNT]) +
- ",copy_sec=" +
std::to_string(t_operation_counters[COPY_SEC_COUNT]) +
- ",total=" +
std::to_string(t_operation_counters[ALL_COUNT]));
- else
- result = std::string("ERR: invalid arguments");
-
- return result;
+ return info.dump(2);
}
void greedy_load_balancer::score(meta_view view, double &primary_stddev,
double &total_stddev)
diff --git a/src/meta/load_balance_policy.cpp b/src/meta/load_balance_policy.cpp
index f1c586462..3b44fbb9b 100644
--- a/src/meta/load_balance_policy.cpp
+++ b/src/meta/load_balance_policy.cpp
@@ -17,8 +17,12 @@
#include "meta/load_balance_policy.h"
+#include <fmt/core.h>
+#include <fmt/format.h>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <limits.h>
+#include <nlohmann/json.hpp>
+#include <nlohmann/json_fwd.hpp>
#include <algorithm>
#include <iterator>
#include <limits>
@@ -189,10 +193,10 @@ load_balance_policy::load_balance_policy(meta_service
*svc)
{
static std::once_flag flag;
std::call_once(flag, [&]() {
- _ctrl_balancer_ignored_apps =
dsn::command_manager::instance().register_command(
- {"meta.lb.ignored_app_list"},
- "meta.lb.ignored_app_list <get|set|clear> [app_id1,app_id2..]",
- "get, set and clear balancer ignored_app_list",
+ _ctrl_balancer_ignored_apps =
dsn::command_manager::instance().register_single_command(
+ "meta.lb.ignored_app_list",
+ "Get, set or clear balancer ignored_app_list",
+ "<get|set|clear> [set_app_id1,set_app_id2,...]",
[this](const std::vector<std::string> &args) {
return remote_command_balancer_ignored_app_ids(args);
});
@@ -406,70 +410,75 @@ bool load_balance_policy::execute_balance(
std::string
load_balance_policy::remote_command_balancer_ignored_app_ids(const
std::vector<std::string> &args)
{
- static const std::string invalid_arguments("invalid arguments");
- if (args.empty()) {
- return invalid_arguments;
- }
- if (args[0] == "set") {
- return set_balancer_ignored_app_ids(args);
- }
- if (args[0] == "get") {
- return get_balancer_ignored_app_ids();
- }
- if (args[0] == "clear") {
- return clear_balancer_ignored_app_ids();
- }
- return invalid_arguments;
+ static const std::string invalid_arguments_message("invalid arguments");
+ nlohmann::json info;
+ do {
+ if (args.empty()) {
+ break;
+ }
+ if (args[0] == "set") {
+ return set_balancer_ignored_app_ids(args);
+ } else if (args[0] == "get") {
+ return get_balancer_ignored_app_ids();
+ } else if (args[0] == "clear") {
+ return clear_balancer_ignored_app_ids();
+ }
+ } while (false);
+
+ info["error"] = invalid_arguments_message;
+ return info.dump(2);
}
std::string load_balance_policy::set_balancer_ignored_app_ids(const
std::vector<std::string> &args)
{
- static const std::string invalid_arguments("invalid arguments");
+ nlohmann::json info;
+ info["error"] = "invalid argument";
if (args.size() != 2) {
- return invalid_arguments;
+ return info.dump(2);
}
std::vector<std::string> app_ids;
dsn::utils::split_args(args[1].c_str(), app_ids, ',');
if (app_ids.empty()) {
- return invalid_arguments;
+ return info.dump(2);
}
std::set<app_id> app_list;
- for (const std::string &app_id_str : app_ids) {
+ for (const auto &app_id_str : app_ids) {
app_id app;
if (!dsn::buf2int32(app_id_str, app)) {
- return invalid_arguments;
+ return info.dump(2);
}
app_list.insert(app);
}
- dsn::zauto_write_lock l(_balancer_ignored_apps_lock);
- _balancer_ignored_apps = std::move(app_list);
- return "set ok";
+ {
+ dsn::zauto_write_lock l(_balancer_ignored_apps_lock);
+ _balancer_ignored_apps = std::move(app_list);
+ }
+ info["error"] = "ok";
+ return info.dump(2);
}
std::string load_balance_policy::get_balancer_ignored_app_ids()
{
- std::stringstream oss;
- dsn::zauto_read_lock l(_balancer_ignored_apps_lock);
- if (_balancer_ignored_apps.empty()) {
- return "no ignored apps";
+ nlohmann::json data;
+ {
+ dsn::zauto_read_lock l(_balancer_ignored_apps_lock);
+ data["ignored_app_id_list"] = fmt::format("{}",
fmt::join(_balancer_ignored_apps, ","));
}
- oss << "ignored_app_id_list: ";
- std::copy(_balancer_ignored_apps.begin(),
- _balancer_ignored_apps.end(),
- std::ostream_iterator<app_id>(oss, ","));
- std::string app_ids = oss.str();
- app_ids[app_ids.size() - 1] = '\0';
- return app_ids;
+ return data.dump(2);
}
std::string load_balance_policy::clear_balancer_ignored_app_ids()
{
- dsn::zauto_write_lock l(_balancer_ignored_apps_lock);
- _balancer_ignored_apps.clear();
- return "clear ok";
+ {
+ dsn::zauto_write_lock l(_balancer_ignored_apps_lock);
+ _balancer_ignored_apps.clear();
+ }
+ nlohmann::json info;
+ info["error"] = "ok";
+ return info.dump(2);
}
bool load_balance_policy::is_ignored_app(app_id app_id)
diff --git a/src/meta/partition_guardian.cpp b/src/meta/partition_guardian.cpp
index 7ca8210c5..6d19aaedd 100644
--- a/src/meta/partition_guardian.cpp
+++ b/src/meta/partition_guardian.cpp
@@ -18,8 +18,11 @@
#include "meta/partition_guardian.h"
#include <fmt/core.h>
+#include <fmt/format.h>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <inttypes.h>
+#include <nlohmann/json.hpp>
+#include <nlohmann/json_fwd.hpp>
#include <stdio.h>
#include <algorithm>
#include <cstdint>
@@ -720,10 +723,10 @@ void partition_guardian::register_ctrl_commands()
"meta.lb.assign_delay_ms",
"control the replica_assign_delay_ms_for_dropouts config"));
- _cmds.emplace_back(dsn::command_manager::instance().register_command(
- {"meta.lb.assign_secondary_black_list"},
- "lb.assign_secondary_black_list [<ip:port,ip:port,ip:port>|clear]",
- "control the assign secondary black list",
+
_cmds.emplace_back(dsn::command_manager::instance().register_single_command(
+ "meta.lb.assign_secondary_black_list",
+ "Control the assign secondary black list",
+ "[host1:port,host2:port,...|clear]",
[this](const std::vector<std::string> &args) {
return ctrl_assign_secondary_black_list(args);
}));
@@ -732,47 +735,56 @@ void partition_guardian::register_ctrl_commands()
std::string
partition_guardian::ctrl_assign_secondary_black_list(const
std::vector<std::string> &args)
{
- std::string invalid_arguments("invalid arguments");
- std::stringstream oss;
+ nlohmann::json msg;
+ msg["error"] = "ok";
+ // Query.
if (args.empty()) {
- dsn::zauto_read_lock l(_black_list_lock);
- oss << "get ok: ";
- for (auto iter = _assign_secondary_black_list.begin();
- iter != _assign_secondary_black_list.end();
- ++iter) {
- if (iter != _assign_secondary_black_list.begin())
- oss << ",";
- oss << *iter;
+ {
+ dsn::zauto_read_lock l(_black_list_lock);
+ msg["assign_secondary_black_list"] =
+ fmt::format("{}", fmt::join(_assign_secondary_black_list,
","));
}
- return oss.str();
+ return msg.dump(2);
}
+ // Invalid argument.
if (args.size() != 1) {
- return invalid_arguments;
+ msg["error"] = "invalid argument, 0 or 1 argument is acceptable";
+ return msg.dump(2);
}
- dsn::zauto_write_lock l(_black_list_lock);
+ // Clear.
if (args[0] == "clear") {
- _assign_secondary_black_list.clear();
- return "clear ok";
+ {
+ dsn::zauto_write_lock l(_black_list_lock);
+ _assign_secondary_black_list.clear();
+ }
+ return msg.dump(2);
}
- std::vector<std::string> ip_ports;
- dsn::utils::split_args(args[0].c_str(), ip_ports, ',');
- if (args.size() == 0) {
- return invalid_arguments;
+ // Set to new value.
+ std::vector<std::string> host_ports;
+ dsn::utils::split_args(args[0].c_str(), host_ports, ',');
+ if (host_ports.empty()) {
+ msg["error"] =
+ "invalid argument, the argument should be in form of
'<host:port,host:port,host:port>'";
+ return msg.dump(2);
}
std::set<dsn::host_port> hp_list;
- for (const std::string &s : ip_ports) {
- const auto hp = host_port::from_string(s);
+ for (const auto &host_port : host_ports) {
+ const auto hp = host_port::from_string(host_port);
if (!hp) {
- return invalid_arguments;
+ msg["error"] = fmt::format("invalid argument, bad host:port '{}'",
host_port);
+ return msg.dump(2);
}
hp_list.insert(hp);
}
- _assign_secondary_black_list = std::move(hp_list);
- return "set ok";
+ {
+ dsn::zauto_write_lock l(_black_list_lock);
+ _assign_secondary_black_list = std::move(hp_list);
+ }
+ return msg.dump(2);
}
void partition_guardian::get_ddd_partitions(const gpid &pid,
diff --git a/src/meta/partition_guardian.h b/src/meta/partition_guardian.h
index ea387b6cb..7a7953b59 100644
--- a/src/meta/partition_guardian.h
+++ b/src/meta/partition_guardian.h
@@ -82,10 +82,10 @@ private:
_ddd_partitions[partition.config.pid] = std::move(partition);
}
- bool in_black_list(dsn::host_port addr)
+ bool in_black_list(dsn::host_port hp)
{
dsn::zauto_read_lock l(_black_list_lock);
- return _assign_secondary_black_list.count(addr) != 0;
+ return _assign_secondary_black_list.count(hp) != 0;
}
meta_service *_svc;
diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index c9fec739e..17a7e3e3f 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -144,27 +144,16 @@ server_state::~server_state() {
_tracker.cancel_outstanding_tasks(); }
void server_state::register_cli_commands()
{
- _cmds.emplace_back(dsn::command_manager::instance().register_command(
- {"meta.dump"},
- "meta.dump - dump app_states of meta server to local file",
- "meta.dump -t|--target target_file",
+
_cmds.emplace_back(dsn::command_manager::instance().register_single_command(
+ "meta.dump",
+ "Dump app_states of meta server to a local file",
+ "<target_file>",
[this](const std::vector<std::string> &args) {
- dsn::error_code err;
- if (args.size() != 2) {
- err = ERR_INVALID_PARAMETERS;
- } else {
- const char *target_file = nullptr;
- for (int i = 0; i < args.size(); i += 2) {
- if (args[i] == "-t" || args[i] == "--target")
- target_file = args[i + 1].c_str();
- }
- if (target_file == nullptr) {
- err = ERR_INVALID_PARAMETERS;
- } else {
- err = this->dump_from_remote_storage(target_file, false);
- }
+ if (args.size() != 1) {
+ return ERR_INVALID_PARAMETERS.to_string();
}
- return std::string(err.to_string());
+
+ return dump_from_remote_storage(args[0].c_str(),
false).to_string();
}));
_cmds.emplace_back(dsn::command_manager::instance().register_bool_command(
diff --git a/src/perf_counter/perf_counters.cpp
b/src/perf_counter/perf_counters.cpp
index 8d805db5b..24e26c699 100644
--- a/src/perf_counter/perf_counters.cpp
+++ b/src/perf_counter/perf_counters.cpp
@@ -117,27 +117,27 @@ perf_counters::perf_counters()
// perf_counters
tools::shared_io_service::instance();
- _cmds.emplace_back(command_manager::instance().register_command(
- {"perf-counters"},
- "perf-counters - query perf counters, filtered by OR of POSIX basic
regular expressions",
- "perf-counters [regexp]...",
+ _cmds.emplace_back(command_manager::instance().register_single_command(
+ "perf-counters",
+ "Query perf counters, filtered by OR of POSIX basic regular
expressions",
+ "[regexp]...",
[](const std::vector<std::string> &args) {
return perf_counters::instance().list_snapshot_by_regexp(args);
}));
- _cmds.emplace_back(command_manager::instance().register_command(
- {"perf-counters-by-substr"},
- "perf-counters-by-substr - query perf counters, filtered by OR of
substrs",
- "perf-counters-by-substr [substr]...",
+ _cmds.emplace_back(command_manager::instance().register_single_command(
+ "perf-counters-by-substr",
+ "Query perf counters, filtered by OR of substrs",
+ "[substr]...",
[](const std::vector<std::string> &args) {
return perf_counters::instance().list_snapshot_by_literal(
args, [](const std::string &arg, const counter_snapshot &cs) {
return cs.name.find(arg) != std::string::npos;
});
}));
- _cmds.emplace_back(command_manager::instance().register_command(
- {"perf-counters-by-prefix"},
- "perf-counters-by-prefix - query perf counters, filtered by OR of
prefix strings",
- "perf-counters-by-prefix [prefix]...",
+ _cmds.emplace_back(command_manager::instance().register_single_command(
+ "perf-counters-by-prefix",
+ "Query perf counters, filtered by OR of prefix strings",
+ "[prefix]...",
[](const std::vector<std::string> &args) {
return perf_counters::instance().list_snapshot_by_literal(
args, [](const std::string &arg, const counter_snapshot &cs) {
@@ -145,10 +145,10 @@ perf_counters::perf_counters()
utils::mequals(cs.name.c_str(), arg.c_str(),
arg.size());
});
}));
- _cmds.emplace_back(command_manager::instance().register_command(
- {"perf-counters-by-postfix"},
- "perf-counters-by-postfix - query perf counters, filtered by OR of
postfix strings",
- "perf-counters-by-postfix [postfix]...",
+ _cmds.emplace_back(command_manager::instance().register_single_command(
+ "perf-counters-by-postfix",
+ "Query perf counters, filtered by OR of postfix strings",
+ "[postfix]...",
[](const std::vector<std::string> &args) {
return perf_counters::instance().list_snapshot_by_literal(
args, [](const std::string &arg, const counter_snapshot &cs) {
@@ -159,10 +159,10 @@ perf_counters::perf_counters()
});
}));
- _cmds.emplace_back(command_manager::instance().register_command(
- {"server-stat"},
- "server-stat - query selected perf counters",
+ _cmds.emplace_back(command_manager::instance().register_single_command(
"server-stat",
+ "Query selected perf counters",
+ "",
[](const std::vector<std::string> &args) { return get_brief_stat();
}));
}
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 43bdad322..6d581edfd 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -2193,10 +2193,10 @@ void replica_stub::open_service()
#if !defined(DSN_ENABLE_GPERF) && defined(DSN_USE_JEMALLOC)
void replica_stub::register_jemalloc_ctrl_command()
{
- _cmds.emplace_back(::dsn::command_manager::instance().register_command(
- {"replica.dump-jemalloc-stats"},
- fmt::format("replica.dump-jemalloc-stats <{}> [buffer size]",
kAllJeStatsTypesStr),
- "dump stats of jemalloc",
+
_cmds.emplace_back(::dsn::command_manager::instance().register_single_command(
+ "replica.dump-jemalloc-stats",
+ "Dump stats of jemalloc",
+ fmt::format("<{}> [buffer size]", kAllJeStatsTypesStr),
[](const std::vector<std::string> &args) {
if (args.empty()) {
return std::string("invalid arguments");
@@ -2234,10 +2234,10 @@ void replica_stub::register_ctrl_command()
/// failure_detector::register_ctrl_commands and
nfs_client_impl::register_cli_commands
static std::once_flag flag;
std::call_once(flag, [&]() {
- _cmds.emplace_back(::dsn::command_manager::instance().register_command(
- {"replica.kill_partition"},
- "replica.kill_partition [app_id [partition_index]]",
- "replica.kill_partition: kill partitions by (all, one app, one
partition)",
+
_cmds.emplace_back(::dsn::command_manager::instance().register_single_command(
+ "replica.kill_partition",
+ "Kill partitions by (all, one app, one partition)",
+ "[app_id [partition_index]]",
[this](const std::vector<std::string> &args) {
dsn::gpid pid;
if (args.size() == 0) {
@@ -2269,11 +2269,10 @@ void replica_stub::register_ctrl_command()
"replica.verbose-commit-log",
"control if print verbose log when commit mutation"));
- _cmds.emplace_back(::dsn::command_manager::instance().register_command(
- {"replica.trigger-checkpoint"},
- "replica.trigger-checkpoint [id1,id2,...] (where id is 'app_id' or
"
- "'app_id.partition_id')",
- "replica.trigger-checkpoint - trigger replicas to do checkpoint",
+
_cmds.emplace_back(::dsn::command_manager::instance().register_single_command(
+ "replica.trigger-checkpoint",
+ "Trigger replicas to do checkpoint by app_id or
app_id.partition_id",
+ "[id1,id2,...]",
[this](const std::vector<std::string> &args) {
return exec_command_on_replica(args, true, [this](const
replica_ptr &rep) {
tasking::enqueue(LPC_PER_REPLICA_CHECKPOINT_TIMER,
@@ -2284,20 +2283,21 @@ void replica_stub::register_ctrl_command()
});
}));
- _cmds.emplace_back(::dsn::command_manager::instance().register_command(
- {"replica.query-compact"},
- "replica.query-compact [id1,id2,...] (where id is 'app_id' or
'app_id.partition_id')",
- "replica.query-compact - query full compact status on the
underlying storage engine",
+
_cmds.emplace_back(::dsn::command_manager::instance().register_single_command(
+ "replica.query-compact",
+ "Query full compact status on the underlying storage engine by
app_id or "
+ "app_id.partition_id",
+ "[id1,id2,...]",
[this](const std::vector<std::string> &args) {
return exec_command_on_replica(args, true, [](const
replica_ptr &rep) {
return rep->query_manual_compact_state();
});
}));
- _cmds.emplace_back(::dsn::command_manager::instance().register_command(
- {"replica.query-app-envs"},
- "replica.query-app-envs [id1,id2,...] (where id is 'app_id' or
'app_id.partition_id')",
- "replica.query-app-envs - query app envs on the underlying storage
engine",
+
_cmds.emplace_back(::dsn::command_manager::instance().register_single_command(
+ "replica.query-app-envs",
+ "Query app envs on the underlying storage engine by app_id or
app_id.partition_id",
+ "[id1,id2,...]",
[this](const std::vector<std::string> &args) {
return exec_command_on_replica(args, true, [](const
replica_ptr &rep) {
std::map<std::string, std::string> kv_map;
@@ -2312,10 +2312,10 @@ void replica_stub::register_ctrl_command()
"replica.release-tcmalloc-memory",
"control if try to release tcmalloc memory"));
- _cmds.emplace_back(::dsn::command_manager::instance().register_command(
- {"replica.get-tcmalloc-status"},
- "replica.get-tcmalloc-status - get status of tcmalloc",
- "get status of tcmalloc",
+
_cmds.emplace_back(::dsn::command_manager::instance().register_single_command(
+ "replica.get-tcmalloc-status",
+ "Get the status of tcmalloc",
+ "",
[](const std::vector<std::string> &args) {
char buf[4096];
MallocExtension::instance()->GetStats(buf, 4096);
@@ -2329,10 +2329,10 @@ void replica_stub::register_ctrl_command()
"control tcmalloc max reserved but not-used memory percentage",
&check_mem_release_max_reserved_mem_percentage));
- _cmds.emplace_back(::dsn::command_manager::instance().register_command(
- {"replica.release-all-reserved-memory"},
- "replica.release-all-reserved-memory - release tcmalloc all
reserved-not-used memory",
- "release tcmalloc all reserverd not-used memory back to operating
system",
+
_cmds.emplace_back(::dsn::command_manager::instance().register_single_command(
+ "replica.release-all-reserved-memory",
+ "Release tcmalloc all reserved-not-used memory back to operating
system",
+ "",
[this](const std::vector<std::string> &args) {
auto release_bytes = gc_tcmalloc_memory(true);
return "OK, release_bytes=" + std::to_string(release_bytes);
diff --git a/src/runtime/service_api_c.cpp b/src/runtime/service_api_c.cpp
index 6d45be07a..44fbd7096 100644
--- a/src/runtime/service_api_c.cpp
+++ b/src/runtime/service_api_c.cpp
@@ -547,25 +547,24 @@ bool run(const char *config_file,
exit(1);
}
- dump_log_cmd =
- dsn::command_manager::instance().register_command({"config-dump"},
- "config-dump - dump
configuration",
- "config-dump
[to-this-config-file]",
- [](const
std::vector<std::string> &args) {
-
std::ostringstream oss;
- std::ofstream
off;
- std::ostream *os
= &oss;
- if (args.size()
> 0) {
-
off.open(args[0]);
- os = &off;
-
- oss <<
"config dump to file "
- <<
args[0] << std::endl;
- }
-
-
dsn_config_dump(*os);
- return oss.str();
- });
+ dump_log_cmd = dsn::command_manager::instance().register_single_command(
+ "config-dump",
+ "Dump all configurations to a server local path or to stdout",
+ "[target_file]",
+ [](const std::vector<std::string> &args) {
+ std::ostringstream oss;
+ std::ofstream off;
+ std::ostream *os = &oss;
+ if (args.size() > 0) {
+ off.open(args[0]);
+ os = &off;
+
+ oss << "config dump to file " << args[0] << std::endl;
+ }
+
+ dsn_config_dump(*os);
+ return oss.str();
+ });
// invoke customized init after apps are created
dsn::tools::sys_init_after_app_created.execute();
diff --git a/src/runtime/service_engine.cpp b/src/runtime/service_engine.cpp
index acfca18ab..dee8a5334 100644
--- a/src/runtime/service_engine.cpp
+++ b/src/runtime/service_engine.cpp
@@ -26,13 +26,15 @@
#include "service_engine.h"
-#include <stdlib.h>
+// IWYU pragma: no_include <ext/alloc_traits.h>
#include <functional>
#include <list>
#include <unordered_map>
#include <utility>
#include "common/gpid.h"
+#include "fmt/core.h"
+#include "nlohmann/json.hpp"
#include "runtime/node_scoper.h"
#include "runtime/rpc/rpc_engine.h"
#include "runtime/rpc/rpc_message.h"
@@ -44,6 +46,7 @@
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "utils/join_point.h"
+#include "utils/string_conv.h"
#include "utils/strings.h"
using namespace dsn::utils;
@@ -144,22 +147,19 @@ error_code service_node::start()
return err;
}
-void service_node::get_runtime_info(const std::string &indent,
- const std::vector<std::string> &args,
- /*out*/ std::stringstream &ss)
+std::string service_node::get_runtime_info(const std::vector<std::string>
&args) const
{
- ss << indent << full_name() << ":" << std::endl;
-
- std::string indent2 = indent + "\t";
- _computation->get_runtime_info(indent2, args, ss);
+ nlohmann::json info;
+ info[full_name()] = _computation->get_runtime_info(args);
+ return info.dump(2);
}
-void service_node::get_queue_info(
- /*out*/ std::stringstream &ss)
+nlohmann::json service_node::get_queue_info() const
{
- ss << "{\"app_name\":\"" << full_name() << "\",\n\"thread_pool\":[\n";
- _computation->get_queue_info(ss);
- ss << "]}";
+ nlohmann::json info;
+ info["app_name"] = full_name();
+ info["thread_pool"] = _computation->get_queue_info();
+ return info;
}
rpc_request_task *service_node::generate_intercepted_request_task(message_ex
*req)
@@ -189,16 +189,18 @@ service_engine::service_engine()
{
_env = nullptr;
- _cmds.emplace_back(dsn::command_manager::instance().register_command(
- {"engine"},
- "engine - get engine internal information",
- "engine [app-id]",
+
_cmds.emplace_back(dsn::command_manager::instance().register_single_command(
+ "engine",
+ "Get engine internal information, including threadpools and threads
and queues in each "
+ "threadpool",
+ "[app-id]",
&service_engine::get_runtime_info));
- _cmds.emplace_back(dsn::command_manager::instance().register_command(
- {"system.queue"},
- "system.queue - get queue internal information",
+
_cmds.emplace_back(dsn::command_manager::instance().register_single_command(
"system.queue",
+ "Get queue internal information, including the threadpool each queue
belongs to, and the "
+ "queue name and size",
+ "",
&service_engine::get_queue_info));
}
@@ -243,39 +245,47 @@ void service_engine::start_node(service_app_spec
&app_spec)
std::string service_engine::get_runtime_info(const std::vector<std::string>
&args)
{
- std::stringstream ss;
- if (args.size() == 0) {
- ss << "" << service_engine::instance()._nodes_by_app_id.size()
- << " nodes available:" << std::endl;
- for (auto &kv : service_engine::instance()._nodes_by_app_id) {
- ss << "\t" << kv.second->id() << "." << kv.second->full_name() <<
std::endl;
- }
- } else {
- std::string indent = "";
- int id = atoi(args[0].c_str());
- auto it = service_engine::instance()._nodes_by_app_id.find(id);
- if (it != service_engine::instance()._nodes_by_app_id.end()) {
- auto args2 = args;
- args2.erase(args2.begin());
- it->second->get_runtime_info(indent, args2, ss);
- } else {
- ss << "cannot find node with given app id";
+ // Overview.
+ if (args.empty()) {
+ nlohmann::json overview;
+ nlohmann::json nodes;
+ for (const auto &nodes_by_app_id :
service_engine::instance()._nodes_by_app_id) {
+ nodes.emplace_back(fmt::format(
+ "{}.{}", nodes_by_app_id.second->id(),
nodes_by_app_id.second->full_name()));
}
+ overview["available_nodes"] = nodes;
+ return overview.dump(2);
+ }
+
+ // Invalid argument.
+ int id;
+ if (!dsn::buf2int32(args[0], id)) {
+ nlohmann::json err_msg;
+ err_msg["error"] = "invalid argument, only one integer argument is
acceptable";
+ return err_msg.dump(2);
}
- return ss.str();
+
+ // The query id is not found.
+ const auto &it = service_engine::instance()._nodes_by_app_id.find(id);
+ if (it == service_engine::instance()._nodes_by_app_id.end()) {
+ nlohmann::json err_msg;
+ err_msg["error"] = fmt::format("cannot find node with given app
id({})", id);
+ return err_msg.dump(2);
+ }
+
+ // Query a special id.
+ auto tmp_args = args;
+ tmp_args.erase(tmp_args.begin());
+ return it->second->get_runtime_info(tmp_args);
}
std::string service_engine::get_queue_info(const std::vector<std::string>
&args)
{
- std::stringstream ss;
- ss << "[";
- for (auto &it : service_engine::instance()._nodes_by_app_id) {
- if (it.first !=
service_engine::instance()._nodes_by_app_id.begin()->first)
- ss << ",";
- it.second->get_queue_info(ss);
+ nlohmann::json info;
+ for (const auto &nodes_by_app_id :
service_engine::instance()._nodes_by_app_id) {
+ info.emplace_back(nodes_by_app_id.second->get_queue_info());
}
- ss << "]";
- return ss.str();
+ return info.dump(2);
}
bool service_engine::is_simulator() const { return _simulator; }
diff --git a/src/runtime/service_engine.h b/src/runtime/service_engine.h
index 1e6528f0f..f2c7aea76 100644
--- a/src/runtime/service_engine.h
+++ b/src/runtime/service_engine.h
@@ -28,10 +28,10 @@
#include <map>
#include <memory>
-#include <sstream>
#include <string>
#include <vector>
+#include "nlohmann/json_fwd.hpp"
#include "runtime/api_task.h"
#include "runtime/global_config.h"
#include "runtime/service_app.h"
@@ -60,10 +60,8 @@ public:
rpc_engine *rpc() const { return _rpc.get(); }
task_engine *computation() const { return _computation.get(); }
- void get_runtime_info(const std::string &indent,
- const std::vector<std::string> &args,
- /*out*/ std::stringstream &ss);
- void get_queue_info(/*out*/ std::stringstream &ss);
+ std::string get_runtime_info(const std::vector<std::string> &args) const;
+ nlohmann::json get_queue_info() const;
dsn::error_code start();
dsn::error_code start_app();
diff --git a/src/runtime/task/task_code.cpp b/src/runtime/task/task_code.cpp
index 9533210e5..9abf2193f 100644
--- a/src/runtime/task/task_code.cpp
+++ b/src/runtime/task/task_code.cpp
@@ -42,24 +42,24 @@ namespace utils {
template <>
void task_code_mgr::register_commands()
{
- _cmds.emplace_back(command_manager::instance().register_command(
- {"task-code"},
- "task-code - query task code containing any given keywords",
- "task-code keyword1 keyword2 ...",
+ _cmds.emplace_back(command_manager::instance().register_single_command(
+ "task-code",
+ "Query task code containing any given keywords",
+ "[keyword1] [keyword2] ...",
[](const std::vector<std::string> &args) {
std::stringstream ss;
-
for (int code = 0; code <= dsn::task_code::max(); code++) {
- if (code == TASK_CODE_INVALID)
+ if (code == TASK_CODE_INVALID) {
continue;
+ }
- std::string codes = dsn::task_code(code).to_string();
- if (args.size() == 0) {
- ss << " " << codes << std::endl;
+ const std::string code_str = dsn::task_code(code).to_string();
+ if (args.empty()) {
+ ss << " " << code_str << std::endl;
} else {
- for (auto &arg : args) {
- if (codes.find(arg.c_str()) != std::string::npos) {
- ss << " " << codes << std::endl;
+ for (const auto &arg : args) {
+ if (code_str.find(arg) != std::string::npos) {
+ ss << " " << code_str << std::endl;
}
}
}
diff --git a/src/runtime/task/task_engine.cpp b/src/runtime/task/task_engine.cpp
index 4fcc3f6e8..179b04562 100644
--- a/src/runtime/task/task_engine.cpp
+++ b/src/runtime/task/task_engine.cpp
@@ -29,9 +29,9 @@
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <limits.h>
#include <mutex>
-#include <ostream>
#include "fmt/core.h"
+#include "nlohmann/json.hpp"
#include "runtime/global_config.h"
#include "runtime/service_engine.h"
#include "runtime/task/task.h"
@@ -184,42 +184,50 @@ bool
task_worker_pool::shared_same_worker_with_current_task(task *tsk) const
}
}
-void task_worker_pool::get_runtime_info(const std::string &indent,
- const std::vector<std::string> &args,
- /*out*/ std::stringstream &ss)
+nlohmann::json task_worker_pool::get_runtime_info(const
std::vector<std::string> &args) const
{
- std::string indent2 = indent + "\t";
- ss << indent << "contains " << _workers.size() << " threads with " <<
_queues.size()
- << " queues" << std::endl;
-
- for (auto &q : _queues) {
- if (q) {
- ss << indent2 << q->get_name() << " now has " << q->count() << "
pending tasks"
- << std::endl;
+ nlohmann::json info;
+
+ // Queues.
+ nlohmann::json queues;
+ for (const auto &queue : _queues) {
+ if (queue) {
+ nlohmann::json q;
+ q["name"] = queue->get_name();
+ q["pending_task_count"] = queue->count();
+ queues.emplace_back(q);
}
}
-
- for (auto &wk : _workers) {
- if (wk) {
- ss << indent2 << wk->index() << " (TID = " << wk->native_tid()
- << ") attached with queue " << wk->queue()->get_name() <<
std::endl;
+ info["queues"] = queues;
+
+ // Threads.
+ nlohmann::json workers;
+ for (const auto &worker : _workers) {
+ if (worker) {
+ nlohmann::json w;
+ w["index"] = worker->index();
+ w["TID"] = worker->native_tid();
+ w["queue_name"] = worker->queue()->get_name();
+ workers.emplace_back(w);
}
}
+ info["threads"] = workers;
+
+ return info;
}
-void task_worker_pool::get_queue_info(/*out*/ std::stringstream &ss)
+
+nlohmann::json task_worker_pool::get_queue_info() const
{
- ss << "[";
- bool first_flag = 0;
- for (auto &q : _queues) {
- if (q) {
- if (!first_flag)
- first_flag = 1;
- else
- ss << ",";
- ss << "\t\t{\"name\":\"" << q->get_name() << "\",\n\t\t\"num\":"
<< q->count() << "}\n";
+ nlohmann::json queues;
+ for (const auto &queue : _queues) {
+ if (queue) {
+ nlohmann::json q;
+ q["name"] = queue->get_name();
+ q["pending_task_count"] = queue->count();
+ queues.emplace_back(q);
}
}
- ss << "]\n";
+ return queues;
}
task_engine::task_engine(service_node *node)
@@ -279,74 +287,71 @@ volatile int
*task_engine::get_task_queue_virtual_length_ptr(dsn::task_code code
return pl->queues()[idx]->get_virtual_length_ptr();
}
-void task_engine::get_runtime_info(const std::string &indent,
- const std::vector<std::string> &args,
- /*out*/ std::stringstream &ss)
+nlohmann::json task_engine::get_runtime_info(const std::vector<std::string>
&args) const
{
- std::string indent2 = indent + "\t";
- for (auto &p : _pools) {
- if (p) {
- ss << indent << p->spec().pool_code.to_string() << std::endl;
- p->get_runtime_info(indent2, args, ss);
+ nlohmann::json pools;
+ for (const auto &pool : _pools) {
+ if (pool) {
+ pools[pool->spec().pool_code.to_string()] =
pool->get_runtime_info(args);
}
}
+ return pools;
}
-void task_engine::get_queue_info(/*out*/ std::stringstream &ss)
+nlohmann::json task_engine::get_queue_info() const
{
- bool first_flag = 0;
- for (auto &p : _pools) {
- if (p) {
- if (!first_flag)
- first_flag = 1;
- else
- ss << ",";
- ss << "\t{\"pool_name\":\"" << p->spec().pool_code <<
"\",\n\t\"pool_queue\":\n";
- p->get_queue_info(ss);
- ss << "}\n";
+ nlohmann::json pools;
+ for (const auto &pool : _pools) {
+ if (pool) {
+ nlohmann::json p;
+ p["name"] = pool->spec().pool_code.to_string();
+ p["queue"] = pool->get_queue_info();
+ pools.emplace_back(p);
}
}
+ return pools;
}
void task_engine::register_cli_commands()
{
static std::once_flag flag;
std::call_once(flag, [&]() {
- _task_queue_max_length_cmd =
dsn::command_manager::instance().register_command(
- {"task.queue_max_length"},
- "task.queue_max_length <pool_code> [queue_max_length]",
- "get/set the max task queue length of specific thread_pool, you
can set INT_MAX, to "
- "set a big enough value, but you can't cancel delay/reject
dynamically",
+ _task_queue_max_length_cmd =
dsn::command_manager::instance().register_single_command(
+ "task.queue_max_length",
+ "Get the current or set a new max task queue length of a specific
thread_pool. It can "
+ "be set it to INT_MAX which means a big enough value, but it can't
be cancelled the "
+ "delay/reject policy dynamically",
+ "<pool_code> [new_max_queue_length]",
[this](const std::vector<std::string> &args) {
- if (args.size() < 1) {
+ if (args.empty()) {
return std::string("ERR: invalid arguments,
task.queue_max_length <pool_code> "
"[queue_max_length]");
}
- for (auto &it : _pools) {
- if (!it) {
+ for (const auto &pool : _pools) {
+ if (!pool) {
continue;
}
- if (it->_spec.pool_code.to_string() == args[0]) {
- // when args length is 1, return current value
+ if (pool->_spec.pool_code.to_string() == args[0]) {
+ // Query.
if (args.size() == 1) {
- return fmt::format("task queue {}, length {}",
+ return fmt::format("The current task queue length
of {} is {}",
args[0],
-
it->_spec.queue_length_throttling_threshold);
+
pool->_spec.queue_length_throttling_threshold);
}
+
+ // Update.
if (args.size() == 2) {
- int queue_length = INT_MAX;
- if ((args[1] != "INT_MAX") &&
- (!dsn::buf2int32(args[1], queue_length))) {
- return fmt::format("queue_max_length must >=
0, or set `INT_MAX`");
- }
- if (queue_length < 0) {
- queue_length = INT_MAX;
+ int new_queue_length = INT_MAX;
+ if ((args[1] != "INT_MAX" &&
+ !dsn::buf2int32(args[1], new_queue_length)) ||
+ new_queue_length < 0) {
+ return fmt::format("queue_max_length must be
>= 0 or 'INT_MAX'");
}
- it->_spec.queue_length_throttling_threshold =
queue_length;
- return fmt::format("task queue {}, length {}",
+ pool->_spec.queue_length_throttling_threshold =
new_queue_length;
+ return fmt::format("Task queue {} is updated to
new max length {}",
args[0],
-
it->_spec.queue_length_throttling_threshold);
+
pool->_spec.queue_length_throttling_threshold);
}
}
}
diff --git a/src/runtime/task/task_engine.h b/src/runtime/task/task_engine.h
index cbff4e351..6a8fe2a0a 100644
--- a/src/runtime/task/task_engine.h
+++ b/src/runtime/task/task_engine.h
@@ -26,12 +26,12 @@
#pragma once
-#include <iosfwd>
#include <list>
#include <memory>
#include <string>
#include <vector>
+#include "nlohmann/json_fwd.hpp"
#include "runtime/task/task_code.h"
#include "utils/command_manager.h"
#include "utils/threadpool_spec.h"
@@ -72,10 +72,8 @@ public:
bool shared_same_worker_with_current_task(task *task) const;
task_engine *engine() const { return _owner; }
service_node *node() const { return _node; }
- void get_runtime_info(const std::string &indent,
- const std::vector<std::string> &args,
- /*out*/ std::stringstream &ss);
- void get_queue_info(/*out*/ std::stringstream &ss);
+ nlohmann::json get_runtime_info(const std::vector<std::string> &args)
const;
+ nlohmann::json get_queue_info() const;
std::vector<task_queue *> &queues() { return _queues; }
std::vector<task_worker *> &workers() { return _workers; }
@@ -118,10 +116,8 @@ public:
volatile int *get_task_queue_virtual_length_ptr(dsn::task_code code, int
hash);
service_node *node() const { return _node; }
- void get_runtime_info(const std::string &indent,
- const std::vector<std::string> &args,
- /*out*/ std::stringstream &ss);
- void get_queue_info(/*out*/ std::stringstream &ss);
+ nlohmann::json get_runtime_info(const std::vector<std::string> &args)
const;
+ nlohmann::json get_queue_info() const;
private:
void register_cli_commands();
diff --git a/src/runtime/test/task_engine.cpp b/src/runtime/test/task_engine.cpp
index 38374f455..be02295b9 100644
--- a/src/runtime/test/task_engine.cpp
+++ b/src/runtime/test/task_engine.cpp
@@ -26,6 +26,9 @@
#include "runtime/task/task_engine.h"
+#include <fmt/core.h>
+#include <nlohmann/json.hpp>
+#include <nlohmann/json_fwd.hpp>
#include <stdio.h>
#include "gtest/gtest.h"
@@ -57,10 +60,7 @@ TEST(core, task_engine)
ASSERT_NE(nullptr, engine);
ASSERT_TRUE(engine->is_started());
- std::vector<std::string> args;
- std::stringstream oss;
- engine->get_runtime_info(" ", args, oss);
- printf("%s\n", oss.str().c_str());
+ fmt::print(stdout, "{}\n", engine->get_runtime_info({}).dump());
std::vector<task_worker_pool *> &pools = engine->pools();
for (size_t i = 0; i < pools.size(); ++i) {
diff --git a/src/runtime/tracer.cpp b/src/runtime/tracer.cpp
index 4fcdfacb6..dd76006f0 100644
--- a/src/runtime/tracer.cpp
+++ b/src/runtime/tracer.cpp
@@ -260,8 +260,6 @@ static std::string tracer_log_flow_error(const char *msg)
static std::string tracer_log_flow(const std::vector<std::string> &args)
{
- // forward|f|backward|b rpc|r|task|t trace_id|task_id(e.g.,
002a003920302390)
- // log_file_name(log.xx.txt)
if (args.size() < 4) {
return tracer_log_flow_error("not enough arguments");
}
@@ -406,11 +404,11 @@ void tracer::install(service_spec &spec)
static std::once_flag flag;
std::call_once(flag, [&]() {
- _tracer_find_cmd = command_manager::instance().register_command(
- {"tracer.find"},
- "tracer.find - find related logs",
- "tracer.find forward|f|backward|b rpc|r|task|t
trace_id|task_id(e.g., "
- "a023003920302390) log_file_name(log.xx.txt)",
+ _tracer_find_cmd = command_manager::instance().register_single_command(
+ "tracer.find",
+ "Find related logs",
+ "[forward|f|backward|b] [rpc|r|task|t] [trace_id|task_id(e.g.,
a023003920302390)] "
+ "<log_file_name(e.g., log.xx.txt)>",
tracer_log_flow);
});
}
diff --git a/src/server/main.cpp b/src/server/main.cpp
index 7c4df51bb..3aefd1329 100644
--- a/src/server/main.cpp
+++ b/src/server/main.cpp
@@ -17,12 +17,14 @@
* under the License.
*/
+#include <nlohmann/json.hpp>
+#include <nlohmann/json_fwd.hpp>
#include <pegasus/git_commit.h>
#include <pegasus/version.h>
#include <unistd.h>
#include <cstdio>
+#include <map>
#include <memory>
-#include <sstream>
#include <string>
#include <vector>
@@ -93,17 +95,18 @@ int main(int argc, char **argv)
dsn_app_registration_pegasus();
std::unique_ptr<command_deregister> server_info_cmd =
- dsn::command_manager::instance().register_command(
- {"server-info"},
- "server-info - query server information",
+ dsn::command_manager::instance().register_single_command(
"server-info",
+ "Query server information",
+ "",
[](const std::vector<std::string> &args) {
- char str[100];
-
::dsn::utils::time_ms_to_date_time(dsn::utils::process_start_millis(), str,
100);
- std::ostringstream oss;
- oss << "Pegasus Server " << PEGASUS_VERSION << " (" <<
PEGASUS_GIT_COMMIT << ") "
- << PEGASUS_BUILD_TYPE << ", Started at " << str;
- return oss.str();
+ nlohmann::json info;
+ info["version"] = PEGASUS_VERSION;
+ info["build_type"] = PEGASUS_BUILD_TYPE;
+ info["git_SHA"] = PEGASUS_GIT_COMMIT;
+ info["start_time"] =
+
::dsn::utils::time_s_to_date_time(dsn::utils::process_start_millis() / 1000);
+ return info.dump();
});
dsn_run(argc, argv, true);
diff --git a/src/server/pegasus_manual_compact_service.cpp
b/src/server/pegasus_manual_compact_service.cpp
index a2cdf4812..617a7b452 100644
--- a/src/server/pegasus_manual_compact_service.cpp
+++ b/src/server/pegasus_manual_compact_service.cpp
@@ -21,14 +21,15 @@
#include <absl/strings/string_view.h>
#include <limits.h>
+#include <nlohmann/json.hpp>
+#include <nlohmann/json_fwd.hpp>
#include <rocksdb/options.h>
#include <list>
-#include <ostream>
#include <set>
#include <utility>
-#include "common/replication.codes.h"
#include "common/replica_envs.h"
+#include "common/replication.codes.h"
#include "pegasus_server_impl.h"
#include "runtime/api_layer1.h"
#include "runtime/task/async_calls.h"
@@ -339,31 +340,16 @@ std::string
pegasus_manual_compact_service::query_compact_state() const
uint64_t start_time_ms = _manual_compact_start_running_time_ms.load();
uint64_t last_finish_time_ms = _manual_compact_last_finish_time_ms.load();
uint64_t last_time_used_ms = _manual_compact_last_time_used_ms.load();
- std::stringstream state;
- if (last_finish_time_ms > 0) {
- char str[24] = {0};
- dsn::utils::time_ms_to_string(last_finish_time_ms, str);
- state << "last finish at [" << str << "]";
- } else {
- state << "last finish at [-]";
- }
-
- if (last_time_used_ms > 0) {
- state << ", last used " << last_time_used_ms << " ms";
- }
-
- if (enqueue_time_ms > 0) {
- char str[24] = {0};
- dsn::utils::time_ms_to_string(enqueue_time_ms, str);
- state << ", recent enqueue at [" << str << "]";
- }
- if (start_time_ms > 0) {
- char str[24] = {0};
- dsn::utils::time_ms_to_string(start_time_ms, str);
- state << ", recent start at [" << str << "]";
- }
- return state.str();
+ nlohmann::json info;
+ info["recent_enqueue_at"] =
+ enqueue_time_ms > 0 ? dsn::utils::time_s_to_date_time(enqueue_time_ms
/ 1000) : "-";
+ info["recent_start_at"] =
+ start_time_ms > 0 ? dsn::utils::time_s_to_date_time(start_time_ms /
1000) : "-";
+ info["last_finish"] =
+ last_finish_time_ms > 0 ?
dsn::utils::time_s_to_date_time(last_finish_time_ms / 1000) : "-";
+ info["last_used_ms"] = last_time_used_ms > 0 ?
std::to_string(last_time_used_ms) : "-";
+ return info.dump();
}
dsn::replication::manual_compaction_status::type
diff --git a/src/shell/commands/node_management.cpp
b/src/shell/commands/node_management.cpp
index ee813cc02..a613289af 100644
--- a/src/shell/commands/node_management.cpp
+++ b/src/shell/commands/node_management.cpp
@@ -672,10 +672,10 @@ bool remote_command(command_executor *e, shell_context
*sc, arguments args)
}
fprintf(stderr, "CALL [%s] [%s] ", n.desc.c_str(), hostname.c_str());
if (results[i].first) {
- fprintf(stderr, "succeed: %s\n", results[i].second.c_str());
+ fprintf(stderr, "succeed:\n%s\n", results[i].second.c_str());
succeed++;
} else {
- fprintf(stderr, "failed: %s\n", results[i].second.c_str());
+ fprintf(stderr, "failed:\n%s\n", results[i].second.c_str());
failed++;
}
}
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index 9f986e6d8..4932a58fe 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -334,7 +334,7 @@ static command_executor commands[] = {
"remote_command",
"send remote command to servers",
"[-t all|meta-server|replica-server] [-r|--resolve_ip] [-l
ip:port,ip:port...]"
- "<command> [arguments...]",
+ " <command> [arguments...]",
remote_command,
},
{
diff --git a/src/utils/command_manager.cpp b/src/utils/command_manager.cpp
index 0c5dd9b1f..bcf37bb87 100644
--- a/src/utils/command_manager.cpp
+++ b/src/utils/command_manager.cpp
@@ -26,26 +26,28 @@
#include "utils/command_manager.h"
+#include <fmt/format.h>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <stdlib.h>
#include <chrono>
#include <limits>
#include <sstream> // IWYU pragma: keep
#include <thread>
+#include <unordered_set>
#include <utility>
namespace dsn {
std::unique_ptr<command_deregister>
command_manager::register_command(const std::vector<std::string> &commands,
- const std::string &help_one_line,
- const std::string &help_long,
+ const std::string &help,
+ const std::string &args,
command_handler handler)
{
auto *c = new command_instance();
c->commands = commands;
- c->help_short = help_one_line;
- c->help_long = help_long;
+ c->help = help;
+ c->args = args;
c->handler = std::move(handler);
utils::auto_write_lock l(_lock);
@@ -59,13 +61,37 @@ command_manager::register_command(const
std::vector<std::string> &commands,
std::unique_ptr<command_deregister> command_manager::register_bool_command(
bool &value, const std::string &command, const std::string &help)
+{
+ return register_single_command(command,
+ help,
+ fmt::format("<true|false>"),
+ [&value, command](const
std::vector<std::string> &args) {
+ return set_bool(value, command, args);
+ });
+}
+
+std::unique_ptr<command_deregister>
+command_manager::register_single_command(const std::string &command,
+ const std::string &help,
+ const std::string &args,
+ command_handler handler)
{
return register_command({command},
- fmt::format("{} <true|false>", command),
- help,
- [&value, command](const std::vector<std::string>
&args) {
- return set_bool(value, command, args);
- });
+ fmt::format("{} - {}", command, help),
+ fmt::format("{} {}", command, args),
+ handler);
+}
+
+std::unique_ptr<command_deregister>
+command_manager::register_multiple_commands(const std::vector<std::string>
&commands,
+ const std::string &help,
+ const std::string &args,
+ command_handler handler)
+{
+ return register_command(commands,
+ fmt::format("{} - {}", fmt::join(commands, "|"),
help),
+ fmt::format("{} {}", fmt::join(commands, "|"),
args),
+ handler);
}
void command_manager::deregister_command(uintptr_t handle)
@@ -128,41 +154,44 @@ std::string command_manager::set_bool(bool &value,
command_manager::command_manager()
{
- _cmds.emplace_back(register_command({"help", "h", "H", "Help"},
- "help|Help|h|H [command] - display
help information",
- "",
- [this](const std::vector<std::string>
&args) {
- std::stringstream ss;
-
- if (args.size() == 0) {
- utils::auto_read_lock l(_lock);
- for (const auto &c :
this->_handlers) {
- ss << c.second->help_short
<< std::endl;
- }
- } else {
- utils::auto_read_lock l(_lock);
- auto it =
_handlers.find(args[0]);
- if (it == _handlers.end())
- ss << "cannot find command
'" << args[0] << "'";
- else {
- ss.width(6);
- ss << std::left <<
it->first << ": "
- <<
it->second->help_short << std::endl
- <<
it->second->help_long << std::endl;
- }
- }
-
- return ss.str();
- }));
-
- _cmds.emplace_back(register_command(
+ _cmds.emplace_back(
+ register_multiple_commands({"help", "h", "H", "Help"},
+ "Display help information",
+ "[command]",
+ [this](const std::vector<std::string>
&args) {
+ std::stringstream ss;
+ if (args.empty()) {
+ std::unordered_set<command_instance
*> cmds;
+ utils::auto_read_lock l(_lock);
+ for (const auto &c :
this->_handlers) {
+ // Multiple commands with the
same handler are print
+ // only once.
+ if
(cmds.insert(c.second.get()).second) {
+ ss << c.second->help <<
std::endl;
+ }
+ }
+ } else {
+ utils::auto_read_lock l(_lock);
+ auto it = _handlers.find(args[0]);
+ if (it == _handlers.end()) {
+ ss << "cannot find command '"
<< args[0] << "'";
+ } else {
+ ss.width(6);
+ ss << std::left <<
it->second->help << std::endl
+ << it->second->args <<
std::endl;
+ }
+ }
+
+ return ss.str();
+ }));
+
+ _cmds.emplace_back(register_multiple_commands(
{"repeat", "r", "R", "Repeat"},
- "repeat|Repeat|r|R interval_seconds max_count command - execute
command periodically",
- "repeat|Repeat|r|R interval_seconds max_count command - execute
command every interval "
- "seconds, to the max count as max_count (0 for infinite)",
+ "Execute a command periodically in every interval seconds for the max
count time (0 for "
+ "infinite)",
+ "<interval_seconds> <max_count> <command>",
[this](const std::vector<std::string> &args) {
std::stringstream ss;
-
if (args.size() < 3) {
return "insufficient arguments";
}
@@ -172,25 +201,27 @@ command_manager::command_manager()
return "invalid interval argument";
}
- int max_count = atoi(args[1].c_str());
- if (max_count < 0) {
+ uint32_t max_count;
+ if (!dsn::buf2uint32(args[1], max_count)) {
return "invalid max count";
}
if (max_count == 0) {
- max_count = std::numeric_limits<int>::max();
+ max_count = std::numeric_limits<uint32_t>::max();
}
- std::string cmd = args[2];
- std::vector<std::string> largs;
- for (int i = 3; i < (int)args.size(); i++) {
- largs.push_back(args[i]);
+ const auto &command = args[2];
+ std::vector<std::string> command_args;
+ for (size_t i = 3; i < args.size(); i++) {
+ command_args.push_back(args[i]);
}
- for (int i = 0; i < max_count; i++) {
+ // TODO(yingchun): the 'repeat' command may last long time (or
even infinity), it's
+ // easy to timeout, the remote_command timeout is a fixed value
of 5 seconds (see
+ // call_remote_command()), and it also consumes thread resource
on server side.
+ for (uint32_t i = 0; i < max_count; i++) {
std::string output;
- auto r = this->run_command(cmd, largs, output);
-
+ auto r = this->run_command(command, command_args, output);
if (!r) {
break;
}
diff --git a/src/utils/command_manager.h b/src/utils/command_manager.h
index 26a6cb342..a73966845 100644
--- a/src/utils/command_manager.h
+++ b/src/utils/command_manager.h
@@ -52,12 +52,6 @@ class command_manager : public
::dsn::utils::singleton<command_manager>
public:
using command_handler = std::function<std::string(const
std::vector<std::string> &)>;
- std::unique_ptr<command_deregister>
- register_command(const std::vector<std::string> &commands,
- const std::string &help_one_line,
- const std::string &help_long,
- command_handler handler) WARN_UNUSED_RESULT;
-
// Register command which query or update a boolean configuration.
// The 'value' will be queried or updated by the command named 'command'
with the 'help'
// description.
@@ -78,15 +72,31 @@ public:
std::function<bool(int64_t new_value)> validator =
[](int64_t new_value) -> bool { return new_value
>= 0; })
{
- return register_command(
- {command},
- fmt::format("{} [num | DEFAULT]", command),
+ return register_single_command(
+ command,
help,
+ fmt::format("[num | DEFAULT]"),
[&value, default_value, command, validator](const
std::vector<std::string> &args) {
return set_int(value, default_value, command, args, validator);
});
}
+ // Register a single 'command' with the 'help' description, its arguments
are describe in
+ // 'args'.
+ std::unique_ptr<command_deregister>
+ register_single_command(const std::string &command,
+ const std::string &help,
+ const std::string &args,
+ command_handler handler) WARN_UNUSED_RESULT;
+
+ // Register multiple 'commands' with the 'help' description, their
arguments are describe in
+ // 'args'.
+ std::unique_ptr<command_deregister>
+ register_multiple_commands(const std::vector<std::string> &commands,
+ const std::string &help,
+ const std::string &args,
+ command_handler handler) WARN_UNUSED_RESULT;
+
bool run_command(const std::string &cmd,
const std::vector<std::string> &args,
/*out*/ std::string &output);
@@ -101,11 +111,17 @@ private:
struct command_instance : public ref_counter
{
std::vector<std::string> commands;
- std::string help_short;
- std::string help_long;
+ std::string help;
+ std::string args;
command_handler handler;
};
+ std::unique_ptr<command_deregister>
+ register_command(const std::vector<std::string> &commands,
+ const std::string &help,
+ const std::string &args,
+ command_handler handler) WARN_UNUSED_RESULT;
+
void deregister_command(uintptr_t handle);
static std::string
diff --git a/src/utils/simple_logger.cpp b/src/utils/simple_logger.cpp
index fa1ef779f..d54e03025 100644
--- a/src/utils/simple_logger.cpp
+++ b/src/utils/simple_logger.cpp
@@ -191,19 +191,19 @@ simple_logger::simple_logger(const char *log_dir)
// "assertion expression: [_handlers.empty()] All commands must be
deregistered before
// command_manager is destroyed, however 'flush-log' is still registered".
// We need to fix it.
- _cmds.emplace_back(::dsn::command_manager::instance().register_command(
- {"flush-log"},
- "flush-log - flush log to stderr or log file",
+
_cmds.emplace_back(::dsn::command_manager::instance().register_single_command(
"flush-log",
+ "Flush log to stderr or file",
+ "",
[this](const std::vector<std::string> &args) {
this->flush();
return "Flush done.";
}));
- _cmds.emplace_back(::dsn::command_manager::instance().register_command(
- {"reset-log-start-level"},
- "reset-log-start-level - reset the log start level",
- "reset-log-start-level [DEBUG | INFO | WARNING | ERROR | FATAL]",
+
_cmds.emplace_back(::dsn::command_manager::instance().register_single_command(
+ "reset-log-start-level",
+ "Reset the log start level",
+ "[DEBUG | INFO | WARNING | ERROR | FATAL]",
[](const std::vector<std::string> &args) {
log_level_t start_level;
if (args.size() == 0) {
diff --git a/src/utils/test/command_manager_test.cpp
b/src/utils/test/command_manager_test.cpp
index e6e2598ca..fd7265d8d 100644
--- a/src/utils/test/command_manager_test.cpp
+++ b/src/utils/test/command_manager_test.cpp
@@ -32,10 +32,10 @@ class command_manager_test : public ::testing::Test
public:
command_manager_test()
{
- _cmd = command_manager::instance().register_command(
- {"test-cmd"},
- "test-cmd - just for command_manager unit-test",
- "test-cmd arg1 arg2 ...",
+ _cmd = command_manager::instance().register_single_command(
+ "test-cmd",
+ "Just for command_manager unit-test",
+ "arg1 arg2 ...",
[](const vector<string> &args) {
return fmt::format("test-cmd response: [{}]",
boost::join(args, " "));
});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]