This is an automated email from the ASF dual-hosted git repository.

junchao pushed a commit to branch recovery_ckpt
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/recovery_ckpt by this push:
     new 0ef350c5 Chenyi (#201)
0ef350c5 is described below

commit 0ef350c55461e70d82a8eb381c2829414a405958
Author: cjcchen <[email protected]>
AuthorDate: Tue Nov 25 00:27:16 2025 +0800

    Chenyi (#201)
    
    * Chenyi lastest executed seq num each replica
    
    * Chenyi lastest executed seq num each replica update file route
    
    * Chenyi update create file
    
    * Chenyi update write in checkpoint
    
    * Chenyi update write in checkpoint
    
    * Chenyi update in checkpoint_manager
    
    * Chenyi update in checkpoint_manager
    
    * Chenyi update in checkpoint_manager
    
    * Chenyi update in checkpoint_manager with ofstream, solved executor_
    
    * Chenyi update in checkpoint_manager with ofstream, solved executor_, with 
test draft, seems deadlock
    
    * Chenyi update in checkpoint_manager with ofstream, solved executor_, with 
test draft
    
    * Chenyi update in checkpoint_manager with ofstream, solved executor_, with 
test draft
    
    * Chenyi update in checkpoint_manager with ofstream, with test draft
    
    ---------
    
    Co-authored-by: cyzhoutt <[email protected]>
---
 platform/consensus/checkpoint/checkpoint.h         |  1 +
 platform/consensus/checkpoint/mock_checkpoint.h    |  1 +
 .../consensus/execution/transaction_executor.cpp   | 17 +++++++++++
 .../consensus/execution/transaction_executor.h     |  6 +++-
 .../consensus/ordering/pbft/checkpoint_manager.cpp | 22 ++++++++++++++
 .../consensus/ordering/pbft/checkpoint_manager.h   |  3 ++
 .../ordering/pbft/checkpoint_manager_test.cpp      | 34 ++++++++++++++++++++++
 .../consensus/ordering/pbft/message_manager.cpp    | 14 ++++++++-
 platform/consensus/recovery/recovery.cpp           | 25 ++++++++++++++++
 platform/consensus/recovery/recovery.h             |  2 +-
 platform/consensus/recovery/recovery_test.cpp      |  2 +-
 11 files changed, 123 insertions(+), 4 deletions(-)

diff --git a/platform/consensus/checkpoint/checkpoint.h 
b/platform/consensus/checkpoint/checkpoint.h
index 7a5b967c..f21b7ff0 100644
--- a/platform/consensus/checkpoint/checkpoint.h
+++ b/platform/consensus/checkpoint/checkpoint.h
@@ -27,6 +27,7 @@ class CheckPoint {
   virtual ~CheckPoint() = default;
 
   virtual uint64_t GetStableCheckpoint() = 0;
+  // virtual uint64_t GetLastExecutedSeq() = 0;
 };
 
 }  // namespace resdb
diff --git a/platform/consensus/checkpoint/mock_checkpoint.h 
b/platform/consensus/checkpoint/mock_checkpoint.h
index 837d895c..4db24f6d 100644
--- a/platform/consensus/checkpoint/mock_checkpoint.h
+++ b/platform/consensus/checkpoint/mock_checkpoint.h
@@ -28,6 +28,7 @@ namespace resdb {
 class MockCheckPoint : public CheckPoint {
  public:
   MOCK_METHOD(uint64_t, GetStableCheckpoint, (), (override));
+  // MOCK_METHOD(uint64_t, GetLastExecutedSeq, (), (override));
 };
 
 }  // namespace resdb
diff --git a/platform/consensus/execution/transaction_executor.cpp 
b/platform/consensus/execution/transaction_executor.cpp
index a62e55f5..d2661828 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -315,6 +315,10 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
            }
       FinishExecute(request->seq());
 
