This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new f402be06 fix(replication): didn't resume the db status after
restarting full sync (#2549)
f402be06 is described below
commit f402be06d5a85b38ca36d4ff2ccadd04f60e1f2b
Author: hulk <[email protected]>
AuthorDate: Sat Sep 21 14:09:28 2024 +0800
fix(replication): didn't resume the db status after restarting full sync
(#2549)
Currently, the pre_fullsync_cb will stop the task runner and set the DB
loading status to yes,
but it didn't resume those states. This will cause the server to run in
restoring status until
success in resyncing from the master. To fix this, we need to call the
post_fullsync_cb to resume those statuses
before restarting full sync.
This PR also uses try_lock to allow the replication thread to be stopped
while preparing the restore db.
---
src/cluster/replication.cc | 14 +++++++++-----
src/cluster/replication.h | 5 +++--
src/server/server.cc | 28 ++++++++++++++++++----------
src/server/server.h | 2 +-
src/storage/storage.cc | 1 +
5 files changed, 32 insertions(+), 18 deletions(-)
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 29da2941..14575703 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -337,7 +337,7 @@ ReplicationThread::ReplicationThread(std::string host,
uint32_t port, Server *sr
CallbackType{CallbacksStateMachine::WRITE, "fullsync
write", &ReplicationThread::fullSyncWriteCB},
CallbackType{CallbacksStateMachine::READ, "fullsync read",
&ReplicationThread::fullSyncReadCB}}) {}
-Status ReplicationThread::Start(std::function<void()> &&pre_fullsync_cb,
std::function<void()> &&post_fullsync_cb) {
+Status ReplicationThread::Start(std::function<bool()> &&pre_fullsync_cb,
std::function<void()> &&post_fullsync_cb) {
pre_fullsync_cb_ = std::move(pre_fullsync_cb);
post_fullsync_cb_ = std::move(post_fullsync_cb);
@@ -700,25 +700,28 @@ ReplicationThread::CBState
ReplicationThread::fullSyncReadCB(bufferevent *bev) {
fullsync_state_ = kFetchMetaID;
LOG(INFO) << "[replication] Succeeded fetching full data files info,
fetching files in parallel";
+ bool pre_fullsync_done = false;
// If 'slave-empty-db-before-fullsync' is yes, we call 'pre_fullsync_cb_'
// just like reloading database. And we don't want slave to occupy too
much
// disk space, so we just empty entire database rudely.
if (srv_->GetConfig()->slave_empty_db_before_fullsync) {
- pre_fullsync_cb_();
+ if (!pre_fullsync_cb_()) return CBState::RESTART;
+ pre_fullsync_done = true;
storage_->EmptyDB();
}
repl_state_.store(kReplFetchSST, std::memory_order_relaxed);
auto s = parallelFetchFile(target_dir, meta.files);
if (!s.IsOK()) {
+ if (pre_fullsync_done) post_fullsync_cb_();
LOG(ERROR) << "[replication] Failed to parallel fetch files while " +
s.Msg();
return CBState::RESTART;
}
LOG(INFO) << "[replication] Succeeded fetching files in parallel,
restoring the backup";
- // Restore DB from backup
- // We already call 'pre_fullsync_cb_' if
'slave-empty-db-before-fullsync' is yes
- if (!srv_->GetConfig()->slave_empty_db_before_fullsync)
pre_fullsync_cb_();
+ // Don't need to call 'pre_fullsync_cb_' again if it was called before
+ if (!pre_fullsync_done && !pre_fullsync_cb_()) return CBState::RESTART;
+
// For old version, master uses rocksdb backup to implement data snapshot
if (srv_->GetConfig()->master_use_repl_port) {
s = storage_->RestoreFromBackup();
@@ -727,6 +730,7 @@ ReplicationThread::CBState
ReplicationThread::fullSyncReadCB(bufferevent *bev) {
}
if (!s.IsOK()) {
LOG(ERROR) << "[replication] Failed to restore backup while " +
s.Msg() + ", restart fullsync";
+ post_fullsync_cb_();
return CBState::RESTART;
}
LOG(INFO) << "[replication] Succeeded restoring the backup, fullsync was
finish";
diff --git a/src/cluster/replication.h b/src/cluster/replication.h
index e499de8b..8325b162 100644
--- a/src/cluster/replication.h
+++ b/src/cluster/replication.h
@@ -95,8 +95,9 @@ class FeedSlaveThread {
class ReplicationThread : private EventCallbackBase<ReplicationThread> {
public:
explicit ReplicationThread(std::string host, uint32_t port, Server *srv);
- Status Start(std::function<void()> &&pre_fullsync_cb, std::function<void()>
&&post_fullsync_cb);
+ Status Start(std::function<bool()> &&pre_fullsync_cb, std::function<void()>
&&post_fullsync_cb);
void Stop();
+ bool IsStopped() const { return stop_flag_; }
ReplState State() { return repl_state_.load(std::memory_order_relaxed); }
int64_t LastIOTimeSecs() const { return
last_io_time_secs_.load(std::memory_order_relaxed); }
@@ -159,7 +160,7 @@ class ReplicationThread : private
EventCallbackBase<ReplicationThread> {
bool next_try_old_psync_ = false;
bool next_try_without_announce_ip_address_ = false;
- std::function<void()> pre_fullsync_cb_;
+ std::function<bool()> pre_fullsync_cb_;
std::function<void()> post_fullsync_cb_;
// Internal states managed by FullSync procedure
diff --git a/src/server/server.cc b/src/server/server.cc
index 393a70d8..c6c4b851 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -281,7 +281,7 @@ Status Server::AddMaster(const std::string &host, uint32_t
port, bool force_reco
if (GetConfig()->master_use_repl_port) master_listen_port += 1;
replication_thread_ = std::make_unique<ReplicationThread>(host,
master_listen_port, this);
- auto s = replication_thread_->Start([this]() { PrepareRestoreDB(); },
+ auto s = replication_thread_->Start([this]() { return PrepareRestoreDB(); },
[this]() {
this->is_loading_ = false;
if (auto s = task_runner_.Start(); !s)
{
@@ -1336,18 +1336,11 @@ std::string Server::GetRocksDBStatsJson() const {
// This function is called by replication thread when finished fetching all
files from its master.
// Before restoring the db from backup or checkpoint, we should
// guarantee other threads don't access DB and its column families, then close
db.
-void Server::PrepareRestoreDB() {
+bool Server::PrepareRestoreDB() {
// Stop feeding slaves thread
LOG(INFO) << "[server] Disconnecting slaves...";
DisconnectSlaves();
- // Stop task runner
- LOG(INFO) << "[server] Stopping the task runner and clear task queue...";
- task_runner_.Cancel();
- if (auto s = task_runner_.Join(); !s) {
- LOG(WARNING) << "[server] " << s.Msg();
- }
-
// If the DB is restored, the object 'db_' will be destroyed, but
// 'db_' will be accessed in data migration task. To avoid wrong
// accessing, data migration task should be stopped before restoring DB
@@ -1362,12 +1355,27 @@ void Server::PrepareRestoreDB() {
// ASAP to avoid user can't receive responses for long time, because the
following
// 'CloseDB' may cost much time to acquire DB mutex.
LOG(INFO) << "[server] Waiting workers for finishing executing commands...";
- { auto exclusivity = WorkExclusivityGuard(); }
+ while (!works_concurrency_rw_lock_.try_lock()) {
+ if (replication_thread_->IsStopped()) {
+ is_loading_ = false;
+ return false;
+ }
+ usleep(1000);
+ }
+ works_concurrency_rw_lock_.unlock();
+
+ // Stop task runner
+ LOG(INFO) << "[server] Stopping the task runner and clear task queue...";
+ task_runner_.Cancel();
+ if (auto s = task_runner_.Join(); !s) {
+ LOG(WARNING) << "[server] " << s.Msg();
+ }
// Cron thread, compaction checker thread, full synchronization thread
// may always run in the background, we need to close db, so they don't
actually work.
LOG(INFO) << "[server] Waiting for closing DB...";
storage->CloseDB();
+ return true;
}
void Server::WaitNoMigrateProcessing() {
diff --git a/src/server/server.h b/src/server/server.h
index 1bb639ba..7d8c8327 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -245,7 +245,7 @@ class Server {
std::string GetRocksDBStatsJson() const;
ReplState GetReplicationState();
- void PrepareRestoreDB();
+ bool PrepareRestoreDB();
void WaitNoMigrateProcessing();
Status AsyncCompactDB(const std::string &begin_key = "", const std::string
&end_key = "");
Status AsyncBgSaveDB();
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 6754ac37..f4627eae 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -482,6 +482,7 @@ Status Storage::RestoreFromCheckpoint() {
// Clean old backups and checkpoints because server will work on the new db
PurgeOldBackups(0, 0);
rocksdb::DestroyDB(config_->checkpoint_dir, rocksdb::Options());
+ rocksdb::DestroyDB(tmp_dir, rocksdb::Options());
// Maybe there is no database directory
auto s = env_->CreateDirIfMissing(config_->db_dir);