This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 858d7f2ae chore(log): replace logging calls in `cluster/*` (#2894)
858d7f2ae is described below

commit 858d7f2aebaf8015c035be0ea2d1d5261599ddfa
Author: Anirudh Lakhanpal <[email protected]>
AuthorDate: Sat Apr 26 08:55:12 2025 +0530

    chore(log): replace logging calls in `cluster/*` (#2894)
    
    Co-authored-by: Twice <[email protected]>
    Co-authored-by: Twice <[email protected]>
---
 src/cluster/cluster.cc              |  27 +++----
 src/cluster/replication.cc          | 154 +++++++++++++++++-------------------
 src/cluster/slot_migrate.cc         | 154 ++++++++++++++++--------------------
 src/cluster/sync_migrate_context.cc |   3 +-
 src/cluster/sync_migrate_context.h  |   2 +-
 5 files changed, 157 insertions(+), 183 deletions(-)

diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc
index 10dbf0e83..62c82c01a 100644
--- a/src/cluster/cluster.cc
+++ b/src/cluster/cluster.cc
@@ -130,7 +130,7 @@ Status Cluster::SetSlotRanges(const std::vector<SlotRange> 
&slot_ranges, const s
         if (migrated_slots_.count(slot) > 0) {
           auto s = srv_->slot_migrator->ClearKeysOfSlotRange(ctx, 
kDefaultNamespace, SlotRange::GetPoint(slot));
           if (!s.ok()) {
-            LOG(ERROR) << "failed to clear data of migrated slot: " << 
s.ToString();
+            error("failed to clear data of migrated slot: {}", s.ToString());
           }
           migrated_slots_.erase(slot);
         }
@@ -213,7 +213,7 @@ Status Cluster::SetClusterNodes(const std::string 
&nodes_str, int64_t version, b
       if (slots_nodes_[slot] != myself_) {
         auto s = srv_->slot_migrator->ClearKeysOfSlotRange(ctx, 
kDefaultNamespace, SlotRange::GetPoint(slot));
         if (!s.ok()) {
-          LOG(ERROR) << "failed to clear data of migrated slots: " << 
s.ToString();
+          error("failed to clear data of migrated slots: {}", s.ToString());
         }
       }
     }
@@ -246,11 +246,11 @@ Status Cluster::SetMasterSlaveRepl() {
     if (!s.IsOK()) {
       return s.Prefixed("failed to remove master");
     }
-    LOG(INFO) << "MASTER MODE enabled by cluster topology setting";
+    info("MASTER MODE enabled by cluster topology setting");
     if (srv_->slot_migrator && is_cluster_enabled && is_slave) {
       // Slave -> Master
       srv_->slot_migrator->SetStopMigrationFlag(false);
-      LOG(INFO) << "Change server role to master, restart migration task";
+      info("Change server role to master, restart migration task");
     }
     return Status::OK();
   }
@@ -261,16 +261,16 @@ Status Cluster::SetMasterSlaveRepl() {
     std::shared_ptr<ClusterNode> master = it->second;
     auto s = srv_->AddMaster(master->host, master->port, false);
     if (!s.IsOK()) {
-      LOG(WARNING) << "SLAVE OF " << master->host << ":" << master->port
-                   << " wasn't enabled by cluster topology setting, encounter 
error: " << s.Msg();
+      warn("SLAVE OF {}:{} wasn't enabled by cluster topology setting, 
encounter error: {}", master->host, master->port,
+           s.Msg());
       return s.Prefixed("failed to add master");
     }
     if (srv_->slot_migrator && is_cluster_enabled && !is_slave) {
       // Master -> Slave
       srv_->slot_migrator->SetStopMigrationFlag(true);
-      LOG(INFO) << "Change server role to slave, stop migration task";
+      info("Change server role to slave, stop migration task");
     }
-    LOG(INFO) << fmt::format("SLAVE OF {}:{} enabled by cluster topology 
setting", master->host, master->port);
+    info("SLAVE OF {}:{} enabled by cluster topology setting", master->host, 
master->port);
   }
 
   return Status::OK();
@@ -374,7 +374,7 @@ Status Cluster::ImportSlotRange(redis::Connection *conn, 
const SlotRange &slot_r
       conn->close_cb = [object_ptr = srv_->slot_import.get(), 
slot_range]([[maybe_unused]] int fd) {
         auto s = object_ptr->StopForLinkError();
         if (!s.IsOK()) {
-          LOG(ERROR) << fmt::format("[import] Failed to stop importing slot(s) 
{}: {}", slot_range.String(), s.Msg());
+          error("[import] Failed to stop importing slot(s) {}: {}", 
slot_range.String(), s.Msg());
         }
       };  // Stop forbidding writing slot to accept write commands
       if (slot_range.HasOverlap(srv_->slot_migrator->GetForbiddenSlotRange())) 
{
@@ -384,17 +384,17 @@ Status Cluster::ImportSlotRange(redis::Connection *conn, 
const SlotRange &slot_r
         // supported in the future.
         srv_->slot_migrator->ReleaseForbiddenSlotRange();
       }
-      LOG(INFO) << fmt::format("[import] Start importing slot(s) {}", 
slot_range.String());
+      info("[import] Start importing slot(s) {}", slot_range.String());
       break;
     case kImportSuccess:
       s = srv_->slot_import->Success(slot_range);
       if (!s.IsOK()) return s;
-      LOG(INFO) << fmt::format("[import] Mark the importing slot(s) {} as 
succeed", slot_range.String());
+      info("[import] Mark the importing slot(s) {} as succeed", 
slot_range.String());
       break;
     case kImportFailed:
       s = srv_->slot_import->Fail(slot_range);
       if (!s.IsOK()) return s;
-      LOG(INFO) << fmt::format("[import] Mark the importing slot(s) {} as 
failed", slot_range.String());
+      info("[import] Mark the importing slot(s) {} as failed", 
slot_range.String());
       break;
     default:
       return {Status::NotOK, errInvalidImportState};
@@ -691,8 +691,7 @@ Status Cluster::DumpClusterNodes(const std::string &file) {
 
 Status Cluster::LoadClusterNodes(const std::string &file_path) {
   if (rocksdb::Env::Default()->FileExists(file_path).IsNotFound()) {
-    LOG(INFO) << fmt::format("The cluster nodes file {} is not found. Use 
CLUSTERX subcommands to specify it.",
-                             file_path);
+    info("The cluster nodes file {} is not found. Use CLUSTERX subcommands to 
specify it.", file_path);
     return Status::OK();
   }
 
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 3541cebb6..e032b87be 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -67,7 +67,7 @@ Status FeedSlaveThread::Start() {
     pthread_sigmask(SIG_BLOCK, &mask, &omask);
     auto s = util::SockSend(conn_->GetFD(), redis::RESP_OK, 
conn_->GetBufferEvent());
     if (!s.IsOK()) {
-      LOG(ERROR) << "failed to send OK response to the replica: " << s.Msg();
+      error("failed to send OK response to the replica: {}", s.Msg());
       return;
     }
     this->loop();
@@ -84,12 +84,12 @@ Status FeedSlaveThread::Start() {
 
 void FeedSlaveThread::Stop() {
   stop_ = true;
-  LOG(WARNING) << "Slave thread was terminated, would stop feeding the slave: 
" << conn_->GetAddr();
+  warn("Slave thread was terminated, would stop feeding the slave: {}", 
conn_->GetAddr());
 }
 
 void FeedSlaveThread::Join() {
   if (auto s = util::ThreadJoin(t_); !s) {
-    LOG(WARNING) << "Slave thread operation failed: " << s.Msg();
+    warn("Slave thread operation failed: {}", s.Msg());
   }
 }
 
@@ -98,7 +98,7 @@ void FeedSlaveThread::checkLivenessIfNeed() {
   const auto ping_command = redis::BulkString("ping");
   auto s = util::SockSend(conn_->GetFD(), ping_command, 
conn_->GetBufferEvent());
   if (!s.IsOK()) {
-    LOG(ERROR) << "Ping slave[" << conn_->GetAddr() << "] err: " << s.Msg() << 
", would stop the thread";
+    error("Ping slave [{}] err: {}, would stop the thread", conn_->GetAddr(), 
s.Msg());
     Stop();
   }
 }
@@ -115,7 +115,7 @@ void FeedSlaveThread::loop() {
     auto curr_seq = next_repl_seq_.load();
 
     if (!iter_ || !iter_->Valid()) {
-      if (iter_) LOG(INFO) << "WAL was rotated, would reopen again";
+      if (iter_) info("WAL was rotated, would reopen again");
       if (!srv_->storage->WALHasNewData(curr_seq) || 
!srv_->storage->GetWALIter(curr_seq, &iter_).IsOK()) {
         iter_ = nullptr;
         usleep(yield_microseconds);
@@ -126,8 +126,9 @@ void FeedSlaveThread::loop() {
     // iter_ would be always valid here
     auto batch = iter_->GetBatch();
     if (batch.sequence != curr_seq) {
-      LOG(ERROR) << "Fatal error encountered, WAL iterator is discrete, some 
seq might be lost"
-                 << ", sequence " << curr_seq << " expected, but got " << 
batch.sequence;
+      error(
+          "Fatal error encountered, WAL iterator is discrete, some seq might 
be lost, sequence {} expected, but got {}",
+          curr_seq, batch.sequence);
       Stop();
       return;
     }
@@ -147,8 +148,7 @@ void FeedSlaveThread::loop() {
       // Send entire bulk which contain multiple batches
       auto s = util::SockSend(conn_->GetFD(), batches_bulk, 
conn_->GetBufferEvent());
       if (!s.IsOK()) {
-        LOG(ERROR) << "Write error while sending batch to slave: " << s.Msg() 
<< ". batches: 0x"
-                   << util::StringToHex(batches_bulk);
+        error("Write error while sending batch to slave: {}. batches: 0x{}", 
s.Msg(), util::StringToHex(batches_bulk));
         Stop();
         return;
       }
@@ -181,7 +181,7 @@ void 
ReplicationThread::CallbacksStateMachine::ConnEventCB(bufferevent *bev, int
     return;
   }
   if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) {
-    LOG(ERROR) << "[replication] connection error/eof, reconnect the master";
+    error("[replication] connection error/eof, reconnect the master");
     // Wait a bit and reconnect
     repl_->repl_state_.store(kReplConnecting, std::memory_order_relaxed);
     std::this_thread::sleep_for(std::chrono::seconds(1));
@@ -203,7 +203,7 @@ void 
ReplicationThread::CallbacksStateMachine::SetWriteCB(bufferevent *bev, buff
 void ReplicationThread::CallbacksStateMachine::ReadWriteCB(bufferevent *bev) {
 LOOP_LABEL:
   assert(handler_idx_ <= handlers_.size());
-  DLOG(INFO) << "[replication] Execute handler[" << 
getHandlerName(handler_idx_) << "]";
+  debug("[replication] Execute handler[{}]", getHandlerName(handler_idx_));
   auto st = getHandlerFunc(handler_idx_)(repl_, bev);
   repl_->last_io_time_secs_.store(util::GetTimeStamp(), 
std::memory_order_relaxed);
   switch (st) {
@@ -230,11 +230,11 @@ LOOP_LABEL:
     case CBState::RESTART:  // state that can be retried some time later
       Stop();
       if (repl_->stop_flag_) {
-        LOG(INFO) << "[replication] Wouldn't restart while the replication 
thread was stopped";
+        info("[replication] Wouldn't restart while the replication thread was 
stopped");
         break;
       }
       repl_->repl_state_.store(kReplConnecting, std::memory_order_relaxed);
-      LOG(INFO) << "[replication] Retry in 10 seconds";
+      info("[replication] Retry in 10 seconds");
       std::this_thread::sleep_for(std::chrono::seconds(10));
       Start();
   }
@@ -264,7 +264,7 @@ void ReplicationThread::CallbacksStateMachine::Start() {
     last_connect_timestamp = util::GetTimeStampMS();
     auto cfd = util::SockConnect(repl_->host_, repl_->port_, 
repl_->srv_->GetConfig()->replication_connect_timeout_ms);
     if (!cfd) {
-      LOG(ERROR) << "[replication] Failed to connect the master, err: " << 
cfd.Msg();
+      error("[replication] Failed to connect the master, err: {}", cfd.Msg());
       continue;
     }
 #ifdef ENABLE_OPENSSL
@@ -272,7 +272,7 @@ void ReplicationThread::CallbacksStateMachine::Start() {
     if (repl_->srv_->GetConfig()->tls_replication) {
       ssl = SSL_new(repl_->srv_->ssl_ctx.get());
       if (!ssl) {
-        LOG(ERROR) << "Failed to construct SSL structure for new connection: " 
<< SSLErrors{};
+        error("Failed to construct SSL structure for new connection: {}", 
fmt::streamed(SSLErrors{}));
         evutil_closesocket(*cfd);
         return;
       }
@@ -288,7 +288,7 @@ void ReplicationThread::CallbacksStateMachine::Start() {
       if (ssl) SSL_free(ssl);
 #endif
       close(*cfd);
-      LOG(ERROR) << "[replication] Failed to create the event socket";
+      error("[replication] Failed to create the event socket");
       continue;
     }
 #ifdef ENABLE_OPENSSL
@@ -346,9 +346,9 @@ Status ReplicationThread::Start(std::function<bool()> 
&&pre_fullsync_cb, std::fu
   // Clean synced checkpoint from old master because replica starts to follow 
new master
   auto s = rocksdb::DestroyDB(srv_->GetConfig()->sync_checkpoint_dir, 
rocksdb::Options());
   if (!s.ok()) {
-    LOG(WARNING) << "Can't clean synced checkpoint from master, error: " << 
s.ToString();
+    warn("Can't clean synced checkpoint from master, error: {}", s.ToString());
   } else {
-    LOG(WARNING) << "Clean old synced checkpoint successfully";
+    warn("Clean old synced checkpoint successfully");
   }
 
   // cleanup the old backups, so we can start replication in a clean state
@@ -368,9 +368,9 @@ void ReplicationThread::Stop() {
   stop_flag_ = true;  // Stopping procedure is asynchronous,
                       // handled by timer
   if (auto s = util::ThreadJoin(t_); !s) {
-    LOG(WARNING) << "Replication thread operation failed: " << s.Msg();
+    warn("Replication thread operation failed: {}", s.Msg());
   }
-  LOG(INFO) << "[replication] Stopped";
+  info("[replication] Stopped");
 }
 
 /*
@@ -384,7 +384,7 @@ void ReplicationThread::Stop() {
 void ReplicationThread::run() {
   base_ = event_base_new();
   if (base_ == nullptr) {
-    LOG(ERROR) << "[replication] Failed to create new ev base";
+    error("[replication] Failed to create new ev base");
     return;
   }
   psync_steps_.Start();
@@ -400,7 +400,7 @@ void ReplicationThread::run() {
 
 ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev) {
   SendString(bev, redis::ArrayOfBulkStrings({"AUTH", 
srv_->GetConfig()->masterauth}));
-  LOG(INFO) << "[replication] Auth request was sent, waiting for response";
+  info("[replication] Auth request was sent, waiting for response");
   repl_state_.store(kReplSendAuth, std::memory_order_relaxed);
   return CBState::NEXT;
 }
@@ -413,17 +413,17 @@ ReplicationThread::CBState 
ReplicationThread::authReadCB(bufferevent *bev) {  //
   if (!line) return CBState::AGAIN;
   if (!ResponseLineIsOK(line.View())) {
     // Auth failed
-    LOG(ERROR) << "[replication] Auth failed: " << line.get();
+    error("[replication] Auth failed: {}", line.get());
     return CBState::RESTART;
   }
-  LOG(INFO) << "[replication] Auth response was received, continue...";
+  info("[replication] Auth response was received, continue...");
   return CBState::NEXT;
 }
 
 ReplicationThread::CBState ReplicationThread::checkDBNameWriteCB(bufferevent 
*bev) {
   SendString(bev, redis::ArrayOfBulkStrings({"_db_name"}));
   repl_state_.store(kReplCheckDBName, std::memory_order_relaxed);
-  LOG(INFO) << "[replication] Check db name request was sent, waiting for 
response";
+  info("[replication] Check db name request was sent, waiting for response");
   return CBState::NEXT;
 }
 
@@ -434,19 +434,19 @@ ReplicationThread::CBState 
ReplicationThread::checkDBNameReadCB(bufferevent *bev
 
   if (line[0] == '-') {
     if (isRestoringError(line.View())) {
-      LOG(WARNING) << "The master was restoring the db, retry later";
+      warn("The master was restoring the db, retry later");
     } else {
-      LOG(ERROR) << "Failed to get the db name, " << line.get();
+      error("Failed to get the db name, {}", line.get());
     }
     return CBState::RESTART;
   }
   std::string db_name = storage_->GetName();
   if (line.length == db_name.size() && !strncmp(line.get(), db_name.data(), 
line.length)) {
     // DB name match, we should continue to next step: TryPsync
-    LOG(INFO) << "[replication] DB name is valid, continue...";
+    info("[replication] DB name is valid, continue...");
     return CBState::NEXT;
   }
-  LOG(ERROR) << "[replication] Mismatched the db name, local: " << db_name << 
", remote: " << line.get();
+  error("[replication] Mismatched the db name, local: {}, remote: {}", 
db_name, line.get());
   return CBState::RESTART;
 }
 
@@ -461,7 +461,7 @@ ReplicationThread::CBState 
ReplicationThread::replConfWriteCB(bufferevent *bev)
   }
   SendString(bev, redis::ArrayOfBulkStrings(data_to_send));
   repl_state_.store(kReplReplConf, std::memory_order_relaxed);
-  LOG(INFO) << "[replication] replconf request was sent, waiting for response";
+  info("[replication] replconf request was sent, waiting for response");
   return CBState::NEXT;
 }
 
@@ -473,21 +473,20 @@ ReplicationThread::CBState 
ReplicationThread::replConfReadCB(bufferevent *bev) {
   // on unknown option: first try without announce ip, if it fails again - do 
nothing (to prevent infinite loop)
   if (isUnknownOption(line.View()) && !next_try_without_announce_ip_address_) {
     next_try_without_announce_ip_address_ = true;
-    LOG(WARNING) << "The old version master, can't handle ip-address, "
-                 << "try without it again";
+    warn("The old version master, can't handle ip-address, try without it 
again");
     // Retry previous state, i.e. send replconf again
     return CBState::PREV;
   }
   if (line[0] == '-' && isRestoringError(line.View())) {
-    LOG(WARNING) << "The master was restoring the db, retry later";
+    warn("The master was restoring the db, retry later");
     return CBState::RESTART;
   }
   if (!ResponseLineIsOK(line.View())) {
-    LOG(WARNING) << "[replication] Failed to replconf: " << line.get() + 1;
+    warn("[replication] Failed to replconf: {}", line.get() + 1);
     //  backward compatible with old version that doesn't support replconf cmd
     return CBState::NEXT;
   } else {
-    LOG(INFO) << "[replication] replconf is ok, start psync";
+    info("[replication] replconf is ok, start psync");
     return CBState::NEXT;
   }
 }
@@ -517,12 +516,11 @@ ReplicationThread::CBState 
ReplicationThread::tryPSyncWriteCB(bufferevent *bev)
   if (!srv_->GetConfig()->use_rsid_psync || next_try_old_psync_ || 
replid.length() != kReplIdLength) {
     next_try_old_psync_ = false;  // Reset next_try_old_psync_
     SendString(bev, redis::ArrayOfBulkStrings({"PSYNC", 
std::to_string(next_seq)}));
-    LOG(INFO) << "[replication] Try to use psync, next seq: " << next_seq;
+    info("[replication] Try to use psync, next seq: {}", next_seq);
   } else {
     // NEW PSYNC "Unique Replication Sequence ID": replication id and sequence 
id
     SendString(bev, redis::ArrayOfBulkStrings({"PSYNC", replid, 
std::to_string(next_seq)}));
-    LOG(INFO) << "[replication] Try to use new psync, current unique 
replication sequence id: " << replid << ":"
-              << cur_seq;
+    info("[replication] Try to use new psync, current unique replication 
sequence id: {}:{}", replid, cur_seq);
   }
   repl_state_.store(kReplSendPSync, std::memory_order_relaxed);
   return CBState::NEXT;
@@ -534,14 +532,13 @@ ReplicationThread::CBState 
ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
   if (!line) return CBState::AGAIN;
 
   if (line[0] == '-' && isRestoringError(line.View())) {
-    LOG(WARNING) << "The master was restoring the db, retry later";
+    warn("The master was restoring the db, retry later");
     return CBState::RESTART;
   }
 
   if (line[0] == '-' && isWrongPsyncNum(line.View())) {
     next_try_old_psync_ = true;
-    LOG(WARNING) << "The old version master, can't handle new PSYNC, "
-                 << "try old PSYNC again";
+    warn("The old version master, can't handle new PSYNC, try old PSYNC 
again");
     // Retry previous state, i.e. send PSYNC again
     return CBState::PREV;
   }
@@ -550,11 +547,11 @@ ReplicationThread::CBState 
ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
     // PSYNC isn't OK, we should use FullSync
     // Switch to fullsync state machine
     fullsync_steps_.Start();
-    LOG(INFO) << "[replication] Failed to psync, error: " << line.get() << ", 
switch to fullsync";
+    info("[replication] Failed to psync, error: {}, switch to fullsync", 
line.get());
     return CBState::QUIT;
   } else {
     // PSYNC is OK, use IncrementBatchLoop
-    LOG(INFO) << "[replication] PSync is ok, start increment batch loop";
+    info("[replication] PSync is ok, start increment batch loop");
     return CBState::NEXT;
   }
 }
@@ -570,7 +567,7 @@ ReplicationThread::CBState 
ReplicationThread::incrementBatchLoopCB(bufferevent *
         if (!line) return CBState::AGAIN;
         incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1, 
nullptr, 10) : 0;
         if (incr_bulk_len_ == 0) {
-          LOG(ERROR) << "[replication] Invalid increment data size";
+          error("[replication] Invalid increment data size");
           return CBState::RESTART;
         }
         incr_state_ = Incr_batch_data;
@@ -598,15 +595,15 @@ ReplicationThread::CBState 
ReplicationThread::incrementBatchLoopCB(bufferevent *
 
         auto s = storage_->ReplicaApplyWriteBatch(&batch);
         if (!s.IsOK()) {
-          LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to 
local, " << s.Msg() << ". batch: 0x"
-                     << util::StringToHex(batch.Data());
+          error("[replication] CRITICAL - Failed to write batch to local, {}. 
batch: 0x{}", s.Msg(),
+                util::StringToHex(batch.Data()));
           return CBState::RESTART;
         }
 
         s = parseWriteBatch(batch);
         if (!s.IsOK()) {
-          LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch 
0x" << util::StringToHex(batch.Data())
-                     << ": " << s.Msg();
+          error("[replication] CRITICAL - failed to parse write batch 0x{}: 
{}", util::StringToHex(batch.Data()),
+                s.Msg());
           return CBState::RESTART;
         }
 
@@ -618,7 +615,7 @@ ReplicationThread::CBState 
ReplicationThread::incrementBatchLoopCB(bufferevent *
 ReplicationThread::CBState ReplicationThread::fullSyncWriteCB(bufferevent 
*bev) {
   SendString(bev, redis::ArrayOfBulkStrings({"_fetch_meta"}));
   repl_state_.store(kReplFetchMeta, std::memory_order_relaxed);
-  LOG(INFO) << "[replication] Start syncing data with fullsync";
+  info("[replication] Start syncing data with fullsync");
   return CBState::NEXT;
 }
 
@@ -634,31 +631,31 @@ ReplicationThread::CBState 
ReplicationThread::fullSyncReadCB(bufferevent *bev) {
       UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
       if (!line) return CBState::AGAIN;
       if (line[0] == '-') {
-        LOG(ERROR) << "[replication] Failed to fetch meta id: " << line.get();
+        error("[replication] Failed to fetch meta id: {}", line.get());
         return CBState::RESTART;
       }
       fullsync_meta_id_ = static_cast<rocksdb::BackupID>(line.length > 0 ? 
std::strtoul(line.get(), nullptr, 10) : 0);
       if (fullsync_meta_id_ == 0) {
-        LOG(ERROR) << "[replication] Invalid meta id received";
+        error("[replication] Invalid meta id received");
         return CBState::RESTART;
       }
       fullsync_state_ = kFetchMetaSize;
-      LOG(INFO) << "[replication] Succeed fetching meta id: " << 
fullsync_meta_id_;
+      info("[replication] Succeed fetching meta id: {}", fullsync_meta_id_);
     }
     case kFetchMetaSize: {
       UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
       if (!line) return CBState::AGAIN;
       if (line[0] == '-') {
-        LOG(ERROR) << "[replication] Failed to fetch meta size: " << 
line.get();
+        error("[replication] Failed to fetch meta size: {}", line.get());
         return CBState::RESTART;
       }
       fullsync_filesize_ = line.length > 0 ? std::strtoull(line.get(), 
nullptr, 10) : 0;
       if (fullsync_filesize_ == 0) {
-        LOG(ERROR) << "[replication] Invalid meta file size received";
+        error("[replication] Invalid meta file size received");
         return CBState::RESTART;
       }
       fullsync_state_ = kFetchMetaContent;
-      LOG(INFO) << "[replication] Succeed fetching meta size: " << 
fullsync_filesize_;
+      info("[replication] Succeed fetching meta size: {}", fullsync_filesize_);
     }
     case kFetchMetaContent: {
       std::string target_dir;
@@ -670,7 +667,7 @@ ReplicationThread::CBState 
ReplicationThread::fullSyncReadCB(bufferevent *bev) {
         }
         auto s = engine::Storage::ReplDataManager::ParseMetaAndSave(storage_, 
fullsync_meta_id_, input, &meta);
         if (!s.IsOK()) {
-          LOG(ERROR) << "[replication] Failed to parse meta and save: " << 
s.Msg();
+          error("[replication] Failed to parse meta and save: {}", s.Msg());
           return CBState::AGAIN;
         }
         target_dir = srv_->GetConfig()->backup_sync_dir;
@@ -679,7 +676,7 @@ ReplicationThread::CBState 
ReplicationThread::fullSyncReadCB(bufferevent *bev) {
         UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
         if (!line) return CBState::AGAIN;
         if (line[0] == '-') {
-          LOG(ERROR) << "[replication] Failed to fetch meta info: " << 
line.get();
+          error("[replication] Failed to fetch meta info: {}", line.get());
           return CBState::RESTART;
         }
         std::vector<std::string> need_files = 
util::Split(std::string(line.get()), ",");
@@ -695,18 +692,17 @@ ReplicationThread::CBState 
ReplicationThread::fullSyncReadCB(bufferevent *bev) {
         if (iter != need_files.end()) need_files.erase(iter);
         auto s = engine::Storage::ReplDataManager::CleanInvalidFiles(storage_, 
target_dir, need_files);
         if (!s.IsOK()) {
-          LOG(WARNING) << "[replication] Failed to clean up invalid files of 
the old checkpoint,"
-                       << " error: " << s.Msg();
-          LOG(WARNING) << "[replication] Try to clean all checkpoint files";
+          warn("[replication] Failed to clean up invalid files of the old 
checkpoint, error: {}", s.Msg());
+          warn("[replication] Try to clean all checkpoint files");
           auto s = rocksdb::DestroyDB(target_dir, rocksdb::Options());
           if (!s.ok()) {
-            LOG(WARNING) << "[replication] Failed to clean all checkpoint 
files, error: " << s.ToString();
+            warn("[replication] Failed to clean all checkpoint files, error: 
{}", s.ToString());
           }
         }
       }
       assert(evbuffer_get_length(input) == 0);
       fullsync_state_ = kFetchMetaID;
-      LOG(INFO) << "[replication] Succeeded fetching full data files info, 
fetching files in parallel";
+      info("[replication] Succeeded fetching full data files info, fetching 
files in parallel");
 
       bool pre_fullsync_done = false;
       // If 'slave-empty-db-before-fullsync' is yes, we call 'pre_fullsync_cb_'
@@ -722,10 +718,10 @@ ReplicationThread::CBState 
ReplicationThread::fullSyncReadCB(bufferevent *bev) {
       auto s = parallelFetchFile(target_dir, meta.files);
       if (!s.IsOK()) {
         if (pre_fullsync_done) post_fullsync_cb_();
-        LOG(ERROR) << "[replication] Failed to parallel fetch files while " + 
s.Msg();
+        error("[replication] Failed to parallel fetch files while {}", 
s.Msg());
         return CBState::RESTART;
       }
-      LOG(INFO) << "[replication] Succeeded fetching files in parallel, 
restoring the backup";
+      info("[replication] Succeeded fetching files in parallel, restoring the 
backup");
 
       // Don't need to call 'pre_fullsync_cb_' again if it was called before
       if (!pre_fullsync_done && !pre_fullsync_cb_()) return CBState::RESTART;
@@ -737,18 +733,18 @@ ReplicationThread::CBState 
ReplicationThread::fullSyncReadCB(bufferevent *bev) {
         s = storage_->RestoreFromCheckpoint();
       }
       if (!s.IsOK()) {
-        LOG(ERROR) << "[replication] Failed to restore backup while " + 
s.Msg() + ", restart fullsync";
+        error("[replication] Failed to restore backup while {}, restart 
fullsync", s.Msg());
         post_fullsync_cb_();
         return CBState::RESTART;
       }
-      LOG(INFO) << "[replication] Succeeded restoring the backup, fullsync was 
finish";
+      info("[replication] Succeeded restoring the backup, fullsync was 
finish");
       post_fullsync_cb_();
 
       // It needs to reload namespaces from DB after the full sync is done,
       // or namespaces are not visible in the replica.
       s = srv_->GetNamespace()->LoadAndRewrite();
       if (!s.IsOK()) {
-        LOG(ERROR) << "[replication] Failed to load and rewrite namespace: " 
<< s.Msg();
+        error("[replication] Failed to load and rewrite namespace: {}", 
s.Msg());
       }
 
       // Switch to psync state machine again
@@ -756,10 +752,7 @@ ReplicationThread::CBState 
ReplicationThread::fullSyncReadCB(bufferevent *bev) {
       return CBState::QUIT;
     }
   }
-
-  LOG(ERROR) << "Should not arrive here";
-  assert(false);
-  return CBState::QUIT;
+  unreachable();
 }
 
 Status ReplicationThread::parallelFetchFile(const std::string &dir,
@@ -810,9 +803,8 @@ Status ReplicationThread::parallelFetchFile(const 
std::string &dir,
               skip_cnt.fetch_add(1);
               uint32_t cur_skip_cnt = skip_cnt.load();
               uint32_t cur_fetch_cnt = fetch_cnt.load();
-              LOG(INFO) << "[skip] " << f_name << " " << f_crc << ", skip 
count: " << cur_skip_cnt
-                        << ", fetch count: " << cur_fetch_cnt << ", progress: 
" << cur_skip_cnt + cur_fetch_cnt << "/"
-                        << files.size();
+              info("[skip] {} {}, skip count: {}, fetch count: {}, progress: 
{} / {}", f_name, f_crc, cur_skip_cnt,
+                   cur_fetch_cnt, (cur_skip_cnt + cur_fetch_cnt), 
files.size());
               continue;
             }
             fetch_files.push_back(f_name);
@@ -824,10 +816,8 @@ Status ReplicationThread::parallelFetchFile(const 
std::string &dir,
             fetch_cnt.fetch_add(1);
             uint32_t cur_skip_cnt = skip_cnt.load();
             uint32_t cur_fetch_cnt = fetch_cnt.load();
-            LOG(INFO) << "[fetch] "
-                      << "Fetched " << fetch_file << ", crc32: " << fetch_crc 
<< ", skip count: " << cur_skip_cnt
-                      << ", fetch count: " << cur_fetch_cnt << ", progress: " 
<< cur_skip_cnt + cur_fetch_cnt << "/"
-                      << files_count;
+            info("[fetch] Fetched {}, crc32 {}, skip count: {}, fetch count: 
{}, progress: {} / {}", fetch_file,
+                 fetch_crc, cur_skip_cnt, cur_fetch_cnt, cur_skip_cnt + 
cur_fetch_cnt, files_count);
           };
           // For master using old version, it only supports to fetch a single 
file by one
           // command, so we need to fetch all files by multiple command 
interactions.
@@ -962,14 +952,14 @@ Status ReplicationThread::fetchFiles(int sock_fd, const 
std::string &dir, const
 
   UniqueEvbuf evbuf;
   for (unsigned i = 0; i < files.size(); i++) {
-    DLOG(INFO) << "[fetch] Start to fetch file " << files[i];
+    debug("[fetch] Start to fetch file {}", files[i]);
     s = fetchFile(sock_fd, evbuf.get(), dir, files[i], crcs[i], fn, ssl);
     if (!s.IsOK()) {
       s = Status(Status::NotOK, "fetch file err: " + s.Msg());
-      LOG(WARNING) << "[fetch] Fail to fetch file " << files[i] << ", err: " 
<< s.Msg();
+      warn("[fetch] Fail to fetch file {}, err: {}", files[i], s.Msg());
       break;
     }
-    DLOG(INFO) << "[fetch] Succeed fetching file " << files[i];
+    debug("[fetch] Succeed fetching file {}", files[i]);
 
     // Just for tests
     if (srv_->GetConfig()->fullsync_recv_file_delay) {
@@ -983,7 +973,7 @@ Status ReplicationThread::fetchFiles(int sock_fd, const 
std::string &dir, const
 void ReplicationThread::TimerCB(int, int16_t) {
   // DLOG(INFO) << "[replication] timer";
   if (stop_flag_) {
-    LOG(INFO) << "[replication] Stop ev loop";
+    info("[replication] Stop ev loop");
     event_base_loopbreak(base_);
     psync_steps_.Stop();
     fullsync_steps_.Stop();
diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index dd6e2efa1..0440a4b50 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -122,8 +122,7 @@ Status SlotMigrator::PerformSlotRangeMigration(const 
std::string &node_id, std::
     migration_job_ = std::move(job);
     job_cv_.notify_one();
   }
-
-  LOG(INFO) << "[migrate] Start migrating slot(s) " << slot_range.String() << 
" to " << dst_ip << ":" << dst_port;
+  info("[migrate] Start migrating slot(s) {} to {}:{}", slot_range.String(), 
dst_ip, dst_port);
 
   return Status::OK();
 }
@@ -134,7 +133,7 @@ SlotMigrator::~SlotMigrator() {
     thread_state_ = ThreadState::Terminated;
     job_cv_.notify_all();
     if (auto s = util::ThreadJoin(t_); !s) {
-      LOG(WARNING) << "Slot migrating thread operation failed: " << s.Msg();
+      warn("Slot migrating thread operation failed: {}", s.Msg());
     }
   }
 }
@@ -159,11 +158,9 @@ void SlotMigrator::loop() {
       clean();
       return;
     }
-
-    LOG(INFO) << "[migrate] Migrating slot(s): " << 
migration_job_->slot_range.String()
-              << ", dst_ip: " << migration_job_->dst_ip << ", dst_port: " << 
migration_job_->dst_port
-              << ", max_speed: " << migration_job_->max_speed
-              << ", max_pipeline_size: " << migration_job_->max_pipeline_size;
+    info("[migrate] Migrating slot(s): {}, dst_ip: {}, dst_port: {}, 
max_speed: {}, max_pipeline_size: {}",
+         migration_job_->slot_range.String(), migration_job_->dst_ip, 
migration_job_->dst_port,
+         migration_job_->max_speed, migration_job_->max_pipeline_size);
 
     dst_ip_ = migration_job_->dst_ip;
     dst_port_ = migration_job_->dst_port;
@@ -180,7 +177,7 @@ void SlotMigrator::runMigrationProcess() {
 
   while (true) {
     if (isTerminated()) {
-      LOG(WARNING) << "[migrate] Will stop state machine, because the thread 
was terminated";
+      warn("[migrate] Will stop state machine, because the thread was 
terminated");
       clean();
       return;
     }
@@ -189,11 +186,10 @@ void SlotMigrator::runMigrationProcess() {
       case SlotMigrationStage::kStart: {
         auto s = startMigration();
         if (s.IsOK()) {
-          LOG(INFO) << "[migrate] Succeed to start migrating slot(s) " << 
slot_range_.load().String();
+          info("[migrate] Succeed to start migrating slot(s) {}", 
slot_range_.load().String());
           current_stage_ = SlotMigrationStage::kSnapshot;
         } else {
-          LOG(ERROR) << "[migrate] Failed to start migrating slot(s) " << 
slot_range_.load().String()
-                     << ". Error: " << s.Msg();
+          error("[migrate] Failed to start migrating slot(s) {}. Error: {}", 
slot_range_.load().String(), s.Msg());
           current_stage_ = SlotMigrationStage::kFailed;
           resumeSyncCtx(s);
         }
@@ -204,8 +200,7 @@ void SlotMigrator::runMigrationProcess() {
         if (s.IsOK()) {
           current_stage_ = SlotMigrationStage::kWAL;
         } else {
-          LOG(ERROR) << "[migrate] Failed to send snapshot of slot(s) " << 
slot_range_.load().String()
-                     << ". Error: " << s.Msg();
+          error("[migrate] Failed to send snapshot of slot(s) {}. Error: {}", 
slot_range_.load().String(), s.Msg());
           current_stage_ = SlotMigrationStage::kFailed;
           resumeSyncCtx(s);
         }
@@ -214,11 +209,10 @@ void SlotMigrator::runMigrationProcess() {
       case SlotMigrationStage::kWAL: {
         auto s = syncWAL();
         if (s.IsOK()) {
-          LOG(INFO) << "[migrate] Succeed to sync from WAL for slot(s) " << 
slot_range_.load().String();
+          info("[migrate] Succeed to sync from WAL for slot(s) {}", 
slot_range_.load().String());
           current_stage_ = SlotMigrationStage::kSuccess;
         } else {
-          LOG(ERROR) << "[migrate] Failed to sync from WAL for slot(s) " << 
slot_range_.load().String()
-                     << ". Error: " << s.Msg();
+          error("[migrate] Failed to sync from WAL for slot(s) {}. Error: {}", 
slot_range_.load().String(), s.Msg());
           current_stage_ = SlotMigrationStage::kFailed;
           resumeSyncCtx(s);
         }
@@ -227,13 +221,13 @@ void SlotMigrator::runMigrationProcess() {
       case SlotMigrationStage::kSuccess: {
         auto s = finishSuccessfulMigration();
         if (s.IsOK()) {
-          LOG(INFO) << "[migrate] Succeed to migrate slot(s) " << 
slot_range_.load().String();
+          info("[migrate] Succeed to migrate slot(s) {}", 
slot_range_.load().String());
           current_stage_ = SlotMigrationStage::kClean;
           migration_state_ = MigrationState::kSuccess;
           resumeSyncCtx(s);
         } else {
-          LOG(ERROR) << "[migrate] Failed to finish a successful migration of 
slot(s) " << slot_range_.load().String()
-                     << ". Error: " << s.Msg();
+          error("[migrate] Failed to finish a successful migration of slot(s) 
{}. Error: {}",
+                slot_range_.load().String(), s.Msg());
           current_stage_ = SlotMigrationStage::kFailed;
           resumeSyncCtx(s);
         }
@@ -242,10 +236,10 @@ void SlotMigrator::runMigrationProcess() {
       case SlotMigrationStage::kFailed: {
         auto s = finishFailedMigration();
         if (!s.IsOK()) {
-          LOG(ERROR) << "[migrate] Failed to finish a failed migration of 
slot(s) " << slot_range_.load().String()
-                     << ". Error: " << s.Msg();
+          error("[migrate] Failed to finish a failed migration of slot(s) {}. 
Error: {}", slot_range_.load().String(),
+                s.Msg());
         }
-        LOG(INFO) << "[migrate] Failed to migrate a slot(s) " << 
slot_range_.load().String();
+        info("[migrate] Failed to migrate a slot(s) {}", 
slot_range_.load().String());
         migration_state_ = MigrationState::kFailed;
         current_stage_ = SlotMigrationStage::kClean;
         break;
@@ -255,7 +249,7 @@ void SlotMigrator::runMigrationProcess() {
         return;
       }
       default:
-        LOG(ERROR) << "[migrate] Unexpected state for the state machine: " << 
static_cast<int>(current_stage_);
+        error("[migrate] Unexpected state for the state machine: {}", 
static_cast<int>(current_stage_));
         clean();
         return;
     }
@@ -302,13 +296,11 @@ Status SlotMigrator::startMigration() {
   if (migration_type_ == MigrationType::kRawKeyValue) {
     bool supported = GET_OR_RET(supportedApplyBatchCommandOnDstNode(*dst_fd_));
     if (!supported) {
-      LOG(INFO) << "APPLYBATCH command is not supported, use redis command for 
migration";
+      info("APPLYBATCH command is not supported, use redis command for 
migration");
       migration_type_ = MigrationType::kRedisCommand;
     }
   }
-
-  LOG(INFO) << "[migrate] Start migrating slot(s) " << 
slot_range_.load().String() << ", connect destination fd "
-            << *dst_fd_;
+  info("[migrate] Start migrating slot(s) {}, connect destination fd {}", 
slot_range_.load().String(), *dst_fd_);
 
   return Status::OK();
 }
@@ -337,12 +329,11 @@ Status SlotMigrator::sendSnapshotByCmd() {
   uint64_t empty_key_cnt = 0;
   std::string restore_cmds;
   SlotRange slot_range = slot_range_;
-
-  LOG(INFO) << "[migrate] Start migrating snapshot of slot(s): " << 
slot_range.String();
+  info("[migrate] Start migrating snapshot of slot(s): {}", 
slot_range.String());
 
   // Construct key prefix to iterate the keys belong to the target slot
   std::string prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
-  LOG(INFO) << "[migrate] Iterate keys of slot(s), key's prefix: " << prefix;
+  info("[migrate] Iterate keys of slot(s), key's prefix: {}", prefix);
 
   std::string upper_bound = ComposeSlotKeyUpperBound(namespace_, 
slot_range.end);
   rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
@@ -379,23 +370,23 @@ Status SlotMigrator::sendSnapshotByCmd() {
     }
 
     if (*result == KeyMigrationResult::kMigrated) {
-      LOG(INFO) << "[migrate] The key " << user_key << " successfully 
migrated";
+      info("[migrate] The key {} successfully migrated", user_key);
       migrated_key_cnt++;
     } else if (*result == KeyMigrationResult::kExpired) {
-      LOG(INFO) << "[migrate] The key " << user_key << " is expired";
+      info("[migrate] The key {} is expired", user_key);
       expired_key_cnt++;
     } else if (*result == KeyMigrationResult::kUnderlyingStructEmpty) {
-      LOG(INFO) << "[migrate] The key " << user_key << " has no elements";
+      info("[migrate] The key {} has no elements", user_key);
       empty_key_cnt++;
     } else {
-      LOG(ERROR) << "[migrate] Migrated a key " << user_key << " with 
unexpected result: " << static_cast<int>(*result);
+      error("[migrate] Migrated a key {} with unexpected result: {}", 
user_key, static_cast<int>(*result));
       return {Status::NotOK};
     }
   }
 
   if (auto s = iter->status(); !s.ok()) {
     auto err_str = s.ToString();
-    LOG(ERROR) << "[migrate] Failed to iterate keys of slot " << current_slot 
<< ": " << err_str;
+    error("[migrate] Failed to iterate keys of slot {}: {}", current_slot, 
err_str);
     return {Status::NotOK, fmt::format("failed to iterate keys of slot {}: 
{}", current_slot, err_str)};
   }
 
@@ -405,10 +396,9 @@ Status SlotMigrator::sendSnapshotByCmd() {
   if (!s.IsOK()) {
     return s.Prefixed(errFailedToSendCommands);
   }
-
-  LOG(INFO) << "[migrate] Succeed to migrate slot(s) snapshot, slot(s): " << 
slot_range.String()
-            << ", Migrated keys: " << migrated_key_cnt << ", Expired keys: " 
<< expired_key_cnt
-            << ", Empty keys: " << empty_key_cnt;
+  info(
+      "[migrate] Succeed to migrate slot(s) snapshot, slot(s): {}, Migrated 
keys: {}, Expired keys: {}, Empty keys: {}",
+      slot_range.String(), migrated_key_cnt, expired_key_cnt, empty_key_cnt);
 
   return Status::OK();
 }
@@ -469,7 +459,7 @@ Status SlotMigrator::finishFailedMigration() {
 }
 
 void SlotMigrator::clean() {
-  LOG(INFO) << "[migrate] Clean resources of migrating slot(s) " << 
slot_range_.load().String();
+  info("[migrate] Clean resources of migrating slot(s) {}", 
slot_range_.load().String());
   if (slot_snapshot_) {
     storage_->GetDB()->ReleaseSnapshot(slot_snapshot_);
     slot_snapshot_ = nullptr;
@@ -603,7 +593,7 @@ Status SlotMigrator::checkMultipleResponses(int sock_fd, 
int total) {
         case ParserState::ArrayLen: {
           UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
           if (!line) {
-            LOG(INFO) << "[migrate] Event buffer is empty, read socket again";
+            info("[migrate] Event buffer is empty, read socket again");
             run = false;
             break;
           }
@@ -635,7 +625,7 @@ Status SlotMigrator::checkMultipleResponses(int sock_fd, 
int total) {
         // Handle bulk string response
         case ParserState::BulkData: {
           if (evbuffer_get_length(evbuf.get()) < bulk_or_array_len + 2) {
-            LOG(INFO) << "[migrate] Bulk data in event buffer is not complete, 
read socket again";
+            info("[migrate] Bulk data in event buffer is not complete, read 
socket again");
             run = false;
             break;
           }
@@ -649,7 +639,7 @@ Status SlotMigrator::checkMultipleResponses(int sock_fd, 
int total) {
           while (run && bulk_or_array_len > 0) {
             evbuffer_ptr ptr = evbuffer_search_eol(evbuf.get(), nullptr, 
nullptr, EVBUFFER_EOL_CRLF_STRICT);
             if (ptr.pos < 0) {
-              LOG(INFO) << "[migrate] Array data in event buffer is not 
complete, read socket again";
+              info("[migrate] Array data in event buffer is not complete, read 
socket again");
               run = false;
               break;
             }
@@ -1016,7 +1006,7 @@ Status SlotMigrator::sendCmdsPipelineIfNeed(std::string 
*commands, bool need) {
   }
 
   if (current_pipeline_size_ == 0) {
-    LOG(INFO) << "[migrate] No commands to send";
+    info("[migrate] No commands to send");
     return Status::OK();
   }
 
@@ -1042,7 +1032,7 @@ Status SlotMigrator::sendCmdsPipelineIfNeed(std::string 
*commands, bool need) {
 }
 
 void SlotMigrator::setForbiddenSlotRange(const SlotRange &slot_range) {
-  LOG(INFO) << "[migrate] Setting forbidden slot(s) " << slot_range.String();
+  info("[migrate] Setting forbidden slot(s) {}", slot_range.String());
   // Block server to set forbidden slot
   uint64_t during = util::GetTimeStampUS();
   {
@@ -1050,11 +1040,11 @@ void SlotMigrator::setForbiddenSlotRange(const 
SlotRange &slot_range) {
     forbidden_slot_range_ = slot_range;
   }
   during = util::GetTimeStampUS() - during;
-  LOG(INFO) << "[migrate] To set forbidden slot, server was blocked for " << 
during << "us";
+  info("[migrate] To set forbidden slot, server was blocked for {} us", 
during);
 }
 
 void SlotMigrator::ReleaseForbiddenSlotRange() {
-  LOG(INFO) << "[migrate] Release forbidden slot(s) " << 
forbidden_slot_range_.load().String();
+  info("[migrate] Release forbidden slot(s) {}", 
forbidden_slot_range_.load().String());
   forbidden_slot_range_ = {-1, -1};
 }
 
@@ -1067,7 +1057,7 @@ void SlotMigrator::applyMigrationSpeedLimit() const {
     }
     if (last_send_time_ + per_request_time > current_time) {
       uint64_t during = last_send_time_ + per_request_time - current_time;
-      LOG(INFO) << "[migrate] Sleep to limit migration speed for: " << during;
+      info("[migrate] Sleep to limit migration speed for: {}", during);
       std::this_thread::sleep_for(std::chrono::microseconds(during));
     }
   }
@@ -1078,7 +1068,7 @@ Status 
SlotMigrator::generateCmdsFromBatch(rocksdb::BatchResult *batch, std::str
   WriteBatchExtractor write_batch_extractor(storage_->IsSlotIdEncoded(), 
slot_range_, false);
   rocksdb::Status status = 
batch->writeBatchPtr->Iterate(&write_batch_extractor);
   if (!status.ok()) {
-    LOG(ERROR) << "[migrate] Failed to parse write batch, Err: " << 
status.ToString();
+    error("[migrate] Failed to parse write batch, Err: {}", status.ToString());
     return {Status::NotOK};
   }
 
@@ -1096,7 +1086,7 @@ Status 
SlotMigrator::generateCmdsFromBatch(rocksdb::BatchResult *batch, std::str
 
 Status 
SlotMigrator::migrateIncrementData(std::unique_ptr<rocksdb::TransactionLogIterator>
 *iter, uint64_t end_seq) {
   if (!(*iter) || !(*iter)->Valid()) {
-    LOG(ERROR) << "[migrate] WAL iterator is invalid";
+    error("[migrate] WAL iterator is invalid");
     return {Status::NotOK};
   }
 
@@ -1105,40 +1095,40 @@ Status 
SlotMigrator::migrateIncrementData(std::unique_ptr<rocksdb::TransactionLo
 
   while (true) {
     if (stop_migration_) {
-      LOG(ERROR) << "[migrate] Migration task end during migrating WAL data";
+      error("[migrate] Migration task end during migrating WAL data");
       return {Status::NotOK};
     }
 
     auto batch = (*iter)->GetBatch();
     if (batch.sequence != next_seq) {
-      LOG(ERROR) << "[migrate] WAL iterator is discrete, some seq might be 
lost"
-                 << ", expected sequence: " << next_seq << ", but got 
sequence: " << batch.sequence;
+      error("[migrate] WAL iterator is discrete, some seq might be lost, 
expected sequence: {}, but got sequence: {}",
+            next_seq, batch.sequence);
       return {Status::NotOK};
     }
 
     // Generate commands by iterating write batch
     auto s = generateCmdsFromBatch(&batch, &commands);
     if (!s.IsOK()) {
-      LOG(ERROR) << "[migrate] Failed to generate commands from write batch";
+      error("[migrate] Failed to generate commands from write batch");
       return {Status::NotOK};
     }
 
     // Check whether command pipeline should be sent
     s = sendCmdsPipelineIfNeed(&commands, false);
     if (!s.IsOK()) {
-      LOG(ERROR) << "[migrate] Failed to send WAL commands pipeline";
+      error("[migrate] Failed to send WAL commands pipeline");
       return {Status::NotOK};
     }
 
     next_seq = batch.sequence + batch.writeBatchPtr->Count();
     if (next_seq > end_seq) {
-      LOG(INFO) << "[migrate] Migrate incremental data an epoch OK, seq from " 
<< wal_begin_seq_ << ", to " << end_seq;
+      info("[migrate] Migrate incremental data an epoch OK, seq from {}, to 
{}", wal_begin_seq_, end_seq);
       break;
     }
 
     (*iter)->Next();
     if (!(*iter)->Valid()) {
-      LOG(ERROR) << "[migrate] WAL iterator is invalid, expected end seq: " << 
end_seq << ", next seq: " << next_seq;
+      error("[migrate] WAL iterator is invalid, expected end seq: {}, next 
seq: {}", end_seq, next_seq);
       return {Status::NotOK};
     }
   }
@@ -1146,7 +1136,7 @@ Status 
SlotMigrator::migrateIncrementData(std::unique_ptr<rocksdb::TransactionLo
   // Send the left data of this epoch
   auto s = sendCmdsPipelineIfNeed(&commands, true);
   if (!s.IsOK()) {
-    LOG(ERROR) << "[migrate] Failed to send WAL last commands in pipeline";
+    error("[migrate] Failed to send WAL last commands in pipeline");
     return {Status::NotOK};
   }
 
@@ -1160,31 +1150,29 @@ Status SlotMigrator::syncWalBeforeForbiddingSlot() {
     uint64_t latest_seq = storage_->GetDB()->GetLatestSequenceNumber();
     uint64_t gap = latest_seq - wal_begin_seq_;
     if (gap <= static_cast<uint64_t>(seq_gap_limit_)) {
-      LOG(INFO) << "[migrate] Incremental data sequence: " << gap << ", less 
than limit: " << seq_gap_limit_
-                << ", go to set forbidden slot";
+      info("[migrate] Incremental data sequence: {}, less than limit: {}, go 
to set forbidden slot", gap,
+           seq_gap_limit_);
       break;
     }
 
     std::unique_ptr<rocksdb::TransactionLogIterator> iter = nullptr;
     auto s = storage_->GetWALIter(wal_begin_seq_ + 1, &iter);
     if (!s.IsOK()) {
-      LOG(ERROR) << "[migrate] Failed to generate WAL iterator before setting 
forbidden slot"
-                 << ", Err: " << s.Msg();
+      error("[migrate] Failed to generate WAL iterator before setting 
forbidden slot, Err: {}", s.Msg());
       return {Status::NotOK};
     }
 
     // Iterate wal and migrate data
     s = migrateIncrementData(&iter, latest_seq);
     if (!s.IsOK()) {
-      LOG(ERROR) << "[migrate] Failed to migrate WAL data before setting 
forbidden slot";
+      error("[migrate] Failed to migrate WAL data before setting forbidden 
slot");
       return {Status::NotOK};
     }
 
     wal_begin_seq_ = latest_seq;
     count++;
   }
-
-  LOG(INFO) << "[migrate] Succeed to migrate incremental data before setting 
forbidden slot, end epoch: " << count;
+  info("[migrate] Succeed to migrate incremental data before setting forbidden 
slot, end epoch: {}", count);
   return Status::OK();
 }
 
@@ -1198,15 +1186,14 @@ Status SlotMigrator::syncWalAfterForbiddingSlot() {
   std::unique_ptr<rocksdb::TransactionLogIterator> iter = nullptr;
   auto s = storage_->GetWALIter(wal_begin_seq_ + 1, &iter);
   if (!s.IsOK()) {
-    LOG(ERROR) << "[migrate] Failed to generate WAL iterator after setting 
forbidden slot"
-               << ", Err: " << s.Msg();
+    error("[migrate] Failed to generate WAL iterator after setting forbidden 
slot, Err: {}", s.Msg());
     return {Status::NotOK};
   }
 
   // Send incremental data
   s = migrateIncrementData(&iter, latest_seq);
   if (!s.IsOK()) {
-    LOG(ERROR) << "[migrate] Failed to migrate WAL data after setting 
forbidden slot";
+    error("[migrate] Failed to migrate WAL data after setting forbidden slot");
     return {Status::NotOK};
   }
 
@@ -1271,7 +1258,7 @@ Status SlotMigrator::sendMigrationBatch(BatchSender 
*batch) {
 Status SlotMigrator::sendSnapshotByRawKV() {
   uint64_t start_ts = util::GetTimeStampMS();
   auto slot_range = slot_range_.load();
-  LOG(INFO) << fmt::format("[migrate] Migrating snapshot of slot(s) {} by raw 
key value", slot_range.String());
+  info("[migrate] Migrating snapshot of slot(s) {} by raw key value", 
slot_range.String());
 
   auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
   auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
@@ -1337,9 +1324,9 @@ Status SlotMigrator::sendSnapshotByRawKV() {
   GET_OR_RET(sendMigrationBatch(&batch_sender));
 
   auto elapsed = util::GetTimeStampMS() - start_ts;
-  LOG(INFO) << fmt::format(
-      "[migrate] Succeed to migrate snapshot range, slot(s): {}, elapsed: {} 
ms, "
-      "sent: {} bytes, rate: {:.2f} kb/s, batches: {}, entries: {}",
+  info(
+      "[migrate] Succeed to migrate snapshot range, slot(s): {}, elapsed: {} 
ms, sent: {} bytes, rate: {:.2f} kb/s, "
+      "batches: {}, entries: {}",
       slot_range.String(), elapsed, batch_sender.GetSentBytes(), 
batch_sender.GetRate(start_ts),
       batch_sender.GetSentBatchesNum(), batch_sender.GetEntriesNum());
 
@@ -1348,7 +1335,7 @@ Status SlotMigrator::sendSnapshotByRawKV() {
 
 Status SlotMigrator::syncWALByRawKV() {
   uint64_t start_ts = util::GetTimeStampMS();
-  LOG(INFO) << "[migrate] Syncing WAL of slot(s) " << 
slot_range_.load().String() << " by raw key value";
+  info("[migrate] Syncing WAL of slot(s) {} by raw key value", 
slot_range_.load().String());
   BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_, 
migrate_batch_bytes_per_sec_);
 
   int epoch = 1;
@@ -1363,8 +1350,8 @@ Status SlotMigrator::syncWALByRawKV() {
     if (!s.IsOK()) {
       return {Status::NotOK, fmt::format("migrate incremental data failed, 
{}", s.Msg())};
     }
-    LOG(INFO) << fmt::format("[migrate] Migrated incremental data, epoch: {}, 
seq from {} to {}", epoch, wal_begin_seq_,
-                             wal_incremental_seq);
+    info("[migrate] Migrated incremental data, epoch: {}, seq from {} to {}", 
epoch, wal_begin_seq_,
+         wal_incremental_seq);
     wal_begin_seq_ = wal_incremental_seq;
     epoch++;
   }
@@ -1377,12 +1364,12 @@ Status SlotMigrator::syncWALByRawKV() {
     if (!s.IsOK()) {
       return {Status::NotOK, fmt::format("migrate last incremental data 
failed, {}", s.Msg())};
     }
-    LOG(INFO) << fmt::format("[migrate] Migrated last incremental data after 
set forbidden slot, seq from {} to {}",
-                             wal_begin_seq_, wal_incremental_seq);
+    info("[migrate] Migrated last incremental data after set forbidden slot, 
seq from {} to {}", wal_begin_seq_,
+         wal_incremental_seq);
   }
 
   auto elapsed = util::GetTimeStampMS() - start_ts;
-  LOG(INFO) << fmt::format(
+  info(
       "[migrate] Succeed to migrate incremental data, slot(s): {}, elapsed: {} 
ms, "
       "sent: {} bytes, rate: {:.2f} kb/s, batches: {}, entries: {}",
       slot_range_.load().String(), elapsed, batch_sender.GetSentBytes(), 
batch_sender.GetRate(start_ts),
@@ -1394,9 +1381,8 @@ Status SlotMigrator::syncWALByRawKV() {
 bool SlotMigrator::catchUpIncrementalWAL() {
   uint64_t gap = storage_->GetDB()->GetLatestSequenceNumber() - wal_begin_seq_;
   if (gap <= seq_gap_limit_) {
-    LOG(INFO) << fmt::format(
-        "[migrate] Incremental data sequence gap: {}, less than limit: {}, set 
forbidden slot(s): {}", gap,
-        seq_gap_limit_, slot_range_.load().String());
+    info("[migrate] Incremental data sequence gap: {}, less than limit: {}, 
set forbidden slot(s): {}", gap,
+         seq_gap_limit_, slot_range_.load().String());
     return true;
   }
   return false;
@@ -1417,7 +1403,7 @@ Status 
SlotMigrator::migrateIncrementalDataByRawKV(uint64_t end_seq, BatchSender
       }
       case engine::WALItem::Type::kTypePut: {
         if (item.column_family_id > kMaxColumnFamilyID) {
-          LOG(INFO) << fmt::format("[migrate] Invalid put column family id: 
{}", item.column_family_id);
+          info("[migrate] Invalid put column family id: {}", 
item.column_family_id);
           continue;
         }
         
GET_OR_RET(batch_sender->Put(storage_->GetCFHandle(static_cast<ColumnFamilyID>(item.column_family_id)),
@@ -1426,7 +1412,7 @@ Status 
SlotMigrator::migrateIncrementalDataByRawKV(uint64_t end_seq, BatchSender
       }
       case engine::WALItem::Type::kTypeDelete: {
         if (item.column_family_id > kMaxColumnFamilyID) {
-          LOG(INFO) << fmt::format("[migrate] Invalid delete column family id: 
{}", item.column_family_id);
+          info("[migrate] Invalid delete column family id: {}", 
item.column_family_id);
           continue;
         }
         GET_OR_RET(
diff --git a/src/cluster/sync_migrate_context.cc 
b/src/cluster/sync_migrate_context.cc
index 7b51c9827..6a4e532f6 100644
--- a/src/cluster/sync_migrate_context.cc
+++ b/src/cluster/sync_migrate_context.cc
@@ -35,8 +35,7 @@ void SyncMigrateContext::Resume(const Status &migrate_result) 
{
   migrate_result_ = migrate_result;
   auto s = conn_->Owner()->EnableWriteEvent(conn_->GetFD());
   if (!s.IsOK()) {
-    LOG(ERROR) << "[server] Failed to enable write event on the sync migrate 
connection " << conn_->GetFD() << ": "
-               << s.Msg();
+    error("[server] Failed to enable write event on the sync migrate 
connection {}: {}", conn_->GetFD(), s.Msg());
   }
 }
 
diff --git a/src/cluster/sync_migrate_context.h 
b/src/cluster/sync_migrate_context.h
index 818736cb4..e3593c1dc 100644
--- a/src/cluster/sync_migrate_context.h
+++ b/src/cluster/sync_migrate_context.h
@@ -25,7 +25,7 @@
 class SyncMigrateContext : private EvbufCallbackBase<SyncMigrateContext, 
false>,
                            private EventCallbackBase<SyncMigrateContext> {
  public:
-  SyncMigrateContext(Server *srv, redis::Connection *conn, int timeout) : 
srv_(srv), conn_(conn), timeout_(timeout){};
+  SyncMigrateContext(Server *srv, redis::Connection *conn, int timeout) : 
srv_(srv), conn_(conn), timeout_(timeout) {}
 
   void Suspend();
   void Resume(const Status &migrate_result);

Reply via email to