This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 610bb40a Refactor replication callbacks via templates (#1646)
610bb40a is described below

commit 610bb40ade146c74303b2db98599b1959cb0fbb5
Author: Twice <[email protected]>
AuthorDate: Tue Aug 8 05:22:29 2023 +0800

    Refactor replication callbacks via templates (#1646)
    
    Co-authored-by: Binbin <[email protected]>
    Co-authored-by: hulk <[email protected]>
---
 src/cluster/replication.cc | 237 +++++++++++++++++++++------------------------
 src/cluster/replication.h  |  43 ++++----
 src/common/event_util.h    |  23 +++++
 3 files changed, 156 insertions(+), 147 deletions(-)

diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 39a2ade7..261c49a5 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -161,54 +161,50 @@ void SendString(bufferevent *bev, const std::string 
&data) {
   evbuffer_add(output, data.c_str(), data.length());
 }
 
-void ReplicationThread::CallbacksStateMachine::ConnEventCB(bufferevent *bev, 
int16_t events, void *state_machine_ptr) {
+void ReplicationThread::CallbacksStateMachine::ConnEventCB(bufferevent *bev, 
int16_t events) {
   if (events & BEV_EVENT_CONNECTED) {
     // call write_cb when connected
     bufferevent_data_cb write_cb = nullptr;
     bufferevent_getcb(bev, nullptr, &write_cb, nullptr, nullptr);
-    if (write_cb) write_cb(bev, state_machine_ptr);
+    if (write_cb) write_cb(bev, this);
     return;
   }
   if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) {
     LOG(ERROR) << "[replication] connection error/eof, reconnect the master";
     // Wait a bit and reconnect
-    auto state_m = static_cast<CallbacksStateMachine *>(state_machine_ptr);
-    state_m->repl_->repl_state_.store(kReplConnecting, 
std::memory_order_relaxed);
+    repl_->repl_state_.store(kReplConnecting, std::memory_order_relaxed);
     std::this_thread::sleep_for(std::chrono::seconds(1));
-    state_m->Stop();
-    state_m->Start();
+    Stop();
+    Start();
   }
 }
 
-void ReplicationThread::CallbacksStateMachine::SetReadCB(bufferevent *bev, 
bufferevent_data_cb cb,
-                                                         void 
*state_machine_ptr) {
+void ReplicationThread::CallbacksStateMachine::SetReadCB(bufferevent *bev, 
bufferevent_data_cb cb) {
   bufferevent_enable(bev, EV_READ);
-  bufferevent_setcb(bev, cb, nullptr, ConnEventCB, state_machine_ptr);
+  bufferevent_setcb(bev, cb, nullptr, 
EventCallbackFunc<&CallbacksStateMachine::ConnEventCB>, this);
 }
 
-void ReplicationThread::CallbacksStateMachine::SetWriteCB(bufferevent *bev, 
bufferevent_data_cb cb,
-                                                          void 
*state_machine_ptr) {
+void ReplicationThread::CallbacksStateMachine::SetWriteCB(bufferevent *bev, 
bufferevent_data_cb cb) {
   bufferevent_enable(bev, EV_WRITE);
-  bufferevent_setcb(bev, nullptr, cb, ConnEventCB, state_machine_ptr);
+  bufferevent_setcb(bev, nullptr, cb, 
EventCallbackFunc<&CallbacksStateMachine::ConnEventCB>, this);
 }
 
-void ReplicationThread::CallbacksStateMachine::EvCallback(bufferevent *bev, 
void *ctx) {
-  auto self = static_cast<CallbacksStateMachine *>(ctx);
+void ReplicationThread::CallbacksStateMachine::ReadWriteCB(bufferevent *bev) {
 LOOP_LABEL:
-  assert(self->handler_idx_ <= self->handlers_.size());
-  DLOG(INFO) << "[replication] Execute handler[" << 
self->getHandlerName(self->handler_idx_) << "]";
-  auto st = self->getHandlerFunc(self->handler_idx_)(bev, self->repl_);
-  self->repl_->last_io_time_.store(util::GetTimeStamp(), 
std::memory_order_relaxed);
+  assert(handler_idx_ <= handlers_.size());
+  DLOG(INFO) << "[replication] Execute handler[" << 
getHandlerName(handler_idx_) << "]";
+  auto st = getHandlerFunc(handler_idx_)(repl_, bev);
+  repl_->last_io_time_.store(util::GetTimeStamp(), std::memory_order_relaxed);
   switch (st) {
     case CBState::NEXT:
-      ++self->handler_idx_;
+      ++handler_idx_;
       [[fallthrough]];
     case CBState::PREV:
-      if (st == CBState::PREV) --self->handler_idx_;
-      if (self->getHandlerEventType(self->handler_idx_) == WRITE) {
-        SetWriteCB(bev, EvCallback, ctx);
+      if (st == CBState::PREV) --handler_idx_;
+      if (getHandlerEventType(handler_idx_) == WRITE) {
+        SetWriteCB(bev, 
EventCallbackFunc<&CallbacksStateMachine::ReadWriteCB>);
       } else {
-        SetReadCB(bev, EvCallback, ctx);
+        SetReadCB(bev, EventCallbackFunc<&CallbacksStateMachine::ReadWriteCB>);
       }
       // invoke the read handler (of next step) directly, as the bev might
       // have the data already.
@@ -217,19 +213,19 @@ LOOP_LABEL:
       break;
     case CBState::QUIT:  // state that can not be retry, or all steps are 
executed.
       bufferevent_free(bev);
-      self->bev_ = nullptr;
-      self->repl_->repl_state_.store(kReplError, std::memory_order_relaxed);
+      bev_ = nullptr;
+      repl_->repl_state_.store(kReplError, std::memory_order_relaxed);
       break;
     case CBState::RESTART:  // state that can be retried some time later
-      self->Stop();
-      if (self->repl_->stop_flag_) {
+      Stop();
+      if (repl_->stop_flag_) {
         LOG(INFO) << "[replication] Wouldn't restart while the replication 
thread was stopped";
         break;
       }
-      self->repl_->repl_state_.store(kReplConnecting, 
std::memory_order_relaxed);
+      repl_->repl_state_.store(kReplConnecting, std::memory_order_relaxed);
       LOG(INFO) << "[replication] Retry in 10 seconds";
       std::this_thread::sleep_for(std::chrono::seconds(10));
-      self->Start();
+      Start();
   }
 }
 
@@ -243,8 +239,8 @@ void ReplicationThread::CallbacksStateMachine::Start() {
   // Note: It may cause data races to use 'masterauth' directly.
   // It is acceptable because password change is a low frequency operation.
   if (!repl_->srv_->GetConfig()->masterauth.empty()) {
-    handlers_.emplace_front(CallbacksStateMachine::READ, "auth read", 
authReadCB);
-    handlers_.emplace_front(CallbacksStateMachine::WRITE, "auth write", 
authWriteCB);
+    handlers_.emplace_front(CallbacksStateMachine::READ, "auth read", 
&ReplicationThread::authReadCB);
+    handlers_.emplace_front(CallbacksStateMachine::WRITE, "auth write", 
&ReplicationThread::authWriteCB);
   }
 
   uint64_t last_connect_timestamp = 0;
@@ -275,9 +271,9 @@ void ReplicationThread::CallbacksStateMachine::Start() {
   handler_idx_ = 0;
   repl_->incr_state_ = Incr_batch_size;
   if (getHandlerEventType(0) == WRITE) {
-    SetWriteCB(bev, EvCallback, this);
+    SetWriteCB(bev, EventCallbackFunc<&CallbacksStateMachine::ReadWriteCB>);
   } else {
-    SetReadCB(bev, EvCallback, this);
+    SetReadCB(bev, EventCallbackFunc<&CallbacksStateMachine::ReadWriteCB>);
   }
   bev_ = bev;
 }
@@ -298,18 +294,17 @@ ReplicationThread::ReplicationThread(std::string host, 
uint32_t port, Server *sr
       psync_steps_(
           this,
           CallbacksStateMachine::CallbackList{
-              
CallbacksStateMachine::CallbackType{CallbacksStateMachine::WRITE, "dbname 
write", checkDBNameWriteCB},
-              CallbacksStateMachine::CallbackType{CallbacksStateMachine::READ, 
"dbname read", checkDBNameReadCB},
-              
CallbacksStateMachine::CallbackType{CallbacksStateMachine::WRITE, "replconf 
write", replConfWriteCB},
-              CallbacksStateMachine::CallbackType{CallbacksStateMachine::READ, 
"replconf read", replConfReadCB},
-              
CallbacksStateMachine::CallbackType{CallbacksStateMachine::WRITE, "psync 
write", tryPSyncWriteCB},
-              CallbacksStateMachine::CallbackType{CallbacksStateMachine::READ, 
"psync read", tryPSyncReadCB},
-              CallbacksStateMachine::CallbackType{CallbacksStateMachine::READ, 
"batch loop", incrementBatchLoopCB}}),
+              CallbackType{CallbacksStateMachine::WRITE, "dbname write", 
&ReplicationThread::checkDBNameWriteCB},
+              CallbackType{CallbacksStateMachine::READ, "dbname read", 
&ReplicationThread::checkDBNameReadCB},
+              CallbackType{CallbacksStateMachine::WRITE, "replconf write", 
&ReplicationThread::replConfWriteCB},
+              CallbackType{CallbacksStateMachine::READ, "replconf read", 
&ReplicationThread::replConfReadCB},
+              CallbackType{CallbacksStateMachine::WRITE, "psync write", 
&ReplicationThread::tryPSyncWriteCB},
+              CallbackType{CallbacksStateMachine::READ, "psync read", 
&ReplicationThread::tryPSyncReadCB},
+              CallbackType{CallbacksStateMachine::READ, "batch loop", 
&ReplicationThread::incrementBatchLoopCB}}),
       fullsync_steps_(
-          this,
-          CallbacksStateMachine::CallbackList{
-              
CallbacksStateMachine::CallbackType{CallbacksStateMachine::WRITE, "fullsync 
write", fullSyncWriteCB},
-              CallbacksStateMachine::CallbackType{CallbacksStateMachine::READ, 
"fullsync read", fullSyncReadCB}}) {}
+          this, CallbacksStateMachine::CallbackList{
+                    CallbackType{CallbacksStateMachine::WRITE, "fullsync 
write", &ReplicationThread::fullSyncWriteCB},
+                    CallbackType{CallbacksStateMachine::READ, "fullsync read", 
&ReplicationThread::fullSyncReadCB}}) {}
 
 Status ReplicationThread::Start(std::function<void()> &&pre_fullsync_cb, 
std::function<void()> &&post_fullsync_cb) {
   pre_fullsync_cb_ = std::move(pre_fullsync_cb);
@@ -370,15 +365,14 @@ void ReplicationThread::run() {
   event_base_free(base_);
 }
 
-ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev, 
void *ctx) {
-  auto self = static_cast<ReplicationThread *>(ctx);
-  SendString(bev, redis::MultiBulkString({"AUTH", 
self->srv_->GetConfig()->masterauth}));
+ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev) {
+  SendString(bev, redis::MultiBulkString({"AUTH", 
srv_->GetConfig()->masterauth}));
   LOG(INFO) << "[replication] Auth request was sent, waiting for response";
-  self->repl_state_.store(kReplSendAuth, std::memory_order_relaxed);
+  repl_state_.store(kReplSendAuth, std::memory_order_relaxed);
   return CBState::NEXT;
 }
 
-ReplicationThread::CBState ReplicationThread::authReadCB(bufferevent *bev, 
void *ctx) {
+ReplicationThread::CBState ReplicationThread::authReadCB(bufferevent *bev) {  
// NOLINT
   auto input = bufferevent_get_input(bev);
   UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
   if (!line) return CBState::AGAIN;
@@ -391,15 +385,14 @@ ReplicationThread::CBState 
ReplicationThread::authReadCB(bufferevent *bev, void
   return CBState::NEXT;
 }
 
-ReplicationThread::CBState ReplicationThread::checkDBNameWriteCB(bufferevent 
*bev, void *ctx) {
+ReplicationThread::CBState ReplicationThread::checkDBNameWriteCB(bufferevent 
*bev) {
   SendString(bev, redis::MultiBulkString({"_db_name"}));
-  auto self = static_cast<ReplicationThread *>(ctx);
-  self->repl_state_.store(kReplCheckDBName, std::memory_order_relaxed);
+  repl_state_.store(kReplCheckDBName, std::memory_order_relaxed);
   LOG(INFO) << "[replication] Check db name request was sent, waiting for 
response";
   return CBState::NEXT;
 }
 
-ReplicationThread::CBState ReplicationThread::checkDBNameReadCB(bufferevent 
*bev, void *ctx) {
+ReplicationThread::CBState ReplicationThread::checkDBNameReadCB(bufferevent 
*bev) {
   auto input = bufferevent_get_input(bev);
   UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
   if (!line) return CBState::AGAIN;
@@ -412,8 +405,7 @@ ReplicationThread::CBState 
ReplicationThread::checkDBNameReadCB(bufferevent *bev
     }
     return CBState::RESTART;
   }
-  auto self = static_cast<ReplicationThread *>(ctx);
-  std::string db_name = self->storage_->GetName();
+  std::string db_name = storage_->GetName();
   if (line.length == db_name.size() && !strncmp(line.get(), db_name.data(), 
line.length)) {
     // DB name match, we should continue to next step: TryPsync
     LOG(INFO) << "[replication] DB name is valid, continue...";
@@ -423,31 +415,29 @@ ReplicationThread::CBState 
ReplicationThread::checkDBNameReadCB(bufferevent *bev
   return CBState::RESTART;
 }
 
-ReplicationThread::CBState ReplicationThread::replConfWriteCB(bufferevent 
*bev, void *ctx) {
-  auto self = static_cast<ReplicationThread *>(ctx);
-  auto config = self->srv_->GetConfig();
+ReplicationThread::CBState ReplicationThread::replConfWriteCB(bufferevent 
*bev) {
+  auto config = srv_->GetConfig();
 
   auto port = config->replica_announce_port > 0 ? 
config->replica_announce_port : config->port;
   std::vector<std::string> data_to_send{"replconf", "listening-port", 
std::to_string(port)};
-  if (!self->next_try_without_announce_ip_address_ && 
!config->replica_announce_ip.empty()) {
+  if (!next_try_without_announce_ip_address_ && 
!config->replica_announce_ip.empty()) {
     data_to_send.emplace_back("ip-address");
     data_to_send.emplace_back(config->replica_announce_ip);
   }
   SendString(bev, redis::MultiBulkString(data_to_send));
-  self->repl_state_.store(kReplReplConf, std::memory_order_relaxed);
+  repl_state_.store(kReplReplConf, std::memory_order_relaxed);
   LOG(INFO) << "[replication] replconf request was sent, waiting for response";
   return CBState::NEXT;
 }
 
-ReplicationThread::CBState ReplicationThread::replConfReadCB(bufferevent *bev, 
void *ctx) {
-  auto self = static_cast<ReplicationThread *>(ctx);
+ReplicationThread::CBState ReplicationThread::replConfReadCB(bufferevent *bev) 
{
   auto input = bufferevent_get_input(bev);
   UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
   if (!line) return CBState::AGAIN;
 
   // on unknown option: first try without announce ip, if it fails again - do 
nothing (to prevent infinite loop)
-  if (isUnknownOption(line.get()) && 
!self->next_try_without_announce_ip_address_) {
-    self->next_try_without_announce_ip_address_ = true;
+  if (isUnknownOption(line.get()) && !next_try_without_announce_ip_address_) {
+    next_try_without_announce_ip_address_ = true;
     LOG(WARNING) << "The old version master, can't handle ip-address, "
                  << "try without it again";
     // Retry previous state, i.e. send replconf again
@@ -467,14 +457,13 @@ ReplicationThread::CBState 
ReplicationThread::replConfReadCB(bufferevent *bev, v
   }
 }
 
-ReplicationThread::CBState ReplicationThread::tryPSyncWriteCB(bufferevent 
*bev, void *ctx) {
-  auto self = static_cast<ReplicationThread *>(ctx);
-  auto cur_seq = self->storage_->LatestSeqNumber();
+ReplicationThread::CBState ReplicationThread::tryPSyncWriteCB(bufferevent 
*bev) {
+  auto cur_seq = storage_->LatestSeqNumber();
   auto next_seq = cur_seq + 1;
   std::string replid;
 
   // Get replication id
-  std::string replid_in_wal = self->storage_->GetReplIdFromWalBySeq(cur_seq);
+  std::string replid_in_wal = storage_->GetReplIdFromWalBySeq(cur_seq);
   // Set if valid replication id
   if (replid_in_wal.length() == kReplIdLength) {
     replid = replid_in_wal;
@@ -482,7 +471,7 @@ ReplicationThread::CBState 
ReplicationThread::tryPSyncWriteCB(bufferevent *bev,
     // Maybe there is no WAL, we can get replication id from db since master
     // always write replication id into db before any operation when starting
     // new "replication history".
-    std::string replid_in_db = self->storage_->GetReplIdFromDbEngine();
+    std::string replid_in_db = storage_->GetReplIdFromDbEngine();
     if (replid_in_db.length() == kReplIdLength) {
       replid = replid_in_db;
     }
@@ -490,8 +479,8 @@ ReplicationThread::CBState 
ReplicationThread::tryPSyncWriteCB(bufferevent *bev,
 
   // To adapt to old master using old PSYNC, i.e. only use next sequence id.
   // Also use old PSYNC if replica can't find replication id from WAL and DB.
-  if (!self->srv_->GetConfig()->use_rsid_psync || self->next_try_old_psync_ || 
replid.length() != kReplIdLength) {
-    self->next_try_old_psync_ = false;  // Reset next_try_old_psync_
+  if (!srv_->GetConfig()->use_rsid_psync || next_try_old_psync_ || 
replid.length() != kReplIdLength) {
+    next_try_old_psync_ = false;  // Reset next_try_old_psync_
     SendString(bev, redis::MultiBulkString({"PSYNC", 
std::to_string(next_seq)}));
     LOG(INFO) << "[replication] Try to use psync, next seq: " << next_seq;
   } else {
@@ -500,12 +489,11 @@ ReplicationThread::CBState 
ReplicationThread::tryPSyncWriteCB(bufferevent *bev,
     LOG(INFO) << "[replication] Try to use new psync, current unique 
replication sequence id: " << replid << ":"
               << cur_seq;
   }
-  self->repl_state_.store(kReplSendPSync, std::memory_order_relaxed);
+  repl_state_.store(kReplSendPSync, std::memory_order_relaxed);
   return CBState::NEXT;
 }
 
-ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev, 
void *ctx) {
-  auto self = static_cast<ReplicationThread *>(ctx);
+ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev) 
{
   auto input = bufferevent_get_input(bev);
   UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
   if (!line) return CBState::AGAIN;
@@ -516,7 +504,7 @@ ReplicationThread::CBState 
ReplicationThread::tryPSyncReadCB(bufferevent *bev, v
   }
 
   if (line[0] == '-' && isWrongPsyncNum(line.get())) {
-    self->next_try_old_psync_ = true;
+    next_try_old_psync_ = true;
     LOG(WARNING) << "The old version master, can't handle new PSYNC, "
                  << "try old PSYNC again";
     // Retry previous state, i.e. send PSYNC again
@@ -526,7 +514,7 @@ ReplicationThread::CBState 
ReplicationThread::tryPSyncReadCB(bufferevent *bev, v
   if (strncmp(line.get(), "+OK", 3) != 0) {
     // PSYNC isn't OK, we should use FullSync
     // Switch to fullsync state machine
-    self->fullsync_steps_.Start();
+    fullsync_steps_.Start();
     LOG(INFO) << "[replication] Failed to psync, error: " << line.get() << ", 
switch to fullsync";
     return CBState::QUIT;
   } else {
@@ -536,49 +524,48 @@ ReplicationThread::CBState 
ReplicationThread::tryPSyncReadCB(bufferevent *bev, v
   }
 }
 
-ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent 
*bev, void *ctx) {
+ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent 
*bev) {
   char *bulk_data = nullptr;
-  auto self = static_cast<ReplicationThread *>(ctx);
-  self->repl_state_.store(kReplConnected, std::memory_order_relaxed);
+  repl_state_.store(kReplConnected, std::memory_order_relaxed);
   auto input = bufferevent_get_input(bev);
   while (true) {
-    switch (self->incr_state_) {
+    switch (incr_state_) {
       case Incr_batch_size: {
         // Read bulk length
         UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
         if (!line) return CBState::AGAIN;
-        self->incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1, 
nullptr, 10) : 0;
-        if (self->incr_bulk_len_ == 0) {
+        incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1, 
nullptr, 10) : 0;
+        if (incr_bulk_len_ == 0) {
           LOG(ERROR) << "[replication] Invalid increment data size";
           return CBState::RESTART;
         }
-        self->incr_state_ = Incr_batch_data;
+        incr_state_ = Incr_batch_data;
         break;
       }
       case Incr_batch_data:
         // Read bulk data (batch data)
-        if (self->incr_bulk_len_ + 2 <= evbuffer_get_length(input)) {  // We 
got enough data
-          bulk_data = reinterpret_cast<char *>(evbuffer_pullup(input, 
static_cast<ssize_t>(self->incr_bulk_len_ + 2)));
-          std::string bulk_string = std::string(bulk_data, 
self->incr_bulk_len_);
+        if (incr_bulk_len_ + 2 <= evbuffer_get_length(input)) {  // We got 
enough data
+          bulk_data = reinterpret_cast<char *>(evbuffer_pullup(input, 
static_cast<ssize_t>(incr_bulk_len_ + 2)));
+          std::string bulk_string = std::string(bulk_data, incr_bulk_len_);
           // master would send the ping heartbeat packet to check whether the 
slave was alive or not,
           // don't write ping to db here.
           if (bulk_string != "ping") {
-            auto s = 
self->storage_->ReplicaApplyWriteBatch(std::string(bulk_data, 
self->incr_bulk_len_));
+            auto s = storage_->ReplicaApplyWriteBatch(std::string(bulk_data, 
incr_bulk_len_));
             if (!s.IsOK()) {
               LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to 
local, " << s.Msg() << ". batch: 0x"
                          << util::StringToHex(bulk_string);
               return CBState::RESTART;
             }
 
-            s = self->parseWriteBatch(bulk_string);
+            s = parseWriteBatch(bulk_string);
             if (!s.IsOK()) {
               LOG(ERROR) << "[replication] CRITICAL - failed to parse write 
batch 0x" << util::StringToHex(bulk_string)
                          << ": " << s.Msg();
               return CBState::RESTART;
             }
           }
-          evbuffer_drain(input, self->incr_bulk_len_ + 2);
-          self->incr_state_ = Incr_batch_size;
+          evbuffer_drain(input, incr_bulk_len_ + 2);
+          incr_state_ = Incr_batch_size;
         } else {
           return CBState::AGAIN;
         }
@@ -587,22 +574,20 @@ ReplicationThread::CBState 
ReplicationThread::incrementBatchLoopCB(bufferevent *
   }
 }
 
-ReplicationThread::CBState ReplicationThread::fullSyncWriteCB(bufferevent 
*bev, void *ctx) {
+ReplicationThread::CBState ReplicationThread::fullSyncWriteCB(bufferevent 
*bev) {
   SendString(bev, redis::MultiBulkString({"_fetch_meta"}));
-  auto self = static_cast<ReplicationThread *>(ctx);
-  self->repl_state_.store(kReplFetchMeta, std::memory_order_relaxed);
+  repl_state_.store(kReplFetchMeta, std::memory_order_relaxed);
   LOG(INFO) << "[replication] Start syncing data with fullsync";
   return CBState::NEXT;
 }
 
-ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev, 
void *ctx) {
-  auto self = static_cast<ReplicationThread *>(ctx);
+ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) 
{
   auto input = bufferevent_get_input(bev);
-  switch (self->fullsync_state_) {
+  switch (fullsync_state_) {
     case kFetchMetaID: {
       // New version master only sends meta file content
-      if (!self->srv_->GetConfig()->master_use_repl_port) {
-        self->fullsync_state_ = kFetchMetaContent;
+      if (!srv_->GetConfig()->master_use_repl_port) {
+        fullsync_state_ = kFetchMetaContent;
         return CBState::AGAIN;
       }
       UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
@@ -611,14 +596,13 @@ ReplicationThread::CBState 
ReplicationThread::fullSyncReadCB(bufferevent *bev, v
         LOG(ERROR) << "[replication] Failed to fetch meta id: " << line.get();
         return CBState::RESTART;
       }
-      self->fullsync_meta_id_ =
-          static_cast<rocksdb::BackupID>(line.length > 0 ? 
std::strtoul(line.get(), nullptr, 10) : 0);
-      if (self->fullsync_meta_id_ == 0) {
+      fullsync_meta_id_ = static_cast<rocksdb::BackupID>(line.length > 0 ? 
std::strtoul(line.get(), nullptr, 10) : 0);
+      if (fullsync_meta_id_ == 0) {
         LOG(ERROR) << "[replication] Invalid meta id received";
         return CBState::RESTART;
       }
-      self->fullsync_state_ = kFetchMetaSize;
-      LOG(INFO) << "[replication] Succeed fetching meta id: " << 
self->fullsync_meta_id_;
+      fullsync_state_ = kFetchMetaSize;
+      LOG(INFO) << "[replication] Succeed fetching meta id: " << 
fullsync_meta_id_;
     }
     case kFetchMetaSize: {
       UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
@@ -627,29 +611,28 @@ ReplicationThread::CBState 
ReplicationThread::fullSyncReadCB(bufferevent *bev, v
         LOG(ERROR) << "[replication] Failed to fetch meta size: " << 
line.get();
         return CBState::RESTART;
       }
-      self->fullsync_filesize_ = line.length > 0 ? std::strtoull(line.get(), 
nullptr, 10) : 0;
-      if (self->fullsync_filesize_ == 0) {
+      fullsync_filesize_ = line.length > 0 ? std::strtoull(line.get(), 
nullptr, 10) : 0;
+      if (fullsync_filesize_ == 0) {
         LOG(ERROR) << "[replication] Invalid meta file size received";
         return CBState::RESTART;
       }
-      self->fullsync_state_ = kFetchMetaContent;
-      LOG(INFO) << "[replication] Succeed fetching meta size: " << 
self->fullsync_filesize_;
+      fullsync_state_ = kFetchMetaContent;
+      LOG(INFO) << "[replication] Succeed fetching meta size: " << 
fullsync_filesize_;
     }
     case kFetchMetaContent: {
       std::string target_dir;
       engine::Storage::ReplDataManager::MetaInfo meta;
       // Master using old version
-      if (self->srv_->GetConfig()->master_use_repl_port) {
-        if (evbuffer_get_length(input) < self->fullsync_filesize_) {
+      if (srv_->GetConfig()->master_use_repl_port) {
+        if (evbuffer_get_length(input) < fullsync_filesize_) {
           return CBState::AGAIN;
         }
-        auto s =
-            engine::Storage::ReplDataManager::ParseMetaAndSave(self->storage_, 
self->fullsync_meta_id_, input, &meta);
+        auto s = engine::Storage::ReplDataManager::ParseMetaAndSave(storage_, 
fullsync_meta_id_, input, &meta);
         if (!s.IsOK()) {
           LOG(ERROR) << "[replication] Failed to parse meta and save: " << 
s.Msg();
           return CBState::AGAIN;
         }
-        target_dir = self->srv_->GetConfig()->backup_sync_dir;
+        target_dir = srv_->GetConfig()->backup_sync_dir;
       } else {
         // Master using new version
         UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
@@ -663,13 +646,13 @@ ReplicationThread::CBState 
ReplicationThread::fullSyncReadCB(bufferevent *bev, v
           meta.files.emplace_back(f, 0);
         }
 
-        target_dir = self->srv_->GetConfig()->sync_checkpoint_dir;
+        target_dir = srv_->GetConfig()->sync_checkpoint_dir;
         // Clean invalid files of checkpoint, "CURRENT" file must be invalid
         // because we identify one file by its file number but only "CURRENT"
         // file doesn't have number.
         auto iter = std::find(need_files.begin(), need_files.end(), "CURRENT");
         if (iter != need_files.end()) need_files.erase(iter);
-        auto s = 
engine::Storage::ReplDataManager::CleanInvalidFiles(self->storage_, target_dir, 
need_files);
+        auto s = engine::Storage::ReplDataManager::CleanInvalidFiles(storage_, 
target_dir, need_files);
         if (!s.IsOK()) {
           LOG(WARNING) << "[replication] Failed to clean up invalid files of 
the old checkpoint,"
                        << " error: " << s.Msg();
@@ -681,19 +664,19 @@ ReplicationThread::CBState 
ReplicationThread::fullSyncReadCB(bufferevent *bev, v
         }
       }
       assert(evbuffer_get_length(input) == 0);
-      self->fullsync_state_ = kFetchMetaID;
+      fullsync_state_ = kFetchMetaID;
       LOG(INFO) << "[replication] Succeeded fetching full data files info, 
fetching files in parallel";
 
       // If 'slave-empty-db-before-fullsync' is yes, we call 'pre_fullsync_cb_'
       // just like reloading database. And we don't want slave to occupy too 
much
       // disk space, so we just empty entire database rudely.
-      if (self->srv_->GetConfig()->slave_empty_db_before_fullsync) {
-        self->pre_fullsync_cb_();
-        self->storage_->EmptyDB();
+      if (srv_->GetConfig()->slave_empty_db_before_fullsync) {
+        pre_fullsync_cb_();
+        storage_->EmptyDB();
       }
 
-      self->repl_state_.store(kReplFetchSST, std::memory_order_relaxed);
-      auto s = self->parallelFetchFile(target_dir, meta.files);
+      repl_state_.store(kReplFetchSST, std::memory_order_relaxed);
+      auto s = parallelFetchFile(target_dir, meta.files);
       if (!s.IsOK()) {
         LOG(ERROR) << "[replication] Failed to parallel fetch files while " + 
s.Msg();
         return CBState::RESTART;
@@ -702,22 +685,22 @@ ReplicationThread::CBState 
ReplicationThread::fullSyncReadCB(bufferevent *bev, v
 
       // Restore DB from backup
       // We already call 'pre_fullsync_cb_' if 
'slave-empty-db-before-fullsync' is yes
-      if (!self->srv_->GetConfig()->slave_empty_db_before_fullsync) 
self->pre_fullsync_cb_();
+      if (!srv_->GetConfig()->slave_empty_db_before_fullsync) 
pre_fullsync_cb_();
       // For old version, master uses rocksdb backup to implement data snapshot
-      if (self->srv_->GetConfig()->master_use_repl_port) {
-        s = self->storage_->RestoreFromBackup();
+      if (srv_->GetConfig()->master_use_repl_port) {
+        s = storage_->RestoreFromBackup();
       } else {
-        s = self->storage_->RestoreFromCheckpoint();
+        s = storage_->RestoreFromCheckpoint();
       }
       if (!s.IsOK()) {
         LOG(ERROR) << "[replication] Failed to restore backup while " + 
s.Msg() + ", restart fullsync";
         return CBState::RESTART;
       }
       LOG(INFO) << "[replication] Succeeded restoring the backup, fullsync was 
finish";
-      self->post_fullsync_cb_();
+      post_fullsync_cb_();
 
       // Switch to psync state machine again
-      self->psync_steps_.Start();
+      psync_steps_.Start();
       return CBState::QUIT;
     }
   }
@@ -772,7 +755,7 @@ Status ReplicationThread::parallelFetchFile(const 
std::string &dir,
           }
           unsigned files_count = files.size();
           FetchFileCallback fn = [&fetch_cnt, &skip_cnt, files_count](const 
std::string &fetch_file,
-                                                                      const 
uint32_t fetch_crc) {
+                                                                      uint32_t 
fetch_crc) {
             fetch_cnt.fetch_add(1);
             uint32_t cur_skip_cnt = skip_cnt.load();
             uint32_t cur_fetch_cnt = fetch_cnt.load();
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index 2b6f4c86..6bf5954b 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -57,7 +57,7 @@ enum WriteBatchType {
   kBatchTypeStream,
 };
 
-using FetchFileCallback = std::function<void(const std::string, const 
uint32_t)>;
+using FetchFileCallback = std::function<void(const std::string &, uint32_t)>;
 
 class FeedSlaveThread {
  public:
@@ -118,17 +118,20 @@ class ReplicationThread : private 
EventCallbackBase<ReplicationThread> {
       READ,
       WRITE,
     };
-    using CallbackType = std::tuple<EventType, std::string, 
std::function<State(bufferevent *, void *)>>;
+
+    using CallbackFunc = std::function<State(ReplicationThread *, bufferevent 
*)>;
+    using CallbackType = std::tuple<EventType, std::string, CallbackFunc>;
     using CallbackList = std::deque<CallbackType>;
+
     CallbacksStateMachine(ReplicationThread *repl, CallbackList &&handlers)
         : repl_(repl), handlers_(std::move(handlers)) {}
 
     void Start();
     void Stop();
-    static void EvCallback(bufferevent *bev, void *ctx);
-    static void ConnEventCB(bufferevent *bev, int16_t events, void 
*state_machine_ptr);
-    static void SetReadCB(bufferevent *bev, bufferevent_data_cb cb, void 
*state_machine_ptr);
-    static void SetWriteCB(bufferevent *bev, bufferevent_data_cb cb, void 
*state_machine_ptr);
+    void ReadWriteCB(bufferevent *bev);
+    void ConnEventCB(bufferevent *bev, int16_t events);
+    void SetReadCB(bufferevent *bev, bufferevent_data_cb cb);
+    void SetWriteCB(bufferevent *bev, bufferevent_data_cb cb);
 
    private:
     bufferevent *bev_ = nullptr;
@@ -138,11 +141,11 @@ class ReplicationThread : private 
EventCallbackBase<ReplicationThread> {
 
     EventType getHandlerEventType(CallbackList::size_type idx) { return 
std::get<0>(handlers_[idx]); }
     std::string getHandlerName(CallbackList::size_type idx) { return 
std::get<1>(handlers_[idx]); }
-    std::function<State(bufferevent *, void *)> 
getHandlerFunc(CallbackList::size_type idx) {
-      return std::get<2>(handlers_[idx]);
-    }
+    CallbackFunc getHandlerFunc(CallbackList::size_type idx) { return 
std::get<2>(handlers_[idx]); }
   };
 
+  using CallbackType = CallbacksStateMachine::CallbackType;
+
  private:
   std::thread t_;
   std::atomic<bool> stop_flag_ = false;
@@ -181,17 +184,17 @@ class ReplicationThread : private 
EventCallbackBase<ReplicationThread> {
 
   void run();
 
-  static CBState authWriteCB(bufferevent *bev, void *ctx);
-  static CBState authReadCB(bufferevent *bev, void *ctx);
-  static CBState checkDBNameWriteCB(bufferevent *bev, void *ctx);
-  static CBState checkDBNameReadCB(bufferevent *bev, void *ctx);
-  static CBState replConfWriteCB(bufferevent *bev, void *ctx);
-  static CBState replConfReadCB(bufferevent *bev, void *ctx);
-  static CBState tryPSyncWriteCB(bufferevent *bev, void *ctx);
-  static CBState tryPSyncReadCB(bufferevent *bev, void *ctx);
-  static CBState incrementBatchLoopCB(bufferevent *bev, void *ctx);
-  static CBState fullSyncWriteCB(bufferevent *bev, void *ctx);
-  static CBState fullSyncReadCB(bufferevent *bev, void *ctx);
+  CBState authWriteCB(bufferevent *bev);
+  CBState authReadCB(bufferevent *bev);
+  CBState checkDBNameWriteCB(bufferevent *bev);
+  CBState checkDBNameReadCB(bufferevent *bev);
+  CBState replConfWriteCB(bufferevent *bev);
+  CBState replConfReadCB(bufferevent *bev);
+  CBState tryPSyncWriteCB(bufferevent *bev);
+  CBState tryPSyncReadCB(bufferevent *bev);
+  CBState incrementBatchLoopCB(bufferevent *bev);
+  CBState fullSyncWriteCB(bufferevent *bev);
+  CBState fullSyncReadCB(bufferevent *bev);
 
   // Synchronized-Blocking ops
   Status sendAuth(int sock_fd);
diff --git a/src/common/event_util.h b/src/common/event_util.h
index 556f40a4..8df67be3 100644
--- a/src/common/event_util.h
+++ b/src/common/event_util.h
@@ -136,3 +136,26 @@ struct EvconnlistenerBase {
     return evconnlistener_new(base, callback<cb>, this, flags, backlog, fd);
   }
 };
+
+namespace details {
+
+template <auto F, typename>
+struct EventCallbackImpl;
+
+template <auto F, typename T, typename R, typename... Args>
+struct EventCallbackImpl<F, R (T::*)(Args...)> {
+  static R Func(Args... args, void *ctx) { return (reinterpret_cast<T 
*>(ctx)->*F)(args...); }
+};
+
+}  // namespace details
+
+// convert member functions to eventbuffer callbacks
+// e.g. for member function `void A::f(int x)` from class A
+// EventCallback<&A::f> generate a function
+// void EventCallback<&A::f>::Func(int x, void *ctx)
+// and put `this` pointer of A to `void *ctx`
+template <auto F>
+struct EventCallback : details::EventCallbackImpl<F, decltype(F)> {};
+
+template <auto F>
+constexpr auto EventCallbackFunc = EventCallback<F>::Func;

Reply via email to