+      if (response != nullptr || !response_v.empty()){
+        std::cout<<2<<"testing"<<request->seq()<<std::endl;
+        set_OnExecuteSuccess(request->seq());
+      }
       if(response == nullptr){
              response = std::make_unique<BatchUserResponse>();
              for (auto& s : response_v) {
@@ -350,6 +354,19 @@ void 
TransactionExecutor::SetDuplicateManager(DuplicateManager* manager) {
   duplicate_manager_ = manager;
 }
 
+void TransactionExecutor::set_OnExecuteSuccess(uint64_t seq) {
+  // Monotonic update: only move forward.
+  uint64_t cur = latest_executed_seq_.load(std::memory_order_relaxed);
+  while (seq > cur && !latest_executed_seq_.compare_exchange_weak(
+            cur, seq, std::memory_order_release, std::memory_order_relaxed)) {
+    /* retry with updated `cur` */
+  }
+  // latest_executed_seq_ = seq;
+}
+
+uint64_t TransactionExecutor::get_latest_executed_seq() const {
+  return latest_executed_seq_.load(std::memory_order_acquire);
+}
 
 bool TransactionExecutor::SetFlag(uint64_t uid, int f) {
   std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
diff --git a/platform/consensus/execution/transaction_executor.h 
b/platform/consensus/execution/transaction_executor.h
index 6fb8ef39..93f134a1 100644
--- a/platform/consensus/execution/transaction_executor.h
+++ b/platform/consensus/execution/transaction_executor.h
@@ -20,6 +20,8 @@
 #pragma once
 #include <functional>
 #include <thread>
+#include <atomic>
+#include <cstdint>
 
 #include "executor/common/transaction_manager.h"
 #include "platform/common/queue/lock_free_queue.h"
@@ -34,6 +36,8 @@ namespace resdb {
 // Execute the requests that may contain system information or user requests.
 class TransactionExecutor {
  public:
+  void set_OnExecuteSuccess(uint64_t seq);
+  uint64_t get_latest_executed_seq() const;
   typedef std::function<void(std::unique_ptr<Request>,
                              std::unique_ptr<BatchUserResponse> resp)>
       PostExecuteFunc;
@@ -75,7 +79,7 @@ class TransactionExecutor {
  private:
   void Execute(std::unique_ptr<Request> request, bool need_execute = true);
   void OnlyExecute(std::unique_ptr<Request> request);
-
+  std::atomic<uint64_t> latest_executed_seq_{0};
   std::unique_ptr<std::string> DoExecute(const Request& request);
   void OrderMessage();
   void ExecuteMessage();
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp 
b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
index a5a24ca8..3ee62fe0 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
@@ -317,10 +317,28 @@ void CheckPointManager::UpdateCheckPointStatus() {
 
     if (current_seq == last_ckpt_seq + water_mark) {
       last_ckpt_seq = current_seq;
+      if (executor_) {         
+        last_executed_seq_ = executor_->get_latest_executed_seq();
+        std::cout<<"In checkpoint"<<std::endl;    
+        
+      }
       if (!is_recovery) {
         BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs,
                             stable_seqs);
       }
+      if(is_recovery){
+        std::cout<<"last_executed_seq_: "<<last_executed_seq_<<std::endl;
+        std::string temp_dir = "/tmp";
+        std::string file_path = temp_dir + "/latest_seqnum.txt";
+        // std::ofstream 
log_file("/home/ubuntu/.cache/bazel/_bazel_ubuntu/latest_seqnum.txt");
+        std::ofstream log_file(file_path, std::ios::app); 
+        if (!log_file.is_open()) { 
+          std::cerr << "Error: Could not open the log file." << 
std::strerror(errno) << std::endl; 
+        } 
+        log_file << "Lastest_seqnum: " << last_executed_seq_ << std::endl; 
+        log_file.flush(); 
+        log_file.close();
+      }
     }
   }
   return;
@@ -378,4 +396,8 @@ uint64_t CheckPointManager::GetCommittableSeq() {
   return committable_seq_;
 }
 
+// void CheckPointManager::SetLastExecutedSeq(uint64_t latest_executed_seq){
+//   latest_executed_seq = executor_->get_latest_executed_seq();
+// }
+
 }  // namespace resdb
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.h 
b/platform/consensus/ordering/pbft/checkpoint_manager.h
index e043978e..ddcc2fbc 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.h
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.h
@@ -49,6 +49,7 @@ class CheckPointManager : public CheckPoint {
                         std::unique_ptr<Request> request);
 
   uint64_t GetStableCheckpoint() override;
+//   void SetLastExecutedSeq(uint64_t latest_executed_seq);
   StableCheckPoint GetStableCheckpointWithVotes();
   bool IsValidCheckpointProof(const StableCheckPoint& stable_ckpt);
 
@@ -74,6 +75,7 @@ class CheckPointManager : public CheckPoint {
   uint64_t GetCommittableSeq();
 
  private:
+  
   void UpdateCheckPointStatus();
   void UpdateStableCheckPointStatus();
   void BroadcastCheckPoint(uint64_t seq, const std::string& hash,
@@ -84,6 +86,7 @@ class CheckPointManager : public CheckPoint {
   bool Wait();
 
  protected:
+  uint64_t last_executed_seq_ = 0;
   ResDBConfig config_;
   ReplicaCommunicator* replica_communicator_;
   std::unique_ptr<ChainState> txn_db_;
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp 
b/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
index f302e819..eaf6c882 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
@@ -124,6 +124,40 @@ TEST_F(CheckPointManagerTest, SendCheckPoint) {
   LOG(ERROR) << "done";
 }
 
+TEST_F(CheckPointManagerTest, latestSeqCheckpointInRecovery) {
+  auto done = std::make_shared<std::promise<void>>();
+  std::future<void> done_future = done->get_future();
+  
+  std::unique_ptr<TransactionManager> trmanager = 
std::make_unique<TransactionManager>(false, true);
+  std::unique_ptr<SystemInfo> system_info = 
std::make_unique<SystemInfo>(config_);
+  std::unique_ptr<TransactionExecutor> exe = 
std::make_unique<TransactionExecutor>(
+    config_,
+    [done, count = 
std::make_shared<std::atomic<int>>(0)](std::unique_ptr<Request> /*req*/,
+                                                        
std::unique_ptr<BatchUserResponse> /*resp*/) {
+    // count 6 requests (0..5). Adjust logic to what's “done” for you.
+      if (++(*count) >= 5) {
+        done->set_value();
+      }
+    },
+    system_info.get(), std::move(trmanager));
+  CheckPointManager manager(config_, &replica_communicator_, nullptr);
+  manager.SetExecutor(exe.get());
+
+  //execute once
+  
+  for (int i = 0; i <= 4; ++i) {
+    std::unique_ptr<Request> request = std::make_unique<Request>();
+    request->set_seq(i);
+    exe->Commit(std::move(request));
+  }
+  std::unique_ptr<Request> request = std::make_unique<Request>();
+  request->set_seq(5);
+  request->set_is_recovery(true);
+  exe->Commit(std::move(request));
+  done_future.wait();
+}
+
+
 TEST_F(CheckPointManagerTest, SendCheckPointOnce) {
   std::promise<bool> propose_done;
   std::future<bool> propose_done_future = propose_done.get_future();
diff --git a/platform/consensus/ordering/pbft/message_manager.cpp 
b/platform/consensus/ordering/pbft/message_manager.cpp
index cc5e187c..44b1c25a 100644
--- a/platform/consensus/ordering/pbft/message_manager.cpp
+++ b/platform/consensus/ordering/pbft/message_manager.cpp
@@ -40,7 +40,19 @@ MessageManager::MessageManager(
               std::unique_ptr<BatchUserResponse> resp_msg) {
             if (request->is_recovery()) {
               if (checkpoint_manager_) {
-                checkpoint_manager_->AddCommitData(std::move(request));
+                // checkpoint_manager_->AddCommitData(std::move(request));
+                // std::cout<<"In message here"<<std::endl;
+                // uint64_t latest_executed_seq = 
checkpoint_manager_->latest_executed_seq_;
+                // std::string temp_dir = "/tmp";
+                // std::string file_path = temp_dir + "/latest_seqnum.txt";
+                // std::ofstream log_file(file_path, std::ios::app);
+                // // std::ofstream log_file("latest_seqnum.txt"); 
+                // if (!log_file.is_open()) { 
+                //   std::cerr << "Error: Could not open the log file." << 
std::strerror(errno) << std::endl; 
+                // } 
+                // log_file << "Lastest_seqnum: " << latest_executed_seq << 
std::endl; 
+                // log_file.flush(); 
+                // log_file.close();
               }
               return;
             }
diff --git a/platform/consensus/recovery/recovery.cpp 
b/platform/consensus/recovery/recovery.cpp
index 51faad5c..5ea24420 100644
--- a/platform/consensus/recovery/recovery.cpp
+++ b/platform/consensus/recovery/recovery.cpp
@@ -24,6 +24,9 @@
 #include <sys/stat.h>
 #include <sys/types.h>
 #include <unistd.h>
+#include <fstream>
+#include <stdio.h>
+#include <iostream>
 
 #include <filesystem>
 
@@ -251,9 +254,26 @@ void Recovery::AddRequest(const Context* context, const 
Request* request) {
   }
   switch (request->type()) {
     case Request::TYPE_PRE_PREPARE:
+      // break;
     case Request::TYPE_PREPARE:
+      // break;
     case Request::TYPE_COMMIT:
+      // break;
     case Request::TYPE_CHECKPOINT:
+    // {
+    //   uint64_t latest_executed_seq = get_latest_executed_seq_recov();
+    //   std::string temp_dir = "/tmp";
+    //   std::string file_path = temp_dir + "/latest_seqnum.txt";
+    //   std::ofstream log_file(file_path);
+    //   // std::ofstream log_file("latest_seqnum.txt"); 
+    //   if (!log_file.is_open()) { 
+    //     std::cerr << "Error: Could not open the log file." << 
std::strerror(errno) << std::endl; 
+    //   } 
+    //   log_file << "Lastest_seqnum: " << latest_executed_seq << std::endl; 
+    //   log_file.flush(); 
+    //   log_file.close();
+    // }
+      // break;
     case Request::TYPE_NEWVIEW:
       return WriteLog(context, request);
     default:
@@ -261,7 +281,12 @@ void Recovery::AddRequest(const Context* context, const 
Request* request) {
   }
 }
 
+// uint64_t Recovery::get_latest_executed_seq_recov(){
+//   return checkpoint_->GetLastExecutedSeq();
+// }
+
 void Recovery::WriteLog(const Context* context, const Request* request) {
+
   std::string data;
   if (request) {
     request->SerializeToString(&data);
diff --git a/platform/consensus/recovery/recovery.h 
b/platform/consensus/recovery/recovery.h
index 90f8fc99..f5c30cbe 100644
--- a/platform/consensus/recovery/recovery.h
+++ b/platform/consensus/recovery/recovery.h
@@ -40,7 +40,7 @@ class Recovery {
   void Init();
 
   virtual void AddRequest(const Context* context, const Request* request);
-
+  // uint64_t get_latest_executed_seq_recov();
   void ReadLogs(std::function<void(const SystemInfoData& data)> 
system_callback,
                 std::function<void(std::unique_ptr<Context> context,
                                    std::unique_ptr<Request> request)>
diff --git a/platform/consensus/recovery/recovery_test.cpp 
b/platform/consensus/recovery/recovery_test.cpp
index a7cb1ef8..f05cf53a 100644
--- a/platform/consensus/recovery/recovery_test.cpp
+++ b/platform/consensus/recovery/recovery_test.cpp
@@ -179,7 +179,7 @@ TEST_F(RecoveryTest, CheckPoint) {
 
   {
     Recovery recovery(config, &checkpoint_, &system_info_, nullptr);
-
+    
     for (int i = 1; i < 10; ++i) {
       for (int t : types) {
         std::unique_ptr<Request> request =

Reply via email to