This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 224cb818e feat(cpp-shell): make mlog_dump dump plog (#1760)
224cb818e is described below

commit 224cb818ea9797b20cf0db25a3c2352e0a0bb160
Author: Yingchun Lai <[email protected]>
AuthorDate: Tue Dec 12 18:38:40 2023 +0800

    feat(cpp-shell): make mlog_dump dump plog (#1760)
    
    Since Pegasus 2.5.0, the slog has been deprecated entirely, the plog is 
left.
    The 'mlog_dump' tool in cpp-shell is used to dump slog ever, this patch 
transfer
    it to dump plog.
---
 src/replica/replica.h           |  2 ++
 src/shell/commands/debugger.cpp | 65 ++++++++++++++++++++++++-----------------
 src/shell/config.ini            |  2 +-
 src/shell/main.cpp              |  3 +-
 src/tools/mutation_log_tool.cpp | 62 ++++++++++++++++++++++++++++++---------
 src/tools/mutation_log_tool.h   |  6 ++--
 src/utils/time_utils.cpp        |  3 --
 7 files changed, 96 insertions(+), 47 deletions(-)

diff --git a/src/replica/replica.h b/src/replica/replica.h
index 2057fca6c..c4f3d2ff4 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -92,6 +92,7 @@ class learn_notify_response;
 class learn_request;
 class learn_response;
 class learn_state;
+class mutation_log_tool;
 class replica;
 class replica_backup_manager;
 class replica_bulk_loader;
@@ -529,6 +530,7 @@ private:
 
 private:
     friend class ::dsn::replication::test::test_checker;
+    friend class ::dsn::replication::mutation_log_tool;
     friend class ::dsn::replication::mutation_queue;
     friend class ::dsn::replication::replica_stub;
     friend class mock_replica;
diff --git a/src/shell/commands/debugger.cpp b/src/shell/commands/debugger.cpp
index 4febb4640..8bfe9b295 100644
--- a/src/shell/commands/debugger.cpp
+++ b/src/shell/commands/debugger.cpp
@@ -20,11 +20,6 @@
 // IWYU pragma: no_include <bits/getopt_core.h>
 // TODO(yingchun): refactor this after libfmt upgraded
 #include <fmt/chrono.h> // IWYU pragma: keep
-// IWYU pragma: no_include <fmt/core.h>
-// IWYU pragma: no_include <fmt/format.h>
-#if FMT_VERSION < 60000
-#include <fmt/time.h> // IWYU pragma: keep
-#endif
 #include <fmt/printf.h> // IWYU pragma: keep
 // IWYU pragma: no_include <algorithm>
 // IWYU pragma: no_include <iterator>
@@ -38,6 +33,9 @@
 #include <stdint.h>
 #include <stdio.h>
 #include <ctime>
+// IWYU pragma: no_include <fmt/core.h>
+// IWYU pragma: no_include <fmt/format.h>
+#include <filesystem>
 #include <functional>
 #include <iostream>
 #include <string>
@@ -45,6 +43,7 @@
 #include <vector>
 
 #include "base/idl_utils.h"
+#include "common/gpid.h"
 #include "common/replication.codes.h"
 #include "pegasus_key_schema.h"
 #include "pegasus_utils.h"
@@ -78,7 +77,7 @@ bool mlog_dump(command_executor *e, shell_context *sc, 
arguments args)
                                            {0, 0, 0, 0}};
 
     bool detailed = false;
-    std::string input;
+    std::string plog_dir;
     std::string output;
     optind = 0;
     while (true) {
@@ -92,7 +91,7 @@ bool mlog_dump(command_executor *e, shell_context *sc, 
arguments args)
             detailed = true;
             break;
         case 'i':
-            input = optarg;
+            plog_dir = optarg;
             break;
         case 'o':
             output = optarg;
@@ -101,12 +100,26 @@ bool mlog_dump(command_executor *e, shell_context *sc, 
arguments args)
             return false;
         }
     }
