This is an automated email from the ASF dual-hosted git repository.

junchao pushed a commit to branch recovery_ckpt
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/recovery_ckpt by this push:
     new dfd49a68 fix seq 0 in memdb
dfd49a68 is described below

commit dfd49a6865e5c86ec7d55a760f9f747398be5cda
Author: Ubuntu <[email protected]>
AuthorDate: Thu Jan 1 14:56:50 2026 +0000

    fix seq 0 in memdb
---
 chain/storage/kv_storage_test.cpp                   |  4 ++++
 chain/storage/memory_db.cpp                         |  2 +-
 .../consensus/ordering/pbft/checkpoint_manager.cpp  | 21 +++++++++++++++++++--
 .../consensus/ordering/pbft/checkpoint_manager.h    |  8 ++++++--
 platform/consensus/ordering/pbft/message_manager.h  |  1 +
 .../consensus/ordering/pbft/viewchange_manager.cpp  |  6 +++++-
 6 files changed, 36 insertions(+), 6 deletions(-)

diff --git a/chain/storage/kv_storage_test.cpp 
b/chain/storage/kv_storage_test.cpp
index 6ffe1e42..343cdbf9 100644
--- a/chain/storage/kv_storage_test.cpp
+++ b/chain/storage/kv_storage_test.cpp
@@ -152,6 +152,10 @@ TEST_P(KVStorageTest, SetValueWithSeq) {
 
   EXPECT_EQ(storage->GetValueWithSeq("test_key", 4),
             std::make_pair(std::string(""), static_cast<uint64_t>(0)));
+
+  EXPECT_EQ(
+      storage->GetValueWithSeq("test_key", 0),
+      std::make_pair(std::string("test_value_v2"), static_cast<uint64_t>(3)));
 }
 
 TEST_P(KVStorageTest, GetAllValueWithSeq) {
diff --git a/chain/storage/memory_db.cpp b/chain/storage/memory_db.cpp
index d5e68c55..aac3bc8c 100644
--- a/chain/storage/memory_db.cpp
+++ b/chain/storage/memory_db.cpp
@@ -64,7 +64,7 @@ std::pair<std::string, uint64_t> MemoryDB::GetValueWithSeq(
     auto it = search_it->second.end();
     do {
       --it;
-      if (it->second == seq) {
+      if (it->second == seq || seq == 0) {
         return *it;
       }
       if (it->second < seq) {
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp 
b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
index 323c12d5..2125b049 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
@@ -160,6 +160,15 @@ bool CheckPointManager::Wait() {
                       [&] { return new_data_ > 0; });
 }
 
+void CheckPointManager::CheckHealthy() {
+  uint32_t current_time = time(nullptr);
+  for(int i = 1; i <= config_.GetReplicaNum(); ++i) {
+    if( current_time - last_update_time_[i] > replica_timeout_ ) {
+      TimeoutHandler(i);
+    }
+  }
+}
+
 void CheckPointManager::UpdateStableCheckPointStatus() {
   uint64_t last_committable_seq = 0;
   while (!stop_) {
@@ -223,13 +232,19 @@ void CheckPointManager::UpdateStableCheckPointStatus() {
 }
 
 void CheckPointManager::SetTimeoutHandler(
-    std::function<void()> timeout_handler) {
+    std::function<void(int)> timeout_handler) {
   timeout_handler_ = timeout_handler;
 }
 
 void CheckPointManager::TimeoutHandler() {
   if (timeout_handler_) {
-    timeout_handler_();
+    timeout_handler_(0);
+  }
+}
+
+void CheckPointManager::TimeoutHandler(uint32_t replica) {
+  if (timeout_handler_) {
+    timeout_handler_(replica);
   }
 }
 
@@ -245,6 +260,7 @@ int 
CheckPointManager::ProcessStatusSync(std::unique_ptr<Context> context,
   uint64_t seq = checkpoint_data.seq();
   uint32_t sender_id = request->sender_id();
   status_[sender_id] = seq;
+  last_update_time_[sender_id] = time(nullptr);
   LOG(ERROR) << " received from :" << sender_id << " commit status:" << seq;
   return 0;
 }
@@ -293,6 +309,7 @@ void CheckPointManager::SyncStatus() {
       last_check_seq = last_seq;
       last_time = 0;
     }
+    CheckHealthy();
     sleep(10);
     last_time++;
   }
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.h 
b/platform/consensus/ordering/pbft/checkpoint_manager.h
index 8152311a..d1c33aa4 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.h
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.h
@@ -54,13 +54,14 @@ class CheckPointManager : public CheckPoint {
   StableCheckPoint GetStableCheckpointWithVotes();
   bool IsValidCheckpointProof(const StableCheckPoint& stable_ckpt);
 
-  void SetTimeoutHandler(std::function<void()> timeout_handler);
+  void SetTimeoutHandler(std::function<void(int)> timeout_handler);
   virtual void UpdateStableCheckPointCallback(
       int64_t current_stable_checkpoint) {}
 
   void Stop();
 
   void TimeoutHandler();
+  void TimeoutHandler(uint32_t replica);
 
   void WaitSignal();
   std::unique_ptr<std::pair<uint64_t, std::string>> PopStableSeqHash();
@@ -89,6 +90,7 @@ class CheckPointManager : public CheckPoint {
   void SyncStatus();
   void StatusProcess();
   void CheckStatus(uint64_t last_seq);
+  void CheckHealthy();
 
  protected:
   uint64_t last_executed_seq_ = 0;
@@ -107,7 +109,7 @@ class CheckPointManager : public CheckPoint {
   LockFreeQueue<Request> data_queue_;
   std::mutex cv_mutex_;
   std::condition_variable cv_;
-  std::function<void()> timeout_handler_;
+  std::function<void(int)> timeout_handler_;
   StableCheckPoint stable_ckpt_;
   int new_data_ = 0;
   LockFreeQueue<std::pair<uint64_t, std::string>> stable_hash_queue_;
@@ -122,6 +124,8 @@ class CheckPointManager : public CheckPoint {
   std::string last_hash_, committable_hash_;
   sem_t committable_seq_signal_;
   std::map<int, uint64_t> status_;
+  std::map<int,int> last_update_time_;
+  int replica_timeout_ = 120;
 };
 
 }  // namespace resdb
diff --git a/platform/consensus/ordering/pbft/message_manager.h 
b/platform/consensus/ordering/pbft/message_manager.h
index e31272fa..a835be28 100644
--- a/platform/consensus/ordering/pbft/message_manager.h
+++ b/platform/consensus/ordering/pbft/message_manager.h
@@ -135,6 +135,7 @@ class MessageManager {
 
   std::mutex lct_lock_;
   std::map<uint64_t, uint64_t> last_committed_time_;
+  std::map<uint64_t, uint32_t> last_update_time_;
 };
 
 }  // namespace resdb
diff --git a/platform/consensus/ordering/pbft/viewchange_manager.cpp 
b/platform/consensus/ordering/pbft/viewchange_manager.cpp
index 3084c7a1..8ff17cbe 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager.cpp
+++ b/platform/consensus/ordering/pbft/viewchange_manager.cpp
@@ -119,7 +119,11 @@ void ViewChangeManager::MayStart() {
     return;
   }
 
-  checkpoint_manager_->SetTimeoutHandler([&]() {
+  checkpoint_manager_->SetTimeoutHandler([&](int replica_id) {
+    if( system_info_->GetPrimaryId() != replica_id ) {
+      return;
+    }
+
     // LOG(ERROR) << "checkpoint timeout";
     if (status_ == ViewChangeStatus::NONE) {
       view_change_counter_ = 1;

Reply via email to