This is an automated email from the ASF dual-hosted git repository.
junchao pushed a commit to branch recovery_ckpt
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/recovery_ckpt by this push:
new 0ef350c5 Chenyi (#201)
0ef350c5 is described below
commit 0ef350c55461e70d82a8eb381c2829414a405958
Author: cjcchen <[email protected]>
AuthorDate: Tue Nov 25 00:27:16 2025 +0800
Chenyi (#201)
* Chenyi lastest executed seq num each replica
* Chenyi lastest executed seq num each replica update file route
* Chenyi update create file
* Chenyi update write in checkpoint
* Chenyi update write in checkpoint
* Chenyi update in checkpoint_manager
* Chenyi update in checkpoint_manager
* Chenyi update in checkpoint_manager
* Chenyi update in checkpoint_manager with ofstream, solved executor_
* Chenyi update in checkpoint_manager with ofstream, solved executor_, with
test draft, seems deadlock
* Chenyi update in checkpoint_manager with ofstream, solved executor_, with
test draft
* Chenyi update in checkpoint_manager with ofstream, solved executor_, with
test draft
* Chenyi update in checkpoint_manager with ofstream, with test draft
---------
Co-authored-by: cyzhoutt <[email protected]>
---
platform/consensus/checkpoint/checkpoint.h | 1 +
platform/consensus/checkpoint/mock_checkpoint.h | 1 +
.../consensus/execution/transaction_executor.cpp | 17 +++++++++++
.../consensus/execution/transaction_executor.h | 6 +++-
.../consensus/ordering/pbft/checkpoint_manager.cpp | 22 ++++++++++++++
.../consensus/ordering/pbft/checkpoint_manager.h | 3 ++
.../ordering/pbft/checkpoint_manager_test.cpp | 34 ++++++++++++++++++++++
.../consensus/ordering/pbft/message_manager.cpp | 14 ++++++++-
platform/consensus/recovery/recovery.cpp | 25 ++++++++++++++++
platform/consensus/recovery/recovery.h | 2 +-
platform/consensus/recovery/recovery_test.cpp | 2 +-
11 files changed, 123 insertions(+), 4 deletions(-)
diff --git a/platform/consensus/checkpoint/checkpoint.h
b/platform/consensus/checkpoint/checkpoint.h
index 7a5b967c..f21b7ff0 100644
--- a/platform/consensus/checkpoint/checkpoint.h
+++ b/platform/consensus/checkpoint/checkpoint.h
@@ -27,6 +27,7 @@ class CheckPoint {
virtual ~CheckPoint() = default;
virtual uint64_t GetStableCheckpoint() = 0;
+ // virtual uint64_t GetLastExecutedSeq() = 0;
};
} // namespace resdb
diff --git a/platform/consensus/checkpoint/mock_checkpoint.h
b/platform/consensus/checkpoint/mock_checkpoint.h
index 837d895c..4db24f6d 100644
--- a/platform/consensus/checkpoint/mock_checkpoint.h
+++ b/platform/consensus/checkpoint/mock_checkpoint.h
@@ -28,6 +28,7 @@ namespace resdb {
class MockCheckPoint : public CheckPoint {
public:
MOCK_METHOD(uint64_t, GetStableCheckpoint, (), (override));
+ // MOCK_METHOD(uint64_t, GetLastExecutedSeq, (), (override));
};
} // namespace resdb
diff --git a/platform/consensus/execution/transaction_executor.cpp
b/platform/consensus/execution/transaction_executor.cpp
index a62e55f5..d2661828 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -315,6 +315,10 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
}
FinishExecute(request->seq());
+ if (response != nullptr || !response_v.empty()){
+ std::cout<<2<<"testing"<<request->seq()<<std::endl;
+ set_OnExecuteSuccess(request->seq());
+ }
if(response == nullptr){
response = std::make_unique<BatchUserResponse>();
for (auto& s : response_v) {
@@ -350,6 +354,19 @@ void
TransactionExecutor::SetDuplicateManager(DuplicateManager* manager) {
duplicate_manager_ = manager;
}
+void TransactionExecutor::set_OnExecuteSuccess(uint64_t seq) {
+ // Monotonic update: only move forward.
+ uint64_t cur = latest_executed_seq_.load(std::memory_order_relaxed);
+ while (seq > cur && !latest_executed_seq_.compare_exchange_weak(
+ cur, seq, std::memory_order_release, std::memory_order_relaxed)) {
+ /* retry with updated `cur` */
+ }
+ // latest_executed_seq_ = seq;
+}
+
+uint64_t TransactionExecutor::get_latest_executed_seq() const {
+ return latest_executed_seq_.load(std::memory_order_acquire);
+}
bool TransactionExecutor::SetFlag(uint64_t uid, int f) {
std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
diff --git a/platform/consensus/execution/transaction_executor.h
b/platform/consensus/execution/transaction_executor.h
index 6fb8ef39..93f134a1 100644
--- a/platform/consensus/execution/transaction_executor.h
+++ b/platform/consensus/execution/transaction_executor.h
@@ -20,6 +20,8 @@
#pragma once
#include <functional>
#include <thread>
+#include <atomic>
+#include <cstdint>
#include "executor/common/transaction_manager.h"
#include "platform/common/queue/lock_free_queue.h"
@@ -34,6 +36,8 @@ namespace resdb {
// Execute the requests that may contain system information or user requests.
class TransactionExecutor {
public:
+ void set_OnExecuteSuccess(uint64_t seq);
+ uint64_t get_latest_executed_seq() const;
typedef std::function<void(std::unique_ptr<Request>,
std::unique_ptr<BatchUserResponse> resp)>
PostExecuteFunc;
@@ -75,7 +79,7 @@ class TransactionExecutor {
private:
void Execute(std::unique_ptr<Request> request, bool need_execute = true);
void OnlyExecute(std::unique_ptr<Request> request);
-
+ std::atomic<uint64_t> latest_executed_seq_{0};
std::unique_ptr<std::string> DoExecute(const Request& request);
void OrderMessage();
void ExecuteMessage();
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
index a5a24ca8..3ee62fe0 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
@@ -317,10 +317,28 @@ void CheckPointManager::UpdateCheckPointStatus() {
if (current_seq == last_ckpt_seq + water_mark) {
last_ckpt_seq = current_seq;
+ if (executor_) {
+ last_executed_seq_ = executor_->get_latest_executed_seq();
+ std::cout<<"In checkpoint"<<std::endl;
+
+ }
if (!is_recovery) {
BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs,
stable_seqs);
}
+ if(is_recovery){
+ std::cout<<"last_executed_seq_: "<<last_executed_seq_<<std::endl;
+ std::string temp_dir = "/tmp";
+ std::string file_path = temp_dir + "/latest_seqnum.txt";
+ // std::ofstream
log_file("/home/ubuntu/.cache/bazel/_bazel_ubuntu/latest_seqnum.txt");
+ std::ofstream log_file(file_path, std::ios::app);
+ if (!log_file.is_open()) {
+ std::cerr << "Error: Could not open the log file." <<
std::strerror(errno) << std::endl;
+ }
+ log_file << "Lastest_seqnum: " << last_executed_seq_ << std::endl;
+ log_file.flush();
+ log_file.close();
+ }
}
}
return;
@@ -378,4 +396,8 @@ uint64_t CheckPointManager::GetCommittableSeq() {
return committable_seq_;
}
+// void CheckPointManager::SetLastExecutedSeq(uint64_t latest_executed_seq){
+// latest_executed_seq = executor_->get_latest_executed_seq();
+// }
+
} // namespace resdb
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.h
b/platform/consensus/ordering/pbft/checkpoint_manager.h
index e043978e..ddcc2fbc 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.h
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.h
@@ -49,6 +49,7 @@ class CheckPointManager : public CheckPoint {
std::unique_ptr<Request> request);
uint64_t GetStableCheckpoint() override;
+// void SetLastExecutedSeq(uint64_t latest_executed_seq);
StableCheckPoint GetStableCheckpointWithVotes();
bool IsValidCheckpointProof(const StableCheckPoint& stable_ckpt);
@@ -74,6 +75,7 @@ class CheckPointManager : public CheckPoint {
uint64_t GetCommittableSeq();
private:
+
void UpdateCheckPointStatus();
void UpdateStableCheckPointStatus();
void BroadcastCheckPoint(uint64_t seq, const std::string& hash,
@@ -84,6 +86,7 @@ class CheckPointManager : public CheckPoint {
bool Wait();
protected:
+ uint64_t last_executed_seq_ = 0;
ResDBConfig config_;
ReplicaCommunicator* replica_communicator_;
std::unique_ptr<ChainState> txn_db_;
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
b/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
index f302e819..eaf6c882 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
@@ -124,6 +124,40 @@ TEST_F(CheckPointManagerTest, SendCheckPoint) {
LOG(ERROR) << "done";
}
+TEST_F(CheckPointManagerTest, latestSeqCheckpointInRecovery) {
+ auto done = std::make_shared<std::promise<void>>();
+ std::future<void> done_future = done->get_future();
+
+ std::unique_ptr<TransactionManager> trmanager =
std::make_unique<TransactionManager>(false, true);
+ std::unique_ptr<SystemInfo> system_info =
std::make_unique<SystemInfo>(config_);
+ std::unique_ptr<TransactionExecutor> exe =
std::make_unique<TransactionExecutor>(
+ config_,
+ [done, count =
std::make_shared<std::atomic<int>>(0)](std::unique_ptr<Request> /*req*/,
+
std::unique_ptr<BatchUserResponse> /*resp*/) {
+ // count 6 requests (0..5). Adjust logic to what's “done” for you.
+ if (++(*count) >= 5) {
+ done->set_value();
+ }
+ },
+ system_info.get(), std::move(trmanager));
+ CheckPointManager manager(config_, &replica_communicator_, nullptr);
+ manager.SetExecutor(exe.get());
+
+ //execute once
+
+ for (int i = 0; i <= 4; ++i) {
+ std::unique_ptr<Request> request = std::make_unique<Request>();
+ request->set_seq(i);
+ exe->Commit(std::move(request));
+ }
+ std::unique_ptr<Request> request = std::make_unique<Request>();
+ request->set_seq(5);
+ request->set_is_recovery(true);
+ exe->Commit(std::move(request));
+ done_future.wait();
+}
+
+
TEST_F(CheckPointManagerTest, SendCheckPointOnce) {
std::promise<bool> propose_done;
std::future<bool> propose_done_future = propose_done.get_future();
diff --git a/platform/consensus/ordering/pbft/message_manager.cpp
b/platform/consensus/ordering/pbft/message_manager.cpp
index cc5e187c..44b1c25a 100644
--- a/platform/consensus/ordering/pbft/message_manager.cpp
+++ b/platform/consensus/ordering/pbft/message_manager.cpp
@@ -40,7 +40,19 @@ MessageManager::MessageManager(
std::unique_ptr<BatchUserResponse> resp_msg) {
if (request->is_recovery()) {
if (checkpoint_manager_) {
- checkpoint_manager_->AddCommitData(std::move(request));
+ // checkpoint_manager_->AddCommitData(std::move(request));
+ // std::cout<<"In message here"<<std::endl;
+ // uint64_t latest_executed_seq =
checkpoint_manager_->latest_executed_seq_;
+ // std::string temp_dir = "/tmp";
+ // std::string file_path = temp_dir + "/latest_seqnum.txt";
+ // std::ofstream log_file(file_path, std::ios::app);
+ // // std::ofstream log_file("latest_seqnum.txt");
+ // if (!log_file.is_open()) {
+ // std::cerr << "Error: Could not open the log file." <<
std::strerror(errno) << std::endl;
+ // }
+ // log_file << "Lastest_seqnum: " << latest_executed_seq <<
std::endl;
+ // log_file.flush();
+ // log_file.close();
}
return;
}
diff --git a/platform/consensus/recovery/recovery.cpp
b/platform/consensus/recovery/recovery.cpp
index 51faad5c..5ea24420 100644
--- a/platform/consensus/recovery/recovery.cpp
+++ b/platform/consensus/recovery/recovery.cpp
@@ -24,6 +24,9 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
+#include <fstream>
+#include <stdio.h>
+#include <iostream>
#include <filesystem>
@@ -251,9 +254,26 @@ void Recovery::AddRequest(const Context* context, const
Request* request) {
}
switch (request->type()) {
case Request::TYPE_PRE_PREPARE:
+ // break;
case Request::TYPE_PREPARE:
+ // break;
case Request::TYPE_COMMIT:
+ // break;
case Request::TYPE_CHECKPOINT:
+ // {
+ // uint64_t latest_executed_seq = get_latest_executed_seq_recov();
+ // std::string temp_dir = "/tmp";
+ // std::string file_path = temp_dir + "/latest_seqnum.txt";
+ // std::ofstream log_file(file_path);
+ // // std::ofstream log_file("latest_seqnum.txt");
+ // if (!log_file.is_open()) {
+ // std::cerr << "Error: Could not open the log file." <<
std::strerror(errno) << std::endl;
+ // }
+ // log_file << "Lastest_seqnum: " << latest_executed_seq << std::endl;
+ // log_file.flush();
+ // log_file.close();
+ // }
+ // break;
case Request::TYPE_NEWVIEW:
return WriteLog(context, request);
default:
@@ -261,7 +281,12 @@ void Recovery::AddRequest(const Context* context, const
Request* request) {
}
}
+// uint64_t Recovery::get_latest_executed_seq_recov(){
+// return checkpoint_->GetLastExecutedSeq();
+// }
+
void Recovery::WriteLog(const Context* context, const Request* request) {
+
std::string data;
if (request) {
request->SerializeToString(&data);
diff --git a/platform/consensus/recovery/recovery.h
b/platform/consensus/recovery/recovery.h
index 90f8fc99..f5c30cbe 100644
--- a/platform/consensus/recovery/recovery.h
+++ b/platform/consensus/recovery/recovery.h
@@ -40,7 +40,7 @@ class Recovery {
void Init();
virtual void AddRequest(const Context* context, const Request* request);
-
+ // uint64_t get_latest_executed_seq_recov();
void ReadLogs(std::function<void(const SystemInfoData& data)>
system_callback,
std::function<void(std::unique_ptr<Context> context,
std::unique_ptr<Request> request)>
diff --git a/platform/consensus/recovery/recovery_test.cpp
b/platform/consensus/recovery/recovery_test.cpp
index a7cb1ef8..f05cf53a 100644
--- a/platform/consensus/recovery/recovery_test.cpp
+++ b/platform/consensus/recovery/recovery_test.cpp
@@ -179,7 +179,7 @@ TEST_F(RecoveryTest, CheckPoint) {
{
Recovery recovery(config, &checkpoint_, &system_info_, nullptr);
-
+
for (int i = 1; i < 10; ++i) {
for (int t : types) {
std::unique_ptr<Request> request =