This is an automated email from the ASF dual-hosted git repository.

junchao pushed a commit to branch recovery_ckpt
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/recovery_ckpt by this push:
     new 45ad21d6 add checkpoint recovery
45ad21d6 is described below

commit 45ad21d6e3b8446ad54862d8586ba927900fcf80
Author: Ubuntu <[email protected]>
AuthorDate: Wed Dec 10 17:38:52 2025 +0000

    add checkpoint recovery
---
 chain/storage/leveldb.cpp                          |  50 +++++++-
 chain/storage/leveldb.h                            |   8 ++
 chain/storage/storage.h                            |   2 +
 executor/common/transaction_manager.cpp            |   2 +
 executor/common/transaction_manager.h              |   4 +-
 executor/kv/kv_executor.cpp                        |   6 +-
 executor/kv/kv_executor.h                          |   2 -
 .../consensus/execution/transaction_executor.cpp   |  49 +++++---
 .../consensus/execution/transaction_executor.h     |   5 +-
 .../consensus/ordering/pbft/checkpoint_manager.cpp | 136 +++++++++++++++++----
 .../consensus/ordering/pbft/checkpoint_manager.h   |  17 ++-
 platform/consensus/ordering/pbft/commitment.cpp    |   2 +-
 .../ordering/pbft/consensus_manager_pbft.cpp       | 102 +++++++++++++++-
 .../ordering/pbft/consensus_manager_pbft.h         |   9 ++
 .../ordering/pbft/lock_free_collector_pool.cpp     |  12 ++
 .../ordering/pbft/lock_free_collector_pool.h       |   1 +
 .../consensus/ordering/pbft/message_manager.cpp    |  11 ++
 platform/consensus/ordering/pbft/message_manager.h |   2 +
 .../ordering/pbft/transaction_collector.cpp        |   4 +-
 platform/consensus/recovery/recovery.cpp           | 114 ++++++++++++++++-
 platform/consensus/recovery/recovery.h             |  17 ++-
 platform/proto/resdb.proto                         |   4 +-
 scripts/deploy/config/kv_server.conf               |  10 +-
 scripts/deploy/config/pbft.config                  |  19 ---
 scripts/deploy/script/deploy.sh                    |   5 +-
 service/kv/kv_service.cpp                          |  14 ++-
 tools/generate_region_config.py                    |   1 +
 27 files changed, 516 insertions(+), 92 deletions(-)

diff --git a/chain/storage/leveldb.cpp b/chain/storage/leveldb.cpp
index f000d6c0..f2de42d7 100644
--- a/chain/storage/leveldb.cpp
+++ b/chain/storage/leveldb.cpp
@@ -64,7 +64,9 @@ ResLevelDB::ResLevelDB(std::optional<LevelDBInfo> config) {
     LOG(ERROR) << "initialized block cache" << std::endl;
   }
   global_stats_ = Stats::GetGlobalStats();
+  last_ckpt_ = 0;
   CreateDB(path);
