This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 858d7f2ae chore(log): replace logging calls in `cluster/*` (#2894)
858d7f2ae is described below
commit 858d7f2aebaf8015c035be0ea2d1d5261599ddfa
Author: Anirudh Lakhanpal <[email protected]>
AuthorDate: Sat Apr 26 08:55:12 2025 +0530
chore(log): replace logging calls in `cluster/*` (#2894)
Co-authored-by: Twice <[email protected]>
Co-authored-by: Twice <[email protected]>
---
src/cluster/cluster.cc | 27 +++----
src/cluster/replication.cc | 154 +++++++++++++++++-------------------
src/cluster/slot_migrate.cc | 154 ++++++++++++++++--------------------
src/cluster/sync_migrate_context.cc | 3 +-
src/cluster/sync_migrate_context.h | 2 +-
5 files changed, 157 insertions(+), 183 deletions(-)
diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index 10dbf0e83..62c82c01a 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -130,7 +130,7 @@ Status Cluster::SetSlotRanges(const std::vector<SlotRange>
&slot_ranges, const s
if (migrated_slots_.count(slot) > 0) {
auto s = srv_->slot_migrator->ClearKeysOfSlotRange(ctx,
kDefaultNamespace, SlotRange::GetPoint(slot));
if (!s.ok()) {
- LOG(ERROR) << "failed to clear data of migrated slot: " <<
s.ToString();
+ error("failed to clear data of migrated slot: {}", s.ToString());
}
migrated_slots_.erase(slot);
}
@@ -213,7 +213,7 @@ Status Cluster::SetClusterNodes(const std::string
&nodes_str, int64_t version, b
if (slots_nodes_[slot] != myself_) {
auto s = srv_->slot_migrator->ClearKeysOfSlotRange(ctx,
kDefaultNamespace, SlotRange::GetPoint(slot));
if (!s.ok()) {
- LOG(ERROR) << "failed to clear data of migrated slots: " <<
s.ToString();
+ error("failed to clear data of migrated slots: {}", s.ToString());
}
}
}
@@ -246,11 +246,11 @@ Status Cluster::SetMasterSlaveRepl() {
if (!s.IsOK()) {
return s.Prefixed("failed to remove master");
}
- LOG(INFO) << "MASTER MODE enabled by cluster topology setting";
+ info("MASTER MODE enabled by cluster topology setting");
if (srv_->slot_migrator && is_cluster_enabled && is_slave) {
// Slave -> Master
srv_->slot_migrator->SetStopMigrationFlag(false);
- LOG(INFO) << "Change server role to master, restart migration task";
+ info("Change server role to master, restart migration task");
}
return Status::OK();
}
@@ -261,16 +261,16 @@ Status Cluster::SetMasterSlaveRepl() {
std::shared_ptr<ClusterNode> master = it->second;
auto s = srv_->AddMaster(master->host, master->port, false);
if (!s.IsOK()) {
- LOG(WARNING) << "SLAVE OF " << master->host << ":" << master->port
- << " wasn't enabled by cluster topology setting, encounter
error: " << s.Msg();
+ warn("SLAVE OF {}:{} wasn't enabled by cluster topology setting,
encounter error: {}", master->host, master->port,
+ s.Msg());
return s.Prefixed("failed to add master");
}
if (srv_->slot_migrator && is_cluster_enabled && !is_slave) {
// Master -> Slave
srv_->slot_migrator->SetStopMigrationFlag(true);
- LOG(INFO) << "Change server role to slave, stop migration task";
+ info("Change server role to slave, stop migration task");
}
- LOG(INFO) << fmt::format("SLAVE OF {}:{} enabled by cluster topology
setting", master->host, master->port);
+ info("SLAVE OF {}:{} enabled by cluster topology setting", master->host,
master->port);
}
return Status::OK();
@@ -374,7 +374,7 @@ Status Cluster::ImportSlotRange(redis::Connection *conn,
const SlotRange &slot_r
conn->close_cb = [object_ptr = srv_->slot_import.get(),
slot_range]([[maybe_unused]] int fd) {
auto s = object_ptr->StopForLinkError();
if (!s.IsOK()) {
- LOG(ERROR) << fmt::format("[import] Failed to stop importing slot(s)
{}: {}", slot_range.String(), s.Msg());
+ error("[import] Failed to stop importing slot(s) {}: {}",
slot_range.String(), s.Msg());
}
}; // Stop forbidding writing slot to accept write commands
if (slot_range.HasOverlap(srv_->slot_migrator->GetForbiddenSlotRange()))
{
@@ -384,17 +384,17 @@ Status Cluster::ImportSlotRange(redis::Connection *conn,
const SlotRange &slot_r
// supported in the future.
srv_->slot_migrator->ReleaseForbiddenSlotRange();
}
- LOG(INFO) << fmt::format("[import] Start importing slot(s) {}",
slot_range.String());
+ info("[import] Start importing slot(s) {}", slot_range.String());
break;
case kImportSuccess:
s = srv_->slot_import->Success(slot_range);
if (!s.IsOK()) return s;
- LOG(INFO) << fmt::format("[import] Mark the importing slot(s) {} as
succeed", slot_range.String());
+ info("[import] Mark the importing slot(s) {} as succeed",
slot_range.String());
break;
case kImportFailed:
s = srv_->slot_import->Fail(slot_range);
if (!s.IsOK()) return s;
- LOG(INFO) << fmt::format("[import] Mark the importing slot(s) {} as
failed", slot_range.String());
+ info("[import] Mark the importing slot(s) {} as failed",
slot_range.String());
break;
default:
return {Status::NotOK, errInvalidImportState};
@@ -691,8 +691,7 @@ Status Cluster::DumpClusterNodes(const std::string &file) {
Status Cluster::LoadClusterNodes(const std::string &file_path) {
if (rocksdb::Env::Default()->FileExists(file_path).IsNotFound()) {
- LOG(INFO) << fmt::format("The cluster nodes file {} is not found. Use
CLUSTERX subcommands to specify it.",
- file_path);
+ info("The cluster nodes file {} is not found. Use CLUSTERX subcommands to
specify it.", file_path);
return Status::OK();
}
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 3541cebb6..e032b87be 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -67,7 +67,7 @@ Status FeedSlaveThread::Start() {
pthread_sigmask(SIG_BLOCK, &mask, &omask);
auto s = util::SockSend(conn_->GetFD(), redis::RESP_OK,
conn_->GetBufferEvent());
if (!s.IsOK()) {
- LOG(ERROR) << "failed to send OK response to the replica: " << s.Msg();
+ error("failed to send OK response to the replica: {}", s.Msg());
return;
}
this->loop();
@@ -84,12 +84,12 @@ Status FeedSlaveThread::Start() {
void FeedSlaveThread::Stop() {
stop_ = true;
- LOG(WARNING) << "Slave thread was terminated, would stop feeding the slave:
" << conn_->GetAddr();
+ warn("Slave thread was terminated, would stop feeding the slave: {}",
conn_->GetAddr());
}
void FeedSlaveThread::Join() {
if (auto s = util::ThreadJoin(t_); !s) {
- LOG(WARNING) << "Slave thread operation failed: " << s.Msg();
+ warn("Slave thread operation failed: {}", s.Msg());
}
}
@@ -98,7 +98,7 @@ void FeedSlaveThread::checkLivenessIfNeed() {
const auto ping_command = redis::BulkString("ping");
auto s = util::SockSend(conn_->GetFD(), ping_command,
conn_->GetBufferEvent());
if (!s.IsOK()) {
- LOG(ERROR) << "Ping slave[" << conn_->GetAddr() << "] err: " << s.Msg() <<
", would stop the thread";
+ error("Ping slave [{}] err: {}, would stop the thread", conn_->GetAddr(),
s.Msg());
Stop();
}
}
@@ -115,7 +115,7 @@ void FeedSlaveThread::loop() {
auto curr_seq = next_repl_seq_.load();
if (!iter_ || !iter_->Valid()) {
- if (iter_) LOG(INFO) << "WAL was rotated, would reopen again";
+ if (iter_) info("WAL was rotated, would reopen again");
if (!srv_->storage->WALHasNewData(curr_seq) ||
!srv_->storage->GetWALIter(curr_seq, &iter_).IsOK()) {
iter_ = nullptr;
usleep(yield_microseconds);
@@ -126,8 +126,9 @@ void FeedSlaveThread::loop() {
// iter_ would be always valid here
auto batch = iter_->GetBatch();
if (batch.sequence != curr_seq) {
- LOG(ERROR) << "Fatal error encountered, WAL iterator is discrete, some
seq might be lost"
- << ", sequence " << curr_seq << " expected, but got " <<
batch.sequence;
+ error(
+ "Fatal error encountered, WAL iterator is discrete, some seq might
be lost, sequence {} expected, but got {}",
+ curr_seq, batch.sequence);
Stop();
return;
}
@@ -147,8 +148,7 @@ void FeedSlaveThread::loop() {
// Send entire bulk which contain multiple batches
auto s = util::SockSend(conn_->GetFD(), batches_bulk,
conn_->GetBufferEvent());
if (!s.IsOK()) {
- LOG(ERROR) << "Write error while sending batch to slave: " << s.Msg()
<< ". batches: 0x"
- << util::StringToHex(batches_bulk);
+ error("Write error while sending batch to slave: {}. batches: 0x{}",
s.Msg(), util::StringToHex(batches_bulk));
Stop();
return;
}
@@ -181,7 +181,7 @@ void
ReplicationThread::CallbacksStateMachine::ConnEventCB(bufferevent *bev, int
return;
}
if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) {
- LOG(ERROR) << "[replication] connection error/eof, reconnect the master";
+ error("[replication] connection error/eof, reconnect the master");
// Wait a bit and reconnect
repl_->repl_state_.store(kReplConnecting, std::memory_order_relaxed);
std::this_thread::sleep_for(std::chrono::seconds(1));
@@ -203,7 +203,7 @@ void
ReplicationThread::CallbacksStateMachine::SetWriteCB(bufferevent *bev, buff
void ReplicationThread::CallbacksStateMachine::ReadWriteCB(bufferevent *bev) {
LOOP_LABEL:
assert(handler_idx_ <= handlers_.size());
- DLOG(INFO) << "[replication] Execute handler[" <<
getHandlerName(handler_idx_) << "]";
+ debug("[replication] Execute handler[{}]", getHandlerName(handler_idx_));
auto st = getHandlerFunc(handler_idx_)(repl_, bev);
repl_->last_io_time_secs_.store(util::GetTimeStamp(),
std::memory_order_relaxed);
switch (st) {
@@ -230,11 +230,11 @@ LOOP_LABEL:
case CBState::RESTART: // state that can be retried some time later
Stop();
if (repl_->stop_flag_) {
- LOG(INFO) << "[replication] Wouldn't restart while the replication
thread was stopped";
+ info("[replication] Wouldn't restart while the replication thread was
stopped");
break;
}
repl_->repl_state_.store(kReplConnecting, std::memory_order_relaxed);
- LOG(INFO) << "[replication] Retry in 10 seconds";
+ info("[replication] Retry in 10 seconds");
std::this_thread::sleep_for(std::chrono::seconds(10));
Start();
}
@@ -264,7 +264,7 @@ void ReplicationThread::CallbacksStateMachine::Start() {
last_connect_timestamp = util::GetTimeStampMS();
auto cfd = util::SockConnect(repl_->host_, repl_->port_,
repl_->srv_->GetConfig()->replication_connect_timeout_ms);
if (!cfd) {
- LOG(ERROR) << "[replication] Failed to connect the master, err: " <<
cfd.Msg();
+ error("[replication] Failed to connect the master, err: {}", cfd.Msg());
continue;
}
#ifdef ENABLE_OPENSSL
@@ -272,7 +272,7 @@ void ReplicationThread::CallbacksStateMachine::Start() {
if (repl_->srv_->GetConfig()->tls_replication) {
ssl = SSL_new(repl_->srv_->ssl_ctx.get());
if (!ssl) {
- LOG(ERROR) << "Failed to construct SSL structure for new connection: "
<< SSLErrors{};
+ error("Failed to construct SSL structure for new connection: {}",
fmt::streamed(SSLErrors{}));
evutil_closesocket(*cfd);
return;
}
@@ -288,7 +288,7 @@ void ReplicationThread::CallbacksStateMachine::Start() {
if (ssl) SSL_free(ssl);
#endif
close(*cfd);
- LOG(ERROR) << "[replication] Failed to create the event socket";
+ error("[replication] Failed to create the event socket");
continue;
}
#ifdef ENABLE_OPENSSL
@@ -346,9 +346,9 @@ Status ReplicationThread::Start(std::function<bool()>
&&pre_fullsync_cb, std::fu
// Clean synced checkpoint from old master because replica starts to follow
new master
auto s = rocksdb::DestroyDB(srv_->GetConfig()->sync_checkpoint_dir,
rocksdb::Options());
if (!s.ok()) {
- LOG(WARNING) << "Can't clean synced checkpoint from master, error: " <<
s.ToString();
+ warn("Can't clean synced checkpoint from master, error: {}", s.ToString());
} else {
- LOG(WARNING) << "Clean old synced checkpoint successfully";
+ warn("Clean old synced checkpoint successfully");
}
// cleanup the old backups, so we can start replication in a clean state
@@ -368,9 +368,9 @@ void ReplicationThread::Stop() {
stop_flag_ = true; // Stopping procedure is asynchronous,
// handled by timer
if (auto s = util::ThreadJoin(t_); !s) {
- LOG(WARNING) << "Replication thread operation failed: " << s.Msg();
+ warn("Replication thread operation failed: {}", s.Msg());
}
- LOG(INFO) << "[replication] Stopped";
+ info("[replication] Stopped");
}
/*
@@ -384,7 +384,7 @@ void ReplicationThread::Stop() {
void ReplicationThread::run() {
base_ = event_base_new();
if (base_ == nullptr) {
- LOG(ERROR) << "[replication] Failed to create new ev base";
+ error("[replication] Failed to create new ev base");
return;
}
psync_steps_.Start();
@@ -400,7 +400,7 @@ void ReplicationThread::run() {
ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev) {
SendString(bev, redis::ArrayOfBulkStrings({"AUTH",
srv_->GetConfig()->masterauth}));
- LOG(INFO) << "[replication] Auth request was sent, waiting for response";
+ info("[replication] Auth request was sent, waiting for response");
repl_state_.store(kReplSendAuth, std::memory_order_relaxed);
return CBState::NEXT;
}
@@ -413,17 +413,17 @@ ReplicationThread::CBState
ReplicationThread::authReadCB(bufferevent *bev) { //
if (!line) return CBState::AGAIN;
if (!ResponseLineIsOK(line.View())) {
// Auth failed
- LOG(ERROR) << "[replication] Auth failed: " << line.get();
+ error("[replication] Auth failed: {}", line.get());
return CBState::RESTART;
}
- LOG(INFO) << "[replication] Auth response was received, continue...";
+ info("[replication] Auth response was received, continue...");
return CBState::NEXT;
}
ReplicationThread::CBState ReplicationThread::checkDBNameWriteCB(bufferevent
*bev) {
SendString(bev, redis::ArrayOfBulkStrings({"_db_name"}));
repl_state_.store(kReplCheckDBName, std::memory_order_relaxed);
- LOG(INFO) << "[replication] Check db name request was sent, waiting for
response";
+ info("[replication] Check db name request was sent, waiting for response");
return CBState::NEXT;
}
@@ -434,19 +434,19 @@ ReplicationThread::CBState
ReplicationThread::checkDBNameReadCB(bufferevent *bev
if (line[0] == '-') {
if (isRestoringError(line.View())) {
- LOG(WARNING) << "The master was restoring the db, retry later";
+ warn("The master was restoring the db, retry later");
} else {
- LOG(ERROR) << "Failed to get the db name, " << line.get();
+ error("Failed to get the db name, {}", line.get());
}
return CBState::RESTART;
}
std::string db_name = storage_->GetName();
if (line.length == db_name.size() && !strncmp(line.get(), db_name.data(),
line.length)) {
// DB name match, we should continue to next step: TryPsync
- LOG(INFO) << "[replication] DB name is valid, continue...";
+ info("[replication] DB name is valid, continue...");
return CBState::NEXT;
}
- LOG(ERROR) << "[replication] Mismatched the db name, local: " << db_name <<
", remote: " << line.get();
+ error("[replication] Mismatched the db name, local: {}, remote: {}",
db_name, line.get());
return CBState::RESTART;
}
@@ -461,7 +461,7 @@ ReplicationThread::CBState
ReplicationThread::replConfWriteCB(bufferevent *bev)
}
SendString(bev, redis::ArrayOfBulkStrings(data_to_send));
repl_state_.store(kReplReplConf, std::memory_order_relaxed);
- LOG(INFO) << "[replication] replconf request was sent, waiting for response";
+ info("[replication] replconf request was sent, waiting for response");
return CBState::NEXT;
}
@@ -473,21 +473,20 @@ ReplicationThread::CBState
ReplicationThread::replConfReadCB(bufferevent *bev) {
// on unknown option: first try without announce ip, if it fails again - do
nothing (to prevent infinite loop)
if (isUnknownOption(line.View()) && !next_try_without_announce_ip_address_) {
next_try_without_announce_ip_address_ = true;
- LOG(WARNING) << "The old version master, can't handle ip-address, "
- << "try without it again";
+ warn("The old version master, can't handle ip-address, try without it
again");
// Retry previous state, i.e. send replconf again
return CBState::PREV;
}
if (line[0] == '-' && isRestoringError(line.View())) {
- LOG(WARNING) << "The master was restoring the db, retry later";
+ warn("The master was restoring the db, retry later");
return CBState::RESTART;
}
if (!ResponseLineIsOK(line.View())) {
- LOG(WARNING) << "[replication] Failed to replconf: " << line.get() + 1;
+ warn("[replication] Failed to replconf: {}", line.get() + 1);
// backward compatible with old version that doesn't support replconf cmd
return CBState::NEXT;
} else {
- LOG(INFO) << "[replication] replconf is ok, start psync";
+ info("[replication] replconf is ok, start psync");
return CBState::NEXT;
}
}
@@ -517,12 +516,11 @@ ReplicationThread::CBState
ReplicationThread::tryPSyncWriteCB(bufferevent *bev)
if (!srv_->GetConfig()->use_rsid_psync || next_try_old_psync_ ||
replid.length() != kReplIdLength) {
next_try_old_psync_ = false; // Reset next_try_old_psync_
SendString(bev, redis::ArrayOfBulkStrings({"PSYNC",
std::to_string(next_seq)}));
- LOG(INFO) << "[replication] Try to use psync, next seq: " << next_seq;
+ info("[replication] Try to use psync, next seq: {}", next_seq);
} else {
// NEW PSYNC "Unique Replication Sequence ID": replication id and sequence
id
SendString(bev, redis::ArrayOfBulkStrings({"PSYNC", replid,
std::to_string(next_seq)}));
- LOG(INFO) << "[replication] Try to use new psync, current unique
replication sequence id: " << replid << ":"
- << cur_seq;
+ info("[replication] Try to use new psync, current unique replication
sequence id: {}:{}", replid, cur_seq);
}
repl_state_.store(kReplSendPSync, std::memory_order_relaxed);
return CBState::NEXT;
@@ -534,14 +532,13 @@ ReplicationThread::CBState
ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
if (!line) return CBState::AGAIN;
if (line[0] == '-' && isRestoringError(line.View())) {
- LOG(WARNING) << "The master was restoring the db, retry later";
+ warn("The master was restoring the db, retry later");
return CBState::RESTART;
}
if (line[0] == '-' && isWrongPsyncNum(line.View())) {
next_try_old_psync_ = true;
- LOG(WARNING) << "The old version master, can't handle new PSYNC, "
- << "try old PSYNC again";
+ warn("The old version master, can't handle new PSYNC, try old PSYNC
again");
// Retry previous state, i.e. send PSYNC again
return CBState::PREV;
}
@@ -550,11 +547,11 @@ ReplicationThread::CBState
ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
// PSYNC isn't OK, we should use FullSync
// Switch to fullsync state machine
fullsync_steps_.Start();
- LOG(INFO) << "[replication] Failed to psync, error: " << line.get() << ",
switch to fullsync";
+ info("[replication] Failed to psync, error: {}, switch to fullsync",
line.get());
return CBState::QUIT;
} else {
// PSYNC is OK, use IncrementBatchLoop
- LOG(INFO) << "[replication] PSync is ok, start increment batch loop";
+ info("[replication] PSync is ok, start increment batch loop");
return CBState::NEXT;
}
}
@@ -570,7 +567,7 @@ ReplicationThread::CBState
ReplicationThread::incrementBatchLoopCB(bufferevent *
if (!line) return CBState::AGAIN;
incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1,
nullptr, 10) : 0;
if (incr_bulk_len_ == 0) {
- LOG(ERROR) << "[replication] Invalid increment data size";
+ error("[replication] Invalid increment data size");
return CBState::RESTART;
}
incr_state_ = Incr_batch_data;
@@ -598,15 +595,15 @@ ReplicationThread::CBState
ReplicationThread::incrementBatchLoopCB(bufferevent *
auto s = storage_->ReplicaApplyWriteBatch(&batch);
if (!s.IsOK()) {
- LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to
local, " << s.Msg() << ". batch: 0x"
- << util::StringToHex(batch.Data());
+ error("[replication] CRITICAL - Failed to write batch to local, {}.
batch: 0x{}", s.Msg(),
+ util::StringToHex(batch.Data()));
return CBState::RESTART;
}
s = parseWriteBatch(batch);
if (!s.IsOK()) {
- LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch
0x" << util::StringToHex(batch.Data())
- << ": " << s.Msg();
+ error("[replication] CRITICAL - failed to parse write batch 0x{}:
{}", util::StringToHex(batch.Data()),
+ s.Msg());
return CBState::RESTART;
}
@@ -618,7 +615,7 @@ ReplicationThread::CBState
ReplicationThread::incrementBatchLoopCB(bufferevent *
ReplicationThread::CBState ReplicationThread::fullSyncWriteCB(bufferevent
*bev) {
SendString(bev, redis::ArrayOfBulkStrings({"_fetch_meta"}));
repl_state_.store(kReplFetchMeta, std::memory_order_relaxed);
- LOG(INFO) << "[replication] Start syncing data with fullsync";
+ info("[replication] Start syncing data with fullsync");
return CBState::NEXT;
}
@@ -634,31 +631,31 @@ ReplicationThread::CBState
ReplicationThread::fullSyncReadCB(bufferevent *bev) {
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (line[0] == '-') {
- LOG(ERROR) << "[replication] Failed to fetch meta id: " << line.get();
+ error("[replication] Failed to fetch meta id: {}", line.get());
return CBState::RESTART;
}
fullsync_meta_id_ = static_cast<rocksdb::BackupID>(line.length > 0 ?
std::strtoul(line.get(), nullptr, 10) : 0);
if (fullsync_meta_id_ == 0) {
- LOG(ERROR) << "[replication] Invalid meta id received";
+ error("[replication] Invalid meta id received");
return CBState::RESTART;
}
fullsync_state_ = kFetchMetaSize;
- LOG(INFO) << "[replication] Succeed fetching meta id: " <<
fullsync_meta_id_;
+ info("[replication] Succeed fetching meta id: {}", fullsync_meta_id_);
}
case kFetchMetaSize: {
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (line[0] == '-') {
- LOG(ERROR) << "[replication] Failed to fetch meta size: " <<
line.get();
+ error("[replication] Failed to fetch meta size: {}", line.get());
return CBState::RESTART;
}
fullsync_filesize_ = line.length > 0 ? std::strtoull(line.get(),
nullptr, 10) : 0;
if (fullsync_filesize_ == 0) {
- LOG(ERROR) << "[replication] Invalid meta file size received";
+ error("[replication] Invalid meta file size received");
return CBState::RESTART;
}
fullsync_state_ = kFetchMetaContent;
- LOG(INFO) << "[replication] Succeed fetching meta size: " <<
fullsync_filesize_;
+ info("[replication] Succeed fetching meta size: {}", fullsync_filesize_);
}
case kFetchMetaContent: {
std::string target_dir;
@@ -670,7 +667,7 @@ ReplicationThread::CBState
ReplicationThread::fullSyncReadCB(bufferevent *bev) {
}
auto s = engine::Storage::ReplDataManager::ParseMetaAndSave(storage_,
fullsync_meta_id_, input, &meta);
if (!s.IsOK()) {
- LOG(ERROR) << "[replication] Failed to parse meta and save: " <<
s.Msg();
+ error("[replication] Failed to parse meta and save: {}", s.Msg());
return CBState::AGAIN;
}
target_dir = srv_->GetConfig()->backup_sync_dir;
@@ -679,7 +676,7 @@ ReplicationThread::CBState
ReplicationThread::fullSyncReadCB(bufferevent *bev) {
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
if (line[0] == '-') {
- LOG(ERROR) << "[replication] Failed to fetch meta info: " <<
line.get();
+ error("[replication] Failed to fetch meta info: {}", line.get());
return CBState::RESTART;
}
std::vector<std::string> need_files =
util::Split(std::string(line.get()), ",");
@@ -695,18 +692,17 @@ ReplicationThread::CBState
ReplicationThread::fullSyncReadCB(bufferevent *bev) {
if (iter != need_files.end()) need_files.erase(iter);
auto s = engine::Storage::ReplDataManager::CleanInvalidFiles(storage_,
target_dir, need_files);
if (!s.IsOK()) {
- LOG(WARNING) << "[replication] Failed to clean up invalid files of
the old checkpoint,"
- << " error: " << s.Msg();
- LOG(WARNING) << "[replication] Try to clean all checkpoint files";
+ warn("[replication] Failed to clean up invalid files of the old
checkpoint, error: {}", s.Msg());
+ warn("[replication] Try to clean all checkpoint files");
auto s = rocksdb::DestroyDB(target_dir, rocksdb::Options());
if (!s.ok()) {
- LOG(WARNING) << "[replication] Failed to clean all checkpoint
files, error: " << s.ToString();
+ warn("[replication] Failed to clean all checkpoint files, error:
{}", s.ToString());
}
}
}
assert(evbuffer_get_length(input) == 0);
fullsync_state_ = kFetchMetaID;
- LOG(INFO) << "[replication] Succeeded fetching full data files info,
fetching files in parallel";
+ info("[replication] Succeeded fetching full data files info, fetching
files in parallel");
bool pre_fullsync_done = false;
// If 'slave-empty-db-before-fullsync' is yes, we call 'pre_fullsync_cb_'
@@ -722,10 +718,10 @@ ReplicationThread::CBState
ReplicationThread::fullSyncReadCB(bufferevent *bev) {
auto s = parallelFetchFile(target_dir, meta.files);
if (!s.IsOK()) {
if (pre_fullsync_done) post_fullsync_cb_();
- LOG(ERROR) << "[replication] Failed to parallel fetch files while " +
s.Msg();
+ error("[replication] Failed to parallel fetch files while {}",
s.Msg());
return CBState::RESTART;
}
- LOG(INFO) << "[replication] Succeeded fetching files in parallel,
restoring the backup";
+ info("[replication] Succeeded fetching files in parallel, restoring the
backup");
// Don't need to call 'pre_fullsync_cb_' again if it was called before
if (!pre_fullsync_done && !pre_fullsync_cb_()) return CBState::RESTART;
@@ -737,18 +733,18 @@ ReplicationThread::CBState
ReplicationThread::fullSyncReadCB(bufferevent *bev) {
s = storage_->RestoreFromCheckpoint();
}
if (!s.IsOK()) {
- LOG(ERROR) << "[replication] Failed to restore backup while " +
s.Msg() + ", restart fullsync";
+ error("[replication] Failed to restore backup while {}, restart
fullsync", s.Msg());
post_fullsync_cb_();
return CBState::RESTART;
}
- LOG(INFO) << "[replication] Succeeded restoring the backup, fullsync was
finish";
+ info("[replication] Succeeded restoring the backup, fullsync was
finish");
post_fullsync_cb_();
// It needs to reload namespaces from DB after the full sync is done,
// or namespaces are not visible in the replica.
s = srv_->GetNamespace()->LoadAndRewrite();
if (!s.IsOK()) {
- LOG(ERROR) << "[replication] Failed to load and rewrite namespace: "
<< s.Msg();
+ error("[replication] Failed to load and rewrite namespace: {}",
s.Msg());
}
// Switch to psync state machine again
@@ -756,10 +752,7 @@ ReplicationThread::CBState
ReplicationThread::fullSyncReadCB(bufferevent *bev) {
return CBState::QUIT;
}
}
-
- LOG(ERROR) << "Should not arrive here";
- assert(false);
- return CBState::QUIT;
+ unreachable();
}
Status ReplicationThread::parallelFetchFile(const std::string &dir,
@@ -810,9 +803,8 @@ Status ReplicationThread::parallelFetchFile(const
std::string &dir,
skip_cnt.fetch_add(1);
uint32_t cur_skip_cnt = skip_cnt.load();
uint32_t cur_fetch_cnt = fetch_cnt.load();
- LOG(INFO) << "[skip] " << f_name << " " << f_crc << ", skip
count: " << cur_skip_cnt
- << ", fetch count: " << cur_fetch_cnt << ", progress:
" << cur_skip_cnt + cur_fetch_cnt << "/"
- << files.size();
+ info("[skip] {} {}, skip count: {}, fetch count: {}, progress:
{} / {}", f_name, f_crc, cur_skip_cnt,
+ cur_fetch_cnt, (cur_skip_cnt + cur_fetch_cnt),
files.size());
continue;
}
fetch_files.push_back(f_name);
@@ -824,10 +816,8 @@ Status ReplicationThread::parallelFetchFile(const
std::string &dir,
fetch_cnt.fetch_add(1);
uint32_t cur_skip_cnt = skip_cnt.load();
uint32_t cur_fetch_cnt = fetch_cnt.load();
- LOG(INFO) << "[fetch] "
- << "Fetched " << fetch_file << ", crc32: " << fetch_crc
<< ", skip count: " << cur_skip_cnt
- << ", fetch count: " << cur_fetch_cnt << ", progress: "
<< cur_skip_cnt + cur_fetch_cnt << "/"
- << files_count;
+ info("[fetch] Fetched {}, crc32 {}, skip count: {}, fetch count:
{}, progress: {} / {}", fetch_file,
+ fetch_crc, cur_skip_cnt, cur_fetch_cnt, cur_skip_cnt +
cur_fetch_cnt, files_count);
};
// For master using old version, it only supports to fetch a single
file by one
// command, so we need to fetch all files by multiple command
interactions.
@@ -962,14 +952,14 @@ Status ReplicationThread::fetchFiles(int sock_fd, const
std::string &dir, const
UniqueEvbuf evbuf;
for (unsigned i = 0; i < files.size(); i++) {
- DLOG(INFO) << "[fetch] Start to fetch file " << files[i];
+ debug("[fetch] Start to fetch file {}", files[i]);
s = fetchFile(sock_fd, evbuf.get(), dir, files[i], crcs[i], fn, ssl);
if (!s.IsOK()) {
s = Status(Status::NotOK, "fetch file err: " + s.Msg());
- LOG(WARNING) << "[fetch] Fail to fetch file " << files[i] << ", err: "
<< s.Msg();
+ warn("[fetch] Fail to fetch file {}, err: {}", files[i], s.Msg());
break;
}
- DLOG(INFO) << "[fetch] Succeed fetching file " << files[i];
+ debug("[fetch] Succeed fetching file {}", files[i]);
// Just for tests
if (srv_->GetConfig()->fullsync_recv_file_delay) {
@@ -983,7 +973,7 @@ Status ReplicationThread::fetchFiles(int sock_fd, const
std::string &dir, const
void ReplicationThread::TimerCB(int, int16_t) {
// DLOG(INFO) << "[replication] timer";
if (stop_flag_) {
- LOG(INFO) << "[replication] Stop ev loop";
+ info("[replication] Stop ev loop");
event_base_loopbreak(base_);
psync_steps_.Stop();
fullsync_steps_.Stop();
diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index dd6e2efa1..0440a4b50 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -122,8 +122,7 @@ Status SlotMigrator::PerformSlotRangeMigration(const
std::string &node_id, std::
migration_job_ = std::move(job);
job_cv_.notify_one();
}
-
- LOG(INFO) << "[migrate] Start migrating slot(s) " << slot_range.String() <<
" to " << dst_ip << ":" << dst_port;
+ info("[migrate] Start migrating slot(s) {} to {}:{}", slot_range.String(),
dst_ip, dst_port);
return Status::OK();
}
@@ -134,7 +133,7 @@ SlotMigrator::~SlotMigrator() {
thread_state_ = ThreadState::Terminated;
job_cv_.notify_all();
if (auto s = util::ThreadJoin(t_); !s) {
- LOG(WARNING) << "Slot migrating thread operation failed: " << s.Msg();
+ warn("Slot migrating thread operation failed: {}", s.Msg());
}
}
}
@@ -159,11 +158,9 @@ void SlotMigrator::loop() {
clean();
return;
}
-
- LOG(INFO) << "[migrate] Migrating slot(s): " <<
migration_job_->slot_range.String()
- << ", dst_ip: " << migration_job_->dst_ip << ", dst_port: " <<
migration_job_->dst_port
- << ", max_speed: " << migration_job_->max_speed
- << ", max_pipeline_size: " << migration_job_->max_pipeline_size;
+ info("[migrate] Migrating slot(s): {}, dst_ip: {}, dst_port: {},
max_speed: {}, max_pipeline_size: {}",
+ migration_job_->slot_range.String(), migration_job_->dst_ip,
migration_job_->dst_port,
+ migration_job_->max_speed, migration_job_->max_pipeline_size);
dst_ip_ = migration_job_->dst_ip;
dst_port_ = migration_job_->dst_port;
@@ -180,7 +177,7 @@ void SlotMigrator::runMigrationProcess() {
while (true) {
if (isTerminated()) {
- LOG(WARNING) << "[migrate] Will stop state machine, because the thread
was terminated";
+ warn("[migrate] Will stop state machine, because the thread was
terminated");
clean();
return;
}
@@ -189,11 +186,10 @@ void SlotMigrator::runMigrationProcess() {
case SlotMigrationStage::kStart: {
auto s = startMigration();
if (s.IsOK()) {
- LOG(INFO) << "[migrate] Succeed to start migrating slot(s) " <<
slot_range_.load().String();
+ info("[migrate] Succeed to start migrating slot(s) {}",
slot_range_.load().String());
current_stage_ = SlotMigrationStage::kSnapshot;
} else {
- LOG(ERROR) << "[migrate] Failed to start migrating slot(s) " <<
slot_range_.load().String()
- << ". Error: " << s.Msg();
+ error("[migrate] Failed to start migrating slot(s) {}. Error: {}",
slot_range_.load().String(), s.Msg());
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
@@ -204,8 +200,7 @@ void SlotMigrator::runMigrationProcess() {
if (s.IsOK()) {
current_stage_ = SlotMigrationStage::kWAL;
} else {
- LOG(ERROR) << "[migrate] Failed to send snapshot of slot(s) " <<
slot_range_.load().String()
- << ". Error: " << s.Msg();
+ error("[migrate] Failed to send snapshot of slot(s) {}. Error: {}",
slot_range_.load().String(), s.Msg());
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
@@ -214,11 +209,10 @@ void SlotMigrator::runMigrationProcess() {
case SlotMigrationStage::kWAL: {
auto s = syncWAL();
if (s.IsOK()) {
- LOG(INFO) << "[migrate] Succeed to sync from WAL for slot(s) " <<
slot_range_.load().String();
+ info("[migrate] Succeed to sync from WAL for slot(s) {}",
slot_range_.load().String());
current_stage_ = SlotMigrationStage::kSuccess;
} else {
- LOG(ERROR) << "[migrate] Failed to sync from WAL for slot(s) " <<
slot_range_.load().String()
- << ". Error: " << s.Msg();
+ error("[migrate] Failed to sync from WAL for slot(s) {}. Error: {}",
slot_range_.load().String(), s.Msg());
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
@@ -227,13 +221,13 @@ void SlotMigrator::runMigrationProcess() {
case SlotMigrationStage::kSuccess: {
auto s = finishSuccessfulMigration();
if (s.IsOK()) {
- LOG(INFO) << "[migrate] Succeed to migrate slot(s) " <<
slot_range_.load().String();
+ info("[migrate] Succeed to migrate slot(s) {}",
slot_range_.load().String());
current_stage_ = SlotMigrationStage::kClean;
migration_state_ = MigrationState::kSuccess;
resumeSyncCtx(s);
} else {
- LOG(ERROR) << "[migrate] Failed to finish a successful migration of
slot(s) " << slot_range_.load().String()
- << ". Error: " << s.Msg();
+ error("[migrate] Failed to finish a successful migration of slot(s)
{}. Error: {}",
+ slot_range_.load().String(), s.Msg());
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
@@ -242,10 +236,10 @@ void SlotMigrator::runMigrationProcess() {
case SlotMigrationStage::kFailed: {
auto s = finishFailedMigration();
if (!s.IsOK()) {
- LOG(ERROR) << "[migrate] Failed to finish a failed migration of
slot(s) " << slot_range_.load().String()
- << ". Error: " << s.Msg();
+ error("[migrate] Failed to finish a failed migration of slot(s) {}.
Error: {}", slot_range_.load().String(),
+ s.Msg());
}
- LOG(INFO) << "[migrate] Failed to migrate a slot(s) " <<
slot_range_.load().String();
+ info("[migrate] Failed to migrate a slot(s) {}",
slot_range_.load().String());
migration_state_ = MigrationState::kFailed;
current_stage_ = SlotMigrationStage::kClean;
break;
@@ -255,7 +249,7 @@ void SlotMigrator::runMigrationProcess() {
return;
}
default:
- LOG(ERROR) << "[migrate] Unexpected state for the state machine: " <<
static_cast<int>(current_stage_);
+ error("[migrate] Unexpected state for the state machine: {}",
static_cast<int>(current_stage_));
clean();
return;
}
@@ -302,13 +296,11 @@ Status SlotMigrator::startMigration() {
if (migration_type_ == MigrationType::kRawKeyValue) {
bool supported = GET_OR_RET(supportedApplyBatchCommandOnDstNode(*dst_fd_));
if (!supported) {
- LOG(INFO) << "APPLYBATCH command is not supported, use redis command for
migration";
+ info("APPLYBATCH command is not supported, use redis command for
migration");
migration_type_ = MigrationType::kRedisCommand;
}
}
-
- LOG(INFO) << "[migrate] Start migrating slot(s) " <<
slot_range_.load().String() << ", connect destination fd "
- << *dst_fd_;
+ info("[migrate] Start migrating slot(s) {}, connect destination fd {}",
slot_range_.load().String(), *dst_fd_);
return Status::OK();
}
@@ -337,12 +329,11 @@ Status SlotMigrator::sendSnapshotByCmd() {
uint64_t empty_key_cnt = 0;
std::string restore_cmds;
SlotRange slot_range = slot_range_;
-
- LOG(INFO) << "[migrate] Start migrating snapshot of slot(s): " <<
slot_range.String();
+ info("[migrate] Start migrating snapshot of slot(s): {}",
slot_range.String());
// Construct key prefix to iterate the keys belong to the target slot
std::string prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
- LOG(INFO) << "[migrate] Iterate keys of slot(s), key's prefix: " << prefix;
+ info("[migrate] Iterate keys of slot(s), key's prefix: {}", prefix);
std::string upper_bound = ComposeSlotKeyUpperBound(namespace_,
slot_range.end);
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
@@ -379,23 +370,23 @@ Status SlotMigrator::sendSnapshotByCmd() {
}
if (*result == KeyMigrationResult::kMigrated) {
- LOG(INFO) << "[migrate] The key " << user_key << " successfully
migrated";
+ info("[migrate] The key {} successfully migrated", user_key);
migrated_key_cnt++;
} else if (*result == KeyMigrationResult::kExpired) {
- LOG(INFO) << "[migrate] The key " << user_key << " is expired";
+ info("[migrate] The key {} is expired", user_key);
expired_key_cnt++;
} else if (*result == KeyMigrationResult::kUnderlyingStructEmpty) {
- LOG(INFO) << "[migrate] The key " << user_key << " has no elements";
+ info("[migrate] The key {} has no elements", user_key);
empty_key_cnt++;
} else {
- LOG(ERROR) << "[migrate] Migrated a key " << user_key << " with
unexpected result: " << static_cast<int>(*result);
+ error("[migrate] Migrated a key {} with unexpected result: {}",
user_key, static_cast<int>(*result));
return {Status::NotOK};
}
}
if (auto s = iter->status(); !s.ok()) {
auto err_str = s.ToString();
- LOG(ERROR) << "[migrate] Failed to iterate keys of slot " << current_slot
<< ": " << err_str;
+ error("[migrate] Failed to iterate keys of slot {}: {}", current_slot,
err_str);
return {Status::NotOK, fmt::format("failed to iterate keys of slot {}:
{}", current_slot, err_str)};
}
@@ -405,10 +396,9 @@ Status SlotMigrator::sendSnapshotByCmd() {
if (!s.IsOK()) {
return s.Prefixed(errFailedToSendCommands);
}
-
- LOG(INFO) << "[migrate] Succeed to migrate slot(s) snapshot, slot(s): " <<
slot_range.String()
- << ", Migrated keys: " << migrated_key_cnt << ", Expired keys: "
<< expired_key_cnt
- << ", Empty keys: " << empty_key_cnt;
+ info(
+ "[migrate] Succeed to migrate slot(s) snapshot, slot(s): {}, Migrated
keys: {}, Expired keys: {}, Empty keys: {}",
+ slot_range.String(), migrated_key_cnt, expired_key_cnt, empty_key_cnt);
return Status::OK();
}
@@ -469,7 +459,7 @@ Status SlotMigrator::finishFailedMigration() {
}
void SlotMigrator::clean() {
- LOG(INFO) << "[migrate] Clean resources of migrating slot(s) " <<
slot_range_.load().String();
+ info("[migrate] Clean resources of migrating slot(s) {}",
slot_range_.load().String());
if (slot_snapshot_) {
storage_->GetDB()->ReleaseSnapshot(slot_snapshot_);
slot_snapshot_ = nullptr;
@@ -603,7 +593,7 @@ Status SlotMigrator::checkMultipleResponses(int sock_fd,
int total) {
case ParserState::ArrayLen: {
UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
if (!line) {
- LOG(INFO) << "[migrate] Event buffer is empty, read socket again";
+ info("[migrate] Event buffer is empty, read socket again");
run = false;
break;
}
@@ -635,7 +625,7 @@ Status SlotMigrator::checkMultipleResponses(int sock_fd,
int total) {
// Handle bulk string response
case ParserState::BulkData: {
if (evbuffer_get_length(evbuf.get()) < bulk_or_array_len + 2) {
- LOG(INFO) << "[migrate] Bulk data in event buffer is not complete,
read socket again";
+ info("[migrate] Bulk data in event buffer is not complete, read
socket again");
run = false;
break;
}
@@ -649,7 +639,7 @@ Status SlotMigrator::checkMultipleResponses(int sock_fd,
int total) {
while (run && bulk_or_array_len > 0) {
evbuffer_ptr ptr = evbuffer_search_eol(evbuf.get(), nullptr,
nullptr, EVBUFFER_EOL_CRLF_STRICT);
if (ptr.pos < 0) {
- LOG(INFO) << "[migrate] Array data in event buffer is not
complete, read socket again";
+ info("[migrate] Array data in event buffer is not complete, read
socket again");
run = false;
break;
}
@@ -1016,7 +1006,7 @@ Status SlotMigrator::sendCmdsPipelineIfNeed(std::string
*commands, bool need) {
}
if (current_pipeline_size_ == 0) {
- LOG(INFO) << "[migrate] No commands to send";
+ info("[migrate] No commands to send");
return Status::OK();
}
@@ -1042,7 +1032,7 @@ Status SlotMigrator::sendCmdsPipelineIfNeed(std::string
*commands, bool need) {
}
void SlotMigrator::setForbiddenSlotRange(const SlotRange &slot_range) {
- LOG(INFO) << "[migrate] Setting forbidden slot(s) " << slot_range.String();
+ info("[migrate] Setting forbidden slot(s) {}", slot_range.String());
// Block server to set forbidden slot
uint64_t during = util::GetTimeStampUS();
{
@@ -1050,11 +1040,11 @@ void SlotMigrator::setForbiddenSlotRange(const
SlotRange &slot_range) {
forbidden_slot_range_ = slot_range;
}
during = util::GetTimeStampUS() - during;
- LOG(INFO) << "[migrate] To set forbidden slot, server was blocked for " <<
during << "us";
+ info("[migrate] To set forbidden slot, server was blocked for {} us",
during);
}
void SlotMigrator::ReleaseForbiddenSlotRange() {
- LOG(INFO) << "[migrate] Release forbidden slot(s) " <<
forbidden_slot_range_.load().String();
+ info("[migrate] Release forbidden slot(s) {}",
forbidden_slot_range_.load().String());
forbidden_slot_range_ = {-1, -1};
}
@@ -1067,7 +1057,7 @@ void SlotMigrator::applyMigrationSpeedLimit() const {
}
if (last_send_time_ + per_request_time > current_time) {
uint64_t during = last_send_time_ + per_request_time - current_time;
- LOG(INFO) << "[migrate] Sleep to limit migration speed for: " << during;
+ info("[migrate] Sleep to limit migration speed for: {}", during);
std::this_thread::sleep_for(std::chrono::microseconds(during));
}
}
@@ -1078,7 +1068,7 @@ Status
SlotMigrator::generateCmdsFromBatch(rocksdb::BatchResult *batch, std::str
WriteBatchExtractor write_batch_extractor(storage_->IsSlotIdEncoded(),
slot_range_, false);
rocksdb::Status status =
batch->writeBatchPtr->Iterate(&write_batch_extractor);
if (!status.ok()) {
- LOG(ERROR) << "[migrate] Failed to parse write batch, Err: " <<
status.ToString();
+ error("[migrate] Failed to parse write batch, Err: {}", status.ToString());
return {Status::NotOK};
}
@@ -1096,7 +1086,7 @@ Status
SlotMigrator::generateCmdsFromBatch(rocksdb::BatchResult *batch, std::str
Status
SlotMigrator::migrateIncrementData(std::unique_ptr<rocksdb::TransactionLogIterator>
*iter, uint64_t end_seq) {
if (!(*iter) || !(*iter)->Valid()) {
- LOG(ERROR) << "[migrate] WAL iterator is invalid";
+ error("[migrate] WAL iterator is invalid");
return {Status::NotOK};
}
@@ -1105,40 +1095,40 @@ Status
SlotMigrator::migrateIncrementData(std::unique_ptr<rocksdb::TransactionLo
while (true) {
if (stop_migration_) {
- LOG(ERROR) << "[migrate] Migration task end during migrating WAL data";
+ error("[migrate] Migration task end during migrating WAL data");
return {Status::NotOK};
}
auto batch = (*iter)->GetBatch();
if (batch.sequence != next_seq) {
- LOG(ERROR) << "[migrate] WAL iterator is discrete, some seq might be
lost"
- << ", expected sequence: " << next_seq << ", but got
sequence: " << batch.sequence;
+ error("[migrate] WAL iterator is discrete, some seq might be lost,
expected sequence: {}, but got sequence: {}",
+ next_seq, batch.sequence);
return {Status::NotOK};
}
// Generate commands by iterating write batch
auto s = generateCmdsFromBatch(&batch, &commands);
if (!s.IsOK()) {
- LOG(ERROR) << "[migrate] Failed to generate commands from write batch";
+ error("[migrate] Failed to generate commands from write batch");
return {Status::NotOK};
}
// Check whether command pipeline should be sent
s = sendCmdsPipelineIfNeed(&commands, false);
if (!s.IsOK()) {
- LOG(ERROR) << "[migrate] Failed to send WAL commands pipeline";
+ error("[migrate] Failed to send WAL commands pipeline");
return {Status::NotOK};
}
next_seq = batch.sequence + batch.writeBatchPtr->Count();
if (next_seq > end_seq) {
- LOG(INFO) << "[migrate] Migrate incremental data an epoch OK, seq from "
<< wal_begin_seq_ << ", to " << end_seq;
+ info("[migrate] Migrate incremental data an epoch OK, seq from {}, to
{}", wal_begin_seq_, end_seq);
break;
}
(*iter)->Next();
if (!(*iter)->Valid()) {
- LOG(ERROR) << "[migrate] WAL iterator is invalid, expected end seq: " <<
end_seq << ", next seq: " << next_seq;
+ error("[migrate] WAL iterator is invalid, expected end seq: {}, next
seq: {}", end_seq, next_seq);
return {Status::NotOK};
}
}
@@ -1146,7 +1136,7 @@ Status
SlotMigrator::migrateIncrementData(std::unique_ptr<rocksdb::TransactionLo
// Send the left data of this epoch
auto s = sendCmdsPipelineIfNeed(&commands, true);
if (!s.IsOK()) {
- LOG(ERROR) << "[migrate] Failed to send WAL last commands in pipeline";
+ error("[migrate] Failed to send WAL last commands in pipeline");
return {Status::NotOK};
}
@@ -1160,31 +1150,29 @@ Status SlotMigrator::syncWalBeforeForbiddingSlot() {
uint64_t latest_seq = storage_->GetDB()->GetLatestSequenceNumber();
uint64_t gap = latest_seq - wal_begin_seq_;
if (gap <= static_cast<uint64_t>(seq_gap_limit_)) {
- LOG(INFO) << "[migrate] Incremental data sequence: " << gap << ", less
than limit: " << seq_gap_limit_
- << ", go to set forbidden slot";
+ info("[migrate] Incremental data sequence: {}, less than limit: {}, go
to set forbidden slot", gap,
+ seq_gap_limit_);
break;
}
std::unique_ptr<rocksdb::TransactionLogIterator> iter = nullptr;
auto s = storage_->GetWALIter(wal_begin_seq_ + 1, &iter);
if (!s.IsOK()) {
- LOG(ERROR) << "[migrate] Failed to generate WAL iterator before setting
forbidden slot"
- << ", Err: " << s.Msg();
+ error("[migrate] Failed to generate WAL iterator before setting
forbidden slot, Err: {}", s.Msg());
return {Status::NotOK};
}
// Iterate wal and migrate data
s = migrateIncrementData(&iter, latest_seq);
if (!s.IsOK()) {
- LOG(ERROR) << "[migrate] Failed to migrate WAL data before setting
forbidden slot";
+ error("[migrate] Failed to migrate WAL data before setting forbidden
slot");
return {Status::NotOK};
}
wal_begin_seq_ = latest_seq;
count++;
}
-
- LOG(INFO) << "[migrate] Succeed to migrate incremental data before setting
forbidden slot, end epoch: " << count;
+ info("[migrate] Succeed to migrate incremental data before setting forbidden
slot, end epoch: {}", count);
return Status::OK();
}
@@ -1198,15 +1186,14 @@ Status SlotMigrator::syncWalAfterForbiddingSlot() {
std::unique_ptr<rocksdb::TransactionLogIterator> iter = nullptr;
auto s = storage_->GetWALIter(wal_begin_seq_ + 1, &iter);
if (!s.IsOK()) {
- LOG(ERROR) << "[migrate] Failed to generate WAL iterator after setting
forbidden slot"
- << ", Err: " << s.Msg();
+ error("[migrate] Failed to generate WAL iterator after setting forbidden
slot, Err: {}", s.Msg());
return {Status::NotOK};
}
// Send incremental data
s = migrateIncrementData(&iter, latest_seq);
if (!s.IsOK()) {
- LOG(ERROR) << "[migrate] Failed to migrate WAL data after setting
forbidden slot";
+ error("[migrate] Failed to migrate WAL data after setting forbidden slot");
return {Status::NotOK};
}
@@ -1271,7 +1258,7 @@ Status SlotMigrator::sendMigrationBatch(BatchSender
*batch) {
Status SlotMigrator::sendSnapshotByRawKV() {
uint64_t start_ts = util::GetTimeStampMS();
auto slot_range = slot_range_.load();
- LOG(INFO) << fmt::format("[migrate] Migrating snapshot of slot(s) {} by raw
key value", slot_range.String());
+ info("[migrate] Migrating snapshot of slot(s) {} by raw key value",
slot_range.String());
auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
@@ -1337,9 +1324,9 @@ Status SlotMigrator::sendSnapshotByRawKV() {
GET_OR_RET(sendMigrationBatch(&batch_sender));
auto elapsed = util::GetTimeStampMS() - start_ts;
- LOG(INFO) << fmt::format(
- "[migrate] Succeed to migrate snapshot range, slot(s): {}, elapsed: {}
ms, "
- "sent: {} bytes, rate: {:.2f} kb/s, batches: {}, entries: {}",
+ info(
+ "[migrate] Succeed to migrate snapshot range, slot(s): {}, elapsed: {}
ms, sent: {} bytes, rate: {:.2f} kb/s, "
+ "batches: {}, entries: {}",
slot_range.String(), elapsed, batch_sender.GetSentBytes(),
batch_sender.GetRate(start_ts),
batch_sender.GetSentBatchesNum(), batch_sender.GetEntriesNum());
@@ -1348,7 +1335,7 @@ Status SlotMigrator::sendSnapshotByRawKV() {
Status SlotMigrator::syncWALByRawKV() {
uint64_t start_ts = util::GetTimeStampMS();
- LOG(INFO) << "[migrate] Syncing WAL of slot(s) " <<
slot_range_.load().String() << " by raw key value";
+ info("[migrate] Syncing WAL of slot(s) {} by raw key value",
slot_range_.load().String());
BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_,
migrate_batch_bytes_per_sec_);
int epoch = 1;
@@ -1363,8 +1350,8 @@ Status SlotMigrator::syncWALByRawKV() {
if (!s.IsOK()) {
return {Status::NotOK, fmt::format("migrate incremental data failed,
{}", s.Msg())};
}
- LOG(INFO) << fmt::format("[migrate] Migrated incremental data, epoch: {},
seq from {} to {}", epoch, wal_begin_seq_,
- wal_incremental_seq);
+ info("[migrate] Migrated incremental data, epoch: {}, seq from {} to {}",
epoch, wal_begin_seq_,
+ wal_incremental_seq);
wal_begin_seq_ = wal_incremental_seq;
epoch++;
}
@@ -1377,12 +1364,12 @@ Status SlotMigrator::syncWALByRawKV() {
if (!s.IsOK()) {
return {Status::NotOK, fmt::format("migrate last incremental data
failed, {}", s.Msg())};
}
- LOG(INFO) << fmt::format("[migrate] Migrated last incremental data after
set forbidden slot, seq from {} to {}",
- wal_begin_seq_, wal_incremental_seq);
+ info("[migrate] Migrated last incremental data after set forbidden slot,
seq from {} to {}", wal_begin_seq_,
+ wal_incremental_seq);
}
auto elapsed = util::GetTimeStampMS() - start_ts;
- LOG(INFO) << fmt::format(
+ info(
"[migrate] Succeed to migrate incremental data, slot(s): {}, elapsed: {}
ms, "
"sent: {} bytes, rate: {:.2f} kb/s, batches: {}, entries: {}",
slot_range_.load().String(), elapsed, batch_sender.GetSentBytes(),
batch_sender.GetRate(start_ts),
@@ -1394,9 +1381,8 @@ Status SlotMigrator::syncWALByRawKV() {
bool SlotMigrator::catchUpIncrementalWAL() {
uint64_t gap = storage_->GetDB()->GetLatestSequenceNumber() - wal_begin_seq_;
if (gap <= seq_gap_limit_) {
- LOG(INFO) << fmt::format(
- "[migrate] Incremental data sequence gap: {}, less than limit: {}, set
forbidden slot(s): {}", gap,
- seq_gap_limit_, slot_range_.load().String());
+ info("[migrate] Incremental data sequence gap: {}, less than limit: {},
set forbidden slot(s): {}", gap,
+ seq_gap_limit_, slot_range_.load().String());
return true;
}
return false;
@@ -1417,7 +1403,7 @@ Status
SlotMigrator::migrateIncrementalDataByRawKV(uint64_t end_seq, BatchSender
}
case engine::WALItem::Type::kTypePut: {
if (item.column_family_id > kMaxColumnFamilyID) {
- LOG(INFO) << fmt::format("[migrate] Invalid put column family id:
{}", item.column_family_id);
+ info("[migrate] Invalid put column family id: {}",
item.column_family_id);
continue;
}
GET_OR_RET(batch_sender->Put(storage_->GetCFHandle(static_cast<ColumnFamilyID>(item.column_family_id)),
@@ -1426,7 +1412,7 @@ Status
SlotMigrator::migrateIncrementalDataByRawKV(uint64_t end_seq, BatchSender
}
case engine::WALItem::Type::kTypeDelete: {
if (item.column_family_id > kMaxColumnFamilyID) {
- LOG(INFO) << fmt::format("[migrate] Invalid delete column family id:
{}", item.column_family_id);
+ info("[migrate] Invalid delete column family id: {}",
item.column_family_id);
continue;
}
GET_OR_RET(
diff --git a/src/cluster/sync_migrate_context.cc
b/src/cluster/sync_migrate_context.cc
index 7b51c9827..6a4e532f6 100644
--- a/src/cluster/sync_migrate_context.cc
+++ b/src/cluster/sync_migrate_context.cc
@@ -35,8 +35,7 @@ void SyncMigrateContext::Resume(const Status &migrate_result)
{
migrate_result_ = migrate_result;
auto s = conn_->Owner()->EnableWriteEvent(conn_->GetFD());
if (!s.IsOK()) {
- LOG(ERROR) << "[server] Failed to enable write event on the sync migrate
connection " << conn_->GetFD() << ": "
- << s.Msg();
+ error("[server] Failed to enable write event on the sync migrate
connection {}: {}", conn_->GetFD(), s.Msg());
}
}
diff --git a/src/cluster/sync_migrate_context.h
b/src/cluster/sync_migrate_context.h
index 818736cb4..e3593c1dc 100644
--- a/src/cluster/sync_migrate_context.h
+++ b/src/cluster/sync_migrate_context.h
@@ -25,7 +25,7 @@
class SyncMigrateContext : private EvbufCallbackBase<SyncMigrateContext,
false>,
private EventCallbackBase<SyncMigrateContext> {
public:
- SyncMigrateContext(Server *srv, redis::Connection *conn, int timeout) :
srv_(srv), conn_(conn), timeout_(timeout){};
+ SyncMigrateContext(Server *srv, redis::Connection *conn, int timeout) :
srv_(srv), conn_(conn), timeout_(timeout) {}
void Suspend();
void Resume(const Status &migrate_result);