This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 224cb818e feat(cpp-shell): make mlog_dump dump plog (#1760)
224cb818e is described below
commit 224cb818ea9797b20cf0db25a3c2352e0a0bb160
Author: Yingchun Lai <[email protected]>
AuthorDate: Tue Dec 12 18:38:40 2023 +0800
feat(cpp-shell): make mlog_dump dump plog (#1760)
Since Pegasus 2.5.0, the slog has been deprecated entirely, the plog is
left.
The 'mlog_dump' tool in cpp-shell is used to dump slog ever, this patch
transfer
it to dump plog.
---
src/replica/replica.h | 2 ++
src/shell/commands/debugger.cpp | 65 ++++++++++++++++++++++++-----------------
src/shell/config.ini | 2 +-
src/shell/main.cpp | 3 +-
src/tools/mutation_log_tool.cpp | 62 ++++++++++++++++++++++++++++++---------
src/tools/mutation_log_tool.h | 6 ++--
src/utils/time_utils.cpp | 3 --
7 files changed, 96 insertions(+), 47 deletions(-)
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 2057fca6c..c4f3d2ff4 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -92,6 +92,7 @@ class learn_notify_response;
class learn_request;
class learn_response;
class learn_state;
+class mutation_log_tool;
class replica;
class replica_backup_manager;
class replica_bulk_loader;
@@ -529,6 +530,7 @@ private:
private:
friend class ::dsn::replication::test::test_checker;
+ friend class ::dsn::replication::mutation_log_tool;
friend class ::dsn::replication::mutation_queue;
friend class ::dsn::replication::replica_stub;
friend class mock_replica;
diff --git a/src/shell/commands/debugger.cpp b/src/shell/commands/debugger.cpp
index 4febb4640..8bfe9b295 100644
--- a/src/shell/commands/debugger.cpp
+++ b/src/shell/commands/debugger.cpp
@@ -20,11 +20,6 @@
// IWYU pragma: no_include <bits/getopt_core.h>
// TODO(yingchun): refactor this after libfmt upgraded
#include <fmt/chrono.h> // IWYU pragma: keep
-// IWYU pragma: no_include <fmt/core.h>
-// IWYU pragma: no_include <fmt/format.h>
-#if FMT_VERSION < 60000
-#include <fmt/time.h> // IWYU pragma: keep
-#endif
#include <fmt/printf.h> // IWYU pragma: keep
// IWYU pragma: no_include <algorithm>
// IWYU pragma: no_include <iterator>
@@ -38,6 +33,9 @@
#include <stdint.h>
#include <stdio.h>
#include <ctime>
+// IWYU pragma: no_include <fmt/core.h>
+// IWYU pragma: no_include <fmt/format.h>
+#include <filesystem>
#include <functional>
#include <iostream>
#include <string>
@@ -45,6 +43,7 @@
#include <vector>
#include "base/idl_utils.h"
+#include "common/gpid.h"
#include "common/replication.codes.h"
#include "pegasus_key_schema.h"
#include "pegasus_utils.h"
@@ -78,7 +77,7 @@ bool mlog_dump(command_executor *e, shell_context *sc,
arguments args)
{0, 0, 0, 0}};
bool detailed = false;
- std::string input;
+ std::string plog_dir;
std::string output;
optind = 0;
while (true) {
@@ -92,7 +91,7 @@ bool mlog_dump(command_executor *e, shell_context *sc,
arguments args)
detailed = true;
break;
case 'i':
- input = optarg;
+ plog_dir = optarg;
break;
case 'o':
output = optarg;
@@ -101,12 +100,26 @@ bool mlog_dump(command_executor *e, shell_context *sc,
arguments args)
return false;
}
}
- if (input.empty()) {
- fprintf(stderr, "ERROR: input is not specified\n");
+ if (plog_dir.empty()) {
+ fmt::print(stderr, "ERROR: 'input' is not specified\n");
+ return false;
+ }
+ if (!dsn::utils::filesystem::directory_exists(plog_dir)) {
+ fmt::print(stderr, "ERROR: '{}' is not a directory\n", plog_dir);
return false;
}
- if (!dsn::utils::filesystem::directory_exists(input)) {
- fprintf(stderr, "ERROR: input %s is not a directory\n", input.c_str());
+
+ const auto replica_path = std::filesystem::path(plog_dir).parent_path();
+ const auto name = replica_path.filename().string();
+ if (name.empty()) {
+ fmt::print(stderr, "ERROR: '{}' is not a valid plog directory\n",
plog_dir);
+ return false;
+ }
+
+ char app_type[128];
+ int32_t app_id, pidx;
+ if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) {
+ fmt::print(stderr, "ERROR: '{}' is not a valid plog directory\n",
plog_dir);
return false;
}
@@ -116,7 +129,7 @@ bool mlog_dump(command_executor *e, shell_context *sc,
arguments args)
} else {
os_ptr = new std::ofstream(output);
if (!*os_ptr) {
- fprintf(stderr, "ERROR: open output file %s failed\n",
output.c_str());
+ fmt::print(stderr, "ERROR: open output file {} failed\n", output);
delete os_ptr;
return true;
}
@@ -218,11 +231,11 @@ bool mlog_dump(command_executor *e, shell_context *sc,
arguments args)
}
dsn::replication::mutation_log_tool tool;
- bool ret = tool.dump(input, os, callback);
+ bool ret = tool.dump(plog_dir, dsn::gpid(app_id, pidx), os, callback);
if (!ret) {
- fprintf(stderr, "ERROR: dump failed\n");
+ fmt::print(stderr, "ERROR: dump failed\n");
} else {
- fprintf(stderr, "Done\n");
+ fmt::print(stderr, "Done\n");
}
if (os_ptr != &std::cout) {
@@ -246,7 +259,7 @@ bool local_get(command_executor *e, shell_context *sc,
arguments args)
rocksdb::DB *db;
rocksdb::Status status = rocksdb::DB::OpenForReadOnly(db_opts, db_path,
&db);
if (!status.ok()) {
- fprintf(stderr, "ERROR: open db failed: %s\n",
status.ToString().c_str());
+ fmt::print(stderr, "ERROR: open db failed: {}\n", status.ToString());
return true;
}
@@ -257,15 +270,15 @@ bool local_get(command_executor *e, shell_context *sc,
arguments args)
rocksdb::ReadOptions rd_opts;
status = db->Get(rd_opts, skey, &value);
if (!status.ok()) {
- fprintf(stderr, "ERROR: get failed: %s\n", status.ToString().c_str());
+ fmt::print(stderr, "ERROR: get failed: {}\n", status.ToString());
} else {
uint32_t expire_ts = pegasus::pegasus_extract_expire_ts(0, value);
dsn::blob user_data;
pegasus::pegasus_extract_user_data(0, std::move(value), user_data);
- fprintf(stderr,
- "%u : \"%s\"\n",
- expire_ts,
- pegasus::utils::c_escape_string(user_data,
sc->escape_all).c_str());
+ fmt::print(stderr,
+ "{} : \"{}\"\n",
+ expire_ts,
+ pegasus::utils::c_escape_string(user_data, sc->escape_all));
}
delete db;
@@ -282,7 +295,7 @@ bool rdb_key_str2hex(command_executor *e, shell_context
*sc, arguments args)
::dsn::blob key;
pegasus::pegasus_generate_key(key, hash_key, sort_key);
rocksdb::Slice skey(key.data(), key.length());
- fprintf(stderr, "\"%s\"\n", skey.ToString(true).c_str());
+ fmt::print(stderr, "\"{}\"\n", skey.ToString(true));
return true;
}
@@ -312,12 +325,12 @@ bool rdb_value_hex2str(command_executor *e, shell_context
*sc, arguments args)
auto expire_ts =
static_cast<int64_t>(pegasus::pegasus_extract_expire_ts(0, pegasus_value)) +
pegasus::utils::epoch_begin; // TODO(wutao): pass user
specified version
std::time_t tm(expire_ts);
- fmt::print(stderr, "\nWhen to expire:\n {:%Y-%m-%d %H:%M:%S}\n",
*std::localtime(&tm));
+ fmt::print(stderr, "\nWhen to expire:\n {:%Y-%m-%d %H:%M:%S}\n",
fmt::localtime(tm));
dsn::blob user_data;
pegasus::pegasus_extract_user_data(0, std::move(pegasus_value), user_data);
- fprintf(stderr,
- "user_data:\n \"%s\"\n",
- pegasus::utils::c_escape_string(user_data.to_string(),
sc->escape_all).c_str());
+ fmt::print(stderr,
+ "user_data:\n \"{}\"\n",
+ pegasus::utils::c_escape_string(user_data.to_string(),
sc->escape_all));
return true;
}
diff --git a/src/shell/config.ini b/src/shell/config.ini
index 6c6ab5adc..812668839 100644
--- a/src/shell/config.ini
+++ b/src/shell/config.ini
@@ -24,7 +24,7 @@ count = 1
[apps.mimic]
type = dsn.app.mimic
arguments =
-pools = THREAD_POOL_DEFAULT,THREAD_POOL_META_SERVER
+pools = THREAD_POOL_DEFAULT,THREAD_POOL_META_SERVER,THREAD_POOL_REPLICATION
run = true
count = 1
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index 50c1614c1..75a52abc6 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -392,7 +392,8 @@ static command_executor commands[] = {
{
"mlog_dump",
"dump mutation log dir",
- "<-i|--input log_dir> [-o|--output file_name] [-d|--detailed]",
+ "<-i|--input log_dir(e.g. '/path/to/replica/reps/2.1.pegasus/plog/')>
[-o|--output "
+ "file_name] [-d|--detailed]",
mlog_dump,
},
{
diff --git a/src/tools/mutation_log_tool.cpp b/src/tools/mutation_log_tool.cpp
index 3c1177d4d..b68a4979b 100644
--- a/src/tools/mutation_log_tool.cpp
+++ b/src/tools/mutation_log_tool.cpp
@@ -27,29 +27,62 @@
#include "mutation_log_tool.h"
#include <alloca.h>
+#include <memory>
#include <vector>
+#include "common/fs_manager.h"
#include "common/gpid.h"
#include "consensus_types.h"
+#include "dsn.layer2_types.h"
+#include "fmt/core.h"
#include "replica/mutation.h"
#include "replica/mutation_log.h"
+#include "replica/replica.h"
+#include "replica/replica_stub.h"
#include "runtime/rpc/rpc_message.h"
#include "runtime/task/task_spec.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
+#include "utils/defer.h"
#include "utils/error_code.h"
+#include "utils/filesystem.h"
+#include "utils/flags.h"
#include "utils/time_utils.h"
namespace dsn {
namespace replication {
+DSN_DECLARE_int32(log_private_file_size_mb);
+
bool mutation_log_tool::dump(
const std::string &log_dir,
+ gpid pid,
std::ostream &output,
std::function<void(int64_t decree, int64_t timestamp, dsn::message_ex
**requests, int count)>
callback)
{
- mutation_log_ptr mlog = new mutation_log_shared(log_dir, 32, false);
+ std::string absolute_path;
+ if (!utils::filesystem::get_absolute_path(log_dir, absolute_path)) {
+ output << fmt::format("ERROR: get absolute path failed\n");
+ return false;
+ }
+ std::string norm_path;
+ utils::filesystem::get_normalized_path(absolute_path, norm_path);
+ auto dn = std::make_unique<dir_node>(/* tag_ */ "", norm_path);
+ app_info ai;
+ ai.__set_app_type("pegasus");
+ auto stub = std::make_unique<replica_stub>();
+ // Constructor of replica is private which can not be accessed by
std::make_unique, so use raw
+ // pointer here.
+ auto *rep = new replica(stub.get(),
+ pid,
+ ai,
+ dn.get(),
+ /* need_restore */ false,
+ /* is_duplication_follower */ false);
+ auto cleanup = dsn::defer([rep]() { delete rep; });
+ auto mlog =
+ std::make_shared<mutation_log_private>(log_dir,
FLAGS_log_private_file_size_mb, pid, rep);
error_code err = mlog->open(
[mlog, &output, callback](int log_length, mutation_ptr &mu) -> bool {
if (mlog->max_decree(mu->data.header.pid) == 0) {
@@ -57,18 +90,19 @@ bool mutation_log_tool::dump(
}
char timestamp_buf[32] = {0};
utils::time_ms_to_string(mu->data.header.timestamp / 1000,
timestamp_buf);
- output << "mutation [" << mu->name() << "]: "
- << "gpid=" << mu->data.header.pid.get_app_id() << "."
- << mu->data.header.pid.get_partition_index() << ", "
- << "ballot=" << mu->data.header.ballot << ", decree=" <<
mu->data.header.decree
- << ", "
- << "timestamp=" << timestamp_buf
- << ", last_committed_decree=" <<
mu->data.header.last_committed_decree << ", "
- << "log_offset=" << mu->data.header.log_offset << ",
log_length=" << log_length
- << ", "
- << "update_count=" << mu->data.updates.size();
- if (callback && mu->data.updates.size() > 0) {
-
+ output << fmt::format("mutation [{}]: gpid={}, ballot={},
decree={}, timestamp={}, "
+ "last_committed_decree={}, log_offset={},
log_length={}, "
+ "update_count={}\n",
+ mu->name(),
+ mu->data.header.pid,
+ mu->data.header.ballot,
+ mu->data.header.decree,
+ timestamp_buf,
+ mu->data.header.last_committed_decree,
+ mu->data.header.log_offset,
+ log_length,
+ mu->data.updates.size());
+ if (callback && !mu->data.updates.empty()) {
dsn::message_ex **batched_requests =
(dsn::message_ex **)alloca(sizeof(dsn::message_ex *) *
mu->data.updates.size());
int batched_count = 0;
@@ -93,7 +127,7 @@ bool mutation_log_tool::dump(
nullptr);
mlog->close();
if (err != dsn::ERR_OK) {
- output << "ERROR: dump mutation log failed, err = " << err.to_string()
<< std::endl;
+ output << fmt::format("ERROR: dump mutation log failed, err = {}\n",
err);
return false;
} else {
return true;
diff --git a/src/tools/mutation_log_tool.h b/src/tools/mutation_log_tool.h
index d313bdc4a..e380be2af 100644
--- a/src/tools/mutation_log_tool.h
+++ b/src/tools/mutation_log_tool.h
@@ -32,6 +32,7 @@
#include <string>
namespace dsn {
+class gpid;
class message_ex;
namespace replication {
@@ -41,9 +42,10 @@ class mutation_log_tool
public:
bool
dump(const std::string &log_dir,
+ gpid pid,
std::ostream &output,
std::function<void(
int64_t decree, int64_t timestamp, dsn::message_ex **requests,
int count)> callback);
};
-}
-}
+} // namespace replication
+} // namespace dsn
diff --git a/src/utils/time_utils.cpp b/src/utils/time_utils.cpp
index 711b4cd3d..34504fc99 100644
--- a/src/utils/time_utils.cpp
+++ b/src/utils/time_utils.cpp
@@ -23,9 +23,6 @@
#include <fmt/chrono.h> // IWYU pragma: keep
// IWYU pragma: no_include <fmt/core.h>
// IWYU pragma: no_include <fmt/format.h>
-#if FMT_VERSION < 60000
-#include <fmt/time.h> // IWYU pragma: keep
-#endif
#include <fmt/printf.h> // IWYU pragma: keep
// IWYU pragma: no_include <algorithm>
// IWYU pragma: no_include <iterator>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]