+  last_ckpt_ = GetLastCheckpointInternal();
 }
 
 void ResLevelDB::CreateDB(const std::string& path) {
@@ -95,6 +97,7 @@ ResLevelDB::~ResLevelDB() {
 
 int ResLevelDB::SetValueWithSeq(const std::string& key,
                                     const std::string& value, uint64_t seq) {
+    LOG(ERROR) << "set value seq:" << seq;
   std::string value_str = GetValue(key);
   ValueHistory history;
   if (!history.ParseFromString(value_str)) {
@@ -109,6 +112,9 @@ int ResLevelDB::SetValueWithSeq(const std::string& key,
 
   if (last_seq > seq) {
     LOG(ERROR) << "seq is small, last:" <<  last_seq << " new seq:" << seq;
+
+    UpdateLastCkpt(last_seq);
+    
     return -2;
   }
 
@@ -120,9 +126,14 @@ int ResLevelDB::SetValueWithSeq(const std::string& key,
     history.mutable_value()->erase(history.mutable_value()->begin());
   }
 
-  LOG(ERROR)<<" set value, string:"<<key<<" seq:"<<seq;
+  LOG(ERROR)<<" set value, string:"<<key<<" seq:"<<seq<<" last seq:"<<last_seq;
   history.SerializeToString(&value_str);
-  return SetValue(key, value_str);
+  int ret = SetValue(key, value_str);
+  if(ret) {
+         return ret;
+  }
+  UpdateLastCkpt(seq);
+  return 0;
 }
 
 std::pair<std::string, uint64_t> ResLevelDB::GetValueWithSeq(
@@ -396,5 +407,40 @@ std::vector<std::pair<std::string, int>> 
ResLevelDB::GetTopHistory(
   return resp;
 }
 
+const std::string ckpt_key = "leveldb_checkpoint";
+
+void ResLevelDB::UpdateLastCkpt(uint64_t seq) {
+  LOG(ERROR)<<" update ckpt seq:"<<seq<<" last:"<<last_ckpt_<<" update 
time:"<<update_time_;
+  if(last_ckpt_ > seq) {
+    return;
+  }
+  last_ckpt_ = seq;
+  update_time_++;
+  if(update_time_%100 == 0 && last_ckpt_>0){
+    SetLastCheckpoint(last_ckpt_);
+    update_time_ = 0;
+  }
+}
+
+int ResLevelDB::SetLastCheckpoint(uint64_t ckpt) {
+  LOG(ERROR)<<" update last ckpt :"<<ckpt;
+  return SetValue(ckpt_key, std::to_string(ckpt));
+}
+
+uint64_t ResLevelDB::GetLastCheckpointInternal() {
+       std::string value = GetValue(ckpt_key);
+       if(value.empty()) {
+               return 0;
+       }
+       return std::stoll(value);
+}
+
+uint64_t ResLevelDB::GetLastCheckpoint() {
+       if(last_ckpt_ >0) {
+                 return last_ckpt_;
+       }
+       return GetLastCheckpointInternal();
+}
+
 }  // namespace storage
 }  // namespace resdb
diff --git a/chain/storage/leveldb.h b/chain/storage/leveldb.h
index 3cf97c50..064a9a7d 100644
--- a/chain/storage/leveldb.h
+++ b/chain/storage/leveldb.h
@@ -73,8 +73,14 @@ class ResLevelDB : public Storage {
 
   bool Flush() override;
 
+  virtual uint64_t GetLastCheckpoint() override;
+
+  virtual int SetLastCheckpoint(uint64_t ckpt);
+
  private:
   void CreateDB(const std::string& path);
+  uint64_t GetLastCheckpointInternal();
+  void UpdateLastCkpt(uint64_t seq);
 
  private:
   std::unique_ptr<leveldb::DB> db_ = nullptr;
@@ -85,6 +91,8 @@ class ResLevelDB : public Storage {
  protected:
   Stats* global_stats_ = nullptr;
   std::unique_ptr<LRUCache<std::string, std::string>> block_cache_;
+  uint64_t last_ckpt_;
+  int update_time_ = 0;
 };
 
 }  // namespace storage
diff --git a/chain/storage/storage.h b/chain/storage/storage.h
index 445f0c1f..a4a7309e 100644
--- a/chain/storage/storage.h
+++ b/chain/storage/storage.h
@@ -58,6 +58,8 @@ class Storage {
       const std::string& key, int number) = 0;
 
   virtual bool Flush() { return true; };
+
+  virtual uint64_t GetLastCheckpoint() { return 0; }
   
   void SetMaxHistoryNum(int num) { max_history_ = num; }
   protected:
diff --git a/executor/common/transaction_manager.cpp 
b/executor/common/transaction_manager.cpp
index 372a00f3..6cd96037 100644
--- a/executor/common/transaction_manager.cpp
+++ b/executor/common/transaction_manager.cpp
@@ -64,6 +64,7 @@ std::unique_ptr<std::string> 
TransactionManager::ExecuteRequest(
 
 std::vector<std::unique_ptr<std::string>> 
TransactionManager::ExecuteBatchDataWithSeq(
     uint64_t seq, const 
std::vector<std::unique_ptr<google::protobuf::Message>>& requests) {
+       seq_ = seq;
     return ExecuteBatchData(requests);
 }
 
@@ -85,6 +86,7 @@ std::vector<std::unique_ptr<std::string>> 
TransactionManager::ExecuteBatchData(
 
 std::unique_ptr<BatchUserResponse> TransactionManager::ExecuteBatchWithSeq(
     uint64_t seq, const BatchUserRequest& request) {
+       LOG(ERROR)<<" execute batch seq:"<<seq_;
     seq_ = seq;
   return ExecuteBatch(request);
 }
diff --git a/executor/common/transaction_manager.h 
b/executor/common/transaction_manager.h
index c0436faf..48c1fc2c 100644
--- a/executor/common/transaction_manager.h
+++ b/executor/common/transaction_manager.h
@@ -55,7 +55,7 @@ class TransactionManager {
 
   bool NeedResponse();
 
-  virtual Storage* GetStorage() { return nullptr; };
+  virtual Storage* GetStorage() { return storage_ ? storage_.get(): nullptr; }
 
  protected:
   virtual std::unique_ptr<google::protobuf::Message> ParseData(
@@ -63,6 +63,8 @@ class TransactionManager {
   virtual std::unique_ptr<std::string> ExecuteRequest(
       const google::protobuf::Message& request);
   uint64_t seq_ = 0;
+
+  std::unique_ptr<Storage> storage_;
  private:
   bool is_out_of_order_ = false;
   bool need_response_ = true;
diff --git a/executor/kv/kv_executor.cpp b/executor/kv/kv_executor.cpp
index d1e7bea8..d31a65fa 100644
--- a/executor/kv/kv_executor.cpp
+++ b/executor/kv/kv_executor.cpp
@@ -24,8 +24,9 @@
 
 namespace resdb {
 
-KVExecutor::KVExecutor(std::unique_ptr<Storage> storage)
-    : storage_(std::move(storage)) {
+KVExecutor::KVExecutor(std::unique_ptr<Storage> storage){
+    storage_=std::move(storage);
+    LOG(ERROR)<<" init storage:"<<storage_.get();
     contract_manager_ = 
std::make_unique<resdb::contract::ContractTransactionManager>(storage_.get());
 }
 
@@ -135,6 +136,7 @@ std::unique_ptr<std::string> KVExecutor::ExecuteData(
 }
 
 void KVExecutor::Set(const std::string& key, const std::string& value) {
+       LOG(ERROR)<<" set key:"<<key<<" seq:"<<seq_;
   storage_->SetValueWithSeq(key, value, seq_);
 }
 
diff --git a/executor/kv/kv_executor.h b/executor/kv/kv_executor.h
index fef12597..81e3cc08 100644
--- a/executor/kv/kv_executor.h
+++ b/executor/kv/kv_executor.h
@@ -57,8 +57,6 @@ class KVExecutor : public TransactionManager {
   void GetTopHistory(const std::string& key, int top_number, Items* items);
 
  private:
-  std::unique_ptr<Storage> storage_;
-
   std::unique_ptr<TransactionManager> contract_manager_;
 };
 
diff --git a/platform/consensus/execution/transaction_executor.cpp 
b/platform/consensus/execution/transaction_executor.cpp
index c601cc18..4a04c978 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -72,7 +72,7 @@ void TransactionExecutor::RegisterExecute(int64_t seq) {
 void TransactionExecutor::WaitForExecute(int64_t seq) {
   if (execute_thread_num_ == 1) return;
   int pre_idx = (seq - 1 + blucket_num_) % blucket_num_;
-
+  //LOG(ERROR)<<"wait for:"<<seq<<" pre idx:"<<pre_idx<<" 
thread:"<<execute_thread_num_;
   while (!IsStop()) {
     std::unique_lock<std::mutex> lk(mutex_);
     cv_.wait_for(lk, std::chrono::milliseconds(10000), [&] {
@@ -82,14 +82,14 @@ void TransactionExecutor::WaitForExecute(int64_t seq) {
       break;
     }
   }
-  // LOG(ERROR)<<"wait for :"<<seq<<" done";
+  //LOG(ERROR)<<"wait for :"<<seq<<" done";
 }
 
 void TransactionExecutor::FinishExecute(int64_t seq) {
   if (execute_thread_num_ == 1) return;
   int idx = seq % blucket_num_;
   std::unique_lock<std::mutex> lk(mutex_);
-  // LOG(ERROR)<<"finish :"<<seq<<" done";
+  //LOG(ERROR)<<"finish :"<<seq<<" done"<<" idx:"<<idx;
   blucket_[idx] = 3;
   cv_.notify_all();
 }
@@ -115,6 +115,7 @@ void TransactionExecutor::Stop() {
 }
 
 Storage* TransactionExecutor::GetStorage() {
+  assert(transaction_manager_ );
   return transaction_manager_ ? transaction_manager_->GetStorage() : nullptr;
 }
 
@@ -132,6 +133,11 @@ uint64_t TransactionExecutor::GetMaxPendingExecutedSeq() {
   return next_execute_seq_ - 1;
 }
 
+void TransactionExecutor::SetPendingExecutedSeq(int seq) {
+  //LOG(ERROR)<<" seq next pending seq:"<<seq;
+  next_execute_seq_ = seq;
+}
+
 bool TransactionExecutor::NeedResponse() {
   return transaction_manager_ == nullptr ||
          transaction_manager_->NeedResponse();
@@ -174,8 +180,8 @@ void TransactionExecutor::OrderMessage() {
       global_stats_->IncExecute();
       uint64_t seq = message->seq();
       if (next_execute_seq_ > seq) {
-        // LOG(INFO) << "request seq:" << seq << " has been executed"
-        // << " next seq:" << next_execute_seq_;
+         LOG(INFO) << "request seq:" << seq << " has been executed"
+         << " next seq:" << next_execute_seq_;
         continue;
       }
 
@@ -205,13 +211,18 @@ void 
TransactionExecutor::AddExecuteMessage(std::unique_ptr<Request> message) {
 
 void TransactionExecutor::ExecuteMessage() {
   while (!IsStop()) {
-    auto message = execute_queue_.Pop();
-    if (message == nullptr) {
-      continue;
-    }
+    std::unique_ptr<Request> message = nullptr;
     bool need_execute = true;
-    if (transaction_manager_ && transaction_manager_->IsOutOfOrder()) {
-      need_execute = false;
+    {
+      std::unique_lock<std::mutex> lk(e_mutex_);
+      message = execute_queue_.Pop();
+      if (message == nullptr) {
+        continue;
+      }
+      if (transaction_manager_ && transaction_manager_->IsOutOfOrder()) {
+        need_execute = false;
+      }
+      RegisterExecute(message->seq());
     }
     Execute(std::move(message), need_execute);
   }
@@ -255,7 +266,6 @@ void 
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
 
 void TransactionExecutor::Execute(std::unique_ptr<Request> request,
                                   bool need_execute) {
-  RegisterExecute(request->seq());
   std::unique_ptr<BatchUserRequest> batch_request = nullptr;
   std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>> 
data;
   std::vector<std::unique_ptr<google::protobuf::Message>> * data_p = nullptr;
@@ -283,9 +293,9 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
   }
   assert(batch_request_p);
 
-  // LOG(INFO) << " get request batch size:"
-  // << batch_request.user_requests_size()<<" proxy id:"
-  //  <<request->proxy_id()<<" need execute:"<<need_execute;
+  //LOG(ERROR) << " get request batch size:"
+  // << batch_request_p->user_requests_size()<<" proxy id:"
+  //  <<request->proxy_id()<<" need execute:"<<need_execute<<" 
seq:"<<request->seq();
 
   std::unique_ptr<BatchUserResponse> response;
   global_stats_->GetTransactionDetails(*batch_request_p);
@@ -307,6 +317,13 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
       }
 
       WaitForExecute(request->seq());
+      if(last_seq_==0){
+        last_seq_ = request->seq();
+      }
+      else {
+        assert(last_seq_+1 == request->seq());
+        last_seq_++;
+      }
            if(data_p->empty() || (*data_p)[0] == nullptr){
                    response = 
transaction_manager_->ExecuteBatchWithSeq(request->seq(), *batch_request_p);
            }
@@ -316,7 +333,6 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
       FinishExecute(request->seq());
 
       if (response != nullptr || !response_v.empty()){
-        std::cout<<2<<"testing"<<request->seq()<<std::endl;
         set_OnExecuteSuccess(request->seq());
       }
       if(response == nullptr){
@@ -342,7 +358,6 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
   response->set_local_id(batch_request_p->local_id());
 
   response->set_seq(request->seq());
-
   if (post_exec_func_) {
     post_exec_func_(std::move(request), std::move(response));
   }
diff --git a/platform/consensus/execution/transaction_executor.h 
b/platform/consensus/execution/transaction_executor.h
index 93f134a1..1cc15373 100644
--- a/platform/consensus/execution/transaction_executor.h
+++ b/platform/consensus/execution/transaction_executor.h
@@ -57,6 +57,7 @@ class TransactionExecutor {
 
   // The max seq S that can be executed (have received all the seq before S).
   uint64_t GetMaxPendingExecutedSeq();
+  void SetPendingExecutedSeq(int seq);
 
   // When a transaction is ready to be executed (have received all the seq
   // before Txn) PreExecute func will be called.
@@ -75,7 +76,6 @@ class TransactionExecutor {
   void FinishExecute(int64_t seq);
 
   void Prepare(std::unique_ptr<Request> request);
-
  private:
   void Execute(std::unique_ptr<Request> request, bool need_execute = true);
   void OnlyExecute(std::unique_ptr<Request> request);
@@ -121,7 +121,8 @@ class TransactionExecutor {
   static const int blucket_num_ = 1024;
   int blucket_[blucket_num_];
   std::condition_variable cv_;
-  std::mutex mutex_;
+  std::mutex mutex_, e_mutex_;
+  int32_t last_seq_ = 0;
 
   enum PrepareType {
     Start_Prepare = 1,
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp 
b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
index 3ee62fe0..e96e9c8c 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
@@ -45,6 +45,8 @@ CheckPointManager::CheckPointManager(const ResDBConfig& 
config,
         std::thread(&CheckPointManager::UpdateStableCheckPointStatus, this);
     checkpoint_thread_ =
         std::thread(&CheckPointManager::UpdateCheckPointStatus, this);
+    status_thread_ =
+        std::thread(&CheckPointManager::SyncStatus, this);
   }
   sem_init(&committable_seq_signal_, 0, 0);
 }
@@ -59,6 +61,9 @@ void CheckPointManager::Stop() {
   if (stable_checkpoint_thread_.joinable()) {
     stable_checkpoint_thread_.join();
   }
+  if (status_thread_.joinable()) {
+    status_thread_.join();
+  }
 }
 
 std::string GetHash(const std::string& h1, const std::string& h2) {
@@ -287,24 +292,114 @@ void CheckPointManager::TimeoutHandler() {
   }
 }
 
+void CheckPointManager::SetLastCommit(uint64_t seq) {
+  last_seq_ = seq;
+}
+
+void CheckPointManager::SetMaxSeq(uint64_t seq) {
+  std::lock_guard<std::mutex> lk(seq_mutex_);
+  max_seq_ = std::max(max_seq_, seq);
+}
+
+uint64_t CheckPointManager::GetMaxSeq() {
+  std::lock_guard<std::mutex> lk(seq_mutex_);
+  return max_seq_;
+}
+
+int CheckPointManager::ProcessStatusSync(std::unique_ptr<Context> context,
+                                         std::unique_ptr<Request> request) {
+
+  CheckPointData checkpoint_data;
+  if (!checkpoint_data.ParseFromString(request->data())) {
+    LOG(ERROR) << "parse checkpont data fail:";
+    return -2;
+  }
+  uint64_t seq = checkpoint_data.seq();
+  uint32_t sender_id = request->sender_id();
+  status_[sender_id] = seq;
+  LOG(ERROR)<<" received from :"<<sender_id<<" commit status:"<<seq;
+  return 0;
+}
+
+void CheckPointManager::CheckStatus(uint64_t last_seq) {
+  std::vector<uint64_t> seqs;
+  for(auto it : status_) {
+    seqs.push_back(it.second);
+  }
+  
+  sort(seqs.begin(), seqs.end());
+  int f = config_.GetMaxMaliciousReplicaNum();
+
+  if(seqs.size() <= f+1) {
+    return;
+  }
+  uint64_t min_seq = seqs[f+1];
+
+
+  LOG(ERROR)<<" check last seq:"<<last_seq<<" max seq:"<<min_seq;
+  if(last_seq < min_seq) {
+    // need recovery from others
+    BroadcastRecovery(last_seq+1,  std::min(min_seq,last_seq+100));
+  }
+}
+
+void CheckPointManager::SyncStatus() {
+  uint64_t last_check_seq = 0;
+  uint64_t last_time = 0;
+  while (!stop_) {
+    uint64_t last_seq = last_seq_;
+
+    CheckPointData checkpoint_data;
+    std::unique_ptr<Request> checkpoint_request = NewRequest(
+        Request::TYPE_STATUS_SYNC, Request(), config_.GetSelfInfo().id());
+    checkpoint_data.set_seq(last_seq);
+    checkpoint_data.SerializeToString(checkpoint_request->mutable_data());
+    replica_communicator_->BroadCast(*checkpoint_request);
+
+    LOG(ERROR)<<" sync status last seq:"<<last_seq<<" last time:"<<last_time;
+    if(last_check_seq == last_seq && last_time > 300) {
+      CheckStatus(last_seq);
+      last_time = 0;
+    }
+    if(last_seq != last_check_seq) {
+      last_check_seq = last_seq;
+      last_time = 0;
+    }
+    sleep(10);
+    last_time++;
+  }
+}
+
+
 void CheckPointManager::UpdateCheckPointStatus() {
   uint64_t last_ckpt_seq = 0;
   int water_mark = config_.GetCheckPointWaterMark();
   int timeout_ms = config_.GetViewchangeCommitTimeout();
   std::vector<std::string> stable_hashs;
   std::vector<uint64_t> stable_seqs;
+  std::map<uint64_t, std::unique_ptr<Request>> pendings;
   while (!stop_) {
-    auto request = data_queue_.Pop(timeout_ms);
+    std::unique_ptr<Request> request = nullptr;
+    if(!pendings.empty()){
+      if(pendings.begin()->second->seq() == last_seq_+1){
+        request = std::move(pendings.begin()->second);
+        pendings.erase(pendings.begin());
+      }
+    }
+    if(request == nullptr){
+      request = data_queue_.Pop(timeout_ms);
+    }
     if (request == nullptr) {
-      // if (last_seq > 0) {
-      //   TimeoutHandler();
-      // }
       continue;
     }
     std::string hash_ = request->hash();
     uint64_t current_seq = request->seq();
+    //LOG(ERROR) << "update checkpoint seq :" << last_seq_ << " current:" << 
current_seq;
     if (current_seq != last_seq_ + 1) {
       LOG(ERROR) << "seq invalid:" << last_seq_ << " current:" << current_seq;
+      if(current_seq > last_seq_ +1) {
+        pendings[current_seq] = std::move(request);
+      }
       continue;
     }
     {
@@ -313,32 +408,14 @@ void CheckPointManager::UpdateCheckPointStatus() {
       last_seq_++;
     }
     bool is_recovery = request->is_recovery();
-    txn_db_->Put(std::move(request));
+    //txn_db_->Put(std::move(request));
 
     if (current_seq == last_ckpt_seq + water_mark) {
       last_ckpt_seq = current_seq;
-      if (executor_) {         
-        last_executed_seq_ = executor_->get_latest_executed_seq();
-        std::cout<<"In checkpoint"<<std::endl;    
-        
-      }
       if (!is_recovery) {
         BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs,
                             stable_seqs);
       }
-      if(is_recovery){
-        std::cout<<"last_executed_seq_: "<<last_executed_seq_<<std::endl;
-        std::string temp_dir = "/tmp";
-        std::string file_path = temp_dir + "/latest_seqnum.txt";
-        // std::ofstream 
log_file("/home/ubuntu/.cache/bazel/_bazel_ubuntu/latest_seqnum.txt");
-        std::ofstream log_file(file_path, std::ios::app); 
-        if (!log_file.is_open()) { 
-          std::cerr << "Error: Could not open the log file." << 
std::strerror(errno) << std::endl; 
-        } 
-        log_file << "Lastest_seqnum: " << last_executed_seq_ << std::endl; 
-        log_file.flush(); 
-        log_file.close();
-      }
     }
   }
   return;
@@ -366,6 +443,19 @@ void CheckPointManager::BroadcastCheckPoint(
   replica_communicator_->BroadCast(*checkpoint_request);
 }
 
+void CheckPointManager::BroadcastRecovery(
+    uint64_t min_seq,  uint64_t max_seq) {
+  RecoveryRequest recovery_data;
+  std::unique_ptr<Request> recovery_request = NewRequest(
+      Request::TYPE_RECOVERY_DATA, Request(), config_.GetSelfInfo().id());
+  recovery_data.set_min_seq(min_seq);
+  recovery_data.set_max_seq(max_seq);
+  recovery_data.SerializeToString(recovery_request->mutable_data());
+
+  LOG(ERROR)<<" recovery request ["<<min_seq<<","<<max_seq<<"]";
+  replica_communicator_->BroadCast(*recovery_request);
+}
+
 void CheckPointManager::WaitSignal() {
   std::unique_lock<std::mutex> lk(mutex_);
   signal_.wait(lk, [&] { return !stable_hash_queue_.Empty(); });
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.h 
b/platform/consensus/ordering/pbft/checkpoint_manager.h
index ddcc2fbc..e41aecae 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.h
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.h
@@ -43,10 +43,15 @@ class CheckPointManager : public CheckPoint {
 
   ChainState* GetTxnDB();
   uint64_t GetMaxTxnSeq();
+  void SetLastCommit(uint64_t seq);
+  void SetMaxSeq(uint64_t seq);
+  uint64_t GetMaxSeq();
 
   void AddCommitData(std::unique_ptr<Request> request);
   int ProcessCheckPoint(std::unique_ptr<Context> context,
                         std::unique_ptr<Request> request);
+  int ProcessStatusSync(std::unique_ptr<Context> context,
+                        std::unique_ptr<Request> request);
 
   uint64_t GetStableCheckpoint() override;
 //   void SetLastExecutedSeq(uint64_t latest_executed_seq);
@@ -84,13 +89,19 @@ class CheckPointManager : public CheckPoint {
 
   void Notify();
   bool Wait();
+  void BroadcastRecovery(uint64_t min_seq,  uint64_t max_seq);
+
+  void SyncStatus();
+  void StatusProcess();
+  void CheckStatus(uint64_t last_seq);
+
 
  protected:
   uint64_t last_executed_seq_ = 0;
   ResDBConfig config_;
   ReplicaCommunicator* replica_communicator_;
   std::unique_ptr<ChainState> txn_db_;
-  std::thread checkpoint_thread_, stable_checkpoint_thread_;
+  std::thread checkpoint_thread_, stable_checkpoint_thread_, status_thread_;
   SignatureVerifier* verifier_;
   std::atomic<bool> stop_;
   std::map<std::pair<uint64_t, std::string>, std::set<uint32_t>> sender_ckpt_;
@@ -109,13 +120,15 @@ class CheckPointManager : public CheckPoint {
   LockFreeQueue<std::pair<uint64_t, std::string>> stable_hash_queue_;
   std::condition_variable signal_;
   ResDBTxnAccessor txn_accessor_;
-  std::mutex lt_mutex_;
+  std::mutex lt_mutex_, seq_mutex_;
   uint64_t last_seq_ = 0;
+  uint64_t max_seq_ = 0;
   TransactionExecutor* executor_;
   std::atomic<uint64_t> highest_prepared_seq_;
   uint64_t committable_seq_ = 0;
   std::string last_hash_, committable_hash_;
   sem_t committable_seq_signal_;
+  std::map<int, uint64_t>status_;
 };
 
 }  // namespace resdb
diff --git a/platform/consensus/ordering/pbft/commitment.cpp 
b/platform/consensus/ordering/pbft/commitment.cpp
index 4d048331..1f7793a6 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -171,7 +171,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context> 
context,
       LOG(ERROR) << " recovery request not valid:"
                  << " current seq:" << message_manager_->GetNextSeq()
                  << " data seq:" << request->seq();
-      return 0;
+      //return 0;
     }
     return message_manager_->AddConsensusMsg(context->signature,
                                              std::move(request));
diff --git a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp 
b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
index 72103eab..8288b07b 100644
--- a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
+++ b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
@@ -69,15 +69,30 @@ ConsensusManagerPBFT::ConsensusManagerPBFT(
         system_info_->SetPrimary(data.primary_id());
       },
       [&](std::unique_ptr<Context> context, std::unique_ptr<Request> request) {
+
+      if(config_.GetSelfInfo().id() == 3 && request->seq() == 188000) {
+        LOG(ERROR)<<" skip seq:"<<request->seq();
+        return 0;
+      }
+
+
+
         return InternalConsensusCommit(std::move(context), std::move(request));
+      },
+      [&](int seq) {
+        message_manager_->SetNextCommitSeq(seq+1);      
       });
+
 }
 
 void ConsensusManagerPBFT::SetNeedCommitQC(bool need_qc) {
   commitment_->SetNeedCommitQC(need_qc);
 }
 
-void ConsensusManagerPBFT::Start() { ConsensusManager::Start(); }
+void ConsensusManagerPBFT::Start() { 
+  ConsensusManager::Start(); 
+  recovery_thread_ = std::thread(&ConsensusManagerPBFT::RemoteRecoveryProcess, 
this);
+}
 
 std::vector<ReplicaInfo> ConsensusManagerPBFT::GetReplicas() {
   return message_manager_->GetReplicas();
@@ -185,8 +200,8 @@ int 
ConsensusManagerPBFT::ConsensusCommit(std::unique_ptr<Context> context,
 
 int ConsensusManagerPBFT::InternalConsensusCommit(
     std::unique_ptr<Context> context, std::unique_ptr<Request> request) {
-  // LOG(INFO) << "recv impl type:" << request->type() << " "
-  //         << "sender id:" << request->sender_id()<<" seq:"<<request->seq();
+   //LOG(ERROR) << "recv impl type:" << request->type() << " "
+   //        << "sender id:" << request->sender_id()<<" seq:"<<request->seq();
 
   switch (request->type()) {
     case Request::TYPE_CLIENT_REQUEST:
@@ -236,6 +251,9 @@ int ConsensusManagerPBFT::InternalConsensusCommit(
     case Request::TYPE_CHECKPOINT:
       return checkpoint_manager_->ProcessCheckPoint(std::move(context),
                                                     std::move(request));
+    case Request::TYPE_STATUS_SYNC:
+      return checkpoint_manager_->ProcessStatusSync(std::move(context),
+                                                    std::move(request));
     case Request::TYPE_VIEWCHANGE:
       return view_change_manager_->ProcessViewChange(std::move(context),
                                                      std::move(request));
@@ -249,6 +267,10 @@ int ConsensusManagerPBFT::InternalConsensusCommit(
                                             std::move(request));
     case Request::TYPE_CUSTOM_QUERY:
       return query_->ProcessCustomQuery(std::move(context), 
std::move(request));
+    case Request::TYPE_RECOVERY_DATA:
+      return ProcessRecoveryData(std::move(context), std::move(request));
+    case Request::TYPE_RECOVERY_DATA_RESP:
+      return ProcessRecoveryDataResponse(std::move(context), 
std::move(request));
   }
   return 0;
 }
@@ -263,4 +285,78 @@ void ConsensusManagerPBFT::SetPreVerifyFunc(
   commitment_->SetPreVerifyFunc(func);
 }
 
+int ConsensusManagerPBFT::ProcessRecoveryData(std::unique_ptr<Context> context,
+                                         std::unique_ptr<Request> request) {
+  RecoveryRequest recovery_data;
+  if (!recovery_data.ParseFromString(request->data())) {
+    LOG(ERROR) << "parse checkpont data fail:";
+    return -2;
+  }
+  LOG(ERROR)<<" obtain min seq:"<<recovery_data.min_seq()<<" max 
seq:"<<recovery_data.max_seq();
+  RecoveryResponse response;
+  int ret = recovery_->GetData(recovery_data, response);
+  if(ret) {
+    return ret;
+  }
+
+  std::unique_ptr<Request> response_data = NewRequest(
+      Request::TYPE_RECOVERY_DATA_RESP, Request(), config_.GetSelfInfo().id());
+
+  response.SerializeToString(response_data->mutable_data());
+
+  LOG(ERROR)<<" obtain min seq:"<<recovery_data.min_seq()<<" max 
seq:"<<recovery_data.max_seq()<<" data size:"<<response.request_size();
+
+  GetBroadCastClient()->SendMessage(*response_data, request->sender_id());
+
+  return 0;
+}
+
+int ConsensusManagerPBFT::ProcessRecoveryDataResponse(std::unique_ptr<Context> 
context,
+                                         std::unique_ptr<Request> request) {
+  recovery_queue_.Push(std::move(request));
+  return 0;
+}
+
+void ConsensusManagerPBFT::RemoteRecoveryProcess(){
+  uint64_t last_recovery = 0;
+  std::set<uint64_t> data;
+
+  while(IsRunning()){
+    auto request = recovery_queue_.Pop();
+    if(request == nullptr) {
+      continue;
+    }
+
+    RecoveryResponse recovery_data;
+    if (!recovery_data.ParseFromString(request->data())) {
+      LOG(ERROR) << "parse checkpont data fail:";
+      continue;
+    }
+
+    LOG(ERROR)<<" receive recovery  data from " <<request->sender_id()<<" data 
size:"<<recovery_data.request_size()<<" 
signrue:"<<recovery_data.signature_size();
+
+    for(int i = 0; i < recovery_data.request().size(); i++) {
+      uint64_t seq = recovery_data.request(i).seq();
+      int type = recovery_data.request(i).type();
+      if(data.find(seq) != data.end()) {
+      //LOG(ERROR)<<" check recovery remote seq:"<<seq<<" type:"<<type<<" has 
been recovered.";
+        continue;
+      }
+
+      auto context = std::make_unique<Context>();
+      context->signature = recovery_data.signature(i);
+      auto request = std::make_unique<Request>(recovery_data.request(i));
+      //write to log
+      //recovery_->AddRequest(context.get(), request.get());
+      InternalConsensusCommit(std::move(context), std::move(request));
+    }
+
+    for(int i = 0; i < recovery_data.request().size(); i++) {
+      uint64_t seq = recovery_data.request(i).seq();
+      data.insert(seq);
+    }
+  }
+}
+
+
 }  // namespace resdb
diff --git a/platform/consensus/ordering/pbft/consensus_manager_pbft.h 
b/platform/consensus/ordering/pbft/consensus_manager_pbft.h
index bea50990..55048a3d 100644
--- a/platform/consensus/ordering/pbft/consensus_manager_pbft.h
+++ b/platform/consensus/ordering/pbft/consensus_manager_pbft.h
@@ -55,6 +55,11 @@ class ConsensusManagerPBFT : public ConsensusManager {
   void SetPreVerifyFunc(std::function<bool(const Request&)>);
   void SetNeedCommitQC(bool need_qc);
 
+  int ProcessRecoveryData(std::unique_ptr<Context> context,
+      std::unique_ptr<Request> request);
+
+  int ProcessRecoveryDataResponse(std::unique_ptr<Context> context,
+                                         std::unique_ptr<Request> request);
  protected:
   int InternalConsensusCommit(std::unique_ptr<Context> context,
                               std::unique_ptr<Request> request);
@@ -67,6 +72,8 @@ class ConsensusManagerPBFT : public ConsensusManager {
   absl::StatusOr<std::pair<std::unique_ptr<Context>, std::unique_ptr<Request>>>
   PopComplainedRequest();
 
+  void RemoteRecoveryProcess();
+
  protected:
   std::unique_ptr<SystemInfo> system_info_;
   std::unique_ptr<CheckPointManager> checkpoint_manager_;
@@ -83,6 +90,8 @@ class ConsensusManagerPBFT : public ConsensusManager {
   std::queue<std::pair<std::unique_ptr<Context>, std::unique_ptr<Request>>>
       request_complained_;
   std::mutex mutex_;
+  std::thread recovery_thread_;
+  LockFreeQueue<Request> recovery_queue_;
 };
 
 }  // namespace resdb
diff --git a/platform/consensus/ordering/pbft/lock_free_collector_pool.cpp 
b/platform/consensus/ordering/pbft/lock_free_collector_pool.cpp
index 4336b66d..2068beed 100644
--- a/platform/consensus/ordering/pbft/lock_free_collector_pool.cpp
+++ b/platform/consensus/ordering/pbft/lock_free_collector_pool.cpp
@@ -50,6 +50,18 @@ LockFreeCollectorPool::LockFreeCollectorPool(const 
std::string& name,
              << " enable viewchange:" << enable_viewchange_ << " done";
 }
 
+void LockFreeCollectorPool::Reset(uint64_t start_seq){
+  uint32_t idx = start_seq & mask_;
+  int seq = start_seq;
+  LOG(ERROR)<<" reset collector:"<<start_seq;
+  for (size_t i = 0; i < (capacity_ << 1); ++i) {
+      int pos = (i + idx)%(capacity_ << 1);
+      collector_[pos] = std::make_unique<TransactionCollector>(seq++, 
executor_,
+          enable_viewchange_);
+  }
+  LOG(ERROR)<<" reset collector:"<<start_seq;
+}
+
 void LockFreeCollectorPool::Update(uint64_t seq) {
   uint32_t idx = seq & mask_;
   if (collector_[idx]->Seq() != seq) {
diff --git a/platform/consensus/ordering/pbft/lock_free_collector_pool.h 
b/platform/consensus/ordering/pbft/lock_free_collector_pool.h
index 5335c4fd..22757827 100644
--- a/platform/consensus/ordering/pbft/lock_free_collector_pool.h
+++ b/platform/consensus/ordering/pbft/lock_free_collector_pool.h
@@ -33,6 +33,7 @@ class LockFreeCollectorPool {
 
   TransactionCollector* GetCollector(uint64_t seq);
   void Update(uint64_t seq);
+  void Reset(uint64_t start_seq);
 
  private:
   std::string name_;
diff --git a/platform/consensus/ordering/pbft/message_manager.cpp 
b/platform/consensus/ordering/pbft/message_manager.cpp
index 2c8b08cc..7489da58 100644
--- a/platform/consensus/ordering/pbft/message_manager.cpp
+++ b/platform/consensus/ordering/pbft/message_manager.cpp
@@ -177,10 +177,14 @@ CollectorResultCode MessageManager::AddConsensusMsg(
   if (request == nullptr || !IsValidMsg(*request)) {
     return CollectorResultCode::INVALID;
   }
+
   int type = request->type();
   uint64_t seq = request->seq();
   int resp_received_count = 0;
   int proxy_id = request->proxy_id();
+  if(checkpoint_manager_){
+    checkpoint_manager_->SetMaxSeq(seq);
+  }
 
   int ret = collector_pool_->GetCollector(seq)->AddRequest(
       std::move(request), signature, type == Request::TYPE_PRE_PREPARE,
@@ -242,6 +246,13 @@ Storage* MessageManager::GetStorage() {
   return transaction_executor_->GetStorage();
 }
 
+void MessageManager::SetNextCommitSeq(int seq) {
+  SetNextSeq(seq);
+  collector_pool_->Reset(seq);
+  checkpoint_manager_->SetLastCommit(seq);
+  return transaction_executor_->SetPendingExecutedSeq(seq);
+}
+
 void MessageManager::SetLastCommittedTime(uint64_t proxy_id) {
   lct_lock_.lock();
   last_committed_time_[proxy_id] = GetCurrentTime();
diff --git a/platform/consensus/ordering/pbft/message_manager.h 
b/platform/consensus/ordering/pbft/message_manager.h
index c51fd22a..d5d7a4b2 100644
--- a/platform/consensus/ordering/pbft/message_manager.h
+++ b/platform/consensus/ordering/pbft/message_manager.h
@@ -84,6 +84,8 @@ class MessageManager {
   std::vector<RequestInfo> GetPreparedProof(uint64_t seq);
   TransactionStatue GetTransactionState(uint64_t seq);
 
+  void SetNextCommitSeq(int seq);
+
   // =============  System information ========
   // Obtain the current replica list.
   std::vector<ReplicaInfo> GetReplicas();
diff --git a/platform/consensus/ordering/pbft/transaction_collector.cpp 
b/platform/consensus/ordering/pbft/transaction_collector.cpp
index 0c69c401..650beec9 100644
--- a/platform/consensus/ordering/pbft/transaction_collector.cpp
+++ b/platform/consensus/ordering/pbft/transaction_collector.cpp
@@ -90,8 +90,8 @@ int TransactionCollector::AddRequest(
   }
 
   if (seq_ != static_cast<uint64_t>(request->seq())) {
-    // LOG(ERROR) << "data invalid, seq not the same:" << seq
-    //           << " collect seq:" << seq_;
+     LOG(ERROR) << "data invalid, seq not the same:" << seq
+               << " collect seq:" << seq_;
     return -2;
   }
 
diff --git a/platform/consensus/recovery/recovery.cpp 
b/platform/consensus/recovery/recovery.cpp
index 5ea24420..5b722d75 100644
--- a/platform/consensus/recovery/recovery.cpp
+++ b/platform/consensus/recovery/recovery.cpp
@@ -55,6 +55,7 @@ Recovery::Recovery(const ResDBConfig& config, CheckPoint* 
checkpoint,
   }
 
   if (recovery_enabled_ == false) {
+    LOG(INFO) << "recovery is not enabled:" << recovery_enabled_;
     return;
   }
 
@@ -83,8 +84,10 @@ Recovery::Recovery(const ResDBConfig& config, CheckPoint* 
checkpoint,
 }
 
 void Recovery::Init() {
+LOG(ERROR)<<" init";
   GetLastFile();
   SwitchFile(file_path_);
+LOG(ERROR)<<" init done";
 
   ckpt_thread_ = std::thread(&Recovery::UpdateStableCheckPoint, this);
 }
@@ -141,6 +144,7 @@ void Recovery::GetLastFile() {
 
     int64_t time_s =
         std::stoll(file_name.substr(time_pos + 1, min_seq_pos - time_pos - 1));
+    LOG(ERROR) << "get path:" << entry.path() << " min:" << min_seq<<" 
time:"<<time_s;
     if (min_seq == -1) {
       if (last_ckpt_ == -1 || m_time_s < time_s) {
         file_path_ = entry.path();
@@ -306,6 +310,9 @@ void Recovery::WriteLog(const Context* context, const 
Request* request) {
   AppendData(sig);
 
   Flush();
+  if(context){
+    InsertCache(*context, *request);
+  }
 }
 
 void Recovery::AppendData(const std::string& data) {
@@ -407,7 +414,7 @@ bool Recovery::Read(int fd, size_t len, char* data) {
 }
 
 std::pair<std::vector<std::pair<int64_t, std::string>>, int64_t>
-Recovery::GetRecoveryFiles() {
+Recovery::GetRecoveryFiles(int64_t ckpt) {
   std::string dir = std::filesystem::path(file_path_).parent_path();
   int64_t last_ckpt = 0;
   for (const auto& entry : std::filesystem::directory_iterator(dir)) {
@@ -421,6 +428,8 @@ Recovery::GetRecoveryFiles() {
       last_ckpt = ckpt;
     }
   }
+  LOG(ERROR)<<"file max ckpt:"<<last_ckpt <<" storage ckpt:"<<ckpt;
+  last_ckpt = std::min(last_ckpt, ckpt);
   std::vector<std::pair<int64_t, std::string>> list;
 
   std::vector<std::pair<int64_t, std::string>> e_list;
@@ -462,13 +471,19 @@ void Recovery::ReadLogs(
     std::function<void(const SystemInfoData& data)> system_callback,
     std::function<void(std::unique_ptr<Context> context,
                        std::unique_ptr<Request> request)>
-        call_back) {
+        call_back,
+        std::function<void(int)> set_start_point) {
   if (recovery_enabled_ == false) {
     return;
   }
+  assert(storage_);
+  int64_t storage_ckpt = storage_->GetLastCheckpoint();
+  LOG(ERROR)<<" storage ckpt:"<<storage_ckpt;
   std::unique_lock<std::mutex> lk(mutex_);
-  auto recovery_files_pair = GetRecoveryFiles();
+
+  auto recovery_files_pair = GetRecoveryFiles(storage_ckpt);
   int64_t ckpt = recovery_files_pair.second;
+  set_start_point(ckpt);
   int idx = 0;
   for (auto path : recovery_files_pair.first) {
     ReadLogsFromFiles(path.second, ckpt, idx++, system_callback, call_back);
@@ -538,9 +553,12 @@ void Recovery::ReadLogsFromFiles(
   }
   uint64_t max_seq = 0;
   for (std::unique_ptr<RecoveryData>& recovery_data : request_list) {
+    //LOG(ERROR)<<" ckpt :"<<ckpt<<" recovery data 
seq:"<<recovery_data->request->seq();
     if (ckpt < recovery_data->request->seq()) {
       recovery_data->request->set_is_recovery(true);
       max_seq = recovery_data->request->seq();
+      //LOG(ERROR)<<" ??? call back:"<<recovery_data->request->seq()<<" 
call_back:"<<(call_back?"1":"0");
+      //InsertCache(*recovery_data->context, *recovery_data->request);
       call_back(std::move(recovery_data->context),
                 std::move(recovery_data->request));
     }
@@ -552,4 +570,94 @@ void Recovery::ReadLogsFromFiles(
   close(fd);
 }
 
+void Recovery::InsertCache(const Context& context, const Request& request){
+  std::unique_lock<std::mutex> lk(data_mutex_);
+  cache_[request.seq()].push_back(std::make_pair(context.signature, request));
+}
+
+int Recovery::GetData(const RecoveryRequest& request,
+    RecoveryResponse &response) {
+
+  auto res = GetDataFromRecoveryFiles(request.min_seq(), request.max_seq());
+
+  for(const auto& it : res) {
+    for(const auto& req: it.second) {
+        *response.add_signature() = req.first->signature;
+        *response.add_request() = *req.second;
+    }
+  }
+
+  /*
+  for(int i = request.min_seq(); i <= request.max_seq(); ++i) {
+    std::unique_lock<std::mutex> lk(data_mutex_);
+    if(cache_.find(i) != cache_.end()) {
+      LOG(ERROR)<<" get data from cache i:"<<i<<" size:"<<cache_[i].size();
+      for(const auto& req : cache_[i]){
+        *response.add_signature() = req.first;
+        *response.add_request() = req.second;
+      }
+    }
+  }
+  */
+  return 0;
+}
+
+std::map<uint64_t, std::vector<std::pair<std::unique_ptr<Context>, 
std::unique_ptr<Request>> >>
+Recovery::GetDataFromRecoveryFiles(uint64_t need_min_seq, uint64_t 
need_max_seq) {
+  std::string dir = std::filesystem::path(file_path_).parent_path();
+
+  std::vector<std::pair<int64_t, std::string>> list;
+  std::vector<std::pair<int64_t, std::string>> e_list;
+
+  for (const auto& entry : std::filesystem::directory_iterator(dir)) {
+    std::string dir = std::filesystem::path(entry.path()).parent_path();
+    std::string file_name = std::filesystem::path(entry.path()).stem();
+    std::string ext = std::filesystem::path(entry.path()).extension();
+    if (ext != ".log") continue;
+    int pos = file_name.rfind("_");
+
+    int max_seq_pos = file_name.rfind("_", pos - 1);
+    int64_t max_seq =
+        std::stoll(file_name.substr(max_seq_pos + 1, pos - max_seq_pos - 1));
+
+    int min_seq_pos = file_name.rfind("_", max_seq_pos - 1);
+    int64_t min_seq = std::stoll(
+        file_name.substr(min_seq_pos + 1, max_seq_pos - min_seq_pos - 1));
+
+    int time_pos = file_name.rfind("_", min_seq_pos - 1);
+    int64_t time =
+        std::stoll(file_name.substr(time_pos + 1, min_seq_pos - time_pos - 1));
+
+    //LOG(ERROR)<<" min seq:"<<min_seq << " max seq:"<<max_seq<<" 
need:"<<need_min_seq<<" "<<need_max_seq;
+    if (min_seq == -1) {
+      e_list.push_back(std::make_pair(time, entry.path()));
+    } else if (max_seq < need_min_seq || min_seq > need_max_seq) {
+      continue;
+    }
+    //LOG(ERROR)<<" get min seq:"<<min_seq << " max seq:"<<max_seq<<" 
need:"<<need_min_seq<<" "<<need_max_seq;
+    list.push_back(std::make_pair(time, entry.path()));
+  }
+
+  sort(e_list.begin(), e_list.end());
+  list.push_back(e_list.back());
+  sort(list.begin(), list.end());
+
+
+  std::map<uint64_t, std::vector<std::pair<std::unique_ptr<Context>, 
std::unique_ptr<Request>> >> res;
+  for (const auto& path : list) {
+    ReadLogsFromFiles(
+      path.second, need_min_seq-1, 0, [&](const SystemInfoData& data) {},
+      [&](std::unique_ptr<Context> context, std::unique_ptr<Request> request) {
+            LOG(ERROR)<<"check get data from recovery file 
seq:"<<request->seq();
+            if(request->seq() >= need_min_seq && request->seq() <= 
need_max_seq) {
+              LOG(ERROR)<<"get data from recovery file seq:"<<request->seq();
+              res[request->seq()].push_back( 
std::make_pair(std::move(context), std::move(request) ) );
+            }
+      });
+  }
+
+  return res;
+}
+
+
 }  // namespace resdb
diff --git a/platform/consensus/recovery/recovery.h 
b/platform/consensus/recovery/recovery.h
index f5c30cbe..b8497394 100644
--- a/platform/consensus/recovery/recovery.h
+++ b/platform/consensus/recovery/recovery.h
@@ -44,11 +44,16 @@ class Recovery {
   void ReadLogs(std::function<void(const SystemInfoData& data)> 
system_callback,
                 std::function<void(std::unique_ptr<Context> context,
                                    std::unique_ptr<Request> request)>
-                    call_back);
+                    call_back,
+                std::function<void(int)> start_point 
+                    );
 
   int64_t GetMaxSeq();
   int64_t GetMinSeq();
 
+  int GetData(const RecoveryRequest& request,
+      RecoveryResponse &response);
+
  private:
   struct RecoveryData {
     std::unique_ptr<Context> context;
@@ -75,7 +80,7 @@ class Recovery {
 
   void UpdateStableCheckPoint();
   std::pair<std::vector<std::pair<int64_t, std::string>>, int64_t>
-  GetRecoveryFiles();
+  GetRecoveryFiles(int64_t ckpt);
   void ReadLogsFromFiles(
       const std::string& path, int64_t ckpt, int file_idx,
       std::function<void(const SystemInfoData& data)> system_callback,
@@ -83,6 +88,11 @@ class Recovery {
                          std::unique_ptr<Request> request)>
           call_back);
 
+  void InsertCache(const Context&context, const Request& request);
+
+  std::map<uint64_t, std::vector<std::pair<std::unique_ptr<Context>, 
std::unique_ptr<Request>> >>
+    GetDataFromRecoveryFiles(uint64_t need_min_seq, uint64_t need_max_seq);
+
  protected:
   ResDBConfig config_;
   CheckPoint* checkpoint_;
@@ -92,7 +102,7 @@ class Recovery {
   std::string file_path_, base_file_path_;
   size_t buffer_size_ = 0;
   int fd_;
-  std::mutex mutex_;
+  std::mutex mutex_, data_mutex_;
 
   int64_t last_ckpt_;
   int64_t min_seq_, max_seq_;
@@ -101,6 +111,7 @@ class Recovery {
   int recovery_ckpt_time_s_;
   SystemInfo* system_info_;
   Storage* storage_;
+  std::map<uint64_t, std::vector<std::pair<SignatureInfo, Request>>> cache_;
 };
 
 }  // namespace resdb
diff --git a/platform/proto/resdb.proto b/platform/proto/resdb.proto
index 47edac38..5c2ccb60 100644
--- a/platform/proto/resdb.proto
+++ b/platform/proto/resdb.proto
@@ -57,8 +57,9 @@ message Request {
         TYPE_NEWVIEW= 17;
         TYPE_CUSTOM_QUERY = 18;
         TYPE_CUSTOM_CONSENSUS = 19;
+        TYPE_STATUS_SYNC = 20;
 
-        NUM_OF_TYPE = 20; // the total number of types.
+        NUM_OF_TYPE = 21; // the total number of types.
                        // Used to create the collector.
     };
     int32 type = 1;
@@ -175,6 +176,7 @@ message RecoveryRequest {
 
 message RecoveryResponse {
   repeated Request request = 1;
+  repeated SignatureInfo signature = 2;
 }
 
 message RequestWithProof {
diff --git a/scripts/deploy/config/kv_server.conf 
b/scripts/deploy/config/kv_server.conf
index abfc222e..d5af12e6 100644
--- a/scripts/deploy/config/kv_server.conf
+++ b/scripts/deploy/config/kv_server.conf
@@ -18,10 +18,10 @@
 #
 
 iplist=(
-172.31.25.4
-172.31.27.205
-172.31.27.141
-172.31.20.222
-172.31.28.162
+172.31.57.186
+172.31.57.186
+172.31.57.186
+172.31.57.186
+172.31.57.186
 )
 
diff --git a/scripts/deploy/config/pbft.config 
b/scripts/deploy/config/pbft.config
index e4fe4960..93e2ba29 100644
--- a/scripts/deploy/config/pbft.config
+++ b/scripts/deploy/config/pbft.config
@@ -1,22 +1,3 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#  http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
 {
   "clientBatchNum": 100,
   "enable_viewchange": true,
diff --git a/scripts/deploy/script/deploy.sh b/scripts/deploy/script/deploy.sh
index 8bae565b..bab480bb 100755
--- a/scripts/deploy/script/deploy.sh
+++ b/scripts/deploy/script/deploy.sh
@@ -19,6 +19,8 @@
 
 set -e
 
+export TEMPLATE_PATH=$PWD/config/pbft.config
+
 # load environment parameters
 . ./script/env.sh
 
@@ -70,7 +72,8 @@ deploy/script/generate_key.sh ${BAZEL_WORKSPACE_PATH} 
${output_key_path} ${#ipli
 deploy/script/generate_config.sh ${BAZEL_WORKSPACE_PATH} ${output_key_path} 
${output_cert_path} ${output_path} ${admin_key_path} ${deploy_iplist[@]}
 
 # build kv server
-bazel build ${server}
+#bazel build ${server} 
+bazel build ${server} --define="enable_leveldb=True"
 
 if [ $? != 0 ]
 then
diff --git a/service/kv/kv_service.cpp b/service/kv/kv_service.cpp
index 269fb807..170b22bd 100644
--- a/service/kv/kv_service.cpp
+++ b/service/kv/kv_service.cpp
@@ -31,6 +31,11 @@
 using namespace resdb;
 using namespace resdb::storage;
 
+
+void SignalHandler(int sig_num) {
+  LOG(ERROR)<<" signal:"<<sig_num<<" call"<<" ======================";
+}
+
 void ShowUsage() {
   printf("<config> <private_key> <cert_file> [logging_dir]\n");
 }
@@ -51,7 +56,10 @@ int main(int argc, char** argv) {
     exit(0);
   }
   google::InitGoogleLogging(argv[0]);
-  FLAGS_minloglevel = 1;
+  FLAGS_minloglevel = 0;
+  signal(SIGINT, SignalHandler);
+  signal(SIGKILL, SignalHandler);
+
 
   char* config_file = argv[1];
   char* private_key_file = argv[2];
@@ -71,10 +79,10 @@ int main(int argc, char** argv) {
   ResConfigData config_data = config->GetConfigData();
 
   std::string db_path = std::to_string(config->GetSelfInfo().port()) + "_db/";
-  LOG(INFO) << "db path:" << db_path;
+  LOG(ERROR) << "db path:" << db_path;
 
   auto server = GenerateResDBServer(
       config_file, private_key_file, cert_file,
       std::make_unique<KVExecutor>(NewStorage(db_path, config_data)), nullptr);
   server->Run();
-}
\ No newline at end of file
+}
diff --git a/tools/generate_region_config.py b/tools/generate_region_config.py
index c3d001f2..417499b8 100644
--- a/tools/generate_region_config.py
+++ b/tools/generate_region_config.py
@@ -81,4 +81,5 @@ if __name__ == "__main__":
     template_config = None
     if len(sys.argv)>3:
       template_config = sys.argv[3]
+    print("generate json config:",template_config)
     GenerateJsonConfig(sys.argv[1], sys.argv[2], template_config)

Reply via email to