This is an automated email from the ASF dual-hosted git repository.
junchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/master by this push:
new 89361043 Fix Memdb with seq 0 (#219)
89361043 is described below
commit 8936104368a58a27fa0bacf3120dfa905a338324
Author: cjcchen <[email protected]>
AuthorDate: Thu Jan 1 23:05:51 2026 +0800
Fix Memdb with seq 0 (#219)
* Chenyi (#201)
* Chenyi lastest executed seq num each replica
* Chenyi lastest executed seq num each replica update file route
* Chenyi update create file
* Chenyi update write in checkpoint
* Chenyi update write in checkpoint
* Chenyi update in checkpoint_manager
* Chenyi update in checkpoint_manager
* Chenyi update in checkpoint_manager
* Chenyi update in checkpoint_manager with ofstream, solved executor_
* Chenyi update in checkpoint_manager with ofstream, solved executor_, with
test draft, seems deadlock
* Chenyi update in checkpoint_manager with ofstream, solved executor_, with
test draft
* Chenyi update in checkpoint_manager with ofstream, solved executor_, with
test draft
* Chenyi update in checkpoint_manager with ofstream, with test draft
---------
Co-authored-by: cyzhoutt <[email protected]>
* add seq
* add checkpoint recovery
* remove query
* remove query
* remove query
* format
* fix recovery test
* fix recovery test
* rm get all interfaces
* fix client config
* fix seq 0 in memdb
* fix seq 0 in memdb
---------
Co-authored-by: cyzhoutt <[email protected]>
---
chain/storage/kv_storage_test.cpp | 4 ++++
chain/storage/memory_db.cpp | 2 +-
.../consensus/ordering/pbft/checkpoint_manager.cpp | 21 +++++++++++++++++++--
.../consensus/ordering/pbft/checkpoint_manager.h | 8 ++++++--
platform/consensus/ordering/pbft/message_manager.h | 1 +
.../consensus/ordering/pbft/viewchange_manager.cpp | 6 +++++-
service/tools/config/interface/service.config | 2 +-
7 files changed, 37 insertions(+), 7 deletions(-)
diff --git a/chain/storage/kv_storage_test.cpp
b/chain/storage/kv_storage_test.cpp
index 6ffe1e42..343cdbf9 100644
--- a/chain/storage/kv_storage_test.cpp
+++ b/chain/storage/kv_storage_test.cpp
@@ -152,6 +152,10 @@ TEST_P(KVStorageTest, SetValueWithSeq) {
EXPECT_EQ(storage->GetValueWithSeq("test_key", 4),
std::make_pair(std::string(""), static_cast<uint64_t>(0)));
+
+ EXPECT_EQ(
+ storage->GetValueWithSeq("test_key", 0),
+ std::make_pair(std::string("test_value_v2"), static_cast<uint64_t>(3)));
}
TEST_P(KVStorageTest, GetAllValueWithSeq) {
diff --git a/chain/storage/memory_db.cpp b/chain/storage/memory_db.cpp
index d5e68c55..aac3bc8c 100644
--- a/chain/storage/memory_db.cpp
+++ b/chain/storage/memory_db.cpp
@@ -64,7 +64,7 @@ std::pair<std::string, uint64_t> MemoryDB::GetValueWithSeq(
auto it = search_it->second.end();
do {
--it;
- if (it->second == seq) {
+ if (it->second == seq || seq == 0) {
return *it;
}
if (it->second < seq) {
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
index 323c12d5..2125b049 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
@@ -160,6 +160,15 @@ bool CheckPointManager::Wait() {
[&] { return new_data_ > 0; });
}
+void CheckPointManager::CheckHealthy() {
+ uint32_t current_time = time(nullptr);
+ for(int i = 1; i <= config_.GetReplicaNum(); ++i) {
+ if( current_time - last_update_time_[i] > replica_timeout_ ) {
+ TimeoutHandler(i);
+ }
+ }
+}
+
void CheckPointManager::UpdateStableCheckPointStatus() {
uint64_t last_committable_seq = 0;
while (!stop_) {
@@ -223,13 +232,19 @@ void CheckPointManager::UpdateStableCheckPointStatus() {
}
void CheckPointManager::SetTimeoutHandler(
- std::function<void()> timeout_handler) {
+ std::function<void(int)> timeout_handler) {
timeout_handler_ = timeout_handler;
}
void CheckPointManager::TimeoutHandler() {
if (timeout_handler_) {
- timeout_handler_();
+ timeout_handler_(0);
+ }
+}
+
+void CheckPointManager::TimeoutHandler(uint32_t replica) {
+ if (timeout_handler_) {
+ timeout_handler_(replica);
}
}
@@ -245,6 +260,7 @@ int
CheckPointManager::ProcessStatusSync(std::unique_ptr<Context> context,
uint64_t seq = checkpoint_data.seq();
uint32_t sender_id = request->sender_id();
status_[sender_id] = seq;
+ last_update_time_[sender_id] = time(nullptr);
LOG(ERROR) << " received from :" << sender_id << " commit status:" << seq;
return 0;
}
@@ -293,6 +309,7 @@ void CheckPointManager::SyncStatus() {
last_check_seq = last_seq;
last_time = 0;
}
+ CheckHealthy();
sleep(10);
last_time++;
}
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.h
b/platform/consensus/ordering/pbft/checkpoint_manager.h
index 8152311a..d1c33aa4 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.h
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.h
@@ -54,13 +54,14 @@ class CheckPointManager : public CheckPoint {
StableCheckPoint GetStableCheckpointWithVotes();
bool IsValidCheckpointProof(const StableCheckPoint& stable_ckpt);
- void SetTimeoutHandler(std::function<void()> timeout_handler);
+ void SetTimeoutHandler(std::function<void(int)> timeout_handler);
virtual void UpdateStableCheckPointCallback(
int64_t current_stable_checkpoint) {}
void Stop();
void TimeoutHandler();
+ void TimeoutHandler(uint32_t replica);
void WaitSignal();
std::unique_ptr<std::pair<uint64_t, std::string>> PopStableSeqHash();
@@ -89,6 +90,7 @@ class CheckPointManager : public CheckPoint {
void SyncStatus();
void StatusProcess();
void CheckStatus(uint64_t last_seq);
+ void CheckHealthy();
protected:
uint64_t last_executed_seq_ = 0;
@@ -107,7 +109,7 @@ class CheckPointManager : public CheckPoint {
LockFreeQueue<Request> data_queue_;
std::mutex cv_mutex_;
std::condition_variable cv_;
- std::function<void()> timeout_handler_;
+ std::function<void(int)> timeout_handler_;
StableCheckPoint stable_ckpt_;
int new_data_ = 0;
LockFreeQueue<std::pair<uint64_t, std::string>> stable_hash_queue_;
@@ -122,6 +124,8 @@ class CheckPointManager : public CheckPoint {
std::string last_hash_, committable_hash_;
sem_t committable_seq_signal_;
std::map<int, uint64_t> status_;
+ std::map<int,int> last_update_time_;
+ int replica_timeout_ = 120;
};
} // namespace resdb
diff --git a/platform/consensus/ordering/pbft/message_manager.h
b/platform/consensus/ordering/pbft/message_manager.h
index e31272fa..a835be28 100644
--- a/platform/consensus/ordering/pbft/message_manager.h
+++ b/platform/consensus/ordering/pbft/message_manager.h
@@ -135,6 +135,7 @@ class MessageManager {
std::mutex lct_lock_;
std::map<uint64_t, uint64_t> last_committed_time_;
+ std::map<uint64_t, uint32_t> last_update_time_;
};
} // namespace resdb
diff --git a/platform/consensus/ordering/pbft/viewchange_manager.cpp
b/platform/consensus/ordering/pbft/viewchange_manager.cpp
index 3084c7a1..8ff17cbe 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager.cpp
+++ b/platform/consensus/ordering/pbft/viewchange_manager.cpp
@@ -119,7 +119,11 @@ void ViewChangeManager::MayStart() {
return;
}
- checkpoint_manager_->SetTimeoutHandler([&]() {
+ checkpoint_manager_->SetTimeoutHandler([&](int replica_id) {
+ if( system_info_->GetPrimaryId() != replica_id ) {
+ return;
+ }
+
// LOG(ERROR) << "checkpoint timeout";
if (status_ == ViewChangeStatus::NONE) {
view_change_counter_ = 1;
diff --git a/service/tools/config/interface/service.config
b/service/tools/config/interface/service.config
index a6fd052b..a437dbc7 100644
--- a/service/tools/config/interface/service.config
+++ b/service/tools/config/interface/service.config
@@ -19,7 +19,7 @@
"replica_info":[
{
"id":5,
-"ip":"172.31.57.186",
+"ip":"127.0.0.1",
"port":17005
}
]