This is an automated email from the ASF dual-hosted git repository.

junchao pushed a commit to branch recovery_ckpt
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/recovery_ckpt by this push:
     new 85af6ebf update vc
85af6ebf is described below

commit 85af6ebf5364a2e9137c785c6689ba5b0859c02d
Author: Ubuntu <[email protected]>
AuthorDate: Sun Jan 4 17:12:48 2026 +0000

    update vc
---
 chain/storage/storage.h                            |   1 +
 .../consensus/execution/transaction_executor.cpp   |   2 +
 .../consensus/ordering/pbft/checkpoint_manager.cpp | 166 ++++++++++++++++++---
 .../consensus/ordering/pbft/checkpoint_manager.h   |  23 ++-
 platform/consensus/ordering/pbft/commitment.cpp    |  63 +++++---
 platform/consensus/ordering/pbft/commitment.h      |   1 +
 .../ordering/pbft/consensus_manager_pbft.cpp       |  43 ++++--
 .../ordering/pbft/lock_free_collector_pool.cpp     |   4 +-
 .../consensus/ordering/pbft/message_manager.cpp    |  24 ++-
 platform/consensus/ordering/pbft/message_manager.h |   2 +-
 .../ordering/pbft/performance_manager.cpp          |   2 +-
 .../consensus/ordering/pbft/response_manager.cpp   |   9 +-
 .../ordering/pbft/transaction_collector.cpp        |   7 +-
 .../consensus/ordering/pbft/viewchange_manager.cpp |  83 +++++++----
 .../consensus/ordering/pbft/viewchange_manager.h   |   2 +-
 platform/consensus/recovery/recovery.cpp           |  38 +++--
 platform/networkstrate/consensus_manager.cpp       |   4 +-
 platform/proto/checkpoint_info.proto               |   2 +
 platform/proto/resdb.proto                         |   2 +
 scripts/deploy/config/pbft.config                  |  36 ++---
 service/tools/config/interface/service.config      |   2 +-
 tools/generate_region_config.py                    |  25 +++-
 22 files changed, 396 insertions(+), 145 deletions(-)

