This is an automated email from the ASF dual-hosted git repository.

junchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 89361043 Fix Memdb with seq 0 (#219)
89361043 is described below

commit 8936104368a58a27fa0bacf3120dfa905a338324
Author: cjcchen <[email protected]>
AuthorDate: Thu Jan 1 23:05:51 2026 +0800

    Fix Memdb with seq 0 (#219)
    
    * Chenyi (#201)
    
    * Chenyi lastest executed seq num each replica
    
    * Chenyi lastest executed seq num each replica update file route
    
    * Chenyi update create file
    
    * Chenyi update write in checkpoint
    
    * Chenyi update write in checkpoint
    
    * Chenyi update in checkpoint_manager
    
    * Chenyi update in checkpoint_manager
    
    * Chenyi update in checkpoint_manager
    
    * Chenyi update in checkpoint_manager with ofstream, solved executor_
    
    * Chenyi update in checkpoint_manager with ofstream, solved executor_, with 
test draft, seems deadlock
    
    * Chenyi update in checkpoint_manager with ofstream, solved executor_, with 
test draft
    
    * Chenyi update in checkpoint_manager with ofstream, solved executor_, with 
test draft
    
    * Chenyi update in checkpoint_manager with ofstream, with test draft
    
    ---------
    
    Co-authored-by: cyzhoutt <[email protected]>
    
    * add seq
    
    * add checkpoint recovery
    
    * remove query
    
    * remove query
    
    * remove query
    
    * format
    
    * fix recovery test
    
    * fix recovery test
    
    * rm get all interfaces
    
    * fix client config
    
    * fix seq 0 in memdb
    
    * fix seq 0 in memdb
    
    ---------
    
    Co-authored-by: cyzhoutt <[email protected]>
---
 chain/storage/kv_storage_test.cpp                   |  4 ++++
 chain/storage/memory_db.cpp                         |  2 +-
 .../consensus/ordering/pbft/checkpoint_manager.cpp  | 21 +++++++++++++++++++--
 .../consensus/ordering/pbft/checkpoint_manager.h    |  8 ++++++--
 platform/consensus/ordering/pbft/message_manager.h  |  1 +
 .../consensus/ordering/pbft/viewchange_manager.cpp  |  6 +++++-
 service/tools/config/interface/service.config       |  2 +-
 7 files changed, 37 insertions(+), 7 deletions(-)

diff --git a/chain/storage/kv_storage_test.cpp 
b/chain/storage/kv_storage_test.cpp
index 6ffe1e42..343cdbf9 100644
--- a/chain/storage/kv_storage_test.cpp
+++ b/chain/storage/kv_storage_test.cpp
@@ -152,6 +152,10 @@ TEST_P(KVStorageTest, SetValueWithSeq) {
 
   EXPECT_EQ(storage->GetValueWithSeq("test_key", 4),
             std::make_pair(std::string(""), static_cast<uint64_t>(0)));
+
+  EXPECT_EQ(
+      storage->GetValueWithSeq("test_key", 0),
+      std::make_pair(std::string("test_value_v2"), static_cast<uint64_t>(3)));
 }
 
 TEST_P(KVStorageTest, GetAllValueWithSeq) {
diff --git a/chain/storage/memory_db.cpp b/chain/storage/memory_db.cpp
index d5e68c55..aac3bc8c 100644
--- a/chain/storage/memory_db.cpp
+++ b/chain/storage/memory_db.cpp
@@ -64,7 +64,7 @@ std::pair<std::string, uint64_t> MemoryDB::GetValueWithSeq(
     auto it = search_it->second.end();
     do {
       --it;
-      if (it->second == seq) {
+      if (it->second == seq || seq == 0) {
         return *it;
       }
       if (it->second < seq) {
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp 
b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
index 323c12d5..2125b049 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
@@ -160,6 +160,15 @@ bool CheckPointManager::Wait() {
                       [&] { return new_data_ > 0; });
 }
 
+void CheckPointManager::CheckHealthy() {
+  uint32_t current_time = time(nullptr);
+  for(int i = 1; i <= config_.GetReplicaNum(); ++i) {
+    if( current_time - last_update_time_[i] > replica_timeout_ ) {
+      TimeoutHandler(i);
+    }
+  }
+}
+
 void CheckPointManager::UpdateStableCheckPointStatus() {
   uint64_t last_committable_seq = 0;
   while (!stop_) {
@@ -223,13 +232,19 @@ void CheckPointManager::UpdateStableCheckPointStatus() {
 }
 
 void CheckPointManager::SetTimeoutHandler(
-    std::function<void()> timeout_handler) {
+    std::function<void(int)> timeout_handler) {
   timeout_handler_ = timeout_handler;
 }
 
 void CheckPointManager::TimeoutHandler() {
   if (timeout_handler_) {
-    timeout_handler_();
+    timeout_handler_(0);
+  }
+}
+
+void CheckPointManager::TimeoutHandler(uint32_t replica) {
+  if (timeout_handler_) {
+    timeout_handler_(replica);
   }
 }
 
@@ -245,6 +260,7 @@ int 
CheckPointManager::ProcessStatusSync(std::unique_ptr<Context> context,
   uint64_t seq = checkpoint_data.seq();
   uint32_t sender_id = request->sender_id();
   status_[sender_id] = seq;
+  last_update_time_[sender_id] = time(nullptr);
   LOG(ERROR) << " received from :" << sender_id << " commit status:" << seq;
   return 0;
 }
@@ -293,6 +309,7 @@ void CheckPointManager::SyncStatus() {
       last_check_seq = last_seq;
       last_time = 0;
     }
+    CheckHealthy();
     sleep(10);
     last_time++;
   }
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.h 
b/platform/consensus/ordering/pbft/checkpoint_manager.h
index 8152311a..d1c33aa4 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.h
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.h
@@ -54,13 +54,14 @@ class CheckPointManager : public CheckPoint {
   StableCheckPoint GetStableCheckpointWithVotes();
   bool IsValidCheckpointProof(const StableCheckPoint& stable_ckpt);
 
-  void SetTimeoutHandler(std::function<void()> timeout_handler);
+  void SetTimeoutHandler(std::function<void(int)> timeout_handler);
   virtual void UpdateStableCheckPointCallback(
       int64_t current_stable_checkpoint) {}
 
   void Stop();
 
   void TimeoutHandler();
+  void TimeoutHandler(uint32_t replica);
 
   void WaitSignal();
   std::unique_ptr<std::pair<uint64_t, std::string>> PopStableSeqHash();
@@ -89,6 +90,7 @@ class CheckPointManager : public CheckPoint {
   void SyncStatus();
   void StatusProcess();
   void CheckStatus(uint64_t last_seq);
+  void CheckHealthy();
 
  protected:
   uint64_t last_executed_seq_ = 0;
@@ -107,7 +109,7 @@ class CheckPointManager : public CheckPoint {
   LockFreeQueue<Request> data_queue_;
   std::mutex cv_mutex_;
   std::condition_variable cv_;
-  std::function<void()> timeout_handler_;
+  std::function<void(int)> timeout_handler_;
   StableCheckPoint stable_ckpt_;
   int new_data_ = 0;
   LockFreeQueue<std::pair<uint64_t, std::string>> stable_hash_queue_;
@@ -122,6 +124,8 @@ class CheckPointManager : public CheckPoint {
   std::string last_hash_, committable_hash_;
   sem_t committable_seq_signal_;
   std::map<int, uint64_t> status_;
+  std::map<int,int> last_update_time_;
+  int replica_timeout_ = 120;
 };
 
 }  // namespace resdb
diff --git a/platform/consensus/ordering/pbft/message_manager.h 
b/platform/consensus/ordering/pbft/message_manager.h
index e31272fa..a835be28 100644
--- a/platform/consensus/ordering/pbft/message_manager.h
+++ b/platform/consensus/ordering/pbft/message_manager.h
@@ -135,6 +135,7 @@ class MessageManager {
 
   std::mutex lct_lock_;
   std::map<uint64_t, uint64_t> last_committed_time_;
+  std::map<uint64_t, uint32_t> last_update_time_;
 };
 
 }  // namespace resdb
diff --git a/platform/consensus/ordering/pbft/viewchange_manager.cpp 
b/platform/consensus/ordering/pbft/viewchange_manager.cpp
index 3084c7a1..8ff17cbe 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager.cpp
+++ b/platform/consensus/ordering/pbft/viewchange_manager.cpp
@@ -119,7 +119,11 @@ void ViewChangeManager::MayStart() {
     return;
   }
 
-  checkpoint_manager_->SetTimeoutHandler([&]() {
+  checkpoint_manager_->SetTimeoutHandler([&](int replica_id) {
+    if( system_info_->GetPrimaryId() != replica_id ) {
+      return;
+    }
+
     // LOG(ERROR) << "checkpoint timeout";
     if (status_ == ViewChangeStatus::NONE) {
       view_change_counter_ = 1;
diff --git a/service/tools/config/interface/service.config 
b/service/tools/config/interface/service.config
index a6fd052b..a437dbc7 100644
--- a/service/tools/config/interface/service.config
+++ b/service/tools/config/interface/service.config
@@ -19,7 +19,7 @@
 "replica_info":[
 {
 "id":5,
-"ip":"172.31.57.186",
+"ip":"127.0.0.1",
 "port":17005
 }
 ]

Reply via email to