This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new c05e3f65c refactor(shell): Refactor on 'remote_command' command (#2052)
c05e3f65c is described below
commit c05e3f65c790671ea09e9fa0908ce2cc7c59645e
Author: Yingchun Lai <[email protected]>
AuthorDate: Wed Jul 17 20:01:26 2024 +0800
refactor(shell): Refactor on 'remote_command' command (#2052)
This patch refactors the code for the `remote_command` shell CLI tool,
now uses `argh::parser` instead of `getopt_long`. And now the error
output is colored to notice the administors.
The effected commands are `server_info`, `server_stat`, `flush_log` and
`remote_command`.
To keep compatible, `remote_command server-info`, `remote_command
server_info`
and `server_info` are equal. The same to `server_stat` and `flush_log`.
Behavior changes:
- If there are some errors, the tool return false instead of true, then
the USAGE can be output for hint.
- The output is organized as JSON format, and the embeded structures are
also in JSON format. (This is very useful for thirdparty tools to parse
the output.)
Example:
```
>>> remote_command server-info -l b334667ddf87:34801
{
"command": "server-info ",
"details": {
"b334667ddf87:34801": {
"acked": true,
"message": {
"build_type": "Debug",
"git_SHA": "cbdca43302c36f5e03b0f96d0dbd6a59149f2ce6",
"start_time": "2024-07-15 07:25:50",
"version": "2.6.0-SNAPSHOT"
},
"role": "user-specified"
}
},
"failed_count": 0,
"succeed_count": 1
}
>>> server_stat -l b334667ddf87:34801
{
"command": "server-stat ",
"details": {
"b334667ddf87:34801": {
"acked": true,
"message":
"replica*app.pegasus*manual.compact.enqueue.count=not_found,
replica*app.pegasus*manual.compact.running.count=not_found,
replica*app.pegasus*rdb.block_cache.memory_usage=not_found,
replica*eon.replica_stub*closing.replica(Count)=not_found,
replica*eon.replica_stub*disk.available.max.ratio=not_found,
replica*eon.replica_stub*disk.available.min.ratio=not_found,
replica*eon.replica_stub*disk.available.total.ratio=not_found,
replica*eon.replica_stub*disk.capacity.total(MB [...]
"role": "user-specified"
}
},
"failed_count": 0,
"succeed_count": 1
}
>>> remote_command meta.lb.add_secondary_max_count_for_one_node -l
b334667ddf87:34601
{
"command": "meta.lb.add_secondary_max_count_for_one_node ",
"details": {
"b334667ddf87:34601": {
"acked": true,
"message": {
"error": "ok",
"meta.lb.add_secondary_max_count_for_one_node": "10"
},
"role": "user-specified"
}
},
"failed_count": 0,
"succeed_count": 1
}
```
---
src/shell/commands/node_management.cpp | 212 ++++++++++++++++-----------------
src/utils/command_manager.h | 8 +-
2 files changed, 109 insertions(+), 111 deletions(-)
diff --git a/src/shell/commands/node_management.cpp
b/src/shell/commands/node_management.cpp
index 803d2ee2a..32cdc091b 100644
--- a/src/shell/commands/node_management.cpp
+++ b/src/shell/commands/node_management.cpp
@@ -17,17 +17,22 @@
* under the License.
*/
+#include <fmt/core.h>
+#include <fmt/format.h>
#include <getopt.h>
+#include <nlohmann/json.hpp>
+#include <nlohmann/json_fwd.hpp>
#include <stdint.h>
#include <stdio.h>
-#include <string.h>
#include <algorithm>
// IWYU pragma: no_include <bits/getopt_core.h>
#include <chrono>
#include <fstream>
+#include <initializer_list>
#include <iostream>
#include <map>
#include <memory>
+#include <set>
#include <string>
#include <thread>
#include <unordered_map>
@@ -39,19 +44,19 @@
#include "dsn.layer2_types.h"
#include "meta_admin_types.h"
#include "runtime/rpc/rpc_host_port.h"
+#include "shell/argh.h"
#include "shell/command_executor.h"
#include "shell/command_helper.h"
#include "shell/command_utils.h"
#include "shell/commands.h"
-#include "shell/sds/sds.h"
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/flags.h"
+#include "utils/fmt_logging.h"
#include "utils/math.h"
#include "utils/metrics.h"
#include "utils/output_utils.h"
#include "utils/ports.h"
-#include "utils/strings.h"
DSN_DEFINE_uint32(shell, nodes_sample_interval_ms, 1000, "The interval between
sampling metrics.");
DSN_DEFINE_validator(nodes_sample_interval_ms, [](uint32_t value) -> bool {
return value > 0; });
@@ -544,145 +549,134 @@ bool ls_nodes(command_executor *e, shell_context *sc,
arguments args)
bool server_info(command_executor *e, shell_context *sc, arguments args)
{
- char *argv[args.argc + 1];
- memcpy(argv, args.argv, sizeof(char *) * args.argc);
- argv[args.argc] = (char *)"server-info";
- arguments new_args;
- new_args.argc = args.argc + 1;
- new_args.argv = argv;
- return remote_command(e, sc, new_args);
+ return remote_command(e, sc, args);
}
bool server_stat(command_executor *e, shell_context *sc, arguments args)
{
- char *argv[args.argc + 1];
- memcpy(argv, args.argv, sizeof(char *) * args.argc);
- argv[args.argc] = (char *)"server-stat";
- arguments new_args;
- new_args.argc = args.argc + 1;
- new_args.argv = argv;
- return remote_command(e, sc, new_args);
+ return remote_command(e, sc, args);
}
-bool remote_command(command_executor *e, shell_context *sc, arguments args)
+bool flush_log(command_executor *e, shell_context *sc, arguments args)
{
- static struct option long_options[] = {{"node_type", required_argument, 0,
't'},
- {"node_list", required_argument, 0,
'l'},
- {"resolve_ip", no_argument, 0, 'r'},
- {0, 0, 0, 0}};
+ return remote_command(e, sc, args);
+}
- std::string type;
- std::string nodes;
- optind = 0;
- bool resolve_ip = false;
- while (true) {
- int option_index = 0;
- int c;
- c = getopt_long(args.argc, args.argv, "t:l:r", long_options,
&option_index);
- if (c == -1)
- break;
- switch (c) {
- case 't':
- type = optarg;
- break;
- case 'l':
- nodes = optarg;
- break;
- case 'r':
- resolve_ip = true;
+bool remote_command(command_executor *e, shell_context *sc, arguments args)
+{
+ // Command format: [remote_command] <command> [arguments...]
+ // [-t
all|meta-server|replica-server]
+ // [-r|--resolve_ip]
+ // [-l host:port,host:port...]
+ argh::parser cmd(args.argc, args.argv,
argh::parser::PREFER_PARAM_FOR_UNREG_OPTION);
+
+ std::string command;
+ std::vector<std::string> pos_args;
+ int pos = 0;
+ do {
+ // Try to parse the positional args.
+ const auto &pos_arg = cmd(pos++);
+ if (!pos_arg) {
break;
- default:
- return false;
}
- }
- if (!type.empty() && !nodes.empty()) {
- fprintf(stderr, "can not specify both node_type and node_list\n");
- return false;
- }
+ // Ignore the args that are useless to the command.
+ static const std::set<std::string> kIgnoreArgs({"remote_command"});
+ if (kIgnoreArgs.count(pos_arg.str()) == 1) {
+ continue;
+ }
- if (type.empty() && nodes.empty()) {
- type = "all";
- }
+ // Collect the positional args following by the command.
+ if (!command.empty()) {
+ pos_args.emplace_back(pos_arg.str());
+ continue;
+ }
- if (!type.empty() && type != "all" && type != "meta-server" && type !=
"replica-server") {
- fprintf(stderr, "invalid type, should be: all | meta-server |
replica-server\n");
+ // Initialize the command.
+ const std::map<std::string, std::string> kCmdsMapping({{"server_info",
"server-info"},
+ {"server_stat",
"server-stat"},
+ {"flush_log",
"flush-log"}});
+ const auto &it = kCmdsMapping.find(pos_arg.str());
+ if (it != kCmdsMapping.end()) {
+ // Use the mapped command.
+ command = it->second;
+ } else {
+ command = pos_arg.str();
+ }
+ } while (true);
+
+ if (command.empty()) {
+ SHELL_PRINTLN_ERROR("missing <command>");
return false;
}
+ const auto resolve_ip = cmd[{"-r", "--resolve_ip"}];
+ auto node_type = cmd({"-t"}).str();
+ std::vector<std::string> nodes_str;
+ PARSE_OPT_STRS(nodes_str, "", {"-l"});
- if (optind == args.argc) {
- fprintf(stderr, "command not specified\n");
+ if (!node_type.empty() && !nodes_str.empty()) {
+ SHELL_PRINTLN_ERROR("can not specify both node_type and nodes_str");
return false;
}
- std::string cmd = args.argv[optind];
- std::vector<std::string> arguments;
- for (int i = optind + 1; i < args.argc; i++) {
- arguments.push_back(args.argv[i]);
+ if (node_type.empty() && nodes_str.empty()) {
+ node_type = "all";
}
- std::vector<node_desc> node_list;
- if (!type.empty()) {
- if (!fill_nodes(sc, type, node_list)) {
- fprintf(stderr, "prepare nodes failed, type = %s\n", type.c_str());
- return true;
- }
- } else {
- std::vector<std::string> tokens;
- dsn::utils::split_args(nodes.c_str(), tokens, ',');
- if (tokens.empty()) {
- fprintf(stderr, "can't parse node from node_list\n");
- return true;
- }
+ static const std::set<std::string> kValidNodeTypes({"all", "meta-server",
"replica-server"});
+ if (!node_type.empty() && kValidNodeTypes.count(node_type) == 0) {
+ SHELL_PRINTLN_ERROR("invalid node_type, should be in [{}]",
+ fmt::join(kValidNodeTypes, ", "));
+ return false;
+ }
- for (std::string &token : tokens) {
- const auto node = dsn::host_port::from_string(token);
- if (!node) {
- fprintf(stderr, "parse %s as a host:port node failed\n",
token.c_str());
- return true;
+ std::vector<node_desc> nodes;
+ do {
+ if (node_type.empty()) {
+ for (const auto &node_str : nodes_str) {
+ const auto node = dsn::host_port::from_string(node_str);
+ if (!node) {
+ SHELL_PRINTLN_ERROR("parse '{}' as host:port failed",
node_str);
+ return false;
+ }
+ nodes.emplace_back("user-specified", node);
}
- node_list.emplace_back("user-specified", node);
+ break;
}
- }
-
- fprintf(stderr, "COMMAND: %s", cmd.c_str());
- for (auto &s : arguments) {
- fprintf(stderr, " %s", s.c_str());
- }
- fprintf(stderr, "\n\n");
- std::vector<std::pair<bool, std::string>> results =
- call_remote_command(sc, node_list, cmd, arguments);
+ if (!fill_nodes(sc, node_type, nodes)) {
+ SHELL_PRINTLN_ERROR("prepare nodes failed, node_type = {}",
node_type);
+ return false;
+ }
+ } while (false);
+ nlohmann::json info;
+ info["command"] = fmt::format("{} {}", command, fmt::join(pos_args, " "));
+ const auto results = call_remote_command(sc, nodes, command, pos_args);
int succeed = 0;
int failed = 0;
- // TODO (yingchun) output is hard to read, need do some refactor
- for (int i = 0; i < node_list.size(); ++i) {
- const auto &node = node_list[i];
- const auto hostname = replication_ddl_client::node_name(node.hp,
resolve_ip);
- fprintf(stderr, "CALL [%s] [%s] ", node.desc.c_str(),
hostname.c_str());
+ CHECK_EQ(results.size(), nodes.size());
+ for (int i = 0; i < nodes.size(); ++i) {
+ nlohmann::json node_info;
+ node_info["role"] = nodes[i].desc;
+ node_info["acked"] = results[i].first;
+ try {
+ // Treat the message as a JSON object by default.
+ node_info["message"] = nlohmann::json::parse(results[i].second);
+ } catch (nlohmann::json::exception &exp) {
+ // Treat it as a string if failed to parse as a JSON object.
+ node_info["message"] = results[i].second;
+ }
if (results[i].first) {
- fprintf(stderr, "succeed:\n%s\n", results[i].second.c_str());
succeed++;
} else {
- fprintf(stderr, "failed:\n%s\n", results[i].second.c_str());
failed++;
}
+ info["details"].emplace(replication_ddl_client::node_name(nodes[i].hp,
resolve_ip),
+ node_info);
}
-
- fprintf(stderr, "\nSucceed count: %d\n", succeed);
- fprintf(stderr, "Failed count: %d\n", failed);
-
+ info["succeed_count"] = succeed;
+ info["failed_count"] = failed;
+ fmt::println(stdout, "{}", info.dump(2));
return true;
}
-
-bool flush_log(command_executor *e, shell_context *sc, arguments args)
-{
- char *argv[args.argc + 1];
- memcpy(argv, args.argv, sizeof(char *) * args.argc);
- argv[args.argc] = (char *)"flush-log";
- arguments new_args;
- new_args.argc = args.argc + 1;
- new_args.argv = argv;
- return remote_command(e, sc, new_args);
-}
diff --git a/src/utils/command_manager.h b/src/utils/command_manager.h
index b124388fd..0a34dd4c1 100644
--- a/src/utils/command_manager.h
+++ b/src/utils/command_manager.h
@@ -27,6 +27,7 @@
#pragma once
#include <fmt/core.h>
+#include <fmt/format.h>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <stdint.h>
#include <functional>
@@ -147,7 +148,9 @@ private:
// Invalid arguments size.
if (args.size() > 1) {
- msg["error"] = "ERR: invalid arguments, only one integer argument
is acceptable";
+ msg["error"] =
+ fmt::format("ERR: invalid arguments '{}', only one argument is
acceptable",
+ fmt::join(args, " "));
return msg.dump(2);
}
@@ -162,7 +165,8 @@ private:
T new_value = 0;
if (!internal::buf2signed(args[0], new_value) ||
!validator(static_cast<int64_t>(new_value))) {
- msg["error"] = "ERR: invalid arguments";
+ msg["error"] =
+ fmt::format("ERR: invalid argument '{}', the value is not
acceptable", args[0]);
return msg.dump(2);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]