This is an automated email from the ASF dual-hosted git repository.
junchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/master by this push:
new 550d5ec9 Recovery ckpt (#222)
550d5ec9 is described below
commit 550d5ec933bef99f5e2e86fc2546b8f5ad0a6ca5
Author: cjcchen <[email protected]>
AuthorDate: Mon Jan 5 02:08:31 2026 +0800
Recovery ckpt (#222)
* Chenyi (#201)
* Chenyi lastest executed seq num each replica
* Chenyi lastest executed seq num each replica update file route
* Chenyi update create file
* Chenyi update write in checkpoint
* Chenyi update write in checkpoint
* Chenyi update in checkpoint_manager
* Chenyi update in checkpoint_manager
* Chenyi update in checkpoint_manager
* Chenyi update in checkpoint_manager with ofstream, solved executor_
* Chenyi update in checkpoint_manager with ofstream, solved executor_, with
test draft, seems deadlock
* Chenyi update in checkpoint_manager with ofstream, solved executor_, with
test draft
* Chenyi update in checkpoint_manager with ofstream, solved executor_, with
test draft
* Chenyi update in checkpoint_manager with ofstream, with test draft
---------
Co-authored-by: cyzhoutt <[email protected]>
* add seq
* add checkpoint recovery
* remove query
* remove query
* remove query
* format
* fix recovery test
* fix recovery test
* rm get all interfaces
* fix client config
* fix seq 0 in memdb
* fix seq 0 in memdb
* update vc
* update vc
* update vc
* update vc
* update vc
* update vc
* update vc
* update vc
---------
Co-authored-by: cyzhoutt <[email protected]>
---
platform/config/resdb_config_utils.cpp | 14 +-
platform/config/resdb_poc_config.cpp | 1 -
platform/config/resdb_poc_config.h | 1 -
.../consensus/ordering/pbft/checkpoint_manager.cpp | 166 ++++++++++++++++++---
.../consensus/ordering/pbft/checkpoint_manager.h | 24 ++-
.../ordering/pbft/checkpoint_manager_test.cpp | 16 +-
platform/consensus/ordering/pbft/commitment.cpp | 63 +++++---
platform/consensus/ordering/pbft/commitment.h | 3 +
.../consensus/ordering/pbft/commitment_test.cpp | 2 +-
.../ordering/pbft/consensus_manager_pbft.cpp | 49 ++++--
.../ordering/pbft/lock_free_collector_pool.cpp | 4 +-
.../consensus/ordering/pbft/message_manager.cpp | 23 ++-
platform/consensus/ordering/pbft/message_manager.h | 1 -
.../ordering/pbft/performance_manager.cpp | 3 +-
platform/consensus/ordering/pbft/query_test.cpp | 2 +-
.../consensus/ordering/pbft/response_manager.cpp | 14 +-
.../ordering/pbft/transaction_collector.cpp | 2 +-
.../consensus/ordering/pbft/viewchange_manager.cpp | 125 ++++++----------
.../consensus/ordering/pbft/viewchange_manager.h | 2 +-
.../consensus/ordering/poc/pow/block_manager.cpp | 1 -
.../consensus/ordering/poc/pow/block_manager.h | 1 -
.../ordering/poc/pow/consensus_service_pow.cpp | 1 -
.../poc/pow/consensus_service_pow_test.cpp | 1 -
platform/consensus/ordering/poc/pow/merkle.cpp | 1 -
platform/consensus/ordering/poc/pow/merkle.h | 1 -
.../consensus/ordering/poc/pow/miner_manager.h | 1 -
platform/consensus/recovery/recovery.cpp | 37 +++--
platform/networkstrate/consensus_manager.cpp | 4 +-
platform/proto/checkpoint_info.proto | 2 +
platform/proto/resdb.proto | 2 +
scripts/deploy/config/pbft.config | 36 ++---
scripts/format.sh | 2 +-
service/tools/kv/api_tools/kv_service_tools.cpp | 2 +-
tools/generate_region_config.py | 24 ++-
34 files changed, 408 insertions(+), 223 deletions(-)
diff --git a/platform/config/resdb_config_utils.cpp
b/platform/config/resdb_config_utils.cpp
index f3574daf..d0a981d4 100644
--- a/platform/config/resdb_config_utils.cpp
+++ b/platform/config/resdb_config_utils.cpp
@@ -123,19 +123,18 @@ ResConfigData ReadConfigFromFile(const std::string&
file_name) {
}
std::vector<ReplicaInfo> ReadConfig(const std::string& file_name) {
-
std::vector<ReplicaInfo> replicas;
std::stringstream json_data;
std::ifstream infile(file_name.c_str());
if (!infile.is_open()) {
std::cerr << "Failed to open file." << file_name << " " << strerror(errno)
- << std::endl;
+ << std::endl;
return replicas;
}
json_data << infile.rdbuf();
std::string cleanJson = RemoveJsonComments(json_data.str());
-
+
RegionInfo region_info;
JsonParseOptions options;
auto status = JsonStringToMessage(cleanJson, ®ion_info, options);
@@ -143,9 +142,12 @@ std::vector<ReplicaInfo> ReadConfig(const std::string&
file_name) {
LOG(ERROR) << "parse json :" << file_name << " fail:" << status.message();
}
assert(status.ok());
- for(const auto& replica_info : region_info.replica_info()) {
- LOG(ERROR) << "parse json id:" << replica_info.id() << " ip:" <<
replica_info.ip() <<" port:"<<replica_info.port();
- replicas.push_back(GenerateReplicaInfo(replica_info.id(),
replica_info.ip(), replica_info.port()));
+ for (const auto& replica_info : region_info.replica_info()) {
+ LOG(ERROR) << "parse json id:" << replica_info.id()
+ << " ip:" << replica_info.ip()
+ << " port:" << replica_info.port();
+ replicas.push_back(GenerateReplicaInfo(replica_info.id(),
replica_info.ip(),
+ replica_info.port()));
}
return replicas;
}
diff --git a/platform/config/resdb_poc_config.cpp
b/platform/config/resdb_poc_config.cpp
index 60e01860..37b10c4e 100644
--- a/platform/config/resdb_poc_config.cpp
+++ b/platform/config/resdb_poc_config.cpp
@@ -17,7 +17,6 @@
* under the License.
*/
-
#include "platform/config/resdb_poc_config.h"
namespace resdb {
diff --git a/platform/config/resdb_poc_config.h
b/platform/config/resdb_poc_config.h
index 86f6fbef..fb078855 100644
--- a/platform/config/resdb_poc_config.h
+++ b/platform/config/resdb_poc_config.h
@@ -17,7 +17,6 @@
* under the License.
*/
-
#pragma once
#include "platform/config/resdb_config.h"
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
index 2125b049..54dfa15b 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
@@ -28,13 +28,15 @@ namespace resdb {
CheckPointManager::CheckPointManager(const ResDBConfig& config,
ReplicaCommunicator* replica_communicator,
- SignatureVerifier* verifier)
+ SignatureVerifier* verifier,
+ SystemInfo* sys_info)
: config_(config),
replica_communicator_(replica_communicator),
verifier_(verifier),
stop_(false),
txn_accessor_(config),
- highest_prepared_seq_(0) {
+ highest_prepared_seq_(0),
+ sys_info_(sys_info) {
current_stable_seq_ = 0;
if (config_.GetConfigData().enable_viewchange()) {
config_.EnableCheckPoint(true);
@@ -64,6 +66,11 @@ void CheckPointManager::Stop() {
}
}
+void CheckPointManager::SetResetExecute(
+ std::function<void(uint64_t seq)> func) {
+ reset_execute_func_ = func;
+}
+
std::string GetHash(const std::string& h1, const std::string& h2) {
return SignatureVerifier::CalculateHash(h1 + h2);
}
@@ -109,6 +116,7 @@ int
CheckPointManager::ProcessCheckPoint(std::unique_ptr<Context> context,
}
uint64_t checkpoint_seq = checkpoint_data.seq();
uint32_t sender_id = request->sender_id();
+ LOG(ERROR) << " receive ckpt:" << checkpoint_seq << " from:" << sender_id;
int water_mark = config_.GetCheckPointWaterMark();
if (checkpoint_seq % water_mark) {
LOG(ERROR) << "checkpoint seq not invalid:" << checkpoint_seq;
@@ -162,11 +170,39 @@ bool CheckPointManager::Wait() {
void CheckPointManager::CheckHealthy() {
uint32_t current_time = time(nullptr);
- for(int i = 1; i <= config_.GetReplicaNum(); ++i) {
- if( current_time - last_update_time_[i] > replica_timeout_ ) {
+
+ std::map<uint64_t, int> seqs;
+
+ for (int i = 1; i <= config_.GetReplicaNum(); ++i) {
+ if (last_update_time_.find(i) == last_update_time_.end() ||
+ last_update_time_[i] == 0) {
+ continue;
+ }
+ LOG(ERROR) << " check healthy, replica:" << i
+ << " current time:" << current_time
+ << " last time:" << last_update_time_[i]
+ << " timeout:" << replica_timeout_
+ << " pass:" << current_time - last_update_time_[i];
+ if (current_time - last_update_time_[i] > replica_timeout_) {
TimeoutHandler(i);
}
+ seqs[status_[i]]++;
}
+
+ uint64_t unstable_check_ckpt = 0;
+ for (auto it : seqs) {
+ int num = 0;
+ for (auto sit : seqs) {
+ if (sit.first < it.first) {
+ continue;
+ }
+ num += sit.second;
+ }
+ if (num >= config_.GetMinDataReceiveNum()) {
+ unstable_check_ckpt = std::max(unstable_check_ckpt, it.first);
+ }
+ }
+ SetUnstableCkpt(unstable_check_ckpt);
}
void CheckPointManager::UpdateStableCheckPointStatus() {
@@ -197,8 +233,11 @@ void CheckPointManager::UpdateStableCheckPointStatus() {
new_data_ = 0;
}
- // LOG(ERROR) << "current stable seq:" << current_stable_seq_
- // << " stable seq:" << stable_seq;
+ LOG(ERROR) << "current stable seq:" << current_stable_seq_
+ << " stable seq:" << stable_seq;
+ if (stable_seq == 0) {
+ continue;
+ }
std::vector<SignatureInfo> votes;
if (current_stable_seq_ < stable_seq) {
std::lock_guard<std::mutex> lk(mutex_);
@@ -223,9 +262,6 @@ void CheckPointManager::UpdateStableCheckPointStatus() {
*stable_ckpt_.add_signatures() = vote;
}
current_stable_seq_ = stable_seq;
- // LOG(INFO) << "done. stable seq:" << current_stable_seq_
- // << " votes:" << stable_ckpt_.DebugString();
- // LOG(INFO) << "done. stable seq:" << current_stable_seq_;
}
UpdateStableCheckPointCallback(current_stable_seq_);
}
@@ -248,7 +284,14 @@ void CheckPointManager::TimeoutHandler(uint32_t replica) {
}
}
-void CheckPointManager::SetLastCommit(uint64_t seq) { last_seq_ = seq; }
+void CheckPointManager::SetLastCommit(uint64_t seq) {
+ LOG(ERROR) << " set last commit:" << seq;
+ last_seq_ = seq;
+ std::lock_guard<std::mutex> lk(lt_mutex_);
+ committed_status_.clear();
+}
+
+uint64_t CheckPointManager::GetLastCommit() { return last_seq_; }
int CheckPointManager::ProcessStatusSync(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
@@ -259,9 +302,14 @@ int
CheckPointManager::ProcessStatusSync(std::unique_ptr<Context> context,
}
uint64_t seq = checkpoint_data.seq();
uint32_t sender_id = request->sender_id();
+ uint32_t primary_id = checkpoint_data.primary_id();
+ uint32_t view = checkpoint_data.view();
+
status_[sender_id] = seq;
last_update_time_[sender_id] = time(nullptr);
- LOG(ERROR) << " received from :" << sender_id << " commit status:" << seq;
+ view_status_[sender_id] = std::make_pair(primary_id, view);
+ LOG(ERROR) << " received from :" << sender_id << " commit status:" << seq
+ << " primary:" << primary_id << " view:" << view;
return 0;
}
@@ -277,12 +325,37 @@ void CheckPointManager::CheckStatus(uint64_t last_seq) {
if (seqs.size() <= f + 1) {
return;
}
- uint64_t min_seq = seqs[f + 1];
+ // uint64_t min_seq = seqs[f + 1];
+ uint64_t min_seq = seqs.back();
LOG(ERROR) << " check last seq:" << last_seq << " max seq:" << min_seq;
if (last_seq < min_seq) {
// need recovery from others
- BroadcastRecovery(last_seq + 1, std::min(min_seq, last_seq + 100));
+ reset_execute_func_(last_seq + 1);
+ BroadcastRecovery(last_seq + 1, std::min(min_seq, last_seq + 500));
+ }
+}
+
+void CheckPointManager::CheckSysStatus() {
+ int f = config_.GetMaxMaliciousReplicaNum();
+
+ std::map<std::pair<int, uint64_t>, int> views;
+ int current_primary = 0;
+ uint64_t current_view = 0;
+ for (auto it : view_status_) {
+ views[it.second]++;
+ if (views[it.second] >= 2 * f + 1) {
+ current_primary = it.second.first;
+ current_view = it.second.second;
+ }
+ }
+
+ if (current_primary > 0 && current_primary != sys_info_->GetPrimaryId() &&
+ current_view > sys_info_->GetCurrentView()) {
+ sys_info_->SetCurrentView(current_view);
+ sys_info_->SetPrimary(current_primary);
+ LOG(ERROR) << " change to primary:" << current_primary
+ << " view:" << current_view;
}
}
@@ -296,15 +369,21 @@ void CheckPointManager::SyncStatus() {
std::unique_ptr<Request> checkpoint_request = NewRequest(
Request::TYPE_STATUS_SYNC, Request(), config_.GetSelfInfo().id());
checkpoint_data.set_seq(last_seq);
+ checkpoint_data.set_view(sys_info_->GetCurrentView());
+ checkpoint_data.set_primary_id(sys_info_->GetPrimaryId());
checkpoint_data.SerializeToString(checkpoint_request->mutable_data());
replica_communicator_->BroadCast(*checkpoint_request);
LOG(ERROR) << " sync status last seq:" << last_seq
- << " last time:" << last_time;
- if (last_check_seq == last_seq && last_time > 300) {
+ << " last time:" << last_time
+ << " primary:" << sys_info_->GetPrimaryId()
+ << " view:" << sys_info_->GetCurrentView();
+ if (last_check_seq == last_seq && last_time > 5) {
CheckStatus(last_seq);
last_time = 0;
}
+ CheckSysStatus();
+
if (last_seq != last_check_seq) {
last_check_seq = last_seq;
last_time = 0;
@@ -325,6 +404,8 @@ void CheckPointManager::UpdateCheckPointStatus() {
while (!stop_) {
std::unique_ptr<Request> request = nullptr;
if (!pendings.empty()) {
+ LOG(ERROR) << " last seq:" << last_seq_
+ << " pending:" << pendings.begin()->second->seq();
if (pendings.begin()->second->seq() == last_seq_ + 1) {
request = std::move(pendings.begin()->second);
pendings.erase(pendings.begin());
@@ -338,8 +419,8 @@ void CheckPointManager::UpdateCheckPointStatus() {
}
std::string hash_ = request->hash();
uint64_t current_seq = request->seq();
- // LOG(ERROR) << "update checkpoint seq :" << last_seq_ << " current:" <<
- // current_seq;
+ LOG(ERROR) << "update checkpoint seq :" << last_seq_
+ << " current:" << current_seq;
if (current_seq != last_seq_ + 1) {
LOG(ERROR) << "seq invalid:" << last_seq_ << " current:" << current_seq;
if (current_seq > last_seq_ + 1) {
@@ -354,13 +435,13 @@ void CheckPointManager::UpdateCheckPointStatus() {
}
bool is_recovery = request->is_recovery();
- if (current_seq == last_ckpt_seq + water_mark) {
+ LOG(ERROR) << " current seq:" << current_seq << " water mark:" <<
water_mark
+ << " current stable seq:" << current_stable_seq_;
+ if (current_seq > 0 && current_seq % water_mark == 0) {
last_ckpt_seq = current_seq;
- if (!is_recovery) {
- BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs,
- stable_seqs);
- }
+ BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs,
stable_seqs);
}
+ ClearCommittedStatus(current_seq);
}
return;
}
@@ -411,10 +492,12 @@ CheckPointManager::PopStableSeqHash() {
uint64_t CheckPointManager::GetHighestPreparedSeq() {
std::lock_guard<std::mutex> lk(lt_mutex_);
+ LOG(ERROR) << "get high prepared seq:" << highest_prepared_seq_;
return highest_prepared_seq_;
}
void CheckPointManager::SetHighestPreparedSeq(uint64_t seq) {
+ LOG(ERROR) << "set high prepared seq:" << seq;
std::lock_guard<std::mutex> lk(lt_mutex_);
highest_prepared_seq_ = seq;
}
@@ -429,6 +512,45 @@ uint64_t CheckPointManager::GetCommittableSeq() {
return committable_seq_;
}
+void CheckPointManager::SetUnstableCkpt(uint64_t unstable_check_ckpt) {
+ LOG(ERROR) << " set unstable ckpt:" << unstable_check_ckpt;
+ {
+ std::lock_guard<std::mutex> lk(lt_mutex_);
+ unstable_check_ckpt_ = unstable_check_ckpt;
+ }
+}
+
+uint64_t CheckPointManager::GetUnstableCkpt() {
+ std::lock_guard<std::mutex> lk(lt_mutex_);
+ LOG(ERROR) << " get unstable ckpt:" << unstable_check_ckpt_;
+ return unstable_check_ckpt_;
+}
+
+void CheckPointManager::AddCommitState(uint64_t seq) {
+ LOG(ERROR) << " add commited state:" << seq;
+ std::lock_guard<std::mutex> lk(lt_mutex_);
+ committed_status_[seq] = true;
+}
+
+bool CheckPointManager::IsCommitted(uint64_t seq) {
+ std::lock_guard<std::mutex> lk(lt_mutex_);
+ if (seq < last_seq_) {
+ return true;
+ }
+ return committed_status_.find(seq) != committed_status_.end();
+}
+
+void CheckPointManager::ClearCommittedStatus(uint64_t seq) {
+ std::lock_guard<std::mutex> lk(lt_mutex_);
+ while (!committed_status_.empty()) {
+ if (committed_status_.begin()->first <= seq) {
+ committed_status_.erase(committed_status_.begin());
+ } else {
+ break;
+ }
+ }
+}
+
// void CheckPointManager::SetLastExecutedSeq(uint64_t latest_executed_seq){
// latest_executed_seq = executor_->get_latest_executed_seq();
// }
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.h
b/platform/consensus/ordering/pbft/checkpoint_manager.h
index d1c33aa4..8a916385 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.h
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.h
@@ -38,10 +38,11 @@ class CheckPointManager : public CheckPoint {
public:
CheckPointManager(const ResDBConfig& config,
ReplicaCommunicator* replica_communicator,
- SignatureVerifier* verifier);
+ SignatureVerifier* verifier, SystemInfo* sys_info);
virtual ~CheckPointManager();
void SetLastCommit(uint64_t seq);
+ uint64_t GetLastCommit();
void AddCommitData(std::unique_ptr<Request> request);
int ProcessCheckPoint(std::unique_ptr<Context> context,
@@ -76,6 +77,17 @@ class CheckPointManager : public CheckPoint {
uint64_t GetCommittableSeq();
+ void SetUnstableCkpt(uint64_t unstable_check_ckpt);
+
+ uint64_t GetUnstableCkpt();
+
+ void AddCommitState(uint64_t seq);
+
+ bool IsCommitted(uint64_t seq);
+ void ClearCommittedStatus(uint64_t seq);
+
+ void SetResetExecute(std::function<void(uint64_t seq)>);
+
private:
void UpdateCheckPointStatus();
void UpdateStableCheckPointStatus();
@@ -90,6 +102,7 @@ class CheckPointManager : public CheckPoint {
void SyncStatus();
void StatusProcess();
void CheckStatus(uint64_t last_seq);
+ void CheckSysStatus();
void CheckHealthy();
protected:
@@ -124,8 +137,13 @@ class CheckPointManager : public CheckPoint {
std::string last_hash_, committable_hash_;
sem_t committable_seq_signal_;
std::map<int, uint64_t> status_;
- std::map<int,int> last_update_time_;
- int replica_timeout_ = 120;
+ std::map<int, int> last_update_time_;
+ int replica_timeout_ = 60;
+ uint64_t unstable_check_ckpt_;
+ std::map<int, uint64_t> committed_status_;
+ std::function<void(uint64_t)> reset_execute_func_;
+ SystemInfo* sys_info_;
+ std::map<int, std::pair<int, uint64_t>> view_status_;
};
} // namespace resdb
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
b/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
index f302e819..0f7ebc88 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
@@ -48,7 +48,7 @@ class MyCheckPointManager : public CheckPointManager {
ReplicaCommunicator* replica_communicator,
SignatureVerifier* verifier,
std::function<void(int64_t)> call_back = nullptr)
- : CheckPointManager(config, replica_communicator, verifier),
+ : CheckPointManager(config, replica_communicator, verifier, &sys_info_),
call_back_(call_back) {}
void UpdateStableCheckPointCallback(int64_t stable_checkpoint) {
@@ -57,6 +57,7 @@ class MyCheckPointManager : public CheckPointManager {
}
}
std::function<void(int64_t)> call_back_;
+ SystemInfo sys_info_;
};
ResConfigData GetConfigData() {
@@ -106,7 +107,8 @@ class CheckPointManagerTest : public Test {
TEST_F(CheckPointManagerTest, SendCheckPoint) {
config_.SetViewchangeCommitTimeout(100);
- CheckPointManager manager(config_, &replica_communicator_, nullptr);
+ SystemInfo sys_info;
+ CheckPointManager manager(config_, &replica_communicator_, nullptr,
&sys_info);
for (int i = 1; i <= 5; ++i) {
std::unique_ptr<Request> request = std::make_unique<Request>();
@@ -132,7 +134,8 @@ TEST_F(CheckPointManagerTest, SendCheckPointOnce) {
propose_done.set_value(true);
}));
- CheckPointManager manager(config_, &replica_communicator_, nullptr);
+ SystemInfo sys_info;
+ CheckPointManager manager(config_, &replica_communicator_, nullptr,
&sys_info);
for (int i = 1; i <= 5; ++i) {
std::unique_ptr<Request> request = std::make_unique<Request>();
request->set_seq(i);
@@ -159,7 +162,8 @@ TEST_F(CheckPointManagerTest, SendCheckPointTwo) {
}
}));
- CheckPointManager manager(config_, &replica_communicator_, nullptr);
+ SystemInfo sys_info;
+ CheckPointManager manager(config_, &replica_communicator_, nullptr,
&sys_info);
std::unique_ptr<Request> request = std::make_unique<Request>();
for (int i = 1; i <= 5; ++i) {
std::unique_ptr<Request> request = std::make_unique<Request>();
@@ -255,7 +259,9 @@ TEST_F(CheckPointManagerTest, Votes) {
std::promise<bool> propose_done;
std::future<bool> propose_done_future = propose_done.get_future();
- CheckPointManager manager(config_, &replica_communicator_, &mock_verifier);
+
+ SystemInfo sys_info;
+ CheckPointManager manager(config_, &replica_communicator_, &mock_verifier,
&sys_info);
EXPECT_CALL(replica_communicator_, BroadCast)
.WillRepeatedly(Invoke([&](const google::protobuf::Message& message) {
for (int i = 1; i <= 3; ++i) {
diff --git a/platform/consensus/ordering/pbft/commitment.cpp
b/platform/consensus/ordering/pbft/commitment.cpp
index 914fc623..463d0161 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -97,13 +97,6 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context>
context,
return -3;
}
- /*
- if(SignatureVerifier::CalculateHash(user_request->data()) !=
- user_request->hash()){ LOG(ERROR) << "the hash and data of the user request
- don't match, reject"; return -2;
- }
- */
-
// check signatures
bool valid = verifier_->VerifyMessage(user_request->data(),
user_request->data_signature());
@@ -128,6 +121,7 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context>
context,
// Artificially make the primary stop proposing new trasactions.
if (!seq.ok()) {
+ LOG(ERROR) << " seq fail";
duplicate_manager_->EraseProposed(user_request->hash());
global_stats_->SeqFail();
Request request;
@@ -167,12 +161,37 @@ int
Commitment::ProcessProposeMsg(std::unique_ptr<Context> context,
if (message_manager_->GetNextSeq() == 0 ||
request->seq() == message_manager_->GetNextSeq()) {
message_manager_->SetNextSeq(request->seq() + 1);
- } else {
+ while (!pending_recovery_.empty()) {
+ LOG(ERROR) << " pending size:" << pending_recovery_.size()
+ << " first:" << pending_recovery_.begin()->first
+ << " next:" << message_manager_->GetNextSeq();
+ if (pending_recovery_.begin()->first <=
+ message_manager_->GetNextSeq()) {
+ if (pending_recovery_.begin()->first ==
+ message_manager_->GetNextSeq()) {
+ message_manager_->SetNextSeq(pending_recovery_.begin()->first + 1);
+ message_manager_->AddConsensusMsg(
+ pending_recovery_.begin()->second.first->signature,
+ std::move(pending_recovery_.begin()->second.second));
+ }
+ pending_recovery_.erase(pending_recovery_.begin());
+ } else {
+ break;
+ }
+ }
+
+ } else if (request->seq() > message_manager_->GetNextSeq()) {
+ uint64_t seq = request->seq();
+ pending_recovery_[seq] =
+ std::make_pair(std::move(context), std::move(request));
+ return 0;
+ } else if (!request->force_recovery()) {
LOG(ERROR) << " recovery request not valid:"
<< " current seq:" << message_manager_->GetNextSeq()
<< " data seq:" << request->seq();
return 0;
}
+ request->set_force_recovery(false);
return message_manager_->AddConsensusMsg(context->signature,
std::move(request));
}
@@ -183,14 +202,6 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context>
context,
return -2;
}
- /*
- if(request->hash() != "null" + std::to_string(request->seq())
- && SignatureVerifier::CalculateHash(request->data()) !=
request->hash())
- { LOG(ERROR) << "the hash and data of the request don't match, reject";
- return -2;
- }
- */
-
if (request->sender_id() != config_.GetSelfInfo().id()) {
if (pre_verify_func_ && !pre_verify_func_(*request)) {
LOG(ERROR) << " check by the user func fail";
@@ -242,8 +253,15 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context>
context,
return -2;
}
if (request->is_recovery()) {
- return message_manager_->AddConsensusMsg(context->signature,
- std::move(request));
+ uint64_t seq = request->seq();
+ CollectorResultCode ret = message_manager_->AddConsensusMsg(
+ context->signature, std::move(request));
+ if (ret == CollectorResultCode::STATE_CHANGED) {
+ if (message_manager_->GetHighestPreparedSeq() < seq) {
+ message_manager_->SetHighestPreparedSeq(seq);
+ }
+ }
+ return ret;
}
// global_stats_->IncPrepare();
std::unique_ptr<Request> commit_request = resdb::NewRequest(
@@ -252,12 +270,12 @@ int
Commitment::ProcessPrepareMsg(std::unique_ptr<Context> context,
// Add request to message_manager.
// If it has received enough same requests(2f+1), broadcast the commit
// message.
- uint64_t seq_ = request->seq();
+ uint64_t seq = request->seq();
CollectorResultCode ret =
message_manager_->AddConsensusMsg(context->signature,
std::move(request));
if (ret == CollectorResultCode::STATE_CHANGED) {
- if (message_manager_->GetHighestPreparedSeq() < seq_) {
- message_manager_->SetHighestPreparedSeq(seq_);
+ if (message_manager_->GetHighestPreparedSeq() < seq) {
+ message_manager_->SetHighestPreparedSeq(seq);
}
// If need qc, sign the data
if (need_qc_ && verifier_) {
@@ -284,6 +302,7 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context>
context,
<< " context:" << (context == nullptr);
return -2;
}
+ uint64_t seq = request->seq();
if (request->is_recovery()) {
return message_manager_->AddConsensusMsg(context->signature,
std::move(request));
@@ -319,7 +338,7 @@ int Commitment::PostProcessExecutedMsg() {
request.set_current_view(batch_resp->current_view());
request.set_proxy_id(batch_resp->proxy_id());
request.set_primary_id(batch_resp->primary_id());
- // LOG(ERROR)<<"send back to proxy:"<<batch_resp->proxy_id();
+ LOG(ERROR) << "send back to proxy:" << batch_resp->proxy_id();
batch_resp->SerializeToString(request.mutable_data());
replica_communicator_->SendMessage(request, request.proxy_id());
}
diff --git a/platform/consensus/ordering/pbft/commitment.h
b/platform/consensus/ordering/pbft/commitment.h
index 03d77cf3..53708fca 100644
--- a/platform/consensus/ordering/pbft/commitment.h
+++ b/platform/consensus/ordering/pbft/commitment.h
@@ -73,6 +73,9 @@ class Commitment {
bool need_qc_ = false;
std::mutex mutex_;
+ std::map<uint64_t,
+ std::pair<std::unique_ptr<Context>, std::unique_ptr<Request>>>
+ pending_recovery_;
std::unique_ptr<DuplicateManager> duplicate_manager_;
};
diff --git a/platform/consensus/ordering/pbft/commitment_test.cpp
b/platform/consensus/ordering/pbft/commitment_test.cpp
index 1b8b24a7..156362e6 100644
--- a/platform/consensus/ordering/pbft/commitment_test.cpp
+++ b/platform/consensus/ordering/pbft/commitment_test.cpp
@@ -58,7 +58,7 @@ class CommitmentTest : public Test {
global_stats_(Stats::GetGlobalStats(1)),
config_(GenerateConfig()),
system_info_(config_),
- checkpoint_manager_(config_, &replica_communicator_, &verifier_),
+ checkpoint_manager_(config_, &replica_communicator_, &verifier_,
&system_info_),
message_manager_(std::make_unique<MessageManager>(
config_, nullptr, &checkpoint_manager_, &system_info_)),
commitment_(
diff --git a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
index 7425e3c7..d7b38807 100644
--- a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
+++ b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
@@ -32,7 +32,8 @@ ConsensusManagerPBFT::ConsensusManagerPBFT(
: ConsensusManager(config),
system_info_(std::make_unique<SystemInfo>(config)),
checkpoint_manager_(std::make_unique<CheckPointManager>(
- config, GetBroadCastClient(), GetSignatureVerifier())),
+ config, GetBroadCastClient(), GetSignatureVerifier(),
+ system_info_.get())),
message_manager_(std::make_unique<MessageManager>(
config, std::move(executor), checkpoint_manager_.get(),
system_info_.get())),
@@ -65,6 +66,8 @@ ConsensusManagerPBFT::ConsensusManagerPBFT(
recovery_->ReadLogs(
[&](const SystemInfoData& data) {
+ LOG(ERROR) << " read data info:" << data.view()
+ << " primary:" << data.primary_id();
system_info_->SetCurrentView(data.view());
system_info_->SetPrimary(data.primary_id());
},
@@ -72,6 +75,7 @@ ConsensusManagerPBFT::ConsensusManagerPBFT(
return InternalConsensusCommit(std::move(context), std::move(request));
},
[&](int seq) { message_manager_->SetNextCommitSeq(seq + 1); });
+ LOG(ERROR) << " recovery is done";
}
void ConsensusManagerPBFT::SetNeedCommitQC(bool need_qc) {
@@ -79,6 +83,7 @@ void ConsensusManagerPBFT::SetNeedCommitQC(bool need_qc) {
}
void ConsensusManagerPBFT::Start() {
+ LOG(ERROR) << " ======= start";
ConsensusManager::Start();
recovery_thread_ =
std::thread(&ConsensusManagerPBFT::RemoteRecoveryProcess, this);
@@ -143,8 +148,9 @@ ConsensusManagerPBFT::PopComplainedRequest() {
// The implementation of PBFT.
int ConsensusManagerPBFT::ConsensusCommit(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
- // LOG(INFO) << "recv impl type:" << request->type() << " "
- // << "sender id:" << request->sender_id();
+ LOG(INFO) << "recv impl type:" << request->type() << " "
+ << "sender id:" << request->sender_id()
+ << " primary:" << system_info_->GetPrimaryId();
// If it is in viewchange, push the request to the queue
// for the requests from the new view which come before
// the local new view done.
@@ -190,8 +196,11 @@ int
ConsensusManagerPBFT::ConsensusCommit(std::unique_ptr<Context> context,
int ConsensusManagerPBFT::InternalConsensusCommit(
std::unique_ptr<Context> context, std::unique_ptr<Request> request) {
- // LOG(ERROR) << "recv impl type:" << request->type() << " "
- // << "sender id:" << request->sender_id()<<" seq:"<<request->seq();
+ LOG(ERROR) << "recv impl type:" << request->type() << " "
+ << "sender id:" << request->sender_id()
+ << " seq:" << request->seq()
+ << " primary:" << system_info_->GetPrimaryId()
+ << " is convery:" << request->is_recovery();
switch (request->type()) {
case Request::TYPE_CLIENT_REQUEST:
@@ -284,7 +293,11 @@ int ConsensusManagerPBFT::ProcessRecoveryData(
return -2;
}
LOG(ERROR) << " obtain min seq:" << recovery_data.min_seq()
- << " max seq:" << recovery_data.max_seq();
+ << " max seq:" << recovery_data.max_seq()
+ << " from:" << request->sender_id();
+ if (request->sender_id() == config_.GetSelfInfo().id()) {
+ return 0;
+ }
RecoveryResponse response;
int ret = recovery_->GetData(recovery_data, response);
if (ret) {
@@ -334,23 +347,27 @@ void ConsensusManagerPBFT::RemoteRecoveryProcess() {
for (int i = 0; i < recovery_data.request().size(); i++) {
uint64_t seq = recovery_data.request(i).seq();
int type = recovery_data.request(i).type();
- if (data.find(seq) != data.end()) {
- // LOG(ERROR)<<" check recovery remote seq:"<<seq<<" type:"<<type<<"
has
- // been recovered.";
+ if (checkpoint_manager_->IsCommitted(seq)) {
+ LOG(ERROR) << " check recovery remote seq:" << seq << " type:" << type
+ << " has been recovered.";
+ continue;
+ }
+
+ uint64_t last_seq = checkpoint_manager_->GetLastCommit();
+ if (seq - last_seq > 1000) {
+ LOG(ERROR) << " check seq:" << seq << " last:" << last_seq
+ << " data is missing, skip.";
continue;
}
auto context = std::make_unique<Context>();
context->signature = recovery_data.signature(i);
auto request = std::make_unique<Request>(recovery_data.request(i));
- // write to log
- // recovery_->AddRequest(context.get(), request.get());
- InternalConsensusCommit(std::move(context), std::move(request));
- }
+ view_change_manager_->SetCurrentViewAndNewPrimary(
+ request->current_view());
+ request->set_force_recovery(true);
- for (int i = 0; i < recovery_data.request().size(); i++) {
- uint64_t seq = recovery_data.request(i).seq();
- data.insert(seq);
+ InternalConsensusCommit(std::move(context), std::move(request));
}
}
}
diff --git a/platform/consensus/ordering/pbft/lock_free_collector_pool.cpp
b/platform/consensus/ordering/pbft/lock_free_collector_pool.cpp
index bf335c17..8aff2e1d 100644
--- a/platform/consensus/ordering/pbft/lock_free_collector_pool.cpp
+++ b/platform/consensus/ordering/pbft/lock_free_collector_pool.cpp
@@ -68,8 +68,8 @@ void LockFreeCollectorPool::Update(uint64_t seq) {
LOG(ERROR) << "seq not match, skip update:" << seq;
return;
}
- // LOG(ERROR)<<" update:"<<(idx^capacity_)<<" seq:"<<seq+capacity_<<
- // " cap:"<<capacity_<<" update seq:"<<seq;
+ LOG(ERROR) << " update:" << (idx ^ capacity_) << " seq:" << seq + capacity_
+ << " cap:" << capacity_ << " update seq:" << seq;
collector_[idx ^ capacity_] = std::make_unique<TransactionCollector>(
seq + capacity_, executor_, enable_viewchange_);
}
diff --git a/platform/consensus/ordering/pbft/message_manager.cpp
b/platform/consensus/ordering/pbft/message_manager.cpp
index 743f7aeb..7b418f4a 100644
--- a/platform/consensus/ordering/pbft/message_manager.cpp
+++ b/platform/consensus/ordering/pbft/message_manager.cpp
@@ -63,6 +63,8 @@ MessageManager::MessageManager(
transaction_executor_->SetSeqUpdateNotifyFunc(
[&](uint64_t seq) { collector_pool_->Update(seq - 1); });
checkpoint_manager_->SetExecutor(transaction_executor_.get());
+ checkpoint_manager_->SetResetExecute(
+ [&](uint64_t seq) { SetNextCommitSeq(seq); });
}
MessageManager::~MessageManager() {
@@ -84,6 +86,7 @@ uint64_t MessageManager ::GetCurrentView() const {
}
void MessageManager::SetNextSeq(uint64_t seq) {
+ LOG(ERROR) << "set next old seq:" << next_seq_;
next_seq_ = seq;
LOG(ERROR) << "set next seq:" << next_seq_;
}
@@ -158,7 +161,6 @@ bool MessageManager::MayConsensusChangeStatus(
return status->compare_exchange_strong(
old_status, TransactionStatue::READY_EXECUTE,
std::memory_order_acq_rel, std::memory_order_acq_rel);
- return true;
}
break;
}
@@ -174,6 +176,7 @@ bool MessageManager::MayConsensusChangeStatus(
CollectorResultCode MessageManager::AddConsensusMsg(
const SignatureInfo& signature, std::unique_ptr<Request> request) {
if (request == nullptr || !IsValidMsg(*request)) {
+ LOG(ERROR) << " msg not invalid";
return CollectorResultCode::INVALID;
}
@@ -181,6 +184,10 @@ CollectorResultCode MessageManager::AddConsensusMsg(
uint64_t seq = request->seq();
int resp_received_count = 0;
int proxy_id = request->proxy_id();
+ if (checkpoint_manager_->IsCommitted(seq)) {
+ LOG(ERROR) << " seq:" << seq << " type:" << type << " has been committed";
+ return CollectorResultCode::STATE_CHANGED;
+ }
int ret = collector_pool_->GetCollector(seq)->AddRequest(
std::move(request), signature, type == Request::TYPE_PRE_PREPARE,
@@ -194,9 +201,15 @@ CollectorResultCode MessageManager::AddConsensusMsg(
if (ret == 1) {
SetLastCommittedTime(proxy_id);
} else if (ret != 0) {
+ LOG(ERROR) << " add request fail";
return CollectorResultCode::INVALID;
}
if (resp_received_count > 0) {
+ if (type == Request::TYPE_COMMIT) {
+ if (checkpoint_manager_) {
+ checkpoint_manager_->AddCommitState(seq);
+ }
+ }
return CollectorResultCode::STATE_CHANGED;
}
return CollectorResultCode::OK;
@@ -206,10 +219,6 @@ std::vector<RequestInfo>
MessageManager::GetPreparedProof(uint64_t seq) {
return collector_pool_->GetCollector(seq)->GetPreparedProof();
}
-TransactionStatue MessageManager::GetTransactionState(uint64_t seq) {
- return collector_pool_->GetCollector(seq)->GetStatus();
-}
-
int MessageManager::GetReplicaState(ReplicaState* state) {
*state->mutable_replica_config() = config_.GetConfigData();
return 0;
@@ -220,9 +229,11 @@ Storage* MessageManager::GetStorage() {
}
void MessageManager::SetNextCommitSeq(int seq) {
+ LOG(ERROR) << " set next commit seq:" << seq;
SetNextSeq(seq);
+ SetHighestPreparedSeq(seq);
collector_pool_->Reset(seq);
- checkpoint_manager_->SetLastCommit(seq);
+ checkpoint_manager_->SetLastCommit(seq - 1);
return transaction_executor_->SetPendingExecutedSeq(seq);
}
diff --git a/platform/consensus/ordering/pbft/message_manager.h
b/platform/consensus/ordering/pbft/message_manager.h
index a835be28..0e99add9 100644
--- a/platform/consensus/ordering/pbft/message_manager.h
+++ b/platform/consensus/ordering/pbft/message_manager.h
@@ -76,7 +76,6 @@ class MessageManager {
// if the request has been prepared, having received 2f+1
// pre-prepare messages.
std::vector<RequestInfo> GetPreparedProof(uint64_t seq);
- TransactionStatue GetTransactionState(uint64_t seq);
void SetNextCommitSeq(int seq);
diff --git a/platform/consensus/ordering/pbft/performance_manager.cpp
b/platform/consensus/ordering/pbft/performance_manager.cpp
index 46a3b587..f7ea4562 100644
--- a/platform/consensus/ordering/pbft/performance_manager.cpp
+++ b/platform/consensus/ordering/pbft/performance_manager.cpp
@@ -195,7 +195,8 @@ CollectorResultCode PerformanceManager::AddResponseMsg(
std::unique_ptr<BatchUserResponse> batch_response =
std::make_unique<BatchUserResponse>();
- if (!batch_response->ParseFromString(request->data())) {
+ if (!batch_response->ParseFromString(request->data()) ||
+ request->seq() == 0) {
LOG(ERROR) << "parse response fail:" << request->data().size()
<< " seq:" << request->seq();
return CollectorResultCode::INVALID;
diff --git a/platform/consensus/ordering/pbft/query_test.cpp
b/platform/consensus/ordering/pbft/query_test.cpp
index 25f276b1..b8639c6f 100644
--- a/platform/consensus/ordering/pbft/query_test.cpp
+++ b/platform/consensus/ordering/pbft/query_test.cpp
@@ -59,7 +59,7 @@ class QueryTest : public Test {
: global_stats_(Stats::GetGlobalStats(1)),
config_(GenerateConfig()),
system_info_(config_),
- checkpoint_manager_(config_, &replica_communicator_, nullptr),
+ checkpoint_manager_(config_, &replica_communicator_, nullptr,
&system_info_),
message_manager_(config_, nullptr, &checkpoint_manager_,
&system_info_),
recovery_(config_, &checkpoint_manager_, &system_info_, nullptr),
query_(config_, &recovery_),
diff --git a/platform/consensus/ordering/pbft/response_manager.cpp
b/platform/consensus/ordering/pbft/response_manager.cpp
index f490c003..1143f232 100644
--- a/platform/consensus/ordering/pbft/response_manager.cpp
+++ b/platform/consensus/ordering/pbft/response_manager.cpp
@@ -184,8 +184,17 @@ CollectorResultCode ResponseManager::AddResponseMsg(
return CollectorResultCode::INVALID;
}
+ LOG(ERROR) << " receive request seq:" << request->seq()
+ << " type:" << request->type()
+ << " local id:" << batch_response->local_id();
uint64_t seq = batch_response->local_id();
request->set_seq(seq);
+ if (seq == 0) {
+ LOG(ERROR) << " local id is invalid:" << seq
+ << " request seq:" << request->seq()
+ << " type:" << request->type();
+ return CollectorResultCode::INVALID;
+ }
int type = request->type();
int resp_received_count = 0;
@@ -202,6 +211,7 @@ CollectorResultCode ResponseManager::AddResponseMsg(
if (ret != 0) {
return CollectorResultCode::INVALID;
}
+ LOG(ERROR) << " get receive count:" << resp_received_count << " seq:" << seq;
if (resp_received_count > 0) {
collector_pool_->Update(seq);
RemoveWaitingResponseRequest(hash);
@@ -336,8 +346,8 @@ int ResponseManager::DoBatch(
new_request->set_proxy_id(config_.GetSelfInfo().id());
replica_communicator_->SendMessage(*new_request, GetPrimary());
send_num_++;
- // LOG(INFO) << "send msg to primary:" << GetPrimary()
- // << " batch size:" << batch_req.size();
+ LOG(INFO) << "send msg to primary:" << GetPrimary()
+ << " batch size:" << batch_req.size();
AddWaitingResponseRequest(std::move(new_request));
return 0;
}
diff --git a/platform/consensus/ordering/pbft/transaction_collector.cpp
b/platform/consensus/ordering/pbft/transaction_collector.cpp
index accb0536..d5c52e6d 100644
--- a/platform/consensus/ordering/pbft/transaction_collector.cpp
+++ b/platform/consensus/ordering/pbft/transaction_collector.cpp
@@ -205,7 +205,7 @@ int TransactionCollector::Commit() {
auto main_request = atomic_mian_request_.Reference();
if (main_request == nullptr) {
- LOG(ERROR) << "no main";
+ LOG(ERROR) << "no main:" << seq_;
return -2;
}
diff --git a/platform/consensus/ordering/pbft/viewchange_manager.cpp
b/platform/consensus/ordering/pbft/viewchange_manager.cpp
index 8ff17cbe..2dd24117 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager.cpp
+++ b/platform/consensus/ordering/pbft/viewchange_manager.cpp
@@ -87,8 +87,8 @@ ViewChangeManager::ViewChangeManager(const ResDBConfig&
config,
if (config_.GetConfigData().enable_viewchange()) {
collector_pool_ = message_manager->GetCollectorPool();
sem_init(&viewchange_timer_signal_, 0, 0);
- server_checking_timeout_thread_ =
- std::thread(&ViewChangeManager::MonitoringViewChangeTimeOut, this);
+ // server_checking_timeout_thread_ =
+ // std::thread(&ViewChangeManager::MonitoringViewChangeTimeOut, this);
checkpoint_state_thread_ =
std::thread(&ViewChangeManager::MonitoringCheckpointState, this);
}
@@ -120,7 +120,7 @@ void ViewChangeManager::MayStart() {
}
checkpoint_manager_->SetTimeoutHandler([&](int replica_id) {
- if( system_info_->GetPrimaryId() != replica_id ) {
+ if (system_info_->GetPrimaryId() != replica_id) {
return;
}
@@ -175,37 +175,6 @@ bool ViewChangeManager::IsValidViewChangeMsg(
return false;
}
- if (!checkpoint_manager_->IsValidCheckpointProof(
- view_change_message.stable_ckpt())) {
- LOG(ERROR) << "stable checkpoint invalid";
- return false;
- }
-
- uint64_t stable_seq = view_change_message.stable_ckpt().seq();
-
- for (const auto& prepared_msg : view_change_message.prepared_msg()) {
- if (prepared_msg.seq() <= stable_seq) {
- continue;
- }
- // If there is less than 2f+1 proof, reject.
- if (prepared_msg.proof_size() < config_.GetMinDataReceiveNum()) {
- LOG(ERROR) << "proof[" << prepared_msg.proof_size()
- << "] not enough:" << config_.GetMinDataReceiveNum();
- return false;
- }
- for (const auto& proof : prepared_msg.proof()) {
- if (proof.request().seq() != prepared_msg.seq()) {
- LOG(ERROR) << "proof seq not match";
- return false;
- }
- std::string data;
- proof.request().SerializeToString(&data);
- if (!verifier_->VerifyMessage(data, proof.signature())) {
- LOG(ERROR) << "proof signature not valid";
- return false;
- }
- }
- }
return true;
}
@@ -232,7 +201,8 @@ void
ViewChangeManager::SetCurrentViewAndNewPrimary(uint64_t view_number) {
config_.GetReplicaInfos()[(view_number - 1) % replicas.size()].id();
system_info_->SetPrimary(id);
global_stats_->ChangePrimary(id);
- LOG(ERROR) << "View Change Happened";
+ LOG(ERROR) << "View Change Happened: primary:" << id
+ << " view:" << view_number;
}
std::vector<std::unique_ptr<Request>> ViewChangeManager::GetPrepareMsg(
@@ -241,51 +211,53 @@ std::vector<std::unique_ptr<Request>>
ViewChangeManager::GetPrepareMsg(
for (const auto& msg : new_view_message.viewchange_messages()) {
for (const auto& msg : msg.prepared_msg()) {
uint64_t seq = msg.seq();
- prepared_msg[seq] = msg.proof(0).request();
+ if (prepared_msg.find(seq) == prepared_msg.end()) {
+ prepared_msg[seq] = msg.proof(0).request();
+ } else if (prepared_msg[seq].type() < msg.proof(0).request().type()) {
+ prepared_msg[seq] = msg.proof(0).request();
+ }
}
}
uint64_t min_s = std::numeric_limits<uint64_t>::max();
+ uint64_t max_seq = 1;
for (const auto& msg : new_view_message.viewchange_messages()) {
min_s = std::min(min_s, msg.stable_ckpt().seq());
+ max_seq = std::max(max_seq, msg.stable_ckpt().seq());
}
- uint64_t max_seq = 1;
if (prepared_msg.size() > 0) {
max_seq = (--prepared_msg.end())->first;
}
- LOG(INFO) << "[GP] min_s: " << min_s << " max_seq: " << max_seq;
+ LOG(ERROR) << "[GP] min_s: " << min_s << " max_seq: " << max_seq;
std::vector<std::unique_ptr<Request>> redo_request;
// Resent all the request with the current view number.
for (auto i = min_s + 1; i <= max_seq; ++i) {
+ // for sequence hole, create a new request with empty data and
+ // sign by the new primary.
if (prepared_msg.find(i) == prepared_msg.end()) {
- // for sequence hole, create a new request with empty data and
- // sign by the new primary.
- std::unique_ptr<Request> user_request = resdb::NewRequest(
- Request::TYPE_PRE_PREPARE, Request(), config_.GetSelfInfo().id());
- user_request->set_seq(i);
- user_request->set_current_view(new_view_message.view_number());
- user_request->set_hash("null" + std::to_string(i));
-
- if (verifier_ && need_sign) {
- std::string data;
- auto signature_or = verifier_->SignMessage(data);
- if (!signature_or.ok()) {
- LOG(ERROR) << "Sign message fail";
- continue;
- }
- *user_request->mutable_data_signature() = *signature_or;
- }
- redo_request.push_back(std::move(user_request));
+ LOG(ERROR) << " seq:" << i << " not prepared, in new view";
} else {
- std::unique_ptr<Request> commit_request = resdb::NewRequest(
- Request::TYPE_COMMIT, prepared_msg[i], config_.GetSelfInfo().id());
- commit_request->set_seq(i);
- commit_request->set_current_view(new_view_message.view_number());
- redo_request.push_back(std::move(commit_request));
+ LOG(ERROR) << " seq:" << i
+ << " prepared, type:" << prepared_msg[i].type();
+ }
+ std::unique_ptr<Request> user_request = resdb::NewRequest(
+ Request::TYPE_PRE_PREPARE, Request(), config_.GetSelfInfo().id());
+ user_request->set_seq(i);
+ user_request->set_current_view(new_view_message.view_number());
+ user_request->set_hash("null" + std::to_string(i));
+ if (verifier_ && need_sign) {
+ std::string data;
+ auto signature_or = verifier_->SignMessage(data);
+ if (!signature_or.ok()) {
+ LOG(ERROR) << "Sign message fail";
+ continue;
+ }
+ *user_request->mutable_data_signature() = *signature_or;
}
+ redo_request.push_back(std::move(user_request));
}
return redo_request;
@@ -298,6 +270,11 @@ int
ViewChangeManager::ProcessNewView(std::unique_ptr<Context> context,
LOG(ERROR) << "Parsing new_view_msg failed.";
return -2;
}
+ if (request->is_recovery()) {
+ LOG(ERROR) << " set new view";
+ SetCurrentViewAndNewPrimary(new_view_message.view_number());
+ return 0;
+ }
LOG(INFO) << "Received NEW-VIEW for view " << new_view_message.view_number();
// Check if new view is from next expected primary
if (new_view_message.view_number() !=
@@ -352,19 +329,12 @@ int
ViewChangeManager::ProcessNewView(std::unique_ptr<Context> context,
SetCurrentViewAndNewPrimary(new_view_message.view_number());
message_manager_->SetNextSeq(max_seq + 1);
- LOG(INFO) << "SetNexSeq: " << max_seq + 1;
+ LOG(ERROR) << "SetNexSeq: " << max_seq + 1;
// All is fine.
for (size_t i = 0; i < request_list.size(); ++i) {
if (new_view_message.request(i).type() ==
static_cast<int>(Request::TYPE_PRE_PREPARE)) {
- new_view_message.request(i);
- auto non_proposed_hashes =
- collector_pool_->GetCollector(new_view_message.request(i).seq())
- ->GetAllStoredHash();
- for (auto& hash : non_proposed_hashes) {
- duplicate_manager_->EraseProposed(hash);
- }
replica_communicator_->SendMessage(new_view_message.request(i),
config_.GetSelfInfo());
} else {
@@ -460,6 +430,7 @@ void ViewChangeManager::SendNewViewMsg(uint64_t
view_number) {
// Broadcast my new view request.
std::unique_ptr<Request> request =
NewRequest(Request::TYPE_NEWVIEW, Request(), config_.GetSelfInfo().id());
+
new_view_message.SerializeToString(request->mutable_data());
replica_communicator_->BroadCast(*request);
}
@@ -484,14 +455,16 @@ void ViewChangeManager::SendViewChangeMsg() {
// P - P is a set containing a set Pm for each request m that prepared at i
// with a sequence number higher than n.
- int max_seq = checkpoint_manager_->GetHighestPreparedSeq();
- LOG(INFO) << "Check prepared or committed txns from "
- << view_change_message.stable_ckpt().seq() + 1 << " to " <<
max_seq;
-
- for (int i = view_change_message.stable_ckpt().seq() + 1; i <= max_seq; ++i)
{
+ uint64_t max_seq = checkpoint_manager_->GetHighestPreparedSeq();
+ uint64_t min_seq = view_change_message.stable_ckpt().seq();
+ min_seq = std::max(min_seq, checkpoint_manager_->GetUnstableCkpt());
+ view_change_message.mutable_stable_ckpt()->set_seq(min_seq);
+ LOG(ERROR) << "Check prepared or committed txns from " << min_seq + 1
+ << " to " << max_seq;
+
+ for (int i = min_seq + 1; i <= max_seq; ++i) {
// seq i has been prepared or committed.
- if (message_manager_->GetTransactionState(i) >=
- TransactionStatue::READY_COMMIT) {
+ if (checkpoint_manager_->IsCommitted(i)) {
std::vector<RequestInfo> proof_info =
message_manager_->GetPreparedProof(i);
assert(proof_info.size() >= config_.GetMinDataReceiveNum());
diff --git a/platform/consensus/ordering/pbft/viewchange_manager.h
b/platform/consensus/ordering/pbft/viewchange_manager.h
index 4cc2b7cd..880ab486 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager.h
+++ b/platform/consensus/ordering/pbft/viewchange_manager.h
@@ -107,6 +107,7 @@ class ViewChangeManager {
void AddNewViewTimer();
void CheckComplaintTimeout();
void SetDuplicateManager(DuplicateManager* manager);
+ void SetCurrentViewAndNewPrimary(uint64_t view_number);
private:
void SendViewChangeMsg();
@@ -115,7 +116,6 @@ class ViewChangeManager {
uint32_t AddRequest(const ViewChangeMessage& viewchange_message,
uint32_t sender);
bool IsNextPrimary(uint64_t view_number);
- void SetCurrentViewAndNewPrimary(uint64_t view_number);
std::vector<std::unique_ptr<Request>> GetPrepareMsg(
const NewViewMessage& new_view_message, bool need_sign = true);
diff --git a/platform/consensus/ordering/poc/pow/block_manager.cpp
b/platform/consensus/ordering/poc/pow/block_manager.cpp
index d6c5205e..ae8c31bc 100644
--- a/platform/consensus/ordering/poc/pow/block_manager.cpp
+++ b/platform/consensus/ordering/poc/pow/block_manager.cpp
@@ -17,7 +17,6 @@
* under the License.
*/
-
#include "platform/consensus/ordering/poc/pow/block_manager.h"
#include <glog/logging.h>
diff --git a/platform/consensus/ordering/poc/pow/block_manager.h
b/platform/consensus/ordering/poc/pow/block_manager.h
index 765ed6d4..b47f7f53 100644
--- a/platform/consensus/ordering/poc/pow/block_manager.h
+++ b/platform/consensus/ordering/poc/pow/block_manager.h
@@ -17,7 +17,6 @@
* under the License.
*/
-
#pragma once
#include "absl/status/status.h"
diff --git a/platform/consensus/ordering/poc/pow/consensus_service_pow.cpp
b/platform/consensus/ordering/poc/pow/consensus_service_pow.cpp
index 36ea9a93..3a863973 100644
--- a/platform/consensus/ordering/poc/pow/consensus_service_pow.cpp
+++ b/platform/consensus/ordering/poc/pow/consensus_service_pow.cpp
@@ -17,7 +17,6 @@
* under the License.
*/
-
#include "platform/consensus/ordering/poc/pow/consensus_service_pow.h"
#include "common/utils/utils.h"
diff --git a/platform/consensus/ordering/poc/pow/consensus_service_pow_test.cpp
b/platform/consensus/ordering/poc/pow/consensus_service_pow_test.cpp
index cda65e18..97081c89 100644
--- a/platform/consensus/ordering/poc/pow/consensus_service_pow_test.cpp
+++ b/platform/consensus/ordering/poc/pow/consensus_service_pow_test.cpp
@@ -17,7 +17,6 @@
* under the License.
*/
-
#include "ordering/poc/pow/consensus_service_pow.h"
#include <glog/logging.h>
diff --git a/platform/consensus/ordering/poc/pow/merkle.cpp
b/platform/consensus/ordering/poc/pow/merkle.cpp
index d4bdd018..ef0309cc 100644
--- a/platform/consensus/ordering/poc/pow/merkle.cpp
+++ b/platform/consensus/ordering/poc/pow/merkle.cpp
@@ -17,7 +17,6 @@
* under the License.
*/
-
#include "platform/consensus/ordering/poc/pow/merkle.h"
#include "platform/consensus/ordering/poc/pow/miner_utils.h"
diff --git a/platform/consensus/ordering/poc/pow/merkle.h
b/platform/consensus/ordering/poc/pow/merkle.h
index 05225767..ae576103 100644
--- a/platform/consensus/ordering/poc/pow/merkle.h
+++ b/platform/consensus/ordering/poc/pow/merkle.h
@@ -17,7 +17,6 @@
* under the License.
*/
-
#pragma once
#include "platform/consensus/ordering/poc/proto/pow.pb.h"
diff --git a/platform/consensus/ordering/poc/pow/miner_manager.h
b/platform/consensus/ordering/poc/pow/miner_manager.h
index bc13ddeb..f976cbd9 100644
--- a/platform/consensus/ordering/poc/pow/miner_manager.h
+++ b/platform/consensus/ordering/poc/pow/miner_manager.h
@@ -17,7 +17,6 @@
* under the License.
*/
-
#pragma once
#include "platform/config/resdb_poc_config.h"
diff --git a/platform/consensus/recovery/recovery.cpp
b/platform/consensus/recovery/recovery.cpp
index 8c265b25..bf24ebc7 100644
--- a/platform/consensus/recovery/recovery.cpp
+++ b/platform/consensus/recovery/recovery.cpp
@@ -114,8 +114,8 @@ void Recovery::UpdateStableCheckPoint() {
}
while (!stop_) {
int64_t latest_ckpt = checkpoint_->GetStableCheckpoint();
- LOG(ERROR) << "get stable ckpt:" << latest_ckpt;
- if (last_ckpt_ == latest_ckpt) {
+ LOG(ERROR) << "get stable ckpt:" << latest_ckpt << " last:" << last_ckpt_;
+ if (last_ckpt_ >= latest_ckpt) {
sleep(recovery_ckpt_time_s_);
continue;
}
@@ -127,7 +127,7 @@ void Recovery::UpdateStableCheckPoint() {
void Recovery::GetLastFile() {
std::string dir = std::filesystem::path(file_path_).parent_path();
last_ckpt_ = -1;
- int m_time_s = 0;
+ uint64_t m_time_s = 0;
for (const auto& entry : std::filesystem::directory_iterator(dir)) {
std::string dir = std::filesystem::path(entry.path()).parent_path();
std::string file_name = std::filesystem::path(entry.path()).stem();
@@ -147,11 +147,12 @@ void Recovery::GetLastFile() {
LOG(ERROR) << "get path:" << entry.path() << " min:" << min_seq
<< " time:" << time_s;
if (min_seq == -1) {
- if (last_ckpt_ == -1 || m_time_s < time_s) {
+ if (m_time_s < time_s) {
file_path_ = entry.path();
last_ckpt_ = ckpt;
+ LOG(ERROR) << "get last path:" << file_name << " min:" << min_seq
+ << " time_s:" << time_s << " min:" << m_time_s;
m_time_s = time_s;
- LOG(ERROR) << "get last path:" << file_name << " min:" << min_seq;
}
}
}
@@ -234,14 +235,15 @@ void Recovery::OpenFile(const std::string& path) {
}
lseek(fd_, 0, SEEK_END);
- LOG(INFO) << "open file:" << path << " pos:" << lseek(fd_, 0, SEEK_CUR)
- << " fd:" << fd_;
+ LOG(ERROR) << "open file:" << path << " pos:" << lseek(fd_, 0, SEEK_CUR)
+ << " fd:" << fd_;
assert(fd_ >= 0);
}
void Recovery::WriteSystemInfo() {
int view = system_info_->GetCurrentView();
int primary_id = system_info_->GetPrimaryId();
+ LOG(ERROR) << "write system info:" << primary_id << " view:" << view;
SystemInfoData data;
data.set_view(view);
data.set_primary_id(primary_id);
@@ -261,7 +263,6 @@ void Recovery::AddRequest(const Context* context, const
Request* request) {
case Request::TYPE_PRE_PREPARE:
case Request::TYPE_PREPARE:
case Request::TYPE_COMMIT:
- case Request::TYPE_CHECKPOINT:
case Request::TYPE_NEWVIEW:
return WriteLog(context, request);
default:
@@ -405,7 +406,6 @@ Recovery::GetRecoveryFiles(int64_t ckpt) {
}
}
LOG(ERROR) << "file max ckpt:" << last_ckpt << " storage ckpt:" << ckpt;
- last_ckpt = std::min(last_ckpt, ckpt);
std::vector<std::pair<int64_t, std::string>> list;
std::vector<std::pair<int64_t, std::string>> e_list;
@@ -499,10 +499,8 @@ void Recovery::ReadLogsFromFiles(
LOG(ERROR) << "parse info fail:" << data.size();
return;
}
- LOG(INFO) << "read system info:" << info.DebugString();
- if (file_idx == 0) {
- system_callback(info);
- }
+ LOG(ERROR) << "read system info:" << info.DebugString();
+ system_callback(info);
}
std::vector<std::unique_ptr<RecoveryData>> request_list;
@@ -532,13 +530,12 @@ void Recovery::ReadLogsFromFiles(
uint64_t max_seq = 0;
for (std::unique_ptr<RecoveryData>& recovery_data : request_list) {
// LOG(ERROR)<<" ckpt :"<<ckpt<<" recovery data
- // seq:"<<recovery_data->request->seq();
- if (ckpt < recovery_data->request->seq()) {
+ // seq:"<<recovery_data->request->seq()<<"
+ // type:"<<recovery_data->request->type();
+ if (ckpt < recovery_data->request->seq() ||
+ recovery_data->request->type() == Request::TYPE_NEWVIEW) {
recovery_data->request->set_is_recovery(true);
max_seq = recovery_data->request->seq();
- // LOG(ERROR)<<" ??? call back:"<<recovery_data->request->seq()<<"
- // call_back:"<<(call_back?"1":"0"); InsertCache(*recovery_data->context,
- // *recovery_data->request);
call_back(std::move(recovery_data->context),
std::move(recovery_data->request));
}
@@ -616,8 +613,8 @@ Recovery::GetDataFromRecoveryFiles(uint64_t need_min_seq,
path.second, need_min_seq - 1, 0, [&](const SystemInfoData& data) {},
[&](std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
- LOG(ERROR) << "check get data from recovery file seq:"
- << request->seq();
+ // LOG(ERROR) << "check get data from recovery file seq:"
+ // << request->seq();
if (request->seq() >= need_min_seq &&
request->seq() <= need_max_seq) {
LOG(ERROR) << "get data from recovery file seq:" << request->seq();
diff --git a/platform/networkstrate/consensus_manager.cpp
b/platform/networkstrate/consensus_manager.cpp
index b3fb1062..006bf96e 100644
--- a/platform/networkstrate/consensus_manager.cpp
+++ b/platform/networkstrate/consensus_manager.cpp
@@ -141,7 +141,9 @@ void ConsensusManager::SendHeartBeat() {
LOG(ERROR) << " server:" << config_.GetSelfInfo().id() << " sends HB"
<< " is ready:" << is_ready_
<< " client size:" << client_replicas.size()
- << " svr size:" << replicas.size();
+ << " svr size:" << replicas.size()
+ << " primary:" << hb_info.primary()
+ << " version:" << hb_info.version();
Request request;
request.set_type(Request::TYPE_HEART_BEAT);
diff --git a/platform/proto/checkpoint_info.proto
b/platform/proto/checkpoint_info.proto
index 6d4de8ae..d842d7a6 100644
--- a/platform/proto/checkpoint_info.proto
+++ b/platform/proto/checkpoint_info.proto
@@ -35,6 +35,8 @@ message CheckPointData {
SignatureInfo hash_signature = 3;
repeated bytes hashs = 4;
repeated uint64 seqs = 5;
+ uint64 primary_id = 6;
+ uint64 view = 7;
}
message StableCheckPoint {
diff --git a/platform/proto/resdb.proto b/platform/proto/resdb.proto
index 958b3295..12347b09 100644
--- a/platform/proto/resdb.proto
+++ b/platform/proto/resdb.proto
@@ -90,6 +90,8 @@ message Request {
int64 create_time = 24;
int64 commit_time = 25;
bytes data_hash = 26;
+ bool force_recovery =27;
+
}
// The response message containing response
diff --git a/scripts/deploy/config/pbft.config
b/scripts/deploy/config/pbft.config
index e4fe4960..1de013ab 100644
--- a/scripts/deploy/config/pbft.config
+++ b/scripts/deploy/config/pbft.config
@@ -1,21 +1,21 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
{
"clientBatchNum": 100,
diff --git a/scripts/format.sh b/scripts/format.sh
index fe25b4c7..41286ecc 100644
--- a/scripts/format.sh
+++ b/scripts/format.sh
@@ -17,7 +17,7 @@
# under the License.
#
-cd ..
+#cd ..
bazel build @com_github_bazelbuild_buildtools//buildifier:buildifier
find . ! -path "./deps/*" -type f -regex ".*\.cpp\|.*\.h" | xargs
clang-format -i
bazel-bin/external/com_github_bazelbuild_buildtools/buildifier/buildifier_/buildifier
-r .
diff --git a/service/tools/kv/api_tools/kv_service_tools.cpp
b/service/tools/kv/api_tools/kv_service_tools.cpp
index b234d550..71953ff7 100644
--- a/service/tools/kv/api_tools/kv_service_tools.cpp
+++ b/service/tools/kv/api_tools/kv_service_tools.cpp
@@ -101,7 +101,7 @@ void OldAPI(char** argv) {
printf("client get value fail\n");
}
} else if (cmd == "getvalues") {
- printf("client getvalues value fail\n");
+ printf("client getvalues value fail\n");
} else if (cmd == "getrange") {
auto res = client.GetRange(key, key2);
if (res != nullptr) {
diff --git a/tools/generate_region_config.py b/tools/generate_region_config.py
index 417499b8..d27703b8 100644
--- a/tools/generate_region_config.py
+++ b/tools/generate_region_config.py
@@ -18,6 +18,7 @@
import os
import json
import sys
+import re
from platform.proto.replica_info_pb2 import ResConfigData,ReplicaInfo
from google.protobuf.json_format import MessageToJson
from google.protobuf.json_format import Parse, ParseDict
@@ -64,13 +65,22 @@ def GenerateJsonConfig(file_name, output_file,
template_file):
old_json=json.loads(s)
template_json = {}
- with open(template_file) as f:
- lines=f.readlines()
- for l in lines:
- l=l.strip()
- s=''.join(lines)
- template_json=json.loads(s)
-
+ with open(template_file, 'r', encoding='utf-8') as f:
+ content = f.read()
+
+ content = re.sub(r'/\*(.*?)\*/', '', content, flags=re.DOTALL)
+
+ lines = content.splitlines()
+ clean_lines = []
+ for line in lines:
+ line = re.sub(r'//.*', '', line)
+ if line.strip():
+ clean_lines.append(line)
+
+ clean_content = '\n'.join(clean_lines)
+
+ template_json = json.loads(clean_content)
+
for (k,v) in template_json.items():
old_json[k] = v