-    if (input.empty()) {
-        fprintf(stderr, "ERROR: input is not specified\n");
+    if (plog_dir.empty()) {
+        fmt::print(stderr, "ERROR: 'input' is not specified\n");
+        return false;
+    }
+    if (!dsn::utils::filesystem::directory_exists(plog_dir)) {
+        fmt::print(stderr, "ERROR: '{}' is not a directory\n", plog_dir);
         return false;
     }
-    if (!dsn::utils::filesystem::directory_exists(input)) {
-        fprintf(stderr, "ERROR: input %s is not a directory\n", input.c_str());
+
+    const auto replica_path = std::filesystem::path(plog_dir).parent_path();
+    const auto name = replica_path.filename().string();
+    if (name.empty()) {
+        fmt::print(stderr, "ERROR: '{}' is not a valid plog directory\n", 
plog_dir);
+        return false;
+    }
+
+    char app_type[128];
+    int32_t app_id, pidx;
+    if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) {
+        fmt::print(stderr, "ERROR: '{}' is not a valid plog directory\n", 
plog_dir);
         return false;
     }
 
@@ -116,7 +129,7 @@ bool mlog_dump(command_executor *e, shell_context *sc, 
arguments args)
     } else {
         os_ptr = new std::ofstream(output);
         if (!*os_ptr) {
-            fprintf(stderr, "ERROR: open output file %s failed\n", 
output.c_str());
+            fmt::print(stderr, "ERROR: open output file {} failed\n", output);
             delete os_ptr;
             return true;
         }
@@ -218,11 +231,11 @@ bool mlog_dump(command_executor *e, shell_context *sc, 
arguments args)
     }
 
     dsn::replication::mutation_log_tool tool;
-    bool ret = tool.dump(input, os, callback);
+    bool ret = tool.dump(plog_dir, dsn::gpid(app_id, pidx), os, callback);
     if (!ret) {
-        fprintf(stderr, "ERROR: dump failed\n");
+        fmt::print(stderr, "ERROR: dump failed\n");
     } else {
-        fprintf(stderr, "Done\n");
+        fmt::print(stderr, "Done\n");
     }
 
     if (os_ptr != &std::cout) {
@@ -246,7 +259,7 @@ bool local_get(command_executor *e, shell_context *sc, 
arguments args)
     rocksdb::DB *db;
     rocksdb::Status status = rocksdb::DB::OpenForReadOnly(db_opts, db_path, 
&db);
     if (!status.ok()) {
-        fprintf(stderr, "ERROR: open db failed: %s\n", 
status.ToString().c_str());
+        fmt::print(stderr, "ERROR: open db failed: {}\n", status.ToString());
         return true;
     }
 
@@ -257,15 +270,15 @@ bool local_get(command_executor *e, shell_context *sc, 
arguments args)
     rocksdb::ReadOptions rd_opts;
     status = db->Get(rd_opts, skey, &value);
     if (!status.ok()) {
-        fprintf(stderr, "ERROR: get failed: %s\n", status.ToString().c_str());
+        fmt::print(stderr, "ERROR: get failed: {}\n", status.ToString());
     } else {
         uint32_t expire_ts = pegasus::pegasus_extract_expire_ts(0, value);
         dsn::blob user_data;
         pegasus::pegasus_extract_user_data(0, std::move(value), user_data);
-        fprintf(stderr,
-                "%u : \"%s\"\n",
-                expire_ts,
-                pegasus::utils::c_escape_string(user_data, 
sc->escape_all).c_str());
+        fmt::print(stderr,
+                   "{} : \"{}\"\n",
+                   expire_ts,
+                   pegasus::utils::c_escape_string(user_data, sc->escape_all));
     }
 
     delete db;
@@ -282,7 +295,7 @@ bool rdb_key_str2hex(command_executor *e, shell_context 
*sc, arguments args)
     ::dsn::blob key;
     pegasus::pegasus_generate_key(key, hash_key, sort_key);
     rocksdb::Slice skey(key.data(), key.length());
-    fprintf(stderr, "\"%s\"\n", skey.ToString(true).c_str());
+    fmt::print(stderr, "\"{}\"\n", skey.ToString(true));
     return true;
 }
 
@@ -312,12 +325,12 @@ bool rdb_value_hex2str(command_executor *e, shell_context 
*sc, arguments args)
     auto expire_ts = 
static_cast<int64_t>(pegasus::pegasus_extract_expire_ts(0, pegasus_value)) +
                      pegasus::utils::epoch_begin; // TODO(wutao): pass user 