diff --git a/chain/storage/storage.h b/chain/storage/storage.h
index 8fa95fee..a0af01bc 100644
--- a/chain/storage/storage.h
+++ b/chain/storage/storage.h
@@ -61,6 +61,7 @@ class Storage {
 
   virtual bool Flush() { return true; };
 
+  virtual uint64_t GetStableCheckpoint() { return 0; }
   virtual uint64_t GetLastCheckpoint() { return 0; }
 
   void SetMaxHistoryNum(int num) { max_history_ = num; }
diff --git a/platform/consensus/execution/transaction_executor.cpp 
b/platform/consensus/execution/transaction_executor.cpp
index 0a7401bc..bc74cf1e 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -146,6 +146,7 @@ bool TransactionExecutor::NeedResponse() {
 
 int TransactionExecutor::Commit(std::unique_ptr<Request> message) {
   global_stats_->IncPendingExecute();
+  LOG(ERROR)<<" commit msg:"<<message->seq();
   if (transaction_manager_ && transaction_manager_->IsOutOfOrder()) {
     // LOG(ERROR)<<"add out of order exe:"<<message->seq()<<" from
     // proxy:"<<message->proxy_id();
@@ -163,6 +164,7 @@ void 
TransactionExecutor::AddNewData(std::unique_ptr<Request> message) {
 }
 
 std::unique_ptr<Request> TransactionExecutor::GetNextData() {
+  //LOG(ERROR)<<" want next:"<<next_execute_seq_;
   if (candidates_.empty() || candidates_.begin()->first != next_execute_seq_) {
     return nullptr;
   }
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp 
b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
index 2125b049..c23641bf 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
@@ -28,13 +28,15 @@ namespace resdb {
 
 CheckPointManager::CheckPointManager(const ResDBConfig& config,
                                      ReplicaCommunicator* replica_communicator,
-                                     SignatureVerifier* verifier)
+                                     SignatureVerifier* verifier,
+                                     SystemInfo * sys_info)
     : config_(config),
       replica_communicator_(replica_communicator),
       verifier_(verifier),
       stop_(false),
       txn_accessor_(config),
-      highest_prepared_seq_(0) {
+      highest_prepared_seq_(0),
+      sys_info_(sys_info){
   current_stable_seq_ = 0;
   if (config_.GetConfigData().enable_viewchange()) {
     config_.EnableCheckPoint(true);
@@ -64,6 +66,10 @@ void CheckPointManager::Stop() {
   }
 }
 
+void CheckPointManager::SetResetExecute(std::function<void (uint64_t seq)> 
func) {
+  reset_execute_func_ = func;
+}
+
 std::string GetHash(const std::string& h1, const std::string& h2) {
   return SignatureVerifier::CalculateHash(h1 + h2);
 }
@@ -75,10 +81,12 @@ uint64_t CheckPointManager::GetStableCheckpoint() {
 
 StableCheckPoint CheckPointManager::GetStableCheckpointWithVotes() {
   std::lock_guard<std::mutex> lk(mutex_);
+  LOG(ERROR)<<"get stable ckpt:"<<stable_ckpt_.DebugString();
   return stable_ckpt_;
 }
 
 void CheckPointManager::AddCommitData(std::unique_ptr<Request> request) {
+  LOG(ERROR)<<" add commit data:"<<request->seq();
   if (config_.IsCheckPointEnabled()) {
     data_queue_.Push(std::move(request));
   }
@@ -109,6 +117,7 @@ int 
CheckPointManager::ProcessCheckPoint(std::unique_ptr<Context> context,
   }
   uint64_t checkpoint_seq = checkpoint_data.seq();
   uint32_t sender_id = request->sender_id();
+  LOG(ERROR)<<" receive ckpt:"<<checkpoint_seq<<" from:"<<sender_id;
   int water_mark = config_.GetCheckPointWaterMark();
   if (checkpoint_seq % water_mark) {
     LOG(ERROR) << "checkpoint seq not invalid:" << checkpoint_seq;
@@ -162,11 +171,38 @@ bool CheckPointManager::Wait() {
 
 void CheckPointManager::CheckHealthy() {
   uint32_t current_time = time(nullptr);
+
+  std::map<uint64_t, int> seqs;
+
   for(int i = 1; i <= config_.GetReplicaNum(); ++i) {
+    if (last_update_time_.find(i) == last_update_time_.end() || 
last_update_time_[i] == 0) {
+      continue;
+    }
+    LOG(ERROR)<<" check healthy, replica:"<<i
+    <<" current time:"<<current_time
+    <<" last time:"<<last_update_time_[i]
+    <<" timeout:"<<replica_timeout_
+    <<" pass:"<<current_time - last_update_time_[i];
     if( current_time - last_update_time_[i] > replica_timeout_ ) {
       TimeoutHandler(i);
     }
+    seqs[status_[i]]++;
   }
+
+  uint64_t unstable_check_ckpt = 0;
+  for(auto it : seqs) {
+    int num = 0;
+    for(auto sit: seqs) {
+      if(sit.first < it.first) {
+        continue;
+      }
+      num += sit.second;
+    }
+    if(num >= config_.GetMinDataReceiveNum()) {
+      unstable_check_ckpt = std::max(unstable_check_ckpt, it.first);
+    }
+  }
+  SetUnstableCkpt(unstable_check_ckpt);
 }
 
 void CheckPointManager::UpdateStableCheckPointStatus() {
@@ -180,6 +216,7 @@ void CheckPointManager::UpdateStableCheckPointStatus() {
     {
       std::lock_guard<std::mutex> lk(mutex_);
       for (auto it : sender_ckpt_) {
+         LOG(ERROR)<<" get ckpt seq :"<<it.first.first<<" 
size:"<<it.second.size();
         if (it.second.size() >=
             static_cast<size_t>(config_.GetMinCheckpointReceiveNum())) {
           committable_seq_ = it.first.first;
@@ -197,8 +234,11 @@ void CheckPointManager::UpdateStableCheckPointStatus() {
       new_data_ = 0;
     }
 
-    // LOG(ERROR) << "current stable seq:" << current_stable_seq_
-    //  << " stable seq:" << stable_seq;
+     LOG(ERROR) << "current stable seq:" << current_stable_seq_
+      << " stable seq:" << stable_seq;
+     if(stable_seq == 0) {
+       continue;
+     }
     std::vector<SignatureInfo> votes;
     if (current_stable_seq_ < stable_seq) {
       std::lock_guard<std::mutex> lk(mutex_);
@@ -223,9 +263,9 @@ void CheckPointManager::UpdateStableCheckPointStatus() {
         *stable_ckpt_.add_signatures() = vote;
       }
       current_stable_seq_ = stable_seq;
-      // LOG(INFO) << "done. stable seq:" << current_stable_seq_
-      //           << " votes:" << stable_ckpt_.DebugString();
-      // LOG(INFO) << "done. stable seq:" << current_stable_seq_;
+       LOG(INFO) << "done. stable seq:" << current_stable_seq_
+                 << " votes:" << stable_ckpt_.DebugString();
+       LOG(INFO) << "done. stable seq:" << current_stable_seq_;
     }
     UpdateStableCheckPointCallback(current_stable_seq_);
   }
@@ -248,7 +288,16 @@ void CheckPointManager::TimeoutHandler(uint32_t replica) {
   }
 }
 
-void CheckPointManager::SetLastCommit(uint64_t seq) { last_seq_ = seq; }
+void CheckPointManager::SetLastCommit(uint64_t seq) { 
+  LOG(ERROR)<<" set last commit:"<<seq;
+  last_seq_ = seq; 
+  std::lock_guard<std::mutex> lk(lt_mutex_);
+  committed_status_.clear();
+}
+
+uint64_t CheckPointManager::GetLastCommit() {
+  return last_seq_;
+}
 
 int CheckPointManager::ProcessStatusSync(std::unique_ptr<Context> context,
                                          std::unique_ptr<Request> request) {
@@ -259,9 +308,13 @@ int 
CheckPointManager::ProcessStatusSync(std::unique_ptr<Context> context,
   }
   uint64_t seq = checkpoint_data.seq();
   uint32_t sender_id = request->sender_id();
+  uint32_t primary_id = checkpoint_data.primary_id();
+  uint32_t view = checkpoint_data.view();
+
   status_[sender_id] = seq;
   last_update_time_[sender_id] = time(nullptr);
-  LOG(ERROR) << " received from :" << sender_id << " commit status:" << seq;
+  view_status_[sender_id] = std::make_pair(primary_id, view);
+  LOG(ERROR) << " received from :" << sender_id << " commit status:" << seq<<" 
primary:"<<primary_id<<" view:"<<view;
   return 0;
 }
 
@@ -277,15 +330,40 @@ void CheckPointManager::CheckStatus(uint64_t last_seq) {
   if (seqs.size() <= f + 1) {
     return;
   }
-  uint64_t min_seq = seqs[f + 1];
+  //uint64_t min_seq = seqs[f + 1];
+  uint64_t min_seq = seqs.back();
 
   LOG(ERROR) << " check last seq:" << last_seq << " max seq:" << min_seq;
   if (last_seq < min_seq) {
     // need recovery from others
-    BroadcastRecovery(last_seq + 1, std::min(min_seq, last_seq + 100));
+    reset_execute_func_(last_seq + 1);
+    BroadcastRecovery(last_seq + 1, std::min(min_seq, last_seq + 500));
+  }
+}
+
+void CheckPointManager::CheckSysStatus() {
+  int f = config_.GetMaxMaliciousReplicaNum();
+
+  std::map<std::pair<int, uint64_t>, int > views;
+  int current_primary = 0;
+  uint64_t current_view= 0;
+  for(auto it : view_status_) {
+    views[it.second]++;
+    if(views[it.second] >= 2 * f+1){
+      current_primary = it.second.first;
+      current_view = it.second.second;
+    }
   }
+
+  if(current_primary > 0 && current_primary != sys_info_->GetPrimaryId() && 
current_view > sys_info_->GetCurrentView()) {
+    sys_info_->SetCurrentView(current_view);
+    sys_info_->SetPrimary(current_primary);
+    LOG(ERROR)<<" change to primary:"<<current_primary<<" view:"<<current_view;
+  }
+
 }
 
+
 void CheckPointManager::SyncStatus() {
   uint64_t last_check_seq = 0;
   uint64_t last_time = 0;
@@ -296,15 +374,21 @@ void CheckPointManager::SyncStatus() {
     std::unique_ptr<Request> checkpoint_request = NewRequest(
         Request::TYPE_STATUS_SYNC, Request(), config_.GetSelfInfo().id());
     checkpoint_data.set_seq(last_seq);
+    checkpoint_data.set_view(sys_info_->GetCurrentView());
+    checkpoint_data.set_primary_id(sys_info_->GetPrimaryId());
     checkpoint_data.SerializeToString(checkpoint_request->mutable_data());
     replica_communicator_->BroadCast(*checkpoint_request);
 
     LOG(ERROR) << " sync status last seq:" << last_seq
-               << " last time:" << last_time;
-    if (last_check_seq == last_seq && last_time > 300) {
+               << " last time:" << last_time
+               << " primary:" << sys_info_->GetPrimaryId()
+               << " view:" << sys_info_->GetCurrentView();
+    if (last_check_seq == last_seq && last_time > 5) {
       CheckStatus(last_seq);
       last_time = 0;
     }
+    CheckSysStatus();
+
     if (last_seq != last_check_seq) {
       last_check_seq = last_seq;
       last_time = 0;
@@ -325,6 +409,7 @@ void CheckPointManager::UpdateCheckPointStatus() {
   while (!stop_) {
     std::unique_ptr<Request> request = nullptr;
     if (!pendings.empty()) {
+      LOG(ERROR)<<" last seq:"<<last_seq_<<" 
pending:"<<pendings.begin()->second->seq();
       if (pendings.begin()->second->seq() == last_seq_ + 1) {
         request = std::move(pendings.begin()->second);
         pendings.erase(pendings.begin());
@@ -338,8 +423,7 @@ void CheckPointManager::UpdateCheckPointStatus() {
     }
     std::string hash_ = request->hash();
     uint64_t current_seq = request->seq();
-    // LOG(ERROR) << "update checkpoint seq :" << last_seq_ << " current:" <<
-    // current_seq;
+    LOG(ERROR) << "update checkpoint seq :" << last_seq_ << " current:" << 
current_seq;
     if (current_seq != last_seq_ + 1) {
       LOG(ERROR) << "seq invalid:" << last_seq_ << " current:" << current_seq;
       if (current_seq > last_seq_ + 1) {
@@ -354,13 +438,15 @@ void CheckPointManager::UpdateCheckPointStatus() {
     }
     bool is_recovery = request->is_recovery();
 
-    if (current_seq == last_ckpt_seq + water_mark) {
+    LOG(ERROR)<<" current seq:"<<current_seq<<" water mark:"<<water_mark<<" 
current stable seq:"<<current_stable_seq_;
+    if (current_seq > 0 && current_seq % water_mark == 0) {
       last_ckpt_seq = current_seq;
-      if (!is_recovery) {
-        BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs,
+      //if (!is_recovery) {
+      BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs,
                             stable_seqs);
-      }
+      //}
     }
+    ClearCommittedStatus(current_seq);
   }
   return;
 }
@@ -411,10 +497,12 @@ CheckPointManager::PopStableSeqHash() {
 
 uint64_t CheckPointManager::GetHighestPreparedSeq() {
   std::lock_guard<std::mutex> lk(lt_mutex_);
+  LOG(ERROR)<<"get high prepared seq:"<<highest_prepared_seq_;
   return highest_prepared_seq_;
 }
 
 void CheckPointManager::SetHighestPreparedSeq(uint64_t seq) {
+  LOG(ERROR)<<"set high prepared seq:"<<seq;
   std::lock_guard<std::mutex> lk(lt_mutex_);
   highest_prepared_seq_ = seq;
 }
@@ -429,6 +517,46 @@ uint64_t CheckPointManager::GetCommittableSeq() {
   return committable_seq_;
 }
 
+void CheckPointManager::SetUnstableCkpt(uint64_t unstable_check_ckpt) {
+  LOG(ERROR)<<" set unstable ckpt:"<<unstable_check_ckpt;
+  {
+  std::lock_guard<std::mutex> lk(lt_mutex_);
+  unstable_check_ckpt_ = unstable_check_ckpt;
+  }
+}
+
+uint64_t CheckPointManager::GetUnstableCkpt() {
+  std::lock_guard<std::mutex> lk(lt_mutex_);
+  LOG(ERROR)<<" get unstable ckpt:"<<unstable_check_ckpt_;
+  return unstable_check_ckpt_;
+}
+
+void CheckPointManager::AddCommitState(uint64_t seq) {
+  LOG(ERROR)<<" add commited state:"<<seq;
+  std::lock_guard<std::mutex> lk(lt_mutex_);
+  committed_status_[seq] = true;
+}
+
+bool CheckPointManager::IsCommitted(uint64_t seq) {
+  std::lock_guard<std::mutex> lk(lt_mutex_);
+  if(seq < last_seq_) {
+    return true;
+  }
+  return committed_status_.find(seq) != committed_status_.end();
+}
+
+void CheckPointManager::ClearCommittedStatus(uint64_t seq) {
+  std::lock_guard<std::mutex> lk(lt_mutex_);
+  while(!committed_status_.empty()){
+    if(committed_status_.begin()->first <= seq) {
+      committed_status_.erase(committed_status_.begin());
+    }
+    else {
+      break;
+    }
+  }
+}
+
 // void CheckPointManager::SetLastExecutedSeq(uint64_t latest_executed_seq){
 //   latest_executed_seq = executor_->get_latest_executed_seq();
 // }
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.h 
b/platform/consensus/ordering/pbft/checkpoint_manager.h
index d1c33aa4..88db50ab 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.h
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.h
@@ -38,10 +38,12 @@ class CheckPointManager : public CheckPoint {
  public:
   CheckPointManager(const ResDBConfig& config,
                     ReplicaCommunicator* replica_communicator,
-                    SignatureVerifier* verifier);
+                    SignatureVerifier* verifier,
+                    SystemInfo * sys_info);
   virtual ~CheckPointManager();
 
   void SetLastCommit(uint64_t seq);
+  uint64_t GetLastCommit();
 
   void AddCommitData(std::unique_ptr<Request> request);
   int ProcessCheckPoint(std::unique_ptr<Context> context,
@@ -76,6 +78,17 @@ class CheckPointManager : public CheckPoint {
 
   uint64_t GetCommittableSeq();
 
+  void SetUnstableCkpt(uint64_t unstable_check_ckpt);
+
+  uint64_t GetUnstableCkpt();
+
+  void AddCommitState(uint64_t seq);
+
+  bool IsCommitted(uint64_t seq);
+  void ClearCommittedStatus(uint64_t seq);
+
+  void SetResetExecute(std::function<void (uint64_t seq)>);
+
  private:
   void UpdateCheckPointStatus();
   void UpdateStableCheckPointStatus();
@@ -90,6 +103,7 @@ class CheckPointManager : public CheckPoint {
   void SyncStatus();
   void StatusProcess();
   void CheckStatus(uint64_t last_seq);
+  void CheckSysStatus();
   void CheckHealthy();
 
  protected:
@@ -125,7 +139,12 @@ class CheckPointManager : public CheckPoint {
   sem_t committable_seq_signal_;
   std::map<int, uint64_t> status_;
   std::map<int,int> last_update_time_;
-  int replica_timeout_ = 120;
+  int replica_timeout_ = 60;
+  uint64_t unstable_check_ckpt_;
+  std::map<int, uint64_t> committed_status_;
+  std::function<void (uint64_t)> reset_execute_func_;
+  SystemInfo * sys_info_;
+  std::map<int, std::pair<int, uint64_t> > view_status_;
 };
 
 }  // namespace resdb
diff --git a/platform/consensus/ordering/pbft/commitment.cpp 
b/platform/consensus/ordering/pbft/commitment.cpp
index 914fc623..f8a23b9f 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -97,13 +97,6 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context> 
context,
     return -3;
   }
 
-  /*
-  if(SignatureVerifier::CalculateHash(user_request->data()) !=
-  user_request->hash()){ LOG(ERROR) << "the hash and data of the user request
-  don't match, reject"; return -2;
-  }
-  */
-
   // check signatures
   bool valid = verifier_->VerifyMessage(user_request->data(),
                                         user_request->data_signature());
@@ -124,10 +117,12 @@ int 
Commitment::ProcessNewRequest(std::unique_ptr<Context> context,
     return -2;
   }
   auto seq = message_manager_->AssignNextSeq();
+  LOG(ERROR)<<" get new txn seq:"<<(*seq);
 
   // Artificially make the primary stop proposing new trasactions.
 
   if (!seq.ok()) {
+    LOG(ERROR)<<" seq fail";
     duplicate_manager_->EraseProposed(user_request->hash());
     global_stats_->SeqFail();
     Request request;
@@ -167,12 +162,32 @@ int 
Commitment::ProcessProposeMsg(std::unique_ptr<Context> context,
     if (message_manager_->GetNextSeq() == 0 ||
         request->seq() == message_manager_->GetNextSeq()) {
       message_manager_->SetNextSeq(request->seq() + 1);
-    } else {
+      while(!pending_recovery_.empty()) {
+        LOG(ERROR)<<" pending size:"<<pending_recovery_.size()<<" 
first:"<<pending_recovery_.begin()->first<<" 
next:"<<message_manager_->GetNextSeq();
+        if(pending_recovery_.begin()->first <= message_manager_->GetNextSeq()) 
{
+          if(pending_recovery_.begin()->first == 
message_manager_->GetNextSeq()){
+            message_manager_->SetNextSeq(pending_recovery_.begin()->first + 1);
+            
message_manager_->AddConsensusMsg(pending_recovery_.begin()->second.first->signature,
+                std::move(pending_recovery_.begin()->second.second));
+          }
+          pending_recovery_.erase(pending_recovery_.begin());
+        }
+        else {
+          break;
+        }
+      }
+
+    } else if( request->seq() > message_manager_->GetNextSeq() ) {
+      uint64_t seq = request->seq();
+      pending_recovery_[seq] = std::make_pair(std::move(context), 
std::move(request));
+      return 0;
+    }else if(!request->force_recovery()){
       LOG(ERROR) << " recovery request not valid:"
                  << " current seq:" << message_manager_->GetNextSeq()
                  << " data seq:" << request->seq();
       return 0;
     }
+    request->set_force_recovery(false);
     return message_manager_->AddConsensusMsg(context->signature,
                                              std::move(request));
   }
@@ -183,13 +198,6 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context> 
context,
     return -2;
   }
 
-  /*
-    if(request->hash() != "null" + std::to_string(request->seq())
-        && SignatureVerifier::CalculateHash(request->data()) != 
request->hash())
-    { LOG(ERROR) << "the hash and data of the request don't match, reject";
-      return -2;
-    }
-    */
 
   if (request->sender_id() != config_.GetSelfInfo().id()) {
     if (pre_verify_func_ && !pre_verify_func_(*request)) {
@@ -228,6 +236,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context> 
context,
   // message.
   CollectorResultCode ret =
       message_manager_->AddConsensusMsg(context->signature, 
std::move(request));
+      LOG(ERROR)<<" ret:"<<ret;
   if (ret == CollectorResultCode::STATE_CHANGED) {
     replica_communicator_->BroadCast(*prepare_request);
   }
@@ -241,9 +250,18 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context> 
context,
     LOG(ERROR) << "user request doesn't contain signature, reject";
     return -2;
   }
+  LOG(ERROR)<< " prepare seq:"<<request->seq();
   if (request->is_recovery()) {
-    return message_manager_->AddConsensusMsg(context->signature,
-                                             std::move(request));
+    uint64_t seq = request->seq();
+    CollectorResultCode ret = 
message_manager_->AddConsensusMsg(context->signature,
+        std::move(request));
+    if (ret == CollectorResultCode::STATE_CHANGED) {
+      if (message_manager_->GetHighestPreparedSeq() < seq) {
+        message_manager_->SetHighestPreparedSeq(seq);
+      }
+    }
+  LOG(ERROR)<< " prepare seq:"<<seq<<" ret:"<<ret;
+    return ret;
   }
   // global_stats_->IncPrepare();
   std::unique_ptr<Request> commit_request = resdb::NewRequest(
@@ -252,12 +270,13 @@ int 
Commitment::ProcessPrepareMsg(std::unique_ptr<Context> context,
   // Add request to message_manager.
   // If it has received enough same requests(2f+1), broadcast the commit
   // message.
-  uint64_t seq_ = request->seq();
+  uint64_t seq = request->seq();
   CollectorResultCode ret =
       message_manager_->AddConsensusMsg(context->signature, 
std::move(request));
+    LOG(ERROR)<<" add msg seq:"<<seq<<" ret:"<<ret;
   if (ret == CollectorResultCode::STATE_CHANGED) {
-    if (message_manager_->GetHighestPreparedSeq() < seq_) {
-      message_manager_->SetHighestPreparedSeq(seq_);
+    if (message_manager_->GetHighestPreparedSeq() < seq) {
+      message_manager_->SetHighestPreparedSeq(seq);
     }
     // If need qc, sign the data
     if (need_qc_ && verifier_) {
@@ -284,6 +303,8 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context> 
context,
                << " context:" << (context == nullptr);
     return -2;
   }
+  LOG(ERROR)<< " commit seq:"<<request->seq();
+  uint64_t seq = request->seq();
   if (request->is_recovery()) {
     return message_manager_->AddConsensusMsg(context->signature,
                                              std::move(request));
@@ -319,7 +340,7 @@ int Commitment::PostProcessExecutedMsg() {
     request.set_current_view(batch_resp->current_view());
     request.set_proxy_id(batch_resp->proxy_id());
     request.set_primary_id(batch_resp->primary_id());
-    // LOG(ERROR)<<"send back to proxy:"<<batch_resp->proxy_id();
+    LOG(ERROR)<<"send back to proxy:"<<batch_resp->proxy_id();
     batch_resp->SerializeToString(request.mutable_data());
     replica_communicator_->SendMessage(request, request.proxy_id());
   }
diff --git a/platform/consensus/ordering/pbft/commitment.h 
b/platform/consensus/ordering/pbft/commitment.h
index 03d77cf3..329fcdc0 100644
--- a/platform/consensus/ordering/pbft/commitment.h
+++ b/platform/consensus/ordering/pbft/commitment.h
@@ -73,6 +73,7 @@ class Commitment {
   bool need_qc_ = false;
 
   std::mutex mutex_;
+  std::map<uint64_t, std::pair<std::unique_ptr<Context>, 
std::unique_ptr<Request>> > pending_recovery_;
   std::unique_ptr<DuplicateManager> duplicate_manager_;
 };
 
diff --git a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp 
b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
index 7425e3c7..ffdd1a03 100644
--- a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
+++ b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
@@ -32,7 +32,7 @@ ConsensusManagerPBFT::ConsensusManagerPBFT(
     : ConsensusManager(config),
       system_info_(std::make_unique<SystemInfo>(config)),
       checkpoint_manager_(std::make_unique<CheckPointManager>(
-          config, GetBroadCastClient(), GetSignatureVerifier())),
+          config, GetBroadCastClient(), GetSignatureVerifier(), 
system_info_.get())),
       message_manager_(std::make_unique<MessageManager>(
           config, std::move(executor), checkpoint_manager_.get(),
           system_info_.get())),
@@ -65,6 +65,7 @@ ConsensusManagerPBFT::ConsensusManagerPBFT(
 
   recovery_->ReadLogs(
       [&](const SystemInfoData& data) {
+        LOG(ERROR)<<" read data info:"<<data.view()<<" 
primary:"<<data.primary_id();
         system_info_->SetCurrentView(data.view());
         system_info_->SetPrimary(data.primary_id());
       },
@@ -72,6 +73,7 @@ ConsensusManagerPBFT::ConsensusManagerPBFT(
         return InternalConsensusCommit(std::move(context), std::move(request));
       },
       [&](int seq) { message_manager_->SetNextCommitSeq(seq + 1); });
+ LOG(ERROR)<<" recovery is done";
 }
 
 void ConsensusManagerPBFT::SetNeedCommitQC(bool need_qc) {
@@ -79,6 +81,7 @@ void ConsensusManagerPBFT::SetNeedCommitQC(bool need_qc) {
 }
 
 void ConsensusManagerPBFT::Start() {
+ LOG(ERROR)<<" ======= start";
   ConsensusManager::Start();
   recovery_thread_ =
       std::thread(&ConsensusManagerPBFT::RemoteRecoveryProcess, this);
@@ -143,8 +146,8 @@ ConsensusManagerPBFT::PopComplainedRequest() {
 // The implementation of PBFT.
 int ConsensusManagerPBFT::ConsensusCommit(std::unique_ptr<Context> context,
                                           std::unique_ptr<Request> request) {
-  // LOG(INFO) << "recv impl type:" << request->type() << " "
-  //          << "sender id:" << request->sender_id();
+   LOG(INFO) << "recv impl type:" << request->type() << " "
+            << "sender id:" << request->sender_id()<<" 
primary:"<<system_info_->GetPrimaryId();
   // If it is in viewchange, push the request to the queue
   // for the requests from the new view which come before
   // the local new view done.
@@ -190,8 +193,10 @@ int 
ConsensusManagerPBFT::ConsensusCommit(std::unique_ptr<Context> context,
 
 int ConsensusManagerPBFT::InternalConsensusCommit(
     std::unique_ptr<Context> context, std::unique_ptr<Request> request) {
-  // LOG(ERROR) << "recv impl type:" << request->type() << " "
-  //        << "sender id:" << request->sender_id()<<" seq:"<<request->seq();
+   LOG(ERROR) << "recv impl type:" << request->type() << " "
+          << "sender id:" << request->sender_id()<<" seq:"<<request->seq() 
+          <<" primary:"<<system_info_->GetPrimaryId() 
+          << " is convery:"<<request->is_recovery() ;
 
   switch (request->type()) {
     case Request::TYPE_CLIENT_REQUEST:
@@ -284,7 +289,11 @@ int ConsensusManagerPBFT::ProcessRecoveryData(
     return -2;
   }
   LOG(ERROR) << " obtain min seq:" << recovery_data.min_seq()
-             << " max seq:" << recovery_data.max_seq();
+             << " max seq:" << recovery_data.max_seq() 
+             << " from:" << request->sender_id();
+  if (request->sender_id() == config_.GetSelfInfo().id()) {
+    return 0;
+  }
   RecoveryResponse response;
   int ret = recovery_->GetData(recovery_data, response);
   if (ret) {
@@ -331,26 +340,28 @@ void ConsensusManagerPBFT::RemoteRecoveryProcess() {
                << " data size:" << recovery_data.request_size()
                << " signrue:" << recovery_data.signature_size();
 
+
     for (int i = 0; i < recovery_data.request().size(); i++) {
       uint64_t seq = recovery_data.request(i).seq();
       int type = recovery_data.request(i).type();
-      if (data.find(seq) != data.end()) {
-        // LOG(ERROR)<<" check recovery remote seq:"<<seq<<" type:"<<type<<" 
has
-        // been recovered.";
+      if(checkpoint_manager_->IsCommitted(seq)) {
+         LOG(ERROR)<<" check recovery remote seq:"<<seq<<" type:"<<type<<" has 
been recovered.";
+        continue;
+      }
+
+      uint64_t last_seq = checkpoint_manager_->GetLastCommit();
+      if( seq - last_seq > 1000 ) {
+         LOG(ERROR)<<" check seq:"<<seq<<" last:"<<last_seq<<" data is 
missing, skip.";
         continue;
       }
 
       auto context = std::make_unique<Context>();
       context->signature = recovery_data.signature(i);
       auto request = std::make_unique<Request>(recovery_data.request(i));
-      // write to log
-      // recovery_->AddRequest(context.get(), request.get());
-      InternalConsensusCommit(std::move(context), std::move(request));
-    }
+      
view_change_manager_->SetCurrentViewAndNewPrimary(request->current_view());
+      request->set_force_recovery(true);
 
-    for (int i = 0; i < recovery_data.request().size(); i++) {
-      uint64_t seq = recovery_data.request(i).seq();
-      data.insert(seq);
+      InternalConsensusCommit(std::move(context), std::move(request));
     }
   }
 }
diff --git a/platform/consensus/ordering/pbft/lock_free_collector_pool.cpp 
b/platform/consensus/ordering/pbft/lock_free_collector_pool.cpp
index bf335c17..a6f8593b 100644
--- a/platform/consensus/ordering/pbft/lock_free_collector_pool.cpp
+++ b/platform/consensus/ordering/pbft/lock_free_collector_pool.cpp
@@ -68,8 +68,8 @@ void LockFreeCollectorPool::Update(uint64_t seq) {
     LOG(ERROR) << "seq not match, skip update:" << seq;
     return;
   }
-  // LOG(ERROR)<<" update:"<<(idx^capacity_)<<" seq:"<<seq+capacity_<<
-  //     " cap:"<<capacity_<<" update seq:"<<seq;
+   LOG(ERROR)<<" update:"<<(idx^capacity_)<<" seq:"<<seq+capacity_<<
+         " cap:"<<capacity_<<" update seq:"<<seq;
   collector_[idx ^ capacity_] = std::make_unique<TransactionCollector>(
       seq + capacity_, executor_, enable_viewchange_);
 }
diff --git a/platform/consensus/ordering/pbft/message_manager.cpp 
b/platform/consensus/ordering/pbft/message_manager.cpp
index 743f7aeb..830373d5 100644
--- a/platform/consensus/ordering/pbft/message_manager.cpp
+++ b/platform/consensus/ordering/pbft/message_manager.cpp
@@ -63,6 +63,10 @@ MessageManager::MessageManager(
   transaction_executor_->SetSeqUpdateNotifyFunc(
       [&](uint64_t seq) { collector_pool_->Update(seq - 1); });
   checkpoint_manager_->SetExecutor(transaction_executor_.get());
+  checkpoint_manager_->SetResetExecute([&](uint64_t seq) {
+    SetNextCommitSeq(seq);
+  });
+
 }
 
 MessageManager::~MessageManager() {
@@ -84,6 +88,7 @@ uint64_t MessageManager ::GetCurrentView() const {
 }
 
 void MessageManager::SetNextSeq(uint64_t seq) {
+  LOG(ERROR) << "set next old seq:" << next_seq_;
   next_seq_ = seq;
   LOG(ERROR) << "set next seq:" << next_seq_;
 }
@@ -158,7 +163,6 @@ bool MessageManager::MayConsensusChangeStatus(
         return status->compare_exchange_strong(
             old_status, TransactionStatue::READY_EXECUTE,
             std::memory_order_acq_rel, std::memory_order_acq_rel);
-        return true;
       }
       break;
   }
@@ -174,6 +178,7 @@ bool MessageManager::MayConsensusChangeStatus(
 CollectorResultCode MessageManager::AddConsensusMsg(
     const SignatureInfo& signature, std::unique_ptr<Request> request) {
   if (request == nullptr || !IsValidMsg(*request)) {
+    LOG(ERROR)<<" msg not invalid";
     return CollectorResultCode::INVALID;
   }
 
@@ -181,6 +186,11 @@ CollectorResultCode MessageManager::AddConsensusMsg(
   uint64_t seq = request->seq();
   int resp_received_count = 0;
   int proxy_id = request->proxy_id();
+  if(checkpoint_manager_->IsCommitted(seq)){
+    LOG(ERROR)<<" seq:"<<seq<<" type:"<<type<<" has been committed";
+    return CollectorResultCode::STATE_CHANGED;
+  }
+
 
   int ret = collector_pool_->GetCollector(seq)->AddRequest(
       std::move(request), signature, type == Request::TYPE_PRE_PREPARE,
@@ -194,9 +204,15 @@ CollectorResultCode MessageManager::AddConsensusMsg(
   if (ret == 1) {
     SetLastCommittedTime(proxy_id);
   } else if (ret != 0) {
+     LOG(ERROR)<<" add request fail";
     return CollectorResultCode::INVALID;
   }
   if (resp_received_count > 0) {
+    if(type == Request::TYPE_COMMIT) {
+      if(checkpoint_manager_) {
+        checkpoint_manager_->AddCommitState(seq);
+      }
+    }
     return CollectorResultCode::STATE_CHANGED;
   }
   return CollectorResultCode::OK;
@@ -206,9 +222,11 @@ std::vector<RequestInfo> 
MessageManager::GetPreparedProof(uint64_t seq) {
   return collector_pool_->GetCollector(seq)->GetPreparedProof();
 }
 
+/*
 TransactionStatue MessageManager::GetTransactionState(uint64_t seq) {
   return collector_pool_->GetCollector(seq)->GetStatus();
 }
+*/
 
 int MessageManager::GetReplicaState(ReplicaState* state) {
   *state->mutable_replica_config() = config_.GetConfigData();
@@ -220,9 +238,11 @@ Storage* MessageManager::GetStorage() {
 }
 
 void MessageManager::SetNextCommitSeq(int seq) {
+  LOG(ERROR)<<" set next commit seq:"<<seq;
   SetNextSeq(seq);
+  SetHighestPreparedSeq(seq);
   collector_pool_->Reset(seq);
-  checkpoint_manager_->SetLastCommit(seq);
+  checkpoint_manager_->SetLastCommit(seq-1);
   return transaction_executor_->SetPendingExecutedSeq(seq);
 }
 
diff --git a/platform/consensus/ordering/pbft/message_manager.h 
b/platform/consensus/ordering/pbft/message_manager.h
index a835be28..c4abe350 100644
--- a/platform/consensus/ordering/pbft/message_manager.h
+++ b/platform/consensus/ordering/pbft/message_manager.h
@@ -76,7 +76,7 @@ class MessageManager {
   // if the request has been prepared, having received 2f+1
   // pre-prepare messages.
   std::vector<RequestInfo> GetPreparedProof(uint64_t seq);
-  TransactionStatue GetTransactionState(uint64_t seq);
+  //TransactionStatue GetTransactionState(uint64_t seq);
 
   void SetNextCommitSeq(int seq);
 
diff --git a/platform/consensus/ordering/pbft/performance_manager.cpp 
b/platform/consensus/ordering/pbft/performance_manager.cpp
index 46a3b587..631b8f4c 100644
--- a/platform/consensus/ordering/pbft/performance_manager.cpp
+++ b/platform/consensus/ordering/pbft/performance_manager.cpp
@@ -195,7 +195,7 @@ CollectorResultCode PerformanceManager::AddResponseMsg(
 
   std::unique_ptr<BatchUserResponse> batch_response =
       std::make_unique<BatchUserResponse>();
-  if (!batch_response->ParseFromString(request->data())) {
+  if (!batch_response->ParseFromString(request->data()) || request->seq() == 
0) {
     LOG(ERROR) << "parse response fail:" << request->data().size()
                << " seq:" << request->seq();
     return CollectorResultCode::INVALID;
diff --git a/platform/consensus/ordering/pbft/response_manager.cpp 
b/platform/consensus/ordering/pbft/response_manager.cpp
index f490c003..28cafce2 100644
--- a/platform/consensus/ordering/pbft/response_manager.cpp
+++ b/platform/consensus/ordering/pbft/response_manager.cpp
@@ -184,8 +184,13 @@ CollectorResultCode ResponseManager::AddResponseMsg(
     return CollectorResultCode::INVALID;
   }
 
+  LOG(ERROR)<<" receive request seq:"<<request->seq()<<" 
type:"<<request->type()<<" local id:"<<batch_response->local_id();
   uint64_t seq = batch_response->local_id();
   request->set_seq(seq);
+  if(seq == 0) {
+    LOG(ERROR)<<" local id is invalid:"<<seq<<" request 
seq:"<<request->seq()<<" type:"<<request->type();
+    return CollectorResultCode::INVALID;
+  }
 
   int type = request->type();
   int resp_received_count = 0;
@@ -202,6 +207,7 @@ CollectorResultCode ResponseManager::AddResponseMsg(
   if (ret != 0) {
     return CollectorResultCode::INVALID;
   }
+  LOG(ERROR)<<" get receive count:"<<resp_received_count<<" seq:"<<seq;
   if (resp_received_count > 0) {
     collector_pool_->Update(seq);
     RemoveWaitingResponseRequest(hash);
@@ -336,8 +342,7 @@ int ResponseManager::DoBatch(
   new_request->set_proxy_id(config_.GetSelfInfo().id());
   replica_communicator_->SendMessage(*new_request, GetPrimary());
   send_num_++;
-  // LOG(INFO) << "send msg to primary:" << GetPrimary()
-  //          << " batch size:" << batch_req.size();
+   LOG(INFO) << "send msg to primary:" << GetPrimary() << " batch size:" << 
batch_req.size();
   AddWaitingResponseRequest(std::move(new_request));
   return 0;
 }
diff --git a/platform/consensus/ordering/pbft/transaction_collector.cpp 
b/platform/consensus/ordering/pbft/transaction_collector.cpp
index accb0536..6327ad69 100644
--- a/platform/consensus/ordering/pbft/transaction_collector.cpp
+++ b/platform/consensus/ordering/pbft/transaction_collector.cpp
@@ -83,6 +83,7 @@ int TransactionCollector::AddRequest(
   uint64_t seq = request->seq();
   uint64_t view = request->current_view();
   if (is_committed_) {
+    LOG(ERROR)<<" seq:"<<seq<<" is committed:";
     return -2;
   }
   if (status_.load() == EXECUTED) {
@@ -116,6 +117,7 @@ int TransactionCollector::AddRequest(
       LOG(ERROR) << "set main request data fail";
       return -2;
     }
+    LOG(ERROR)<<" seq:"<<seq<<" add main:"<<this;
     view_ = view;
     call_back(*main_request->request.get(), 1, nullptr, &status_, force);
     return 0;
@@ -142,6 +144,7 @@ int TransactionCollector::AddRequest(
             if (atomic_mian_request_.Reference() != nullptr &&
                 atomic_mian_request_.Reference()->request->hash() != hash) {
               atomic_mian_request_.Clear();
+              LOG(ERROR)<<" seq:"<<seq_<<" main clear";
               for (auto it = other_main_request_.begin();
                    it != other_main_request_.end(); it++) {
                 if ((*it)->request->hash() == hash) {
@@ -149,6 +152,7 @@ int TransactionCollector::AddRequest(
                   request_info->signature = (*it)->signature;
                   request_info->request = std::move((*it)->request);
                   atomic_mian_request_.Set(request_info);
+                  LOG(ERROR)<<" seq:"<<seq_<<" reset main";
                   break;
                 }
               }
@@ -203,9 +207,10 @@ int TransactionCollector::Commit() {
     return -2;
   }
 
+  LOG(ERROR)<<" seq:"<<seq_<<" commit:"<<this;
   auto main_request = atomic_mian_request_.Reference();
   if (main_request == nullptr) {
-    LOG(ERROR) << "no main";
+    LOG(ERROR) << "no main:"<<seq_;
     return -2;
   }
 
diff --git a/platform/consensus/ordering/pbft/viewchange_manager.cpp 
b/platform/consensus/ordering/pbft/viewchange_manager.cpp
index 8ff17cbe..70468ed3 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager.cpp
+++ b/platform/consensus/ordering/pbft/viewchange_manager.cpp
@@ -87,8 +87,8 @@ ViewChangeManager::ViewChangeManager(const ResDBConfig& 
config,
   if (config_.GetConfigData().enable_viewchange()) {
     collector_pool_ = message_manager->GetCollectorPool();
     sem_init(&viewchange_timer_signal_, 0, 0);
-    server_checking_timeout_thread_ =
-        std::thread(&ViewChangeManager::MonitoringViewChangeTimeOut, this);
+    //server_checking_timeout_thread_ =
+    //    std::thread(&ViewChangeManager::MonitoringViewChangeTimeOut, this);
     checkpoint_state_thread_ =
         std::thread(&ViewChangeManager::MonitoringCheckpointState, this);
   }
@@ -175,11 +175,6 @@ bool ViewChangeManager::IsValidViewChangeMsg(
     return false;
   }
 
-  if (!checkpoint_manager_->IsValidCheckpointProof(
-          view_change_message.stable_ckpt())) {
-    LOG(ERROR) << "stable checkpoint invalid";
-    return false;
-  }
 
   uint64_t stable_seq = view_change_message.stable_ckpt().seq();
 
@@ -202,7 +197,7 @@ bool ViewChangeManager::IsValidViewChangeMsg(
       proof.request().SerializeToString(&data);
       if (!verifier_->VerifyMessage(data, proof.signature())) {
         LOG(ERROR) << "proof signature not valid";
-        return false;
+        //return false;
       }
     }
   }
@@ -232,43 +227,59 @@ void 
ViewChangeManager::SetCurrentViewAndNewPrimary(uint64_t view_number) {
       config_.GetReplicaInfos()[(view_number - 1) % replicas.size()].id();
   system_info_->SetPrimary(id);
   global_stats_->ChangePrimary(id);
-  LOG(ERROR) << "View Change Happened";
+  LOG(ERROR) << "View Change Happened: primary:"<<id<<" view:"<<view_number;
 }
 
 std::vector<std::unique_ptr<Request>> ViewChangeManager::GetPrepareMsg(
     const NewViewMessage& new_view_message, bool need_sign) {
   std::map<uint64_t, Request> prepared_msg;  // <sequence, digest>
+  int idx = 0;
   for (const auto& msg : new_view_message.viewchange_messages()) {
+    ++idx;
     for (const auto& msg : msg.prepared_msg()) {
       uint64_t seq = msg.seq();
-      prepared_msg[seq] = msg.proof(0).request();
+      LOG(ERROR)<<" get prepared msg:"<<seq<<" idx:"<<idx;
+      for(auto & pf: msg.proof()) {
+        LOG(ERROR)<<" get prepared msg:"<<seq<<" proof 
type:"<<pf.request().type();
+      }
+      if(prepared_msg.find(seq) == prepared_msg.end()){
+        prepared_msg[seq] = msg.proof(0).request();
+      }
+      else if(prepared_msg[seq].type() < msg.proof(0).request().type()){
+        prepared_msg[seq] = msg.proof(0).request();
+      }
     }
   }
 
   uint64_t min_s = std::numeric_limits<uint64_t>::max();
+  uint64_t max_seq = 1;
   for (const auto& msg : new_view_message.viewchange_messages()) {
     min_s = std::min(min_s, msg.stable_ckpt().seq());
+    max_seq = std::max(max_seq, msg.stable_ckpt().seq());
   }
 
-  uint64_t max_seq = 1;
   if (prepared_msg.size() > 0) {
     max_seq = (--prepared_msg.end())->first;
   }
 
-  LOG(INFO) << "[GP] min_s: " << min_s << " max_seq: " << max_seq;
+  LOG(ERROR) << "[GP] min_s: " << min_s << " max_seq: " << max_seq;
 
   std::vector<std::unique_ptr<Request>> redo_request;
   // Resent all the request with the current view number.
   for (auto i = min_s + 1; i <= max_seq; ++i) {
-    if (prepared_msg.find(i) == prepared_msg.end()) {
       // for sequence hole, create a new request with empty data and
       // sign by the new primary.
+      if(prepared_msg.find(i) == prepared_msg.end()) {
+        LOG(ERROR)<<" seq:"<<i<<" not prepared, in new view";
+      }
+      else {
+        LOG(ERROR)<<" seq:"<<i<<" prepared, type:"<<prepared_msg[i].type();
+      }
       std::unique_ptr<Request> user_request = resdb::NewRequest(
           Request::TYPE_PRE_PREPARE, Request(), config_.GetSelfInfo().id());
       user_request->set_seq(i);
       user_request->set_current_view(new_view_message.view_number());
       user_request->set_hash("null" + std::to_string(i));
-
       if (verifier_ && need_sign) {
         std::string data;
         auto signature_or = verifier_->SignMessage(data);
@@ -279,13 +290,6 @@ std::vector<std::unique_ptr<Request>> 
ViewChangeManager::GetPrepareMsg(
         *user_request->mutable_data_signature() = *signature_or;
       }
       redo_request.push_back(std::move(user_request));
-    } else {
-      std::unique_ptr<Request> commit_request = resdb::NewRequest(
-          Request::TYPE_COMMIT, prepared_msg[i], config_.GetSelfInfo().id());
-      commit_request->set_seq(i);
-      commit_request->set_current_view(new_view_message.view_number());
-      redo_request.push_back(std::move(commit_request));
-    }
   }
 
   return redo_request;
@@ -298,6 +302,11 @@ int 
ViewChangeManager::ProcessNewView(std::unique_ptr<Context> context,
     LOG(ERROR) << "Parsing new_view_msg failed.";
     return -2;
   }
+  if(request->is_recovery()) {
+    LOG(ERROR)<<" set new view";
+    SetCurrentViewAndNewPrimary(new_view_message.view_number());
+    return 0;
+  }
   LOG(INFO) << "Received NEW-VIEW for view " << new_view_message.view_number();
   // Check if new view is from next expected primary
   if (new_view_message.view_number() !=
@@ -328,6 +337,7 @@ int 
ViewChangeManager::ProcessNewView(std::unique_ptr<Context> context,
     return -2;
   }
 
+  LOG(ERROR)<<" request list size:"<<request_list.size();
   std::set<uint64_t> seq_set;
   // only check the data.
   for (size_t i = 0; i < request_list.size(); ++i) {
@@ -335,6 +345,7 @@ int 
ViewChangeManager::ProcessNewView(std::unique_ptr<Context> context,
       LOG(ERROR) << "data not match";
       return -2;
     }
+    LOG(ERROR)<<" set request seq:"<<request_list[i]->seq();
     seq_set.insert(request_list[i]->seq());
   }
 
@@ -352,31 +363,37 @@ int 
ViewChangeManager::ProcessNewView(std::unique_ptr<Context> context,
 
   SetCurrentViewAndNewPrimary(new_view_message.view_number());
   message_manager_->SetNextSeq(max_seq + 1);
-  LOG(INFO) << "SetNexSeq: " << max_seq + 1;
+  LOG(ERROR) << "SetNexSeq: " << max_seq + 1;
 
   // All is fine.
   for (size_t i = 0; i < request_list.size(); ++i) {
     if (new_view_message.request(i).type() ==
         static_cast<int>(Request::TYPE_PRE_PREPARE)) {
-      new_view_message.request(i);
+        /*
       auto non_proposed_hashes =
           collector_pool_->GetCollector(new_view_message.request(i).seq())
               ->GetAllStoredHash();
       for (auto& hash : non_proposed_hashes) {
         duplicate_manager_->EraseProposed(hash);
       }
+      */
+      //LOG(ERROR)<<"get prepare:"<<new_view_message.request(i).seq();
       replica_communicator_->SendMessage(new_view_message.request(i),
                                          config_.GetSelfInfo());
+      //LOG(ERROR)<<"get prepare:"<<new_view_message.request(i).seq()<<" done";
     } else {
       if (new_view_message.request(i).seq() >
           checkpoint_manager_->GetHighestPreparedSeq()) {
         checkpoint_manager_->SetHighestPreparedSeq(
             new_view_message.request(i).seq());
       }
+      LOG(ERROR)<<"broad new prepare:"<<new_view_message.request(i).type();
       replica_communicator_->BroadCast(new_view_message.request(i));
+      LOG(ERROR)<<"broad new prepare:"<<new_view_message.request(i).type()<<" 
done";
     }
   }
 
+  LOG(ERROR)<<" change state";
   ChangeStatue(ViewChangeStatus::NONE);
   return config_.GetSelfInfo().id() == system_info_->GetPrimaryId() ? -4 : 0;
 }
@@ -460,6 +477,7 @@ void ViewChangeManager::SendNewViewMsg(uint64_t 
view_number) {
   // Broadcast my new view request.
   std::unique_ptr<Request> request =
       NewRequest(Request::TYPE_NEWVIEW, Request(), config_.GetSelfInfo().id());
+
   new_view_message.SerializeToString(request->mutable_data());
   replica_communicator_->BroadCast(*request);
 }
@@ -484,19 +502,22 @@ void ViewChangeManager::SendViewChangeMsg() {
 
   // P - P is a set containing a set Pm for each request m that prepared at i
   // with a sequence number higher than n.
-  int max_seq = checkpoint_manager_->GetHighestPreparedSeq();
-  LOG(INFO) << "Check prepared or committed txns from "
-            << view_change_message.stable_ckpt().seq() + 1 << " to " << 
max_seq;
-
-  for (int i = view_change_message.stable_ckpt().seq() + 1; i <= max_seq; ++i) 
{
+  uint64_t max_seq = checkpoint_manager_->GetHighestPreparedSeq();
+  uint64_t min_seq = view_change_message.stable_ckpt().seq();
+  min_seq = std::max(min_seq, checkpoint_manager_->GetUnstableCkpt());
+  view_change_message.mutable_stable_ckpt()->set_seq(min_seq);
+  LOG(ERROR) << "Check prepared or committed txns from "
+            << min_seq + 1 << " to " << max_seq;
+
+  for (int i = min_seq + 1; i <= max_seq; ++i) {
     // seq i has been prepared or committed.
-    if (message_manager_->GetTransactionState(i) >=
-        TransactionStatue::READY_COMMIT) {
+    if (checkpoint_manager_->IsCommitted(i)) {
       std::vector<RequestInfo> proof_info =
           message_manager_->GetPreparedProof(i);
       assert(proof_info.size() >= config_.GetMinDataReceiveNum());
       auto txn = view_change_message.add_prepared_msg();
       txn->set_seq(i);
+      LOG(ERROR)<<" get seq:"<<i<<" state: commit";
       for (const auto& info : proof_info) {
         auto proof = txn->add_proof();
         *proof->mutable_request() = *info.request;
@@ -509,6 +530,7 @@ void ViewChangeManager::SendViewChangeMsg() {
   std::unique_ptr<Request> request = NewRequest(
       Request::TYPE_VIEWCHANGE, Request(), config_.GetSelfInfo().id());
   view_change_message.SerializeToString(request->mutable_data());
+  LOG(ERROR) << "Broadcast view change";
   replica_communicator_->BroadCast(*request);
 }
 
@@ -571,6 +593,7 @@ void ViewChangeManager::MonitoringViewChangeTimeOut() {
       // [DK9] if the primary cannot get enough viewchange messages before the
       // timer is out, then it broadcasts its viewchanges messages and starts
       // the timer again.
+      LOG(ERROR) << "status :"<<status_;
       if (status_ == ViewChangeStatus::READY_VIEW_CHANGE &&
           viewchange_timeout->view == system_info_->GetCurrentView()) {
         LOG(ERROR) << "It is time to rebroacast viewchange messages";
diff --git a/platform/consensus/ordering/pbft/viewchange_manager.h 
b/platform/consensus/ordering/pbft/viewchange_manager.h
index 4cc2b7cd..880ab486 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager.h
+++ b/platform/consensus/ordering/pbft/viewchange_manager.h
@@ -107,6 +107,7 @@ class ViewChangeManager {
   void AddNewViewTimer();
   void CheckComplaintTimeout();
   void SetDuplicateManager(DuplicateManager* manager);
+  void SetCurrentViewAndNewPrimary(uint64_t view_number);
 
  private:
   void SendViewChangeMsg();
@@ -115,7 +116,6 @@ class ViewChangeManager {
   uint32_t AddRequest(const ViewChangeMessage& viewchange_message,
                       uint32_t sender);
   bool IsNextPrimary(uint64_t view_number);
-  void SetCurrentViewAndNewPrimary(uint64_t view_number);
   std::vector<std::unique_ptr<Request>> GetPrepareMsg(
       const NewViewMessage& new_view_message, bool need_sign = true);
 
diff --git a/platform/consensus/recovery/recovery.cpp 
b/platform/consensus/recovery/recovery.cpp
index 8c265b25..62923f63 100644
--- a/platform/consensus/recovery/recovery.cpp
+++ b/platform/consensus/recovery/recovery.cpp
@@ -114,8 +114,8 @@ void Recovery::UpdateStableCheckPoint() {
   }
   while (!stop_) {
     int64_t latest_ckpt = checkpoint_->GetStableCheckpoint();
-    LOG(ERROR) << "get stable ckpt:" << latest_ckpt;
-    if (last_ckpt_ == latest_ckpt) {
+    LOG(ERROR) << "get stable ckpt:" << latest_ckpt<<" last:"<<last_ckpt_;
+    if (last_ckpt_ >= latest_ckpt) {
       sleep(recovery_ckpt_time_s_);
       continue;
     }
@@ -127,7 +127,7 @@ void Recovery::UpdateStableCheckPoint() {
 void Recovery::GetLastFile() {
   std::string dir = std::filesystem::path(file_path_).parent_path();
   last_ckpt_ = -1;
-  int m_time_s = 0;
+  uint64_t m_time_s = 0;
   for (const auto& entry : std::filesystem::directory_iterator(dir)) {
     std::string dir = std::filesystem::path(entry.path()).parent_path();
     std::string file_name = std::filesystem::path(entry.path()).stem();
@@ -147,11 +147,11 @@ void Recovery::GetLastFile() {
     LOG(ERROR) << "get path:" << entry.path() << " min:" << min_seq
                << " time:" << time_s;
     if (min_seq == -1) {
-      if (last_ckpt_ == -1 || m_time_s < time_s) {
+      if (m_time_s < time_s) {
         file_path_ = entry.path();
         last_ckpt_ = ckpt;
+        LOG(ERROR) << "get last path:" << file_name << " min:" << min_seq<<" 
time_s:"<<time_s<<" min:"<<m_time_s;
         m_time_s = time_s;
-        LOG(ERROR) << "get last path:" << file_name << " min:" << min_seq;
       }
     }
   }
@@ -234,7 +234,7 @@ void Recovery::OpenFile(const std::string& path) {
   }
 
   lseek(fd_, 0, SEEK_END);
-  LOG(INFO) << "open file:" << path << " pos:" << lseek(fd_, 0, SEEK_CUR)
+  LOG(ERROR) << "open file:" << path << " pos:" << lseek(fd_, 0, SEEK_CUR)
             << " fd:" << fd_;
   assert(fd_ >= 0);
 }
@@ -242,6 +242,7 @@ void Recovery::OpenFile(const std::string& path) {
 void Recovery::WriteSystemInfo() {
   int view = system_info_->GetCurrentView();
   int primary_id = system_info_->GetPrimaryId();
+  LOG(ERROR)<<"write system info:"<<primary_id<<" view:"<<view;
   SystemInfoData data;
   data.set_view(view);
   data.set_primary_id(primary_id);
@@ -261,7 +262,6 @@ void Recovery::AddRequest(const Context* context, const 
Request* request) {
     case Request::TYPE_PRE_PREPARE:
     case Request::TYPE_PREPARE:
     case Request::TYPE_COMMIT:
-    case Request::TYPE_CHECKPOINT:
     case Request::TYPE_NEWVIEW:
       return WriteLog(context, request);
     default:
@@ -270,6 +270,7 @@ void Recovery::AddRequest(const Context* context, const 
Request* request) {
 }
 
 void Recovery::WriteLog(const Context* context, const Request* request) {
+  LOG(ERROR)<<" write log request type:"<<request->type();
   std::string data;
   if (request) {
     request->SerializeToString(&data);
@@ -405,7 +406,6 @@ Recovery::GetRecoveryFiles(int64_t ckpt) {
     }
   }
   LOG(ERROR) << "file max ckpt:" << last_ckpt << " storage ckpt:" << ckpt;
-  last_ckpt = std::min(last_ckpt, ckpt);
   std::vector<std::pair<int64_t, std::string>> list;
 
   std::vector<std::pair<int64_t, std::string>> e_list;
@@ -499,10 +499,10 @@ void Recovery::ReadLogsFromFiles(
       LOG(ERROR) << "parse info fail:" << data.size();
       return;
     }
-    LOG(INFO) << "read system info:" << info.DebugString();
-    if (file_idx == 0) {
-      system_callback(info);
-    }
+    LOG(ERROR) << "read system info:" << info.DebugString();
+    //if (file_idx == 0) {
+    system_callback(info);
+    //}
   }
 
   std::vector<std::unique_ptr<RecoveryData>> request_list;
@@ -531,14 +531,11 @@ void Recovery::ReadLogsFromFiles(
   }
   uint64_t max_seq = 0;
   for (std::unique_ptr<RecoveryData>& recovery_data : request_list) {
-    // LOG(ERROR)<<" ckpt :"<<ckpt<<" recovery data
-    // seq:"<<recovery_data->request->seq();
-    if (ckpt < recovery_data->request->seq()) {
+     LOG(ERROR)<<" ckpt :"<<ckpt<<" recovery data 
seq:"<<recovery_data->request->seq()<<" type:"<<recovery_data->request->type();
+    if (ckpt < recovery_data->request->seq() || recovery_data->request->type() 
== Request::TYPE_NEWVIEW) {
       recovery_data->request->set_is_recovery(true);
       max_seq = recovery_data->request->seq();
-      // LOG(ERROR)<<" ??? call back:"<<recovery_data->request->seq()<<"
-      // call_back:"<<(call_back?"1":"0"); InsertCache(*recovery_data->context,
-      // *recovery_data->request);
+      LOG(ERROR)<<" ??? call back:"<<recovery_data->request->seq()<<" 
call_back:"<<(call_back?"1":"0"); 
       call_back(std::move(recovery_data->context),
                 std::move(recovery_data->request));
     }
@@ -611,13 +608,14 @@ Recovery::GetDataFromRecoveryFiles(uint64_t need_min_seq,
   std::map<uint64_t, std::vector<std::pair<std::unique_ptr<Context>,
                                            std::unique_ptr<Request>>>>
       res;
+  int is_in = 0;
   for (const auto& path : list) {
     ReadLogsFromFiles(
         path.second, need_min_seq - 1, 0, [&](const SystemInfoData& data) {},
         [&](std::unique_ptr<Context> context,
             std::unique_ptr<Request> request) {
-          LOG(ERROR) << "check get data from recovery file seq:"
-                     << request->seq();
+          //LOG(ERROR) << "check get data from recovery file seq:"
+          //           << request->seq();
           if (request->seq() >= need_min_seq &&
               request->seq() <= need_max_seq) {
             LOG(ERROR) << "get data from recovery file seq:" << request->seq();
diff --git a/platform/networkstrate/consensus_manager.cpp 
b/platform/networkstrate/consensus_manager.cpp
index b3fb1062..600b9452 100644
--- a/platform/networkstrate/consensus_manager.cpp
+++ b/platform/networkstrate/consensus_manager.cpp
@@ -141,7 +141,9 @@ void ConsensusManager::SendHeartBeat() {
   LOG(ERROR) << " server:" << config_.GetSelfInfo().id() << " sends HB"
              << " is ready:" << is_ready_
              << " client size:" << client_replicas.size()
-             << " svr size:" << replicas.size();
+             << " svr size:" << replicas.size() 
+             << "  primary:"<<hb_info.primary()
+             << " version:" << hb_info.version();
 
   Request request;
   request.set_type(Request::TYPE_HEART_BEAT);
diff --git a/platform/proto/checkpoint_info.proto 
b/platform/proto/checkpoint_info.proto
index 6d4de8ae..d842d7a6 100644
--- a/platform/proto/checkpoint_info.proto
+++ b/platform/proto/checkpoint_info.proto
@@ -35,6 +35,8 @@ message CheckPointData {
   SignatureInfo hash_signature = 3;
   repeated bytes hashs = 4;
   repeated uint64 seqs = 5;
+  uint64 primary_id = 6;
+  uint64 view = 7;
 }
 
 message StableCheckPoint {
diff --git a/platform/proto/resdb.proto b/platform/proto/resdb.proto
index 958b3295..12347b09 100644
--- a/platform/proto/resdb.proto
+++ b/platform/proto/resdb.proto
@@ -90,6 +90,8 @@ message Request {
     int64 create_time = 24;
     int64 commit_time = 25;
     bytes data_hash = 26;
+    bool force_recovery =27;
+
 }
 
 // The response message containing response
diff --git a/scripts/deploy/config/pbft.config 
b/scripts/deploy/config/pbft.config
index e4fe4960..1de013ab 100644
--- a/scripts/deploy/config/pbft.config
+++ b/scripts/deploy/config/pbft.config
@@ -1,21 +1,21 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#  http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
 
 {
   "clientBatchNum": 100,
diff --git a/service/tools/config/interface/service.config 
b/service/tools/config/interface/service.config
index a437dbc7..a6fd052b 100644
--- a/service/tools/config/interface/service.config
+++ b/service/tools/config/interface/service.config
@@ -19,7 +19,7 @@
 "replica_info":[
 {
 "id":5,
-"ip":"127.0.0.1",
+"ip":"172.31.57.186",
 "port":17005
 }
 ]
diff --git a/tools/generate_region_config.py b/tools/generate_region_config.py
index 417499b8..e012f389 100644
--- a/tools/generate_region_config.py
+++ b/tools/generate_region_config.py
@@ -18,6 +18,7 @@
 import os
 import json
 import sys
+import re
 from platform.proto.replica_info_pb2 import ResConfigData,ReplicaInfo
 from google.protobuf.json_format import MessageToJson
 from google.protobuf.json_format import Parse, ParseDict
@@ -64,13 +65,23 @@ def GenerateJsonConfig(file_name, output_file, 
template_file):
         old_json=json.loads(s)
 
       template_json = {}
-      with open(template_file) as f:
-        lines=f.readlines()
-        for l in lines:
-          l=l.strip()
-        s=''.join(lines)
-        template_json=json.loads(s)
-  
+      with open(template_file, 'r', encoding='utf-8') as f:
+        content = f.read()
+
+      content = re.sub(r'/\*(.*?)\*/', '', content, flags=re.DOTALL)
+
+      lines = content.splitlines()
+      clean_lines = []
+      for line in lines:
+          line = re.sub(r'//.*', '', line) 
+          if line.strip(): 
+              clean_lines.append(line)
+
+      clean_content = '\n'.join(clean_lines)
+
+      # 解析清理后的 JSON
+      template_json = json.loads(clean_content)
+
       for (k,v) in template_json.items():
         old_json[k] = v
 

Reply via email to