This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 610bb40a Refactor replication callbacks via templates (#1646)
610bb40a is described below
commit 610bb40ade146c74303b2db98599b1959cb0fbb5
Author: Twice <[email protected]>
AuthorDate: Tue Aug 8 05:22:29 2023 +0800
Refactor replication callbacks via templates (#1646)
Co-authored-by: Binbin <[email protected]>
Co-authored-by: hulk <[email protected]>
---
src/cluster/replication.cc | 237 +++++++++++++++++++++------------------------
src/cluster/replication.h | 43 ++++----
src/common/event_util.h | 23 +++++
3 files changed, 156 insertions(+), 147 deletions(-)
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 39a2ade7..261c49a5 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -161,54 +161,50 @@ void SendString(bufferevent *bev, const std::string
&data) {
evbuffer_add(output, data.c_str(), data.length());
}
-void ReplicationThread::CallbacksStateMachine::ConnEventCB(bufferevent *bev,
int16_t events, void *state_machine_ptr) {
+void ReplicationThread::CallbacksStateMachine::ConnEventCB(bufferevent *bev,
int16_t events) {
if (events & BEV_EVENT_CONNECTED) {
// call write_cb when connected
bufferevent_data_cb write_cb = nullptr;
bufferevent_getcb(bev, nullptr, &write_cb, nullptr, nullptr);
- if (write_cb) write_cb(bev, state_machine_ptr);
+ if (write_cb) write_cb(bev, this);
return;
}
if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) {
LOG(ERROR) << "[replication] connection error/eof, reconnect the master";
// Wait a bit and reconnect
- auto state_m = static_cast<CallbacksStateMachine *>(state_machine_ptr);
- state_m->repl_->repl_state_.store(kReplConnecting,
std::memory_order_relaxed);
+ repl_->repl_state_.store(kReplConnecting, std::memory_order_relaxed);
std::this_thread::sleep_for(std::chrono::seconds(1));
- state_m->Stop();
- state_m->Start();
+ Stop();
+ Start();
}
}
-void ReplicationThread::CallbacksStateMachine::SetReadCB(bufferevent *bev,
bufferevent_data_cb cb,
- void
*state_machine_ptr) {
+void ReplicationThread::CallbacksStateMachine::SetReadCB(bufferevent *bev,
bufferevent_data_cb cb) {
bufferevent_enable(bev, EV_READ);
- bufferevent_setcb(bev, cb, nullptr, ConnEventCB, state_machine_ptr);
+ bufferevent_setcb(bev, cb, nullptr,
EventCallbackFunc<&CallbacksStateMachine::ConnEventCB>, this);
}
-void ReplicationThread::CallbacksStateMachine::SetWriteCB(bufferevent *bev,
bufferevent_data_cb cb,
- void
*state_machine_ptr) {
+void ReplicationThread::CallbacksStateMachine::SetWriteCB(bufferevent *bev,
bufferevent_data_cb cb) {
bufferevent_enable(bev, EV_WRITE);
- bufferevent_setcb(bev, nullptr, cb, ConnEventCB, state_machine_ptr);
+ bufferevent_setcb(bev, nullptr, cb,
EventCallbackFunc<&CallbacksStateMachine::ConnEventCB>, this);
}
-void ReplicationThread::CallbacksStateMachine::EvCallback(bufferevent *bev,
void *ctx) {
- auto self = static_cast<CallbacksStateMachine *>(ctx);
+void ReplicationThread::CallbacksStateMachine::ReadWriteCB(bufferevent *bev) {
LOOP_LABEL:
- assert(self->handler_idx_ <= self->handlers_.size());
- DLOG(INFO) << "[replication] Execute handler[" <<
self->getHandlerName(self->handler_idx_) << "]";
- auto st = self->getHandlerFunc(self->handler_idx_)(bev, self->repl_);
- self->repl_->last_io_time_.store(util::GetTimeStamp(),
std::memory_order_relaxed);
+ assert(handler_idx_ <= handlers_.size());
+ DLOG(INFO) << "[replication] Execute handler[" <<
getHandlerName(handler_idx_) << "]";
+ auto st = getHandlerFunc(handler_idx_)(repl_, bev);
+ repl_->last_io_time_.store(util::GetTimeStamp(), std::memory_order_relaxed);
switch (st) {
case CBState::NEXT:
- ++self->handler_idx_;
+ ++handler_idx_;
[[fallthrough]];
case CBState::PREV:
- if (st == CBState::PREV) --self->handler_idx_;
- if (self->getHandlerEventType(self->handler_idx_) == WRITE) {
- SetWriteCB(bev, EvCallback, ctx);
+ if (st == CBState::PREV) --handler_idx_;
+ if (getHandlerEventType(handler_idx_) == WRITE) {
+ SetWriteCB(bev,
EventCallbackFunc<&CallbacksStateMachine::ReadWriteCB>);
} else {
- SetReadCB(bev, EvCallback, ctx);
+ SetReadCB(bev, EventCallbackFunc<&CallbacksStateMachine::ReadWriteCB>);
}
// invoke the read handler (of next step) directly, as the bev might
// have the data already.
@@ -217,19 +213,19 @@ LOOP_LABEL:
break;
case CBState::QUIT: // state that can not be retry, or all steps are
executed.
bufferevent_free(bev);
- self->bev_ = nullptr;
- self->repl_->repl_state_.store(kReplError, std::memory_order_relaxed);
+ bev_ = nullptr;
+ repl_->repl_state_.store(kReplError, std::memory_order_relaxed);
break;
case CBState::RESTART: // state that can be retried some time later
- self->Stop();
- if (self->repl_->stop_flag_) {
+ Stop();
+ if (repl_->stop_flag_) {
LOG(INFO) << "[replication] Wouldn't restart while the replication
thread was stopped";
break;
}
- self->repl_->repl_state_.store(kReplConnecting,
std::memory_order_relaxed);
+ repl_->repl_state_.store(kReplConnecting, std::memory_order_relaxed);
LOG(INFO) << "[replication] Retry in 10 seconds";
std::this_thread::sleep_for(std::chrono::seconds(10));
- self->Start();
+ Start();
}
}
@@ -243,8 +239,8 @@ void ReplicationThread::CallbacksStateMachine::Start() {
// Note: It may cause data races to use 'masterauth' directly.
// It is acceptable because password change is a low frequency operation.
if (!repl_->srv_->GetConfig()->masterauth.empty()) {
- handlers_.emplace_front(CallbacksStateMachine::READ, "auth read",
authReadCB);
- handlers_.emplace_front(CallbacksStateMachine::WRITE, "auth write",
authWriteCB);
+ handlers_.emplace_front(CallbacksStateMachine::READ, "auth read",
&ReplicationThread::authReadCB);
+ handlers_.emplace_front(CallbacksStateMachine::WRITE, "auth write",
&ReplicationThread::authWriteCB);
}
uint64_t last_connect_timestamp = 0;
@@ -275,9 +271,9 @@ void ReplicationThread::CallbacksStateMachine::Start() {
handler_idx_ = 0;
repl_->incr_state_ = Incr_batch_size;
if (getHandlerEventType(0) == WRITE) {
- SetWriteCB(bev, EvCallback, this);
+ SetWriteCB(bev, EventCallbackFunc<&CallbacksStateMachine::ReadWriteCB>);
} else {
- SetReadCB(bev, EvCallback, this);
+ SetReadCB(bev, EventCallbackFunc<&CallbacksStateMachine::ReadWriteCB>);
}
bev_ = bev;
}
@@ -298,18 +294,17 @@ ReplicationThread::ReplicationThread(std::string host,
uint32_t port, Server *sr
psync_steps_(
this,
CallbacksStateMachine::CallbackList{
-
CallbacksStateMachine::CallbackType{CallbacksStateMachine::WRITE, "dbname
write", checkDBNameWriteCB},
- CallbacksStateMachine::CallbackType{CallbacksStateMachine::READ,
"dbname read", checkDBNameReadCB},
-
CallbacksStateMachine::CallbackType{CallbacksStateMachine::WRITE, "replconf
write", replConfWriteCB},
- CallbacksStateMachine::CallbackType{CallbacksStateMachine::READ,
"replconf read", replConfReadCB},
-
CallbacksStateMachine::CallbackType{CallbacksStateMachine::WRITE, "psync
write", tryPSyncWriteCB},
- CallbacksStateMachine::CallbackType{CallbacksStateMachine::READ,
"psync read", tryPSyncReadCB},
- CallbacksStateMachine::CallbackType{CallbacksStateMachine::READ,
"batch loop", incrementBatchLoopCB}}),
+ CallbackType{CallbacksStateMachine::WRITE, "dbname write",
&ReplicationThread::checkDBNameWriteCB},
+ CallbackType{CallbacksStateMachine::READ, "dbname read",
&ReplicationThread::checkDBNameReadCB},
+ CallbackType{CallbacksStateMachine::WRITE, "replconf write",
&ReplicationThread::replConfWriteCB},
+ CallbackType{CallbacksStateMachine::READ, "replconf read",
&ReplicationThread::replConfReadCB},
+ CallbackType{CallbacksStateMachine::WRITE, "psync write",
&ReplicationThread::tryPSyncWriteCB},
+ CallbackType{CallbacksStateMachine::READ, "psync read",
&ReplicationThread::tryPSyncReadCB},
+ CallbackType{CallbacksStateMachine::READ, "batch loop",
&ReplicationThread::incrementBatchLoopCB}}),
fullsync_steps_(
- this,
- CallbacksStateMachine::CallbackList{
-
CallbacksStateMachine::CallbackType{CallbacksStateMachine::WRITE, "fullsync
write", fullSyncWriteCB},
- CallbacksStateMachine::CallbackType{CallbacksStateMachine::READ,
"fullsync read", fullSyncReadCB}}) {}
+ this, CallbacksStateMachine::CallbackList{
+ CallbackType{CallbacksStateMachine::WRITE, "fullsync
write", &ReplicationThread::fullSyncWriteCB},
+ CallbackType{CallbacksStateMachine::READ, "fullsync read",
&ReplicationThread::fullSyncReadCB}}) {}
Status ReplicationThread::Start(std::function<void()> &&pre_fullsync_cb,
std::function<void()> &&post_fullsync_cb) {
pre_fullsync_cb_ = std::move(pre_fullsync_cb);
@@ -370,15 +365,14 @@ void ReplicationThread::run() {
event_base_free(base_);
}
-ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev,
void *ctx) {
- auto self = static_cast<ReplicationThread *>(ctx);
- SendString(bev, redis::MultiBulkString({"AUTH",
self->srv_->GetConfig()->masterauth}));
+ReplicationThread::CBState ReplicationThread::authWriteCB(bufferevent *bev) {
+ SendString(bev, redis::MultiBulkString({"AUTH",
srv_->GetConfig()->masterauth}));
LOG(INFO) << "[replication] Auth request was sent, waiting for response";
- self->repl_state_.store(kReplSendAuth, std::memory_order_relaxed);
+ repl_state_.store(kReplSendAuth, std::memory_order_relaxed);
return CBState::NEXT;
}
-ReplicationThread::CBState ReplicationThread::authReadCB(bufferevent *bev,
void *ctx) {
+ReplicationThread::CBState ReplicationThread::authReadCB(bufferevent *bev) {
// NOLINT
auto input = bufferevent_get_input(bev);
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
@@ -391,15 +385,14 @@ ReplicationThread::CBState
ReplicationThread::authReadCB(bufferevent *bev, void
return CBState::NEXT;
}
-ReplicationThread::CBState ReplicationThread::checkDBNameWriteCB(bufferevent
*bev, void *ctx) {
+ReplicationThread::CBState ReplicationThread::checkDBNameWriteCB(bufferevent
*bev) {
SendString(bev, redis::MultiBulkString({"_db_name"}));
- auto self = static_cast<ReplicationThread *>(ctx);
- self->repl_state_.store(kReplCheckDBName, std::memory_order_relaxed);
+ repl_state_.store(kReplCheckDBName, std::memory_order_relaxed);
LOG(INFO) << "[replication] Check db name request was sent, waiting for
response";
return CBState::NEXT;
}
-ReplicationThread::CBState ReplicationThread::checkDBNameReadCB(bufferevent
*bev, void *ctx) {
+ReplicationThread::CBState ReplicationThread::checkDBNameReadCB(bufferevent
*bev) {
auto input = bufferevent_get_input(bev);
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
@@ -412,8 +405,7 @@ ReplicationThread::CBState
ReplicationThread::checkDBNameReadCB(bufferevent *bev
}
return CBState::RESTART;
}
- auto self = static_cast<ReplicationThread *>(ctx);
- std::string db_name = self->storage_->GetName();
+ std::string db_name = storage_->GetName();
if (line.length == db_name.size() && !strncmp(line.get(), db_name.data(),
line.length)) {
// DB name match, we should continue to next step: TryPsync
LOG(INFO) << "[replication] DB name is valid, continue...";
@@ -423,31 +415,29 @@ ReplicationThread::CBState
ReplicationThread::checkDBNameReadCB(bufferevent *bev
return CBState::RESTART;
}
-ReplicationThread::CBState ReplicationThread::replConfWriteCB(bufferevent
*bev, void *ctx) {
- auto self = static_cast<ReplicationThread *>(ctx);
- auto config = self->srv_->GetConfig();
+ReplicationThread::CBState ReplicationThread::replConfWriteCB(bufferevent
*bev) {
+ auto config = srv_->GetConfig();
auto port = config->replica_announce_port > 0 ?
config->replica_announce_port : config->port;
std::vector<std::string> data_to_send{"replconf", "listening-port",
std::to_string(port)};
- if (!self->next_try_without_announce_ip_address_ &&
!config->replica_announce_ip.empty()) {
+ if (!next_try_without_announce_ip_address_ &&
!config->replica_announce_ip.empty()) {
data_to_send.emplace_back("ip-address");
data_to_send.emplace_back(config->replica_announce_ip);
}
SendString(bev, redis::MultiBulkString(data_to_send));
- self->repl_state_.store(kReplReplConf, std::memory_order_relaxed);
+ repl_state_.store(kReplReplConf, std::memory_order_relaxed);
LOG(INFO) << "[replication] replconf request was sent, waiting for response";
return CBState::NEXT;
}
-ReplicationThread::CBState ReplicationThread::replConfReadCB(bufferevent *bev,
void *ctx) {
- auto self = static_cast<ReplicationThread *>(ctx);
+ReplicationThread::CBState ReplicationThread::replConfReadCB(bufferevent *bev)
{
auto input = bufferevent_get_input(bev);
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
// on unknown option: first try without announce ip, if it fails again - do
nothing (to prevent infinite loop)
- if (isUnknownOption(line.get()) &&
!self->next_try_without_announce_ip_address_) {
- self->next_try_without_announce_ip_address_ = true;
+ if (isUnknownOption(line.get()) && !next_try_without_announce_ip_address_) {
+ next_try_without_announce_ip_address_ = true;
LOG(WARNING) << "The old version master, can't handle ip-address, "
<< "try without it again";
// Retry previous state, i.e. send replconf again
@@ -467,14 +457,13 @@ ReplicationThread::CBState
ReplicationThread::replConfReadCB(bufferevent *bev, v
}
}
-ReplicationThread::CBState ReplicationThread::tryPSyncWriteCB(bufferevent
*bev, void *ctx) {
- auto self = static_cast<ReplicationThread *>(ctx);
- auto cur_seq = self->storage_->LatestSeqNumber();
+ReplicationThread::CBState ReplicationThread::tryPSyncWriteCB(bufferevent
*bev) {
+ auto cur_seq = storage_->LatestSeqNumber();
auto next_seq = cur_seq + 1;
std::string replid;
// Get replication id
- std::string replid_in_wal = self->storage_->GetReplIdFromWalBySeq(cur_seq);
+ std::string replid_in_wal = storage_->GetReplIdFromWalBySeq(cur_seq);
// Set if valid replication id
if (replid_in_wal.length() == kReplIdLength) {
replid = replid_in_wal;
@@ -482,7 +471,7 @@ ReplicationThread::CBState
ReplicationThread::tryPSyncWriteCB(bufferevent *bev,
// Maybe there is no WAL, we can get replication id from db since master
// always write replication id into db before any operation when starting
// new "replication history".
- std::string replid_in_db = self->storage_->GetReplIdFromDbEngine();
+ std::string replid_in_db = storage_->GetReplIdFromDbEngine();
if (replid_in_db.length() == kReplIdLength) {
replid = replid_in_db;
}
@@ -490,8 +479,8 @@ ReplicationThread::CBState
ReplicationThread::tryPSyncWriteCB(bufferevent *bev,
// To adapt to old master using old PSYNC, i.e. only use next sequence id.
// Also use old PSYNC if replica can't find replication id from WAL and DB.
- if (!self->srv_->GetConfig()->use_rsid_psync || self->next_try_old_psync_ ||
replid.length() != kReplIdLength) {
- self->next_try_old_psync_ = false; // Reset next_try_old_psync_
+ if (!srv_->GetConfig()->use_rsid_psync || next_try_old_psync_ ||
replid.length() != kReplIdLength) {
+ next_try_old_psync_ = false; // Reset next_try_old_psync_
SendString(bev, redis::MultiBulkString({"PSYNC",
std::to_string(next_seq)}));
LOG(INFO) << "[replication] Try to use psync, next seq: " << next_seq;
} else {
@@ -500,12 +489,11 @@ ReplicationThread::CBState
ReplicationThread::tryPSyncWriteCB(bufferevent *bev,
LOG(INFO) << "[replication] Try to use new psync, current unique
replication sequence id: " << replid << ":"
<< cur_seq;
}
- self->repl_state_.store(kReplSendPSync, std::memory_order_relaxed);
+ repl_state_.store(kReplSendPSync, std::memory_order_relaxed);
return CBState::NEXT;
}
-ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev,
void *ctx) {
- auto self = static_cast<ReplicationThread *>(ctx);
+ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev)
{
auto input = bufferevent_get_input(bev);
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
@@ -516,7 +504,7 @@ ReplicationThread::CBState
ReplicationThread::tryPSyncReadCB(bufferevent *bev, v
}
if (line[0] == '-' && isWrongPsyncNum(line.get())) {
- self->next_try_old_psync_ = true;
+ next_try_old_psync_ = true;
LOG(WARNING) << "The old version master, can't handle new PSYNC, "
<< "try old PSYNC again";
// Retry previous state, i.e. send PSYNC again
@@ -526,7 +514,7 @@ ReplicationThread::CBState
ReplicationThread::tryPSyncReadCB(bufferevent *bev, v
if (strncmp(line.get(), "+OK", 3) != 0) {
// PSYNC isn't OK, we should use FullSync
// Switch to fullsync state machine
- self->fullsync_steps_.Start();
+ fullsync_steps_.Start();
LOG(INFO) << "[replication] Failed to psync, error: " << line.get() << ",
switch to fullsync";
return CBState::QUIT;
} else {
@@ -536,49 +524,48 @@ ReplicationThread::CBState
ReplicationThread::tryPSyncReadCB(bufferevent *bev, v
}
}
-ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent
*bev, void *ctx) {
+ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent
*bev) {
char *bulk_data = nullptr;
- auto self = static_cast<ReplicationThread *>(ctx);
- self->repl_state_.store(kReplConnected, std::memory_order_relaxed);
+ repl_state_.store(kReplConnected, std::memory_order_relaxed);
auto input = bufferevent_get_input(bev);
while (true) {
- switch (self->incr_state_) {
+ switch (incr_state_) {
case Incr_batch_size: {
// Read bulk length
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return CBState::AGAIN;
- self->incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1,
nullptr, 10) : 0;
- if (self->incr_bulk_len_ == 0) {
+ incr_bulk_len_ = line.length > 0 ? std::strtoull(line.get() + 1,
nullptr, 10) : 0;
+ if (incr_bulk_len_ == 0) {
LOG(ERROR) << "[replication] Invalid increment data size";
return CBState::RESTART;
}
- self->incr_state_ = Incr_batch_data;
+ incr_state_ = Incr_batch_data;
break;
}
case Incr_batch_data:
// Read bulk data (batch data)
- if (self->incr_bulk_len_ + 2 <= evbuffer_get_length(input)) { // We
got enough data
- bulk_data = reinterpret_cast<char *>(evbuffer_pullup(input,
static_cast<ssize_t>(self->incr_bulk_len_ + 2)));
- std::string bulk_string = std::string(bulk_data,
self->incr_bulk_len_);
+ if (incr_bulk_len_ + 2 <= evbuffer_get_length(input)) { // We got
enough data
+ bulk_data = reinterpret_cast<char *>(evbuffer_pullup(input,
static_cast<ssize_t>(incr_bulk_len_ + 2)));
+ std::string bulk_string = std::string(bulk_data, incr_bulk_len_);
// master would send the ping heartbeat packet to check whether the
slave was alive or not,
// don't write ping to db here.
if (bulk_string != "ping") {
- auto s =
self->storage_->ReplicaApplyWriteBatch(std::string(bulk_data,
self->incr_bulk_len_));
+ auto s = storage_->ReplicaApplyWriteBatch(std::string(bulk_data,
incr_bulk_len_));
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to
local, " << s.Msg() << ". batch: 0x"
<< util::StringToHex(bulk_string);
return CBState::RESTART;
}
- s = self->parseWriteBatch(bulk_string);
+ s = parseWriteBatch(bulk_string);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - failed to parse write
batch 0x" << util::StringToHex(bulk_string)
<< ": " << s.Msg();
return CBState::RESTART;
}
}
- evbuffer_drain(input, self->incr_bulk_len_ + 2);
- self->incr_state_ = Incr_batch_size;
+ evbuffer_drain(input, incr_bulk_len_ + 2);
+ incr_state_ = Incr_batch_size;
} else {
return CBState::AGAIN;
}
@@ -587,22 +574,20 @@ ReplicationThread::CBState
ReplicationThread::incrementBatchLoopCB(bufferevent *
}
}
-ReplicationThread::CBState ReplicationThread::fullSyncWriteCB(bufferevent
*bev, void *ctx) {
+ReplicationThread::CBState ReplicationThread::fullSyncWriteCB(bufferevent
*bev) {
SendString(bev, redis::MultiBulkString({"_fetch_meta"}));
- auto self = static_cast<ReplicationThread *>(ctx);
- self->repl_state_.store(kReplFetchMeta, std::memory_order_relaxed);
+ repl_state_.store(kReplFetchMeta, std::memory_order_relaxed);
LOG(INFO) << "[replication] Start syncing data with fullsync";
return CBState::NEXT;
}
-ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev,
void *ctx) {
- auto self = static_cast<ReplicationThread *>(ctx);
+ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev)
{
auto input = bufferevent_get_input(bev);
- switch (self->fullsync_state_) {
+ switch (fullsync_state_) {
case kFetchMetaID: {
// New version master only sends meta file content
- if (!self->srv_->GetConfig()->master_use_repl_port) {
- self->fullsync_state_ = kFetchMetaContent;
+ if (!srv_->GetConfig()->master_use_repl_port) {
+ fullsync_state_ = kFetchMetaContent;
return CBState::AGAIN;
}
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
@@ -611,14 +596,13 @@ ReplicationThread::CBState
ReplicationThread::fullSyncReadCB(bufferevent *bev, v
LOG(ERROR) << "[replication] Failed to fetch meta id: " << line.get();
return CBState::RESTART;
}
- self->fullsync_meta_id_ =
- static_cast<rocksdb::BackupID>(line.length > 0 ?
std::strtoul(line.get(), nullptr, 10) : 0);
- if (self->fullsync_meta_id_ == 0) {
+ fullsync_meta_id_ = static_cast<rocksdb::BackupID>(line.length > 0 ?
std::strtoul(line.get(), nullptr, 10) : 0);
+ if (fullsync_meta_id_ == 0) {
LOG(ERROR) << "[replication] Invalid meta id received";
return CBState::RESTART;
}
- self->fullsync_state_ = kFetchMetaSize;
- LOG(INFO) << "[replication] Succeed fetching meta id: " <<
self->fullsync_meta_id_;
+ fullsync_state_ = kFetchMetaSize;
+ LOG(INFO) << "[replication] Succeed fetching meta id: " <<
fullsync_meta_id_;
}
case kFetchMetaSize: {
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
@@ -627,29 +611,28 @@ ReplicationThread::CBState
ReplicationThread::fullSyncReadCB(bufferevent *bev, v
LOG(ERROR) << "[replication] Failed to fetch meta size: " <<
line.get();
return CBState::RESTART;
}
- self->fullsync_filesize_ = line.length > 0 ? std::strtoull(line.get(),
nullptr, 10) : 0;
- if (self->fullsync_filesize_ == 0) {
+ fullsync_filesize_ = line.length > 0 ? std::strtoull(line.get(),
nullptr, 10) : 0;
+ if (fullsync_filesize_ == 0) {
LOG(ERROR) << "[replication] Invalid meta file size received";
return CBState::RESTART;
}
- self->fullsync_state_ = kFetchMetaContent;
- LOG(INFO) << "[replication] Succeed fetching meta size: " <<
self->fullsync_filesize_;
+ fullsync_state_ = kFetchMetaContent;
+ LOG(INFO) << "[replication] Succeed fetching meta size: " <<
fullsync_filesize_;
}
case kFetchMetaContent: {
std::string target_dir;
engine::Storage::ReplDataManager::MetaInfo meta;
// Master using old version
- if (self->srv_->GetConfig()->master_use_repl_port) {
- if (evbuffer_get_length(input) < self->fullsync_filesize_) {
+ if (srv_->GetConfig()->master_use_repl_port) {
+ if (evbuffer_get_length(input) < fullsync_filesize_) {
return CBState::AGAIN;
}
- auto s =
- engine::Storage::ReplDataManager::ParseMetaAndSave(self->storage_,
self->fullsync_meta_id_, input, &meta);
+ auto s = engine::Storage::ReplDataManager::ParseMetaAndSave(storage_,
fullsync_meta_id_, input, &meta);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] Failed to parse meta and save: " <<
s.Msg();
return CBState::AGAIN;
}
- target_dir = self->srv_->GetConfig()->backup_sync_dir;
+ target_dir = srv_->GetConfig()->backup_sync_dir;
} else {
// Master using new version
UniqueEvbufReadln line(input, EVBUFFER_EOL_CRLF_STRICT);
@@ -663,13 +646,13 @@ ReplicationThread::CBState
ReplicationThread::fullSyncReadCB(bufferevent *bev, v
meta.files.emplace_back(f, 0);
}
- target_dir = self->srv_->GetConfig()->sync_checkpoint_dir;
+ target_dir = srv_->GetConfig()->sync_checkpoint_dir;
// Clean invalid files of checkpoint, "CURRENT" file must be invalid
// because we identify one file by its file number but only "CURRENT"
// file doesn't have number.
auto iter = std::find(need_files.begin(), need_files.end(), "CURRENT");
if (iter != need_files.end()) need_files.erase(iter);
- auto s =
engine::Storage::ReplDataManager::CleanInvalidFiles(self->storage_, target_dir,
need_files);
+ auto s = engine::Storage::ReplDataManager::CleanInvalidFiles(storage_,
target_dir, need_files);
if (!s.IsOK()) {
LOG(WARNING) << "[replication] Failed to clean up invalid files of
the old checkpoint,"
<< " error: " << s.Msg();
@@ -681,19 +664,19 @@ ReplicationThread::CBState
ReplicationThread::fullSyncReadCB(bufferevent *bev, v
}
}
assert(evbuffer_get_length(input) == 0);
- self->fullsync_state_ = kFetchMetaID;
+ fullsync_state_ = kFetchMetaID;
LOG(INFO) << "[replication] Succeeded fetching full data files info,
fetching files in parallel";
// If 'slave-empty-db-before-fullsync' is yes, we call 'pre_fullsync_cb_'
// just like reloading database. And we don't want slave to occupy too
much
// disk space, so we just empty entire database rudely.
- if (self->srv_->GetConfig()->slave_empty_db_before_fullsync) {
- self->pre_fullsync_cb_();
- self->storage_->EmptyDB();
+ if (srv_->GetConfig()->slave_empty_db_before_fullsync) {
+ pre_fullsync_cb_();
+ storage_->EmptyDB();
}
- self->repl_state_.store(kReplFetchSST, std::memory_order_relaxed);
- auto s = self->parallelFetchFile(target_dir, meta.files);
+ repl_state_.store(kReplFetchSST, std::memory_order_relaxed);
+ auto s = parallelFetchFile(target_dir, meta.files);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] Failed to parallel fetch files while " +
s.Msg();
return CBState::RESTART;
@@ -702,22 +685,22 @@ ReplicationThread::CBState
ReplicationThread::fullSyncReadCB(bufferevent *bev, v
// Restore DB from backup
// We already call 'pre_fullsync_cb_' if
'slave-empty-db-before-fullsync' is yes
- if (!self->srv_->GetConfig()->slave_empty_db_before_fullsync)
self->pre_fullsync_cb_();
+ if (!srv_->GetConfig()->slave_empty_db_before_fullsync)
pre_fullsync_cb_();
// For old version, master uses rocksdb backup to implement data snapshot
- if (self->srv_->GetConfig()->master_use_repl_port) {
- s = self->storage_->RestoreFromBackup();
+ if (srv_->GetConfig()->master_use_repl_port) {
+ s = storage_->RestoreFromBackup();
} else {
- s = self->storage_->RestoreFromCheckpoint();
+ s = storage_->RestoreFromCheckpoint();
}
if (!s.IsOK()) {
LOG(ERROR) << "[replication] Failed to restore backup while " +
s.Msg() + ", restart fullsync";
return CBState::RESTART;
}
LOG(INFO) << "[replication] Succeeded restoring the backup, fullsync was
finish";
- self->post_fullsync_cb_();
+ post_fullsync_cb_();
// Switch to psync state machine again
- self->psync_steps_.Start();
+ psync_steps_.Start();
return CBState::QUIT;
}
}
@@ -772,7 +755,7 @@ Status ReplicationThread::parallelFetchFile(const
std::string &dir,
}
unsigned files_count = files.size();
FetchFileCallback fn = [&fetch_cnt, &skip_cnt, files_count](const
std::string &fetch_file,
- const
uint32_t fetch_crc) {
+ uint32_t
fetch_crc) {
fetch_cnt.fetch_add(1);
uint32_t cur_skip_cnt = skip_cnt.load();
uint32_t cur_fetch_cnt = fetch_cnt.load();
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index 2b6f4c86..6bf5954b 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -57,7 +57,7 @@ enum WriteBatchType {
kBatchTypeStream,
};
-using FetchFileCallback = std::function<void(const std::string, const
uint32_t)>;
+using FetchFileCallback = std::function<void(const std::string &, uint32_t)>;
class FeedSlaveThread {
public:
@@ -118,17 +118,20 @@ class ReplicationThread : private
EventCallbackBase<ReplicationThread> {
READ,
WRITE,
};
- using CallbackType = std::tuple<EventType, std::string,
std::function<State(bufferevent *, void *)>>;
+
+ using CallbackFunc = std::function<State(ReplicationThread *, bufferevent
*)>;
+ using CallbackType = std::tuple<EventType, std::string, CallbackFunc>;
using CallbackList = std::deque<CallbackType>;
+
CallbacksStateMachine(ReplicationThread *repl, CallbackList &&handlers)
: repl_(repl), handlers_(std::move(handlers)) {}
void Start();
void Stop();
- static void EvCallback(bufferevent *bev, void *ctx);
- static void ConnEventCB(bufferevent *bev, int16_t events, void
*state_machine_ptr);
- static void SetReadCB(bufferevent *bev, bufferevent_data_cb cb, void
*state_machine_ptr);
- static void SetWriteCB(bufferevent *bev, bufferevent_data_cb cb, void
*state_machine_ptr);
+ void ReadWriteCB(bufferevent *bev);
+ void ConnEventCB(bufferevent *bev, int16_t events);
+ void SetReadCB(bufferevent *bev, bufferevent_data_cb cb);
+ void SetWriteCB(bufferevent *bev, bufferevent_data_cb cb);
private:
bufferevent *bev_ = nullptr;
@@ -138,11 +141,11 @@ class ReplicationThread : private
EventCallbackBase<ReplicationThread> {
EventType getHandlerEventType(CallbackList::size_type idx) { return
std::get<0>(handlers_[idx]); }
std::string getHandlerName(CallbackList::size_type idx) { return
std::get<1>(handlers_[idx]); }
- std::function<State(bufferevent *, void *)>
getHandlerFunc(CallbackList::size_type idx) {
- return std::get<2>(handlers_[idx]);
- }
+ CallbackFunc getHandlerFunc(CallbackList::size_type idx) { return
std::get<2>(handlers_[idx]); }
};
+ using CallbackType = CallbacksStateMachine::CallbackType;
+
private:
std::thread t_;
std::atomic<bool> stop_flag_ = false;
@@ -181,17 +184,17 @@ class ReplicationThread : private
EventCallbackBase<ReplicationThread> {
void run();
- static CBState authWriteCB(bufferevent *bev, void *ctx);
- static CBState authReadCB(bufferevent *bev, void *ctx);
- static CBState checkDBNameWriteCB(bufferevent *bev, void *ctx);
- static CBState checkDBNameReadCB(bufferevent *bev, void *ctx);
- static CBState replConfWriteCB(bufferevent *bev, void *ctx);
- static CBState replConfReadCB(bufferevent *bev, void *ctx);
- static CBState tryPSyncWriteCB(bufferevent *bev, void *ctx);
- static CBState tryPSyncReadCB(bufferevent *bev, void *ctx);
- static CBState incrementBatchLoopCB(bufferevent *bev, void *ctx);
- static CBState fullSyncWriteCB(bufferevent *bev, void *ctx);
- static CBState fullSyncReadCB(bufferevent *bev, void *ctx);
+ CBState authWriteCB(bufferevent *bev);
+ CBState authReadCB(bufferevent *bev);
+ CBState checkDBNameWriteCB(bufferevent *bev);
+ CBState checkDBNameReadCB(bufferevent *bev);
+ CBState replConfWriteCB(bufferevent *bev);
+ CBState replConfReadCB(bufferevent *bev);
+ CBState tryPSyncWriteCB(bufferevent *bev);
+ CBState tryPSyncReadCB(bufferevent *bev);
+ CBState incrementBatchLoopCB(bufferevent *bev);
+ CBState fullSyncWriteCB(bufferevent *bev);
+ CBState fullSyncReadCB(bufferevent *bev);
// Synchronized-Blocking ops
Status sendAuth(int sock_fd);
diff --git a/src/common/event_util.h b/src/common/event_util.h
index 556f40a4..8df67be3 100644
--- a/src/common/event_util.h
+++ b/src/common/event_util.h
@@ -136,3 +136,26 @@ struct EvconnlistenerBase {
return evconnlistener_new(base, callback<cb>, this, flags, backlog, fd);
}
};
+
+namespace details {
+
+template <auto F, typename>
+struct EventCallbackImpl;
+
+template <auto F, typename T, typename R, typename... Args>
+struct EventCallbackImpl<F, R (T::*)(Args...)> {
+ static R Func(Args... args, void *ctx) { return (reinterpret_cast<T
*>(ctx)->*F)(args...); }
+};
+
+} // namespace details
+
+// convert member functions to eventbuffer callbacks
+// e.g. for member function `void A::f(int x)` from class A
+// EventCallback<&A::f> generate a function
+// void EventCallback<&A::f>::Func(int x, void *ctx)
+// and put `this` pointer of A to `void *ctx`
+template <auto F>
+struct EventCallback : details::EventCallbackImpl<F, decltype(F)> {};
+
+template <auto F>
+constexpr auto EventCallbackFunc = EventCallback<F>::Func;