This is an automated email from the ASF dual-hosted git repository.
junchao pushed a commit to branch recovery_ckpt
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/recovery_ckpt by this push:
new 45ad21d6 add checkpoint recovery
45ad21d6 is described below
commit 45ad21d6e3b8446ad54862d8586ba927900fcf80
Author: Ubuntu <[email protected]>
AuthorDate: Wed Dec 10 17:38:52 2025 +0000
add checkpoint recovery
---
chain/storage/leveldb.cpp | 50 +++++++-
chain/storage/leveldb.h | 8 ++
chain/storage/storage.h | 2 +
executor/common/transaction_manager.cpp | 2 +
executor/common/transaction_manager.h | 4 +-
executor/kv/kv_executor.cpp | 6 +-
executor/kv/kv_executor.h | 2 -
.../consensus/execution/transaction_executor.cpp | 49 +++++---
.../consensus/execution/transaction_executor.h | 5 +-
.../consensus/ordering/pbft/checkpoint_manager.cpp | 136 +++++++++++++++++----
.../consensus/ordering/pbft/checkpoint_manager.h | 17 ++-
platform/consensus/ordering/pbft/commitment.cpp | 2 +-
.../ordering/pbft/consensus_manager_pbft.cpp | 102 +++++++++++++++-
.../ordering/pbft/consensus_manager_pbft.h | 9 ++
.../ordering/pbft/lock_free_collector_pool.cpp | 12 ++
.../ordering/pbft/lock_free_collector_pool.h | 1 +
.../consensus/ordering/pbft/message_manager.cpp | 11 ++
platform/consensus/ordering/pbft/message_manager.h | 2 +
.../ordering/pbft/transaction_collector.cpp | 4 +-
platform/consensus/recovery/recovery.cpp | 114 ++++++++++++++++-
platform/consensus/recovery/recovery.h | 17 ++-
platform/proto/resdb.proto | 4 +-
scripts/deploy/config/kv_server.conf | 10 +-
scripts/deploy/config/pbft.config | 19 ---
scripts/deploy/script/deploy.sh | 5 +-
service/kv/kv_service.cpp | 14 ++-
tools/generate_region_config.py | 1 +
27 files changed, 516 insertions(+), 92 deletions(-)
diff --git a/chain/storage/leveldb.cpp b/chain/storage/leveldb.cpp
index f000d6c0..f2de42d7 100644
--- a/chain/storage/leveldb.cpp
+++ b/chain/storage/leveldb.cpp
@@ -64,7 +64,9 @@ ResLevelDB::ResLevelDB(std::optional<LevelDBInfo> config) {
LOG(ERROR) << "initialized block cache" << std::endl;
}
global_stats_ = Stats::GetGlobalStats();
+ last_ckpt_ = 0;
CreateDB(path);
+ last_ckpt_ = GetLastCheckpointInternal();
}
void ResLevelDB::CreateDB(const std::string& path) {
@@ -95,6 +97,7 @@ ResLevelDB::~ResLevelDB() {
int ResLevelDB::SetValueWithSeq(const std::string& key,
const std::string& value, uint64_t seq) {
+ LOG(ERROR) << "set value seq:" << seq;
std::string value_str = GetValue(key);
ValueHistory history;
if (!history.ParseFromString(value_str)) {
@@ -109,6 +112,9 @@ int ResLevelDB::SetValueWithSeq(const std::string& key,
if (last_seq > seq) {
LOG(ERROR) << "seq is small, last:" << last_seq << " new seq:" << seq;
+
+ UpdateLastCkpt(last_seq);
+
return -2;
}
@@ -120,9 +126,14 @@ int ResLevelDB::SetValueWithSeq(const std::string& key,
history.mutable_value()->erase(history.mutable_value()->begin());
}
- LOG(ERROR)<<" set value, string:"<<key<<" seq:"<<seq;
+ LOG(ERROR)<<" set value, string:"<<key<<" seq:"<<seq<<" last seq:"<<last_seq;
history.SerializeToString(&value_str);
- return SetValue(key, value_str);
+ int ret = SetValue(key, value_str);
+ if(ret) {
+ return ret;
+ }
+ UpdateLastCkpt(seq);
+ return 0;
}
std::pair<std::string, uint64_t> ResLevelDB::GetValueWithSeq(
@@ -396,5 +407,40 @@ std::vector<std::pair<std::string, int>>
ResLevelDB::GetTopHistory(
return resp;
}
+const std::string ckpt_key = "leveldb_checkpoint";
+
+void ResLevelDB::UpdateLastCkpt(uint64_t seq) {
+ LOG(ERROR)<<" update ckpt seq:"<<seq<<" last:"<<last_ckpt_<<" update
time:"<<update_time_;
+ if(last_ckpt_ > seq) {
+ return;
+ }
+ last_ckpt_ = seq;
+ update_time_++;
+ if(update_time_%100 == 0 && last_ckpt_>0){
+ SetLastCheckpoint(last_ckpt_);
+ update_time_ = 0;
+ }
+}
+
+int ResLevelDB::SetLastCheckpoint(uint64_t ckpt) {
+ LOG(ERROR)<<" update last ckpt :"<<ckpt;
+ return SetValue(ckpt_key, std::to_string(ckpt));
+}
+
+uint64_t ResLevelDB::GetLastCheckpointInternal() {
+ std::string value = GetValue(ckpt_key);
+ if(value.empty()) {
+ return 0;
+ }
+ return std::stoll(value);
+}
+
+uint64_t ResLevelDB::GetLastCheckpoint() {
+ if(last_ckpt_ >0) {
+ return last_ckpt_;
+ }
+ return GetLastCheckpointInternal();
+}
+
} // namespace storage
} // namespace resdb
diff --git a/chain/storage/leveldb.h b/chain/storage/leveldb.h
index 3cf97c50..064a9a7d 100644
--- a/chain/storage/leveldb.h
+++ b/chain/storage/leveldb.h
@@ -73,8 +73,14 @@ class ResLevelDB : public Storage {
bool Flush() override;
+ virtual uint64_t GetLastCheckpoint() override;
+
+ virtual int SetLastCheckpoint(uint64_t ckpt);
+
private:
void CreateDB(const std::string& path);
+ uint64_t GetLastCheckpointInternal();
+ void UpdateLastCkpt(uint64_t seq);
private:
std::unique_ptr<leveldb::DB> db_ = nullptr;
@@ -85,6 +91,8 @@ class ResLevelDB : public Storage {
protected:
Stats* global_stats_ = nullptr;
std::unique_ptr<LRUCache<std::string, std::string>> block_cache_;
+ uint64_t last_ckpt_;
+ int update_time_ = 0;
};
} // namespace storage
diff --git a/chain/storage/storage.h b/chain/storage/storage.h
index 445f0c1f..a4a7309e 100644
--- a/chain/storage/storage.h
+++ b/chain/storage/storage.h
@@ -58,6 +58,8 @@ class Storage {
const std::string& key, int number) = 0;
virtual bool Flush() { return true; };
+
+ virtual uint64_t GetLastCheckpoint() { return 0; }
void SetMaxHistoryNum(int num) { max_history_ = num; }
protected:
diff --git a/executor/common/transaction_manager.cpp
b/executor/common/transaction_manager.cpp
index 372a00f3..6cd96037 100644
--- a/executor/common/transaction_manager.cpp
+++ b/executor/common/transaction_manager.cpp
@@ -64,6 +64,7 @@ std::unique_ptr<std::string>
TransactionManager::ExecuteRequest(
std::vector<std::unique_ptr<std::string>>
TransactionManager::ExecuteBatchDataWithSeq(
uint64_t seq, const
std::vector<std::unique_ptr<google::protobuf::Message>>& requests) {
+ seq_ = seq;
return ExecuteBatchData(requests);
}
@@ -85,6 +86,7 @@ std::vector<std::unique_ptr<std::string>>
TransactionManager::ExecuteBatchData(
std::unique_ptr<BatchUserResponse> TransactionManager::ExecuteBatchWithSeq(
uint64_t seq, const BatchUserRequest& request) {
+ LOG(ERROR)<<" execute batch seq:"<<seq_;
seq_ = seq;
return ExecuteBatch(request);
}
diff --git a/executor/common/transaction_manager.h
b/executor/common/transaction_manager.h
index c0436faf..48c1fc2c 100644
--- a/executor/common/transaction_manager.h
+++ b/executor/common/transaction_manager.h
@@ -55,7 +55,7 @@ class TransactionManager {
bool NeedResponse();
- virtual Storage* GetStorage() { return nullptr; };
+ virtual Storage* GetStorage() { return storage_ ? storage_.get(): nullptr; }
protected:
virtual std::unique_ptr<google::protobuf::Message> ParseData(
@@ -63,6 +63,8 @@ class TransactionManager {
virtual std::unique_ptr<std::string> ExecuteRequest(
const google::protobuf::Message& request);
uint64_t seq_ = 0;
+
+ std::unique_ptr<Storage> storage_;
private:
bool is_out_of_order_ = false;
bool need_response_ = true;
diff --git a/executor/kv/kv_executor.cpp b/executor/kv/kv_executor.cpp
index d1e7bea8..d31a65fa 100644
--- a/executor/kv/kv_executor.cpp
+++ b/executor/kv/kv_executor.cpp
@@ -24,8 +24,9 @@
namespace resdb {
-KVExecutor::KVExecutor(std::unique_ptr<Storage> storage)
- : storage_(std::move(storage)) {
+KVExecutor::KVExecutor(std::unique_ptr<Storage> storage){
+ storage_=std::move(storage);
+ LOG(ERROR)<<" init storage:"<<storage_.get();
contract_manager_ =
std::make_unique<resdb::contract::ContractTransactionManager>(storage_.get());
}
@@ -135,6 +136,7 @@ std::unique_ptr<std::string> KVExecutor::ExecuteData(
}
void KVExecutor::Set(const std::string& key, const std::string& value) {
+ LOG(ERROR)<<" set key:"<<key<<" seq:"<<seq_;
storage_->SetValueWithSeq(key, value, seq_);
}
diff --git a/executor/kv/kv_executor.h b/executor/kv/kv_executor.h
index fef12597..81e3cc08 100644
--- a/executor/kv/kv_executor.h
+++ b/executor/kv/kv_executor.h
@@ -57,8 +57,6 @@ class KVExecutor : public TransactionManager {
void GetTopHistory(const std::string& key, int top_number, Items* items);
private:
- std::unique_ptr<Storage> storage_;
-
std::unique_ptr<TransactionManager> contract_manager_;
};
diff --git a/platform/consensus/execution/transaction_executor.cpp
b/platform/consensus/execution/transaction_executor.cpp
index c601cc18..4a04c978 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -72,7 +72,7 @@ void TransactionExecutor::RegisterExecute(int64_t seq) {
void TransactionExecutor::WaitForExecute(int64_t seq) {
if (execute_thread_num_ == 1) return;
int pre_idx = (seq - 1 + blucket_num_) % blucket_num_;
-
+ //LOG(ERROR)<<"wait for:"<<seq<<" pre idx:"<<pre_idx<<"
thread:"<<execute_thread_num_;
while (!IsStop()) {
std::unique_lock<std::mutex> lk(mutex_);
cv_.wait_for(lk, std::chrono::milliseconds(10000), [&] {
@@ -82,14 +82,14 @@ void TransactionExecutor::WaitForExecute(int64_t seq) {
break;
}
}
- // LOG(ERROR)<<"wait for :"<<seq<<" done";
+ //LOG(ERROR)<<"wait for :"<<seq<<" done";
}
void TransactionExecutor::FinishExecute(int64_t seq) {
if (execute_thread_num_ == 1) return;
int idx = seq % blucket_num_;
std::unique_lock<std::mutex> lk(mutex_);
- // LOG(ERROR)<<"finish :"<<seq<<" done";
+ //LOG(ERROR)<<"finish :"<<seq<<" done"<<" idx:"<<idx;
blucket_[idx] = 3;
cv_.notify_all();
}
@@ -115,6 +115,7 @@ void TransactionExecutor::Stop() {
}
Storage* TransactionExecutor::GetStorage() {
+ assert(transaction_manager_ );
return transaction_manager_ ? transaction_manager_->GetStorage() : nullptr;
}
@@ -132,6 +133,11 @@ uint64_t TransactionExecutor::GetMaxPendingExecutedSeq() {
return next_execute_seq_ - 1;
}
+void TransactionExecutor::SetPendingExecutedSeq(int seq) {
+ //LOG(ERROR)<<" seq next pending seq:"<<seq;
+ next_execute_seq_ = seq;
+}
+
bool TransactionExecutor::NeedResponse() {
return transaction_manager_ == nullptr ||
transaction_manager_->NeedResponse();
@@ -174,8 +180,8 @@ void TransactionExecutor::OrderMessage() {
global_stats_->IncExecute();
uint64_t seq = message->seq();
if (next_execute_seq_ > seq) {
- // LOG(INFO) << "request seq:" << seq << " has been executed"
- // << " next seq:" << next_execute_seq_;
+ LOG(INFO) << "request seq:" << seq << " has been executed"
+ << " next seq:" << next_execute_seq_;
continue;
}
@@ -205,13 +211,18 @@ void
TransactionExecutor::AddExecuteMessage(std::unique_ptr<Request> message) {
void TransactionExecutor::ExecuteMessage() {
while (!IsStop()) {
- auto message = execute_queue_.Pop();
- if (message == nullptr) {
- continue;
- }
+ std::unique_ptr<Request> message = nullptr;
bool need_execute = true;
- if (transaction_manager_ && transaction_manager_->IsOutOfOrder()) {
- need_execute = false;
+ {
+ std::unique_lock<std::mutex> lk(e_mutex_);
+ message = execute_queue_.Pop();
+ if (message == nullptr) {
+ continue;
+ }
+ if (transaction_manager_ && transaction_manager_->IsOutOfOrder()) {
+ need_execute = false;
+ }
+ RegisterExecute(message->seq());
}
Execute(std::move(message), need_execute);
}
@@ -255,7 +266,6 @@ void
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
void TransactionExecutor::Execute(std::unique_ptr<Request> request,
bool need_execute) {
- RegisterExecute(request->seq());
std::unique_ptr<BatchUserRequest> batch_request = nullptr;
std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
data;
std::vector<std::unique_ptr<google::protobuf::Message>> * data_p = nullptr;
@@ -283,9 +293,9 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
}
assert(batch_request_p);
- // LOG(INFO) << " get request batch size:"
- // << batch_request.user_requests_size()<<" proxy id:"
- // <<request->proxy_id()<<" need execute:"<<need_execute;
+ //LOG(ERROR) << " get request batch size:"
+ // << batch_request_p->user_requests_size()<<" proxy id:"
+ // <<request->proxy_id()<<" need execute:"<<need_execute<<"
seq:"<<request->seq();
std::unique_ptr<BatchUserResponse> response;
global_stats_->GetTransactionDetails(*batch_request_p);
@@ -307,6 +317,13 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
}
WaitForExecute(request->seq());
+ if(last_seq_==0){
+ last_seq_ = request->seq();
+ }
+ else {
+ assert(last_seq_+1 == request->seq());
+ last_seq_++;
+ }
if(data_p->empty() || (*data_p)[0] == nullptr){
response =
transaction_manager_->ExecuteBatchWithSeq(request->seq(), *batch_request_p);
}
@@ -316,7 +333,6 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
FinishExecute(request->seq());
if (response != nullptr || !response_v.empty()){
- std::cout<<2<<"testing"<<request->seq()<<std::endl;
set_OnExecuteSuccess(request->seq());
}
if(response == nullptr){
@@ -342,7 +358,6 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
response->set_local_id(batch_request_p->local_id());
response->set_seq(request->seq());
-
if (post_exec_func_) {
post_exec_func_(std::move(request), std::move(response));
}
diff --git a/platform/consensus/execution/transaction_executor.h
b/platform/consensus/execution/transaction_executor.h
index 93f134a1..1cc15373 100644
--- a/platform/consensus/execution/transaction_executor.h
+++ b/platform/consensus/execution/transaction_executor.h
@@ -57,6 +57,7 @@ class TransactionExecutor {
// The max seq S that can be executed (have received all the seq before S).
uint64_t GetMaxPendingExecutedSeq();
+ void SetPendingExecutedSeq(int seq);
// When a transaction is ready to be executed (have received all the seq
// before Txn) PreExecute func will be called.
@@ -75,7 +76,6 @@ class TransactionExecutor {
void FinishExecute(int64_t seq);
void Prepare(std::unique_ptr<Request> request);
-
private:
void Execute(std::unique_ptr<Request> request, bool need_execute = true);
void OnlyExecute(std::unique_ptr<Request> request);
@@ -121,7 +121,8 @@ class TransactionExecutor {
static const int blucket_num_ = 1024;
int blucket_[blucket_num_];
std::condition_variable cv_;
- std::mutex mutex_;
+ std::mutex mutex_, e_mutex_;
+ int32_t last_seq_ = 0;
enum PrepareType {
Start_Prepare = 1,
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
index 3ee62fe0..e96e9c8c 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
@@ -45,6 +45,8 @@ CheckPointManager::CheckPointManager(const ResDBConfig&
config,
std::thread(&CheckPointManager::UpdateStableCheckPointStatus, this);
checkpoint_thread_ =
std::thread(&CheckPointManager::UpdateCheckPointStatus, this);
+ status_thread_ =
+ std::thread(&CheckPointManager::SyncStatus, this);
}
sem_init(&committable_seq_signal_, 0, 0);
}
@@ -59,6 +61,9 @@ void CheckPointManager::Stop() {
if (stable_checkpoint_thread_.joinable()) {
stable_checkpoint_thread_.join();
}
+ if (status_thread_.joinable()) {
+ status_thread_.join();
+ }
}
std::string GetHash(const std::string& h1, const std::string& h2) {
@@ -287,24 +292,114 @@ void CheckPointManager::TimeoutHandler() {
}
}
+void CheckPointManager::SetLastCommit(uint64_t seq) {
+ last_seq_ = seq;
+}
+
+void CheckPointManager::SetMaxSeq(uint64_t seq) {
+ std::lock_guard<std::mutex> lk(seq_mutex_);
+ max_seq_ = std::max(max_seq_, seq);
+}
+
+uint64_t CheckPointManager::GetMaxSeq() {
+ std::lock_guard<std::mutex> lk(seq_mutex_);
+ return max_seq_;
+}
+
+int CheckPointManager::ProcessStatusSync(std::unique_ptr<Context> context,
+ std::unique_ptr<Request> request) {
+
+ CheckPointData checkpoint_data;
+ if (!checkpoint_data.ParseFromString(request->data())) {
+ LOG(ERROR) << "parse checkpont data fail:";
+ return -2;
+ }
+ uint64_t seq = checkpoint_data.seq();
+ uint32_t sender_id = request->sender_id();
+ status_[sender_id] = seq;
+ LOG(ERROR)<<" received from :"<<sender_id<<" commit status:"<<seq;
+ return 0;
+}
+
+void CheckPointManager::CheckStatus(uint64_t last_seq) {
+ std::vector<uint64_t> seqs;
+ for(auto it : status_) {
+ seqs.push_back(it.second);
+ }
+
+ sort(seqs.begin(), seqs.end());
+ int f = config_.GetMaxMaliciousReplicaNum();
+
+ if(seqs.size() <= f+1) {
+ return;
+ }
+ uint64_t min_seq = seqs[f+1];
+
+
+ LOG(ERROR)<<" check last seq:"<<last_seq<<" max seq:"<<min_seq;
+ if(last_seq < min_seq) {
+ // need recovery from others
+ BroadcastRecovery(last_seq+1, std::min(min_seq,last_seq+100));
+ }
+}
+
+void CheckPointManager::SyncStatus() {
+ uint64_t last_check_seq = 0;
+ uint64_t last_time = 0;
+ while (!stop_) {
+ uint64_t last_seq = last_seq_;
+
+ CheckPointData checkpoint_data;
+ std::unique_ptr<Request> checkpoint_request = NewRequest(
+ Request::TYPE_STATUS_SYNC, Request(), config_.GetSelfInfo().id());
+ checkpoint_data.set_seq(last_seq);
+ checkpoint_data.SerializeToString(checkpoint_request->mutable_data());
+ replica_communicator_->BroadCast(*checkpoint_request);
+
+ LOG(ERROR)<<" sync status last seq:"<<last_seq<<" last time:"<<last_time;
+ if(last_check_seq == last_seq && last_time > 300) {
+ CheckStatus(last_seq);
+ last_time = 0;
+ }
+ if(last_seq != last_check_seq) {
+ last_check_seq = last_seq;
+ last_time = 0;
+ }
+ sleep(10);
+ last_time++;
+ }
+}
+
+
void CheckPointManager::UpdateCheckPointStatus() {
uint64_t last_ckpt_seq = 0;
int water_mark = config_.GetCheckPointWaterMark();
int timeout_ms = config_.GetViewchangeCommitTimeout();
std::vector<std::string> stable_hashs;
std::vector<uint64_t> stable_seqs;
+ std::map<uint64_t, std::unique_ptr<Request>> pendings;
while (!stop_) {
- auto request = data_queue_.Pop(timeout_ms);
+ std::unique_ptr<Request> request = nullptr;
+ if(!pendings.empty()){
+ if(pendings.begin()->second->seq() == last_seq_+1){
+ request = std::move(pendings.begin()->second);
+ pendings.erase(pendings.begin());
+ }
+ }
+ if(request == nullptr){
+ request = data_queue_.Pop(timeout_ms);
+ }
if (request == nullptr) {
- // if (last_seq > 0) {
- // TimeoutHandler();
- // }
continue;
}
std::string hash_ = request->hash();
uint64_t current_seq = request->seq();
+ //LOG(ERROR) << "update checkpoint seq :" << last_seq_ << " current:" <<
current_seq;
if (current_seq != last_seq_ + 1) {
LOG(ERROR) << "seq invalid:" << last_seq_ << " current:" << current_seq;
+ if(current_seq > last_seq_ +1) {
+ pendings[current_seq] = std::move(request);
+ }
continue;
}
{
@@ -313,32 +408,14 @@ void CheckPointManager::UpdateCheckPointStatus() {
last_seq_++;
}
bool is_recovery = request->is_recovery();
- txn_db_->Put(std::move(request));
+ //txn_db_->Put(std::move(request));
if (current_seq == last_ckpt_seq + water_mark) {
last_ckpt_seq = current_seq;
- if (executor_) {
- last_executed_seq_ = executor_->get_latest_executed_seq();
- std::cout<<"In checkpoint"<<std::endl;
-
- }
if (!is_recovery) {
BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs,
stable_seqs);
}
- if(is_recovery){
- std::cout<<"last_executed_seq_: "<<last_executed_seq_<<std::endl;
- std::string temp_dir = "/tmp";
- std::string file_path = temp_dir + "/latest_seqnum.txt";
- // std::ofstream
log_file("/home/ubuntu/.cache/bazel/_bazel_ubuntu/latest_seqnum.txt");
- std::ofstream log_file(file_path, std::ios::app);
- if (!log_file.is_open()) {
- std::cerr << "Error: Could not open the log file." <<
std::strerror(errno) << std::endl;
- }
- log_file << "Lastest_seqnum: " << last_executed_seq_ << std::endl;
- log_file.flush();
- log_file.close();
- }
}
}
return;
@@ -366,6 +443,19 @@ void CheckPointManager::BroadcastCheckPoint(
replica_communicator_->BroadCast(*checkpoint_request);
}
+void CheckPointManager::BroadcastRecovery(
+ uint64_t min_seq, uint64_t max_seq) {
+ RecoveryRequest recovery_data;
+ std::unique_ptr<Request> recovery_request = NewRequest(
+ Request::TYPE_RECOVERY_DATA, Request(), config_.GetSelfInfo().id());
+ recovery_data.set_min_seq(min_seq);
+ recovery_data.set_max_seq(max_seq);
+ recovery_data.SerializeToString(recovery_request->mutable_data());
+
+ LOG(ERROR)<<" recovery request ["<<min_seq<<","<<max_seq<<"]";
+ replica_communicator_->BroadCast(*recovery_request);
+}
+
void CheckPointManager::WaitSignal() {
std::unique_lock<std::mutex> lk(mutex_);
signal_.wait(lk, [&] { return !stable_hash_queue_.Empty(); });
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.h
b/platform/consensus/ordering/pbft/checkpoint_manager.h
index ddcc2fbc..e41aecae 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.h
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.h
@@ -43,10 +43,15 @@ class CheckPointManager : public CheckPoint {
ChainState* GetTxnDB();
uint64_t GetMaxTxnSeq();
+ void SetLastCommit(uint64_t seq);
+ void SetMaxSeq(uint64_t seq);
+ uint64_t GetMaxSeq();
void AddCommitData(std::unique_ptr<Request> request);
int ProcessCheckPoint(std::unique_ptr<Context> context,
std::unique_ptr<Request> request);
+ int ProcessStatusSync(std::unique_ptr<Context> context,
+ std::unique_ptr<Request> request);
uint64_t GetStableCheckpoint() override;
// void SetLastExecutedSeq(uint64_t latest_executed_seq);
@@ -84,13 +89,19 @@ class CheckPointManager : public CheckPoint {
void Notify();
bool Wait();
+ void BroadcastRecovery(uint64_t min_seq, uint64_t max_seq);
+
+ void SyncStatus();
+ void StatusProcess();
+ void CheckStatus(uint64_t last_seq);
+
protected:
uint64_t last_executed_seq_ = 0;
ResDBConfig config_;
ReplicaCommunicator* replica_communicator_;
std::unique_ptr<ChainState> txn_db_;
- std::thread checkpoint_thread_, stable_checkpoint_thread_;
+ std::thread checkpoint_thread_, stable_checkpoint_thread_, status_thread_;
SignatureVerifier* verifier_;
std::atomic<bool> stop_;
std::map<std::pair<uint64_t, std::string>, std::set<uint32_t>> sender_ckpt_;
@@ -109,13 +120,15 @@ class CheckPointManager : public CheckPoint {
LockFreeQueue<std::pair<uint64_t, std::string>> stable_hash_queue_;
std::condition_variable signal_;
ResDBTxnAccessor txn_accessor_;
- std::mutex lt_mutex_;
+ std::mutex lt_mutex_, seq_mutex_;
uint64_t last_seq_ = 0;
+ uint64_t max_seq_ = 0;
TransactionExecutor* executor_;
std::atomic<uint64_t> highest_prepared_seq_;
uint64_t committable_seq_ = 0;
std::string last_hash_, committable_hash_;
sem_t committable_seq_signal_;
+ std::map<int, uint64_t>status_;
};
} // namespace resdb
diff --git a/platform/consensus/ordering/pbft/commitment.cpp
b/platform/consensus/ordering/pbft/commitment.cpp
index 4d048331..1f7793a6 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -171,7 +171,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context>
context,
LOG(ERROR) << " recovery request not valid:"
<< " current seq:" << message_manager_->GetNextSeq()
<< " data seq:" << request->seq();
- return 0;
+ //return 0;
}
return message_manager_->AddConsensusMsg(context->signature,
std::move(request));
diff --git a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
index 72103eab..8288b07b 100644
--- a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
+++ b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
@@ -69,15 +69,30 @@ ConsensusManagerPBFT::ConsensusManagerPBFT(
system_info_->SetPrimary(data.primary_id());
},
[&](std::unique_ptr<Context> context, std::unique_ptr<Request> request) {
+
+ if(config_.GetSelfInfo().id() == 3 && request->seq() == 188000) {
+ LOG(ERROR)<<" skip seq:"<<request->seq();
+ return 0;
+ }
+
+
+
return InternalConsensusCommit(std::move(context), std::move(request));
+ },
+ [&](int seq) {
+ message_manager_->SetNextCommitSeq(seq+1);
});
+
}
void ConsensusManagerPBFT::SetNeedCommitQC(bool need_qc) {
commitment_->SetNeedCommitQC(need_qc);
}
-void ConsensusManagerPBFT::Start() { ConsensusManager::Start(); }
+void ConsensusManagerPBFT::Start() {
+ ConsensusManager::Start();
+ recovery_thread_ = std::thread(&ConsensusManagerPBFT::RemoteRecoveryProcess,
this);
+}
std::vector<ReplicaInfo> ConsensusManagerPBFT::GetReplicas() {
return message_manager_->GetReplicas();
@@ -185,8 +200,8 @@ int
ConsensusManagerPBFT::ConsensusCommit(std::unique_ptr<Context> context,
int ConsensusManagerPBFT::InternalConsensusCommit(
std::unique_ptr<Context> context, std::unique_ptr<Request> request) {
- // LOG(INFO) << "recv impl type:" << request->type() << " "
- // << "sender id:" << request->sender_id()<<" seq:"<<request->seq();
+ //LOG(ERROR) << "recv impl type:" << request->type() << " "
+ // << "sender id:" << request->sender_id()<<" seq:"<<request->seq();
switch (request->type()) {
case Request::TYPE_CLIENT_REQUEST:
@@ -236,6 +251,9 @@ int ConsensusManagerPBFT::InternalConsensusCommit(
case Request::TYPE_CHECKPOINT:
return checkpoint_manager_->ProcessCheckPoint(std::move(context),
std::move(request));
+ case Request::TYPE_STATUS_SYNC:
+ return checkpoint_manager_->ProcessStatusSync(std::move(context),
+ std::move(request));
case Request::TYPE_VIEWCHANGE:
return view_change_manager_->ProcessViewChange(std::move(context),
std::move(request));
@@ -249,6 +267,10 @@ int ConsensusManagerPBFT::InternalConsensusCommit(
std::move(request));
case Request::TYPE_CUSTOM_QUERY:
return query_->ProcessCustomQuery(std::move(context),
std::move(request));
+ case Request::TYPE_RECOVERY_DATA:
+ return ProcessRecoveryData(std::move(context), std::move(request));
+ case Request::TYPE_RECOVERY_DATA_RESP:
+ return ProcessRecoveryDataResponse(std::move(context),
std::move(request));
}
return 0;
}
@@ -263,4 +285,78 @@ void ConsensusManagerPBFT::SetPreVerifyFunc(
commitment_->SetPreVerifyFunc(func);
}
+int ConsensusManagerPBFT::ProcessRecoveryData(std::unique_ptr<Context> context,
+ std::unique_ptr<Request> request) {
+ RecoveryRequest recovery_data;
+ if (!recovery_data.ParseFromString(request->data())) {
+ LOG(ERROR) << "parse checkpont data fail:";
+ return -2;
+ }
+ LOG(ERROR)<<" obtain min seq:"<<recovery_data.min_seq()<<" max
seq:"<<recovery_data.max_seq();
+ RecoveryResponse response;
+ int ret = recovery_->GetData(recovery_data, response);
+ if(ret) {
+ return ret;
+ }
+
+ std::unique_ptr<Request> response_data = NewRequest(
+ Request::TYPE_RECOVERY_DATA_RESP, Request(), config_.GetSelfInfo().id());
+
+ response.SerializeToString(response_data->mutable_data());
+
+ LOG(ERROR)<<" obtain min seq:"<<recovery_data.min_seq()<<" max
seq:"<<recovery_data.max_seq()<<" data size:"<<response.request_size();
+
+ GetBroadCastClient()->SendMessage(*response_data, request->sender_id());
+
+ return 0;
+}
+
+int ConsensusManagerPBFT::ProcessRecoveryDataResponse(std::unique_ptr<Context>
context,
+ std::unique_ptr<Request> request) {
+ recovery_queue_.Push(std::move(request));
+ return 0;
+}
+
+void ConsensusManagerPBFT::RemoteRecoveryProcess(){
+ uint64_t last_recovery = 0;
+ std::set<uint64_t> data;
+
+ while(IsRunning()){
+ auto request = recovery_queue_.Pop();
+ if(request == nullptr) {
+ continue;
+ }
+
+ RecoveryResponse recovery_data;
+ if (!recovery_data.ParseFromString(request->data())) {
+ LOG(ERROR) << "parse checkpont data fail:";
+ continue;
+ }
+
+ LOG(ERROR)<<" receive recovery data from " <<request->sender_id()<<" data
size:"<<recovery_data.request_size()<<"
signrue:"<<recovery_data.signature_size();
+
+ for(int i = 0; i < recovery_data.request().size(); i++) {
+ uint64_t seq = recovery_data.request(i).seq();
+ int type = recovery_data.request(i).type();
+ if(data.find(seq) != data.end()) {
+ //LOG(ERROR)<<" check recovery remote seq:"<<seq<<" type:"<<type<<" has
been recovered.";
+ continue;
+ }
+
+ auto context = std::make_unique<Context>();
+ context->signature = recovery_data.signature(i);
+ auto request = std::make_unique<Request>(recovery_data.request(i));
+ //write to log
+ //recovery_->AddRequest(context.get(), request.get());
+ InternalConsensusCommit(std::move(context), std::move(request));
+ }
+
+ for(int i = 0; i < recovery_data.request().size(); i++) {
+ uint64_t seq = recovery_data.request(i).seq();
+ data.insert(seq);
+ }
+ }
+}
+
+
} // namespace resdb
diff --git a/platform/consensus/ordering/pbft/consensus_manager_pbft.h
b/platform/consensus/ordering/pbft/consensus_manager_pbft.h
index bea50990..55048a3d 100644
--- a/platform/consensus/ordering/pbft/consensus_manager_pbft.h
+++ b/platform/consensus/ordering/pbft/consensus_manager_pbft.h
@@ -55,6 +55,11 @@ class ConsensusManagerPBFT : public ConsensusManager {
void SetPreVerifyFunc(std::function<bool(const Request&)>);
void SetNeedCommitQC(bool need_qc);
+ int ProcessRecoveryData(std::unique_ptr<Context> context,
+ std::unique_ptr<Request> request);
+
+ int ProcessRecoveryDataResponse(std::unique_ptr<Context> context,
+ std::unique_ptr<Request> request);
protected:
int InternalConsensusCommit(std::unique_ptr<Context> context,
std::unique_ptr<Request> request);
@@ -67,6 +72,8 @@ class ConsensusManagerPBFT : public ConsensusManager {
absl::StatusOr<std::pair<std::unique_ptr<Context>, std::unique_ptr<Request>>>
PopComplainedRequest();
+ void RemoteRecoveryProcess();
+
protected:
std::unique_ptr<SystemInfo> system_info_;
std::unique_ptr<CheckPointManager> checkpoint_manager_;
@@ -83,6 +90,8 @@ class ConsensusManagerPBFT : public ConsensusManager {
std::queue<std::pair<std::unique_ptr<Context>, std::unique_ptr<Request>>>
request_complained_;
std::mutex mutex_;
+ std::thread recovery_thread_;
+ LockFreeQueue<Request> recovery_queue_;
};
} // namespace resdb
diff --git a/platform/consensus/ordering/pbft/lock_free_collector_pool.cpp
b/platform/consensus/ordering/pbft/lock_free_collector_pool.cpp
index 4336b66d..2068beed 100644
--- a/platform/consensus/ordering/pbft/lock_free_collector_pool.cpp
+++ b/platform/consensus/ordering/pbft/lock_free_collector_pool.cpp
@@ -50,6 +50,18 @@ LockFreeCollectorPool::LockFreeCollectorPool(const
std::string& name,
<< " enable viewchange:" << enable_viewchange_ << " done";
}
+void LockFreeCollectorPool::Reset(uint64_t start_seq){
+ uint32_t idx = start_seq & mask_;
+ int seq = start_seq;
+ LOG(ERROR)<<" reset collector:"<<start_seq;
+ for (size_t i = 0; i < (capacity_ << 1); ++i) {
+ int pos = (i + idx)%(capacity_ << 1);
+ collector_[pos] = std::make_unique<TransactionCollector>(seq++,
executor_,
+ enable_viewchange_);
+ }
+ LOG(ERROR)<<" reset collector:"<<start_seq;
+}
+
void LockFreeCollectorPool::Update(uint64_t seq) {
uint32_t idx = seq & mask_;
if (collector_[idx]->Seq() != seq) {
diff --git a/platform/consensus/ordering/pbft/lock_free_collector_pool.h
b/platform/consensus/ordering/pbft/lock_free_collector_pool.h
index 5335c4fd..22757827 100644
--- a/platform/consensus/ordering/pbft/lock_free_collector_pool.h
+++ b/platform/consensus/ordering/pbft/lock_free_collector_pool.h
@@ -33,6 +33,7 @@ class LockFreeCollectorPool {
TransactionCollector* GetCollector(uint64_t seq);
void Update(uint64_t seq);
+ void Reset(uint64_t start_seq);
private:
std::string name_;
diff --git a/platform/consensus/ordering/pbft/message_manager.cpp
b/platform/consensus/ordering/pbft/message_manager.cpp
index 2c8b08cc..7489da58 100644
--- a/platform/consensus/ordering/pbft/message_manager.cpp
+++ b/platform/consensus/ordering/pbft/message_manager.cpp
@@ -177,10 +177,14 @@ CollectorResultCode MessageManager::AddConsensusMsg(
if (request == nullptr || !IsValidMsg(*request)) {
return CollectorResultCode::INVALID;
}
+
int type = request->type();
uint64_t seq = request->seq();
int resp_received_count = 0;
int proxy_id = request->proxy_id();
+ if(checkpoint_manager_){
+ checkpoint_manager_->SetMaxSeq(seq);
+ }
int ret = collector_pool_->GetCollector(seq)->AddRequest(
std::move(request), signature, type == Request::TYPE_PRE_PREPARE,
@@ -242,6 +246,13 @@ Storage* MessageManager::GetStorage() {
return transaction_executor_->GetStorage();
}
+void MessageManager::SetNextCommitSeq(int seq) {
+ SetNextSeq(seq);
+ collector_pool_->Reset(seq);
+ checkpoint_manager_->SetLastCommit(seq);
+ return transaction_executor_->SetPendingExecutedSeq(seq);
+}
+
void MessageManager::SetLastCommittedTime(uint64_t proxy_id) {
lct_lock_.lock();
last_committed_time_[proxy_id] = GetCurrentTime();
diff --git a/platform/consensus/ordering/pbft/message_manager.h
b/platform/consensus/ordering/pbft/message_manager.h
index c51fd22a..d5d7a4b2 100644
--- a/platform/consensus/ordering/pbft/message_manager.h
+++ b/platform/consensus/ordering/pbft/message_manager.h
@@ -84,6 +84,8 @@ class MessageManager {
std::vector<RequestInfo> GetPreparedProof(uint64_t seq);
TransactionStatue GetTransactionState(uint64_t seq);
+ void SetNextCommitSeq(int seq);
+
// ============= System information ========
// Obtain the current replica list.
std::vector<ReplicaInfo> GetReplicas();
diff --git a/platform/consensus/ordering/pbft/transaction_collector.cpp
b/platform/consensus/ordering/pbft/transaction_collector.cpp
index 0c69c401..650beec9 100644
--- a/platform/consensus/ordering/pbft/transaction_collector.cpp
+++ b/platform/consensus/ordering/pbft/transaction_collector.cpp
@@ -90,8 +90,8 @@ int TransactionCollector::AddRequest(
}
if (seq_ != static_cast<uint64_t>(request->seq())) {
- // LOG(ERROR) << "data invalid, seq not the same:" << seq
- // << " collect seq:" << seq_;
+ LOG(ERROR) << "data invalid, seq not the same:" << seq
+ << " collect seq:" << seq_;
return -2;
}
diff --git a/platform/consensus/recovery/recovery.cpp
b/platform/consensus/recovery/recovery.cpp
index 5ea24420..5b722d75 100644
--- a/platform/consensus/recovery/recovery.cpp
+++ b/platform/consensus/recovery/recovery.cpp
@@ -55,6 +55,7 @@ Recovery::Recovery(const ResDBConfig& config, CheckPoint*
checkpoint,
}
if (recovery_enabled_ == false) {
+ LOG(INFO) << "recovery is not enabled:" << recovery_enabled_;
return;
}
@@ -83,8 +84,10 @@ Recovery::Recovery(const ResDBConfig& config, CheckPoint*
checkpoint,
}
void Recovery::Init() {
+LOG(ERROR)<<" init";
GetLastFile();
SwitchFile(file_path_);
+LOG(ERROR)<<" init done";
ckpt_thread_ = std::thread(&Recovery::UpdateStableCheckPoint, this);
}
@@ -141,6 +144,7 @@ void Recovery::GetLastFile() {
int64_t time_s =
std::stoll(file_name.substr(time_pos + 1, min_seq_pos - time_pos - 1));
+ LOG(ERROR) << "get path:" << entry.path() << " min:" << min_seq<<"
time:"<<time_s;
if (min_seq == -1) {
if (last_ckpt_ == -1 || m_time_s < time_s) {
file_path_ = entry.path();
@@ -306,6 +310,9 @@ void Recovery::WriteLog(const Context* context, const
Request* request) {
AppendData(sig);
Flush();
+ if(context){
+ InsertCache(*context, *request);
+ }
}
void Recovery::AppendData(const std::string& data) {
@@ -407,7 +414,7 @@ bool Recovery::Read(int fd, size_t len, char* data) {
}
std::pair<std::vector<std::pair<int64_t, std::string>>, int64_t>
-Recovery::GetRecoveryFiles() {
+Recovery::GetRecoveryFiles(int64_t ckpt) {
std::string dir = std::filesystem::path(file_path_).parent_path();
int64_t last_ckpt = 0;
for (const auto& entry : std::filesystem::directory_iterator(dir)) {
@@ -421,6 +428,8 @@ Recovery::GetRecoveryFiles() {
last_ckpt = ckpt;
}
}
+ LOG(ERROR)<<"file max ckpt:"<<last_ckpt <<" storage ckpt:"<<ckpt;
+ last_ckpt = std::min(last_ckpt, ckpt);
std::vector<std::pair<int64_t, std::string>> list;
std::vector<std::pair<int64_t, std::string>> e_list;
@@ -462,13 +471,19 @@ void Recovery::ReadLogs(
std::function<void(const SystemInfoData& data)> system_callback,
std::function<void(std::unique_ptr<Context> context,
std::unique_ptr<Request> request)>
- call_back) {
+ call_back,
+ std::function<void(int)> set_start_point) {
if (recovery_enabled_ == false) {
return;
}
+ assert(storage_);
+ int64_t storage_ckpt = storage_->GetLastCheckpoint();
+ LOG(ERROR)<<" storage ckpt:"<<storage_ckpt;
std::unique_lock<std::mutex> lk(mutex_);
- auto recovery_files_pair = GetRecoveryFiles();
+
+ auto recovery_files_pair = GetRecoveryFiles(storage_ckpt);
int64_t ckpt = recovery_files_pair.second;
+ set_start_point(ckpt);
int idx = 0;
for (auto path : recovery_files_pair.first) {
ReadLogsFromFiles(path.second, ckpt, idx++, system_callback, call_back);
@@ -538,9 +553,12 @@ void Recovery::ReadLogsFromFiles(
}
uint64_t max_seq = 0;
for (std::unique_ptr<RecoveryData>& recovery_data : request_list) {
+ //LOG(ERROR)<<" ckpt :"<<ckpt<<" recovery data
seq:"<<recovery_data->request->seq();
if (ckpt < recovery_data->request->seq()) {
recovery_data->request->set_is_recovery(true);
max_seq = recovery_data->request->seq();
+ //LOG(ERROR)<<" ??? call back:"<<recovery_data->request->seq()<<"
call_back:"<<(call_back?"1":"0");
+ //InsertCache(*recovery_data->context, *recovery_data->request);
call_back(std::move(recovery_data->context),
std::move(recovery_data->request));
}
@@ -552,4 +570,94 @@ void Recovery::ReadLogsFromFiles(
close(fd);
}
+void Recovery::InsertCache(const Context& context, const Request& request){
+ std::unique_lock<std::mutex> lk(data_mutex_);
+ cache_[request.seq()].push_back(std::make_pair(context.signature, request));
+}
+
+int Recovery::GetData(const RecoveryRequest& request,
+ RecoveryResponse &response) {
+
+ auto res = GetDataFromRecoveryFiles(request.min_seq(), request.max_seq());
+
+ for(const auto& it : res) {
+ for(const auto& req: it.second) {
+ *response.add_signature() = req.first->signature;
+ *response.add_request() = *req.second;
+ }
+ }
+
+ /*
+ for(int i = request.min_seq(); i <= request.max_seq(); ++i) {
+ std::unique_lock<std::mutex> lk(data_mutex_);
+ if(cache_.find(i) != cache_.end()) {
+ LOG(ERROR)<<" get data from cache i:"<<i<<" size:"<<cache_[i].size();
+ for(const auto& req : cache_[i]){
+ *response.add_signature() = req.first;
+ *response.add_request() = req.second;
+ }
+ }
+ }
+ */
+ return 0;
+}
+
+std::map<uint64_t, std::vector<std::pair<std::unique_ptr<Context>,
std::unique_ptr<Request>> >>
+Recovery::GetDataFromRecoveryFiles(uint64_t need_min_seq, uint64_t
need_max_seq) {
+ std::string dir = std::filesystem::path(file_path_).parent_path();
+
+ std::vector<std::pair<int64_t, std::string>> list;
+ std::vector<std::pair<int64_t, std::string>> e_list;
+
+ for (const auto& entry : std::filesystem::directory_iterator(dir)) {
+ std::string dir = std::filesystem::path(entry.path()).parent_path();
+ std::string file_name = std::filesystem::path(entry.path()).stem();
+ std::string ext = std::filesystem::path(entry.path()).extension();
+ if (ext != ".log") continue;
+ int pos = file_name.rfind("_");
+
+ int max_seq_pos = file_name.rfind("_", pos - 1);
+ int64_t max_seq =
+ std::stoll(file_name.substr(max_seq_pos + 1, pos - max_seq_pos - 1));
+
+ int min_seq_pos = file_name.rfind("_", max_seq_pos - 1);
+ int64_t min_seq = std::stoll(
+ file_name.substr(min_seq_pos + 1, max_seq_pos - min_seq_pos - 1));
+
+ int time_pos = file_name.rfind("_", min_seq_pos - 1);
+ int64_t time =
+ std::stoll(file_name.substr(time_pos + 1, min_seq_pos - time_pos - 1));
+
+ //LOG(ERROR)<<" min seq:"<<min_seq << " max seq:"<<max_seq<<"
need:"<<need_min_seq<<" "<<need_max_seq;
+ if (min_seq == -1) {
+ e_list.push_back(std::make_pair(time, entry.path()));
+ } else if (max_seq < need_min_seq || min_seq > need_max_seq) {
+ continue;
+ }
+ //LOG(ERROR)<<" get min seq:"<<min_seq << " max seq:"<<max_seq<<"
need:"<<need_min_seq<<" "<<need_max_seq;
+ list.push_back(std::make_pair(time, entry.path()));
+ }
+
+ sort(e_list.begin(), e_list.end());
+ list.push_back(e_list.back());
+ sort(list.begin(), list.end());
+
+
+ std::map<uint64_t, std::vector<std::pair<std::unique_ptr<Context>,
std::unique_ptr<Request>> >> res;
+ for (const auto& path : list) {
+ ReadLogsFromFiles(
+ path.second, need_min_seq-1, 0, [&](const SystemInfoData& data) {},
+ [&](std::unique_ptr<Context> context, std::unique_ptr<Request> request) {
+ LOG(ERROR)<<"check get data from recovery file
seq:"<<request->seq();
+ if(request->seq() >= need_min_seq && request->seq() <=
need_max_seq) {
+ LOG(ERROR)<<"get data from recovery file seq:"<<request->seq();
+ res[request->seq()].push_back(
std::make_pair(std::move(context), std::move(request) ) );
+ }
+ });
+ }
+
+ return res;
+}
+
+
} // namespace resdb
diff --git a/platform/consensus/recovery/recovery.h
b/platform/consensus/recovery/recovery.h
index f5c30cbe..b8497394 100644
--- a/platform/consensus/recovery/recovery.h
+++ b/platform/consensus/recovery/recovery.h
@@ -44,11 +44,16 @@ class Recovery {
void ReadLogs(std::function<void(const SystemInfoData& data)>
system_callback,
std::function<void(std::unique_ptr<Context> context,
std::unique_ptr<Request> request)>
- call_back);
+ call_back,
+ std::function<void(int)> start_point
+ );
int64_t GetMaxSeq();
int64_t GetMinSeq();
+ int GetData(const RecoveryRequest& request,
+ RecoveryResponse &response);
+
private:
struct RecoveryData {
std::unique_ptr<Context> context;
@@ -75,7 +80,7 @@ class Recovery {
void UpdateStableCheckPoint();
std::pair<std::vector<std::pair<int64_t, std::string>>, int64_t>
- GetRecoveryFiles();
+ GetRecoveryFiles(int64_t ckpt);
void ReadLogsFromFiles(
const std::string& path, int64_t ckpt, int file_idx,
std::function<void(const SystemInfoData& data)> system_callback,
@@ -83,6 +88,11 @@ class Recovery {
std::unique_ptr<Request> request)>
call_back);
+ void InsertCache(const Context&context, const Request& request);
+
+ std::map<uint64_t, std::vector<std::pair<std::unique_ptr<Context>,
std::unique_ptr<Request>> >>
+ GetDataFromRecoveryFiles(uint64_t need_min_seq, uint64_t need_max_seq);
+
protected:
ResDBConfig config_;
CheckPoint* checkpoint_;
@@ -92,7 +102,7 @@ class Recovery {
std::string file_path_, base_file_path_;
size_t buffer_size_ = 0;
int fd_;
- std::mutex mutex_;
+ std::mutex mutex_, data_mutex_;
int64_t last_ckpt_;
int64_t min_seq_, max_seq_;
@@ -101,6 +111,7 @@ class Recovery {
int recovery_ckpt_time_s_;
SystemInfo* system_info_;
Storage* storage_;
+ std::map<uint64_t, std::vector<std::pair<SignatureInfo, Request>>> cache_;
};
} // namespace resdb
diff --git a/platform/proto/resdb.proto b/platform/proto/resdb.proto
index 47edac38..5c2ccb60 100644
--- a/platform/proto/resdb.proto
+++ b/platform/proto/resdb.proto
@@ -57,8 +57,9 @@ message Request {
TYPE_NEWVIEW= 17;
TYPE_CUSTOM_QUERY = 18;
TYPE_CUSTOM_CONSENSUS = 19;
+ TYPE_STATUS_SYNC = 20;
- NUM_OF_TYPE = 20; // the total number of types.
+ NUM_OF_TYPE = 21; // the total number of types.
// Used to create the collector.
};
int32 type = 1;
@@ -175,6 +176,7 @@ message RecoveryRequest {
message RecoveryResponse {
repeated Request request = 1;
+ repeated SignatureInfo signature = 2;
}
message RequestWithProof {
diff --git a/scripts/deploy/config/kv_server.conf
b/scripts/deploy/config/kv_server.conf
index abfc222e..d5af12e6 100644
--- a/scripts/deploy/config/kv_server.conf
+++ b/scripts/deploy/config/kv_server.conf
@@ -18,10 +18,10 @@
#
iplist=(
-172.31.25.4
-172.31.27.205
-172.31.27.141
-172.31.20.222
-172.31.28.162
+172.31.57.186
+172.31.57.186
+172.31.57.186
+172.31.57.186
+172.31.57.186
)
diff --git a/scripts/deploy/config/pbft.config
b/scripts/deploy/config/pbft.config
index e4fe4960..93e2ba29 100644
--- a/scripts/deploy/config/pbft.config
+++ b/scripts/deploy/config/pbft.config
@@ -1,22 +1,3 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
{
"clientBatchNum": 100,
"enable_viewchange": true,
diff --git a/scripts/deploy/script/deploy.sh b/scripts/deploy/script/deploy.sh
index 8bae565b..bab480bb 100755
--- a/scripts/deploy/script/deploy.sh
+++ b/scripts/deploy/script/deploy.sh
@@ -19,6 +19,8 @@
set -e
+export TEMPLATE_PATH=$PWD/config/pbft.config
+
# load environment parameters
. ./script/env.sh
@@ -70,7 +72,8 @@ deploy/script/generate_key.sh ${BAZEL_WORKSPACE_PATH}
${output_key_path} ${#ipli
deploy/script/generate_config.sh ${BAZEL_WORKSPACE_PATH} ${output_key_path}
${output_cert_path} ${output_path} ${admin_key_path} ${deploy_iplist[@]}
# build kv server
-bazel build ${server}
+#bazel build ${server}
+bazel build ${server} --define="enable_leveldb=True"
if [ $? != 0 ]
then
diff --git a/service/kv/kv_service.cpp b/service/kv/kv_service.cpp
index 269fb807..170b22bd 100644
--- a/service/kv/kv_service.cpp
+++ b/service/kv/kv_service.cpp
@@ -31,6 +31,11 @@
using namespace resdb;
using namespace resdb::storage;
+
+void SignalHandler(int sig_num) {
+ LOG(ERROR)<<" signal:"<<sig_num<<" call"<<" ======================";
+}
+
void ShowUsage() {
printf("<config> <private_key> <cert_file> [logging_dir]\n");
}
@@ -51,7 +56,10 @@ int main(int argc, char** argv) {
exit(0);
}
google::InitGoogleLogging(argv[0]);
- FLAGS_minloglevel = 1;
+ FLAGS_minloglevel = 0;
+ signal(SIGINT, SignalHandler);
+ signal(SIGKILL, SignalHandler);
+
char* config_file = argv[1];
char* private_key_file = argv[2];
@@ -71,10 +79,10 @@ int main(int argc, char** argv) {
ResConfigData config_data = config->GetConfigData();
std::string db_path = std::to_string(config->GetSelfInfo().port()) + "_db/";
- LOG(INFO) << "db path:" << db_path;
+ LOG(ERROR) << "db path:" << db_path;
auto server = GenerateResDBServer(
config_file, private_key_file, cert_file,
std::make_unique<KVExecutor>(NewStorage(db_path, config_data)), nullptr);
server->Run();
-}
\ No newline at end of file
+}
diff --git a/tools/generate_region_config.py b/tools/generate_region_config.py
index c3d001f2..417499b8 100644
--- a/tools/generate_region_config.py
+++ b/tools/generate_region_config.py
@@ -81,4 +81,5 @@ if __name__ == "__main__":
template_config = None
if len(sys.argv)>3:
template_config = sys.argv[3]
+ print("generate json config:",template_config)
GenerateJsonConfig(sys.argv[1], sys.argv[2], template_config)