This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 96ddf41de chore(log): replace logging calls in `utils/kvrocks2redis`
(#2890)
96ddf41de is described below
commit 96ddf41de2b54d75d619ae3737ef41d8fb509114
Author: Twice <[email protected]>
AuthorDate: Mon Apr 21 10:17:29 2025 +0800
chore(log): replace logging calls in `utils/kvrocks2redis` (#2890)
Signed-off-by: PragmaTwice <[email protected]>
---
src/cli/daemon_util.h | 4 ++--
utils/kvrocks2redis/main.cc | 10 ++++++----
utils/kvrocks2redis/parser.cc | 2 +-
utils/kvrocks2redis/redis_writer.cc | 33 +++++++++++++++------------------
utils/kvrocks2redis/sync.cc | 27 +++++++++++++--------------
5 files changed, 37 insertions(+), 39 deletions(-)
diff --git a/src/cli/daemon_util.h b/src/cli/daemon_util.h
index da491bd93..ca406062b 100644
--- a/src/cli/daemon_util.h
+++ b/src/cli/daemon_util.h
@@ -106,7 +106,7 @@ inline bool IsSupervisedMode(SupervisedMode mode) {
inline void Daemonize() {
pid_t pid = fork();
if (pid < 0) {
- error("Failed to fork the process, error: ", strerror(errno));
+ error("Failed to fork the process, error: {}", strerror(errno));
exit(1);
}
@@ -114,7 +114,7 @@ inline void Daemonize() {
// change the file mode
umask(0);
if (setsid() < 0) {
- error("Failed to setsid, error: ", strerror(errno));
+ error("Failed to setsid, error: {}", strerror(errno));
exit(1);
}
diff --git a/utils/kvrocks2redis/main.cc b/utils/kvrocks2redis/main.cc
index a258d68bd..acadf08f6 100644
--- a/utils/kvrocks2redis/main.cc
+++ b/utils/kvrocks2redis/main.cc
@@ -35,6 +35,7 @@
#include "logging.h"
#include "parser.h"
#include "redis_writer.h"
+#include "spdlog/common.h"
#include "spdlog/logger.h"
#include "spdlog/sinks/daily_file_sink.h"
#include "spdlog/sinks/stdout_color_sinks.h"
@@ -89,6 +90,7 @@ static void InitSpdlog(const kvrocks2redis::Config &config) {
std::make_shared<spdlog::sinks::stdout_color_sink_mt>()};
auto logger = std::make_shared<spdlog::logger>("kvrocks2redis",
sinks.begin(), sinks.end());
logger->set_level(config.loglevel);
+ logger->flush_on(spdlog::level::info);
spdlog::set_default_logger(logger);
}
@@ -112,13 +114,13 @@ int main(int argc, char *argv[]) {
}
InitSpdlog(config);
- LOG(INFO) << "kvrocks2redis " << PrintVersion();
+ info("kvrocks2redis {}", PrintVersion());
if (config.daemonize) Daemonize();
s = CreatePidFile(config.pidfile);
if (!s.IsOK()) {
- LOG(ERROR) << "Failed to create pidfile '" << config.pidfile << "': " <<
s.Msg();
+ error("Failed to create pidfile '{}': {}", config.pidfile, s.Msg());
exit(1);
}
@@ -130,7 +132,7 @@ int main(int argc, char *argv[]) {
engine::Storage storage(&kvrocks_config);
s = storage.Open(kDBOpenModeAsSecondaryInstance);
if (!s.IsOK()) {
- LOG(ERROR) << "Failed to open Kvrocks storage: " << s.Msg();
+ error("Failed to open Kvrocks storage: {}", s.Msg());
exit(1);
}
@@ -140,7 +142,7 @@ int main(int argc, char *argv[]) {
Sync sync(&storage, &writer, &parser, &config);
hup_handler = [&sync] {
if (!sync.IsStopped()) {
- LOG(INFO) << "Bye Bye";
+ info("Stopping sync");
sync.Stop();
}
};
diff --git a/utils/kvrocks2redis/parser.cc b/utils/kvrocks2redis/parser.cc
index e8b186786..687e997bf 100644
--- a/utils/kvrocks2redis/parser.cc
+++ b/utils/kvrocks2redis/parser.cc
@@ -178,7 +178,7 @@ Status Parser::ParseWriteBatch(const std::string
&batch_string) {
for (const auto &iter : *resp_commands) {
auto s = writer_->Write(iter.first, iter.second);
if (!s.IsOK()) {
- LOG(ERROR) << "[kvrocks2redis] Failed to write to AOF from the write
batch. Error: " << s.Msg();
+ error("Failed to write to AOF from the write batch. Error: {}", s.Msg());
}
}
diff --git a/utils/kvrocks2redis/redis_writer.cc
b/utils/kvrocks2redis/redis_writer.cc
index 74410669e..932d96ca9 100644
--- a/utils/kvrocks2redis/redis_writer.cc
+++ b/utils/kvrocks2redis/redis_writer.cc
@@ -38,7 +38,7 @@ RedisWriter::RedisWriter(kvrocks2redis::Config *config) :
Writer(config) {
assert(stop_flag_);
});
} catch (const std::system_error &e) {
- LOG(ERROR) << "[kvrocks2redis] Failed to create thread: " << e.what();
+ error("Failed to create thread: {}", e.what());
return;
}
}
@@ -85,53 +85,53 @@ void RedisWriter::Stop() {
if (t_.joinable()) t_.join();
// handled by sync func
- LOG(INFO) << "[kvrocks2redis] redis_writer Stopped";
+ info("RedisWriter Stopped");
}
void RedisWriter::sync() {
for (const auto &iter : config_->tokens) {
Status s = readNextOffsetFromFile(iter.first, &next_offsets_[iter.first]);
if (!s.IsOK()) {
- LOG(ERROR) << s.Msg();
+ error("{}", s.Msg());
return;
}
}
size_t chunk_size = 4 * 1024 * 1024;
- char *buffer = new char[chunk_size];
+ auto buffer = std::make_unique<char[]>(chunk_size);
while (!stop_flag_) {
for (const auto &iter : config_->tokens) {
Status s = GetAofFd(iter.first);
if (!s.IsOK()) {
- LOG(ERROR) << s.Msg();
+ error("{}", s.Msg());
continue;
}
s = getRedisConn(iter.first, iter.second.host, iter.second.port,
iter.second.auth, iter.second.db_number);
if (!s.IsOK()) {
- LOG(ERROR) << s.Msg();
+ error("{}", s.Msg());
continue;
}
while (true) {
- auto getted_line_leng = pread(aof_fds_[iter.first], buffer,
chunk_size, next_offsets_[iter.first]);
+ auto getted_line_leng = pread(aof_fds_[iter.first], buffer.get(),
chunk_size, next_offsets_[iter.first]);
if (getted_line_leng <= 0) {
if (getted_line_leng < 0) {
- LOG(ERROR) << "ERR read aof file : " << strerror(errno);
+ error("failed to read AOF file: {}", strerror(errno));
}
break;
}
- std::string con = std::string(buffer, getted_line_leng);
- s = util::SockSend(redis_fds_[iter.first], std::string(buffer,
getted_line_leng));
+ std::string con = std::string(buffer.get(), getted_line_leng);
+ s = util::SockSend(redis_fds_[iter.first], con);
if (!s.IsOK()) {
- LOG(ERROR) << "ERR send data to redis err: " << s.Msg();
+ error("Failed to send data to redis err: {}", s.Msg());
break;
}
auto line_state = util::SockReadLine(redis_fds_[iter.first]);
if (!line_state) {
- LOG(ERROR) << "read redis response err: " << s.Msg();
+ error("Failed to read redis response err: {}", s.Msg());
break;
}
@@ -139,23 +139,20 @@ void RedisWriter::sync() {
if (line.compare(0, 1, "-") == 0) {
// Ooops, something went wrong , sync process has been terminated,
administrator should be notified
// when full sync is needed, please remove last_next_seq config
file, and restart kvrocks2redis
- LOG(ERROR) << "[kvrocks2redis] CRITICAL - redis sync return error ,
administrator confirm needed : " << line;
- delete[] buffer;
+ error("CRITICAL - redis sync return error, administrator confirm
needed: {}", line);
Stop();
return;
}
s = updateNextOffset(iter.first, next_offsets_[iter.first] +
getted_line_leng);
if (!s.IsOK()) {
- LOG(ERROR) << "ERR updating next offset: " << s.Msg();
+ error("Failed to updating next offset: {}", s.Msg());
break;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
-
- delete[] buffer;
}
Status RedisWriter::getRedisConn(const std::string &ns, const std::string
&host, uint32_t port, const std::string &auth,
@@ -210,7 +207,7 @@ Status RedisWriter::selectDB(const std::string &ns, int
db_number) {
return s.Prefixed("failed to send SELECT command to socket");
}
- LOG(INFO) << "[kvrocks2redis] select db request was sent, waiting for
response";
+ info("select db request was sent, waiting for response");
std::string line =
GET_OR_RET(util::SockReadLine(redis_fds_[ns]).Prefixed("read select db response
err"));
if (line.compare(0, 3, "+OK") != 0) {
return {Status::NotOK, "[kvrocks2redis] redis select db failed: " + line};
diff --git a/utils/kvrocks2redis/sync.cc b/utils/kvrocks2redis/sync.cc
index 78c95e43d..8c0553a9b 100644
--- a/utils/kvrocks2redis/sync.cc
+++ b/utils/kvrocks2redis/sync.cc
@@ -55,10 +55,10 @@ Sync::~Sync() {
void Sync::Start() {
auto s = readNextSeqFromFile(&next_seq_);
if (!s.IsOK()) {
- LOG(ERROR) << s.Msg();
+ error(s.Msg());
return;
}
- LOG(INFO) << "[kvrocks2redis] Start sync the data from kvrocks to redis";
+ info("Start sync the data from kvrocks to redis");
while (!IsStopped()) {
s = checkWalBoundary();
if (!s.IsOK()) {
@@ -66,7 +66,7 @@ void Sync::Start() {
}
s = incrementBatchLoop();
if (!s.IsOK()) {
- LOG(ERROR) << s.Msg();
+ error(s.Msg());
}
}
}
@@ -75,7 +75,7 @@ void Sync::Stop() {
if (stop_flag_) return;
stop_flag_ = true; // Stopping procedure is asynchronous,
- LOG(INFO) << "[kvrocks2redis] Stopped";
+ info("Sync Stopped");
}
Status Sync::tryCatchUpWithPrimary() {
@@ -100,8 +100,8 @@ Status Sync::checkWalBoundary() {
auto batch = iter->GetBatch();
if (next_seq_ != batch.sequence) {
if (next_seq_ > batch.sequence) {
- LOG(ERROR) << "checkWALBoundary with sequence: " << next_seq_
- << ", but GetWALIter return older sequence: " <<
batch.sequence;
+ error("checkWALBoundary with sequence: {}, but GetWALIter return older
sequence: {}", next_seq_,
+ batch.sequence);
}
return {Status::NotOK};
}
@@ -111,7 +111,7 @@ Status Sync::checkWalBoundary() {
}
Status Sync::incrementBatchLoop() {
- LOG(INFO) << "[kvrocks2redis] Start parsing increment data";
+ info("Start parsing increment data");
std::unique_ptr<rocksdb::TransactionLogIterator> iter;
while (!IsStopped()) {
if (!tryCatchUpWithPrimary().IsOK()) {
@@ -123,8 +123,8 @@ Status Sync::incrementBatchLoop() {
auto batch = iter->GetBatch();
if (batch.sequence != next_seq_) {
if (next_seq_ > batch.sequence) {
- LOG(ERROR) << "checkWALBoundary with sequence: " << next_seq_
- << ", but GetWALIter return older sequence: " <<
batch.sequence;
+ error("checkWALBoundary with sequence: {}, but GetWALIter return
older sequence: {}", next_seq_,
+ batch.sequence);
}
return {Status::NotOK};
}
@@ -146,25 +146,24 @@ Status Sync::incrementBatchLoop() {
}
void Sync::parseKVFromLocalStorage() {
- LOG(INFO) << "[kvrocks2redis] Start parsing kv from the local storage";
+ info("Start parsing kv from the local storage");
for (const auto &iter : config_->tokens) {
auto s = writer_->FlushDB(iter.first);
if (!s.IsOK()) {
- LOG(ERROR) << "[kvrocks2redis] Failed to flush target redis db in
namespace: " << iter.first
- << ", encounter error: " << s.Msg();
+ error("Failed to flush target redis db in namespace: {}, encounter
error: {}", iter.first, s.Msg());
return;
}
}
Status s = parser_->ParseFullDB();
if (!s.IsOK()) {
- LOG(ERROR) << "[kvrocks2redis] Failed to parse full db, encounter error: "
<< s.Msg();
+ error("Failed to parse full db, encounter error: {}", s.Msg());
return;
}
auto last_seq = storage_->GetDB()->GetLatestSequenceNumber();
s = updateNextSeq(last_seq + 1);
if (!s.IsOK()) {
- LOG(ERROR) << "[kvrocks2redis] Failed to update next sequence: " <<
s.Msg();
+ error("Failed to update next sequence: {}", s.Msg());
}
}