specified version
     std::time_t tm(expire_ts);
-    fmt::print(stderr, "\nWhen to expire:\n  {:%Y-%m-%d %H:%M:%S}\n", 
*std::localtime(&tm));
+    fmt::print(stderr, "\nWhen to expire:\n  {:%Y-%m-%d %H:%M:%S}\n", 
fmt::localtime(tm));
 
     dsn::blob user_data;
     pegasus::pegasus_extract_user_data(0, std::move(pegasus_value), user_data);
-    fprintf(stderr,
-            "user_data:\n  \"%s\"\n",
-            pegasus::utils::c_escape_string(user_data.to_string(), 
sc->escape_all).c_str());
+    fmt::print(stderr,
+               "user_data:\n  \"{}\"\n",
+               pegasus::utils::c_escape_string(user_data.to_string(), 
sc->escape_all));
     return true;
 }
diff --git a/src/shell/config.ini b/src/shell/config.ini
index 6c6ab5adc..812668839 100644
--- a/src/shell/config.ini
+++ b/src/shell/config.ini
@@ -24,7 +24,7 @@ count = 1
 [apps.mimic]
 type = dsn.app.mimic
 arguments =
-pools = THREAD_POOL_DEFAULT,THREAD_POOL_META_SERVER
+pools = THREAD_POOL_DEFAULT,THREAD_POOL_META_SERVER,THREAD_POOL_REPLICATION
 run = true
 count = 1
 
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index 50c1614c1..75a52abc6 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -392,7 +392,8 @@ static command_executor commands[] = {
     {
         "mlog_dump",
         "dump mutation log dir",
-        "<-i|--input log_dir> [-o|--output file_name] [-d|--detailed]",
+        "<-i|--input log_dir(e.g. '/path/to/replica/reps/2.1.pegasus/plog/')> 
[-o|--output "
+        "file_name] [-d|--detailed]",
         mlog_dump,
     },
     {
diff --git a/src/tools/mutation_log_tool.cpp b/src/tools/mutation_log_tool.cpp
index 3c1177d4d..b68a4979b 100644
--- a/src/tools/mutation_log_tool.cpp
+++ b/src/tools/mutation_log_tool.cpp
@@ -27,29 +27,62 @@
 #include "mutation_log_tool.h"
 
 #include <alloca.h>
+#include <memory>
 #include <vector>
 
+#include "common/fs_manager.h"
 #include "common/gpid.h"
 #include "consensus_types.h"
+#include "dsn.layer2_types.h"
+#include "fmt/core.h"
 #include "replica/mutation.h"
 #include "replica/mutation_log.h"
+#include "replica/replica.h"
+#include "replica/replica_stub.h"
 #include "runtime/rpc/rpc_message.h"
 #include "runtime/task/task_spec.h"
 #include "utils/autoref_ptr.h"
 #include "utils/blob.h"
+#include "utils/defer.h"
 #include "utils/error_code.h"
+#include "utils/filesystem.h"
+#include "utils/flags.h"
 #include "utils/time_utils.h"
 
 namespace dsn {
 namespace replication {
 
+DSN_DECLARE_int32(log_private_file_size_mb);
+
 bool mutation_log_tool::dump(
     const std::string &log_dir,
+    gpid pid,
     std::ostream &output,
     std::function<void(int64_t decree, int64_t timestamp, dsn::message_ex 
**requests, int count)>
         callback)
 {
-    mutation_log_ptr mlog = new mutation_log_shared(log_dir, 32, false);
+    std::string absolute_path;
+    if (!utils::filesystem::get_absolute_path(log_dir, absolute_path)) {
+        output << fmt::format("ERROR: get absolute path failed\n");
+        return false;
+    }
+    std::string norm_path;
+    utils::filesystem::get_normalized_path(absolute_path, norm_path);
+    auto dn = std::make_unique<dir_node>(/* tag_ */ "", norm_path);
+    app_info ai;
+    ai.__set_app_type("pegasus");
+    auto stub = std::make_unique<replica_stub>();
+    // Constructor of replica is private which can not be accessed by 
std::make_unique, so use raw
+    // pointer here.
+    auto *rep = new replica(stub.get(),
+                            pid,
+                            ai,
+                            dn.get(),
+                            /* need_restore */ false,
+                            /* is_duplication_follower */ false);
+    auto cleanup = dsn::defer([rep]() { delete rep; });
+    auto mlog =
+        std::make_shared<mutation_log_private>(log_dir, 
FLAGS_log_private_file_size_mb, pid, rep);
     error_code err = mlog->open(
         [mlog, &output, callback](int log_length, mutation_ptr &mu) -> bool {
             if (mlog->max_decree(mu->data.header.pid) == 0) {
@@ -57,18 +90,19 @@ bool mutation_log_tool::dump(
             }
             char timestamp_buf[32] = {0};
             utils::time_ms_to_string(mu->data.header.timestamp / 1000, 
timestamp_buf);
-            output << "mutation [" << mu->name() << "]: "
-                   << "gpid=" << mu->data.header.pid.get_app_id() << "."
-                   << mu->data.header.pid.get_partition_index() << ", "
-                   << "ballot=" << mu->data.header.ballot << ", decree=" << 
mu->data.header.decree
-                   << ", "
-                   << "timestamp=" << timestamp_buf
-                   << ", last_committed_decree=" << 
mu->data.header.last_committed_decree << ", "
-                   << "log_offset=" << mu->data.header.log_offset << ", 
log_length=" << log_length
-                   << ", "
-                   << "update_count=" << mu->data.updates.size();
-            if (callback && mu->data.updates.size() > 0) {
-
+            output << fmt::format("mutation [{}]: gpid={}, ballot={}, 
decree={}, timestamp={}, "
+                                  "last_committed_decree={}, log_offset={}, 
log_length={}, "
+                                  "update_count={}\n",
+                                  mu->name(),
+                                  mu->data.header.pid,
+                                  mu->data.header.ballot,
+                                  mu->data.header.decree,
+                                  timestamp_buf,
+                                  mu->data.header.last_committed_decree,
+                                  mu->data.header.log_offset,
+                                  log_length,
+                                  mu->data.updates.size());
+            if (callback && !mu->data.updates.empty()) {
                 dsn::message_ex **batched_requests =
                     (dsn::message_ex **)alloca(sizeof(dsn::message_ex *) * 
mu->data.updates.size());
                 int batched_count = 0;
@@ -93,7 +127,7 @@ bool mutation_log_tool::dump(
         nullptr);
     mlog->close();
     if (err != dsn::ERR_OK) {
-        output << "ERROR: dump mutation log failed, err = " << err.to_string() 
<< std::endl;
+        output << fmt::format("ERROR: dump mutation log failed, err = {}\n", 
err);
         return false;
     } else {
         return true;
diff --git a/src/tools/mutation_log_tool.h b/src/tools/mutation_log_tool.h
index d313bdc4a..e380be2af 100644
--- a/src/tools/mutation_log_tool.h
+++ b/src/tools/mutation_log_tool.h
@@ -32,6 +32,7 @@
 #include <string>
 
 namespace dsn {
+class gpid;
 class message_ex;
 
 namespace replication {
@@ -41,9 +42,10 @@ class mutation_log_tool
 public:
     bool
     dump(const std::string &log_dir,
+         gpid pid,
          std::ostream &output,
          std::function<void(
              int64_t decree, int64_t timestamp, dsn::message_ex **requests, 
int count)> callback);
 };
-}
-}
+} // namespace replication
+} // namespace dsn
diff --git a/src/utils/time_utils.cpp b/src/utils/time_utils.cpp
index 711b4cd3d..34504fc99 100644
--- a/src/utils/time_utils.cpp
+++ b/src/utils/time_utils.cpp
@@ -23,9 +23,6 @@
 #include <fmt/chrono.h> // IWYU pragma: keep
 // IWYU pragma: no_include <fmt/core.h>
 // IWYU pragma: no_include <fmt/format.h>
-#if FMT_VERSION < 60000
-#include <fmt/time.h> // IWYU pragma: keep
-#endif
 #include <fmt/printf.h> // IWYU pragma: keep
 // IWYU pragma: no_include <algorithm>
 // IWYU pragma: no_include <iterator>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to