This is an automated email from the ASF dual-hosted git repository. junchao pushed a commit to branch cassandra in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit ad8daedc80e0b9e318f630ec5cb85949b377d345 Author: cjcchen <[email protected]> AuthorDate: Mon Dec 1 23:14:01 2025 +0000 add test --- .../ordering/cassandra/algorithm/cassandra.cpp | 40 ++++--- .../cassandra/algorithm/proposal_graph.cpp | 119 +++++++-------------- .../cassandra/algorithm/proposal_manager.cpp | 2 + .../ordering/cassandra/algorithm/proposal_state.h | 6 ++ .../ordering/cassandra_cft/algorithm/cassandra.cpp | 54 ++++++---- .../cassandra_cft/algorithm/proposal_graph.cpp | 41 ++++--- .../ordering/cassandra_cft/framework/consensus.cpp | 2 +- .../ordering/multipaxos/algorithm/multipaxos.cpp | 22 ++-- .../ordering/multipaxos/framework/consensus.cpp | 4 +- .../multipaxos/framework/performance_manager.cpp | 49 +++++++++ .../multipaxos/framework/performance_manager.h | 46 ++++++++ .../consensus/ordering/tusk/algorithm/tusk.cpp | 24 ++++- platform/statistic/stats.cpp | 2 + scripts/deploy/config/cassandra.config | 6 +- .../deploy/config/kv_performance_server_11.conf | 27 +++++ .../deploy/config/kv_performance_server_21.conf | 69 ++++++++++++ scripts/deploy/config/multipaxos.config | 2 +- scripts/deploy/config/tusk.config | 6 +- 18 files changed, 372 insertions(+), 149 deletions(-) diff --git a/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp b/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp index 832e00c0..ddad256f 100644 --- a/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp +++ b/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp @@ -5,6 +5,7 @@ #include "common/crypto/signature_verifier.h" #include "common/utils/utils.h" + namespace resdb { namespace cassandra { namespace cassandra_recv { @@ -17,7 +18,7 @@ Cassandra::Cassandra(int id, int f, int total_num, int block_size, SignatureVeri total_num_ = total_num; f_ = f; is_stop_ = false; - timeout_ms_ = 1000; + timeout_ms_ = 100; //timeout_ms_ = 60000; local_txn_id_ = 1; local_proposal_id_ = 1; @@ -168,7 +169,7 @@ void Cassandra::AsyncCommit() { } } LOG(ERROR)<<" commit done:"<<txn_num<<" proposer:"<<p->header().proposer_id()<<" height:"<<p->header().height(); - global_stats_->AddCommitTxn(txn_num); + //global_stats_->AddCommitTxn(txn_num); } } @@ -469,9 +470,8 @@ bool Cassandra::ReceiveProposal(std::unique_ptr<Proposal> proposal) { << " pre id:" << pre_p->header().proposal_id(); } - /* - - if(proposal->header().height() >=250 && proposal->header().height() <300){ +#ifdef NOPOA + if(proposal->header().height() >=850 && proposal->header().height() <1350){ if(Checklimit(0, f_, proposal->header().proposer_id()) || Checklimit(f_+1, 2*f_,proposal->header().proposer_id()) || Checklimit(2*f_+1, 3*f_,proposal->header().proposer_id()) @@ -479,24 +479,34 @@ bool Cassandra::ReceiveProposal(std::unique_ptr<Proposal> proposal) { return true; } } - */ + #endif - if(proposal->header().height() >=150 && proposal->header().height() <200){ + #ifdef GOTPOA + if(proposal->header().height() >=850 && proposal->header().height() <1350){ if(!((proposal->header().proposer_id() <= 2*f_ && id_ <=2*f_) || (proposal->header().proposer_id() > 2*f_ && id_ >2*f_))) { return true; } } - + #endif + - /* - if(proposal->header().height() >=250 && proposal->header().height() <350){ + #ifdef GOTPOR + if(proposal->header().height() >=150 && proposal->header().height() <250){ if(!((proposal->header().proposer_id() <= 2*f_+1 && id_ <=2*f_+1) || (proposal->header().proposer_id() > 2*f_+1 && id_ >2*f_+1))) { return true; } } - */ + #endif + +/* + if(!((proposal->header().proposer_id() <= 2*f_+1 && id_ <=2*f_+1) + || (proposal->header().proposer_id() > 2*f_+1 && id_ >2*f_+1))) { + return true; + } + */ + /* if(proposal->header().height() >=200 && proposal->header().height() <205){ @@ -551,12 +561,16 @@ bool Cassandra::AddProposal(const Proposal& proposal) { AskProposal(proposal); //assert(1==0); } + #ifdef GOPOA else { + #endif return false; + #ifdef GOPOA } + #endif } - LOG(ERROR)<<" proposal blocks:"<<proposal.block_size(); + //LOG(ERROR)<<" proposal blocks:"<<proposal.block_size(); for (const Block& block : proposal.block()) { bool ret = false;; for(int i = 0; i< 5; ++i){ @@ -612,7 +626,7 @@ bool Cassandra::AddProposal(const Proposal& proposal) { //LOG(ERROR) << "can vote:"; } } - LOG(ERROR)<<"recv done"; + //LOG(ERROR)<<"recv done"; std::vector<std::unique_ptr<Proposal>> future_g = graph_->GetNotFound( proposal.header().height() + 1, proposal.header().hash()); diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp b/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp index df8ccb42..e5b91135 100644 --- a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp +++ b/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp @@ -72,23 +72,6 @@ int ProposalGraph::AddProposal(const Proposal& proposal) { << " current height:" << current_height_<<" from:"<<proposal.header().proposer_id()<<" proposal id:"<<proposal.header().proposal_id(); assert(current_height_ >= latest_commit_.header().height()); - { - int cur_h = proposal.header().height(); - for(int i = 0; i <3 && proposal.history_size()>=3; ++i){ - const auto& sub_history = proposal.history(i); - std::string sub_hash = sub_history.hash(); - auto sub_node_it = node_info_.find(sub_hash); - if(sub_node_it == node_info_.end()){ - continue; - } - LOG(ERROR)<<" history node :"<<sub_node_it->second->proposal.header().proposer_id() - <<" height:"<<sub_node_it->second->proposal.header().height() - <<" hash:"<<sub_hash; - assert(cur_h == sub_node_it->second->proposal.header().height()+1); - cur_h--; - } - } - /* if (proposal.header().height() < current_height_) { @@ -125,6 +108,7 @@ int ProposalGraph::AddProposal(const Proposal& proposal) { LOG(ERROR) << "verify parent fail:" << proposal.header().proposer_id() << " id:" << proposal.header().proposal_id(); + #ifdef GOPOA g_[proposal.header().prehash()].push_back(proposal.header().hash()); auto np = std::make_unique<NodeInfo>(proposal); //new_proposals_[proposal.header().hash()] = &np->proposal; @@ -133,12 +117,13 @@ int ProposalGraph::AddProposal(const Proposal& proposal) { // hash:"<<Encode(proposal.header().hash()); node_info_[proposal.header().hash()] = std::move(np); last_node_[proposal.header().height()].insert(proposal.header().hash()); + #endif // assert(1==0); return 2; } - LOG(ERROR)<<"history size:"<<proposal.history_size(); + //LOG(ERROR)<<"history size:"<<proposal.history_size(); /* for (const auto& history : proposal.history()) { std::string hash = history.hash(); @@ -179,8 +164,8 @@ int ProposalGraph::AddProposal(const Proposal& proposal) { if(sub_node_it == node_info_.end()){ continue; } - LOG(ERROR)<<" state:"<<sub_node_it->second->state<<" node:"<<sub_node_it->second->proposal.header().proposer_id() - <<" height:"<<sub_node_it->second->proposal.header().height(); + //LOG(ERROR)<<" state:"<<sub_node_it->second->state<<" node:"<<sub_node_it->second->proposal.header().proposer_id() + //<<" height:"<<sub_node_it->second->proposal.header().height(); assert(cur_h == sub_node_it->second->proposal.header().height()+1); cur_h--; // LOG(ERROR)<<"proposal:("<<node_it->second->proposal.header().proposer_id() @@ -190,45 +175,20 @@ int ProposalGraph::AddProposal(const Proposal& proposal) { num++; } - LOG(ERROR)<<"get num:"<<num; + //LOG(ERROR)<<"get num:"<<num; if(num == 3) { const auto& sub_history = proposal.history(2); std::string sub_hash = sub_history.hash(); auto sub_node_it = node_info_.find(sub_hash); - LOG(ERROR)<<" state:"<<node_it->second->state; - if (sub_node_it->second->state != ProposalState::Committed) { - Commit(sub_hash); - } - } - } - -/* - for (const auto& history : proposal.history()) { - std::string hash = history.hash(); - auto node_it = node_info_.find(hash); - for (auto s : GetStates()) { - if (s >= node_it->second->state && s <= history.state()) { - if (s != history.state()) { - LOG(ERROR) << "update from higher state"; + if(sub_node_it != node_info_.end()){ + //LOG(ERROR)<<" state:"<<node_it->second->state; + if (sub_node_it->second->state != ProposalState::Committed) { + Commit(sub_hash); } - node_it->second->votes[s].insert(proposal.header().proposer_id()); } } - // LOG(ERROR)<<"proposal:("<<node_it->second->proposal.header().proposer_id() - //<<","<<node_it->second->proposal.header().proposal_id()<<")" - //<<" history state:"<<history.state()<<" - //num:"<<node_it->second->votes[history.state()].size(); - CheckState(node_it->second.get(), - static_cast<resdb::cassandra::ProposalState>(history.state())); - if (node_it->second->state == ProposalState::PreCommit) { - Commit(hash); - } - //LOG(ERROR)<<"history proposal:("<<node_it->second->proposal.header().proposer_id() - //<<","<<node_it->second->proposal.header().proposal_id() - //<<" state:"<<node_it->second->state; } - */ //LOG(ERROR) << "height:" << current_height_ // << " proposal height:" << proposal.header().height(); @@ -249,24 +209,11 @@ int ProposalGraph::AddProposal(const Proposal& proposal) { g_[proposal.header().prehash()].push_back(proposal.header().hash()); auto np = std::make_unique<NodeInfo>(proposal); //new_proposals_[proposal.header().hash()] = &np->proposal; - LOG(ERROR)<<"add proposal proposer:"<<proposal.header().proposer_id()<<" id:"<<proposal.header().proposal_id()<<" hash:"<<Encode(proposal.header().hash()); + //LOG(ERROR)<<"add proposal proposer:"<<proposal.header().proposer_id()<<" id:"<<proposal.header().proposal_id()<<" hash:"<<Encode(proposal.header().hash()); node_info_[proposal.header().hash()] = std::move(np); last_node_[proposal.header().height()].insert(proposal.header().hash()); } -/* - LOG(ERROR)<<" node info size:"<<node_info_.size(); - std::string hash = proposal.header().hash(); - auto node_it = node_info_.find(hash); - assert(node_it != node_info_.end()); - node_it->second->votes[ProposalState::New].insert(proposal.header().proposer_id()); - - CheckState(node_it->second.get(), - static_cast<resdb::cassandra::ProposalState>(ProposalState::New)); - */ - - - //LOG(ERROR)<<"add graph done"; return 0; } @@ -329,6 +276,10 @@ void ProposalGraph::Commit(const std::string& hash) { auto it = node_info_.find(c_hash); if (it == node_info_.end()) { LOG(ERROR) << "node not found, hash:"; + #ifdef GOPOA + commit_p.clear(); + break; + #endif assert(1 == 0); } @@ -362,6 +313,10 @@ void ProposalGraph::Commit(const std::string& hash) { if (commit_p.size() > 1) { LOG(ERROR) << "commit more hash"; } + if(commit_p.size()==0) { + LOG(ERROR) << "commit not ready"; + return; + } assert(commit_p.size()>0); int block_num = 0; int p_num = 0; @@ -390,11 +345,11 @@ void ProposalGraph::Commit(const std::string& hash) { for(auto block : commit_p[i][j]->sub_block()){ - LOG(ERROR) << "commmit proposal from:" << commit_p[i][j]->header().proposer_id() - << " block id:" << block.local_id(); + //LOG(ERROR) << "commmit proposal from:" << commit_p[i][j]->header().proposer_id() + //<< " block id:" << block.local_id(); if(check_.find(std::make_pair(block.local_id(), commit_p[i][j]->header().proposer_id())) != check_.end()){ - LOG(ERROR) << "commmit proposal from:" << commit_p[i][j]->header().proposer_id() - << " block id:" << block.local_id() << "has committed"; + //LOG(ERROR) << "commmit proposal from:" << commit_p[i][j]->header().proposer_id() + // << " block id:" << block.local_id() << "has committed"; } else { check_.insert(std::make_pair(block.local_id(), commit_p[i][j]->header().proposer_id())); @@ -409,7 +364,9 @@ void ProposalGraph::Commit(const std::string& hash) { } } //global_stats_->AddCommitBlock(block_num); - LOG(ERROR)<<" commit proposal num:"<<p_num; + global_stats_->AddCommitTxn(p_num); + //LOG(ERROR)<<" commit proposal num:"<<p_num; + LOG(ERROR)<<"commit proposal from :"<<it->second->proposal.header().proposer_id()<<" id:"<<it->second->proposal.header().proposal_id()<<" height:"<<it->second->proposal.header().height()<<" num:"<<p_num; // TODO clean last_node_[it->second->proposal.header().height()].clear(); latest_commit_ = it->second->proposal; @@ -478,7 +435,7 @@ void ProposalGraph::UpdateHistory(Proposal* proposal) { his->set_sender(node_it->second->proposal.header().proposer_id()); his->set_id(node_it->second->proposal.header().proposal_id()); hash = node_it->second->proposal.header().prehash(); - LOG(ERROR)<<" get his proposer:"<<node_it->second->proposal.header().proposer_id()<<" height:"<<node_it->second->proposal.header().height(); + //LOG(ERROR)<<" get his proposer:"<<node_it->second->proposal.header().proposer_id()<<" height:"<<node_it->second->proposal.header().height(); } } @@ -570,7 +527,7 @@ Proposal* ProposalGraph::GetStrongestProposal() { - LOG(ERROR)<<" update his"; + //LOG(ERROR)<<" update his"; UpdateHistory(&sp->proposal); //LOG(ERROR) << "get strong proposal from height:" << current_height_ << " ->(" // << sp->proposal.header().proposer_id() << "," @@ -605,14 +562,14 @@ int ProposalGraph::CompareState(const ProposalState& state1, // p1 < p2 bool ProposalGraph::Compare(const NodeInfo& p1, const NodeInfo& p2) { - LOG(ERROR) << "proposer:" << p1.proposal.header().proposer_id() << " " - << p2.proposal.header().proposer_id() - << "height:" << p1.proposal.header().height() << " " - << p2.proposal.header().height() - <<" state:"<< p1.state<<" "<<p2.state - <<" hash cmp:"<< (p1.proposal.header().hash() < p2.proposal.header().hash()) - <<" cmp num:" << Cmp(p1.proposal.header().proposer_id(), p2.proposal.header().proposer_id()) - <<" sub block:" << p1.proposal.sub_block_size() <<" "<< p2.proposal.sub_block_size(); + //LOG(ERROR) << "proposer:" << p1.proposal.header().proposer_id() << " " + // << p2.proposal.header().proposer_id() + // << "height:" << p1.proposal.header().height() << " " + // << p2.proposal.header().height() + // <<" state:"<< p1.state<<" "<<p2.state + // <<" hash cmp:"<< (p1.proposal.header().hash() < p2.proposal.header().hash()) + // <<" cmp num:" << Cmp(p1.proposal.header().proposer_id(), p2.proposal.header().proposer_id()) + // <<" sub block:" << p1.proposal.sub_block_size() <<" "<< p2.proposal.sub_block_size(); if (p1.proposal.header().height() != p2.proposal.header().height()) { return p1.proposal.header().height() < p2.proposal.header().height(); } @@ -622,11 +579,13 @@ bool ProposalGraph::Compare(const NodeInfo& p1, const NodeInfo& p2) { return CompareState(p1.state, p2.state) < 0; } - if(150<=p1.proposal.header().height()&&p1.proposal.header().height()<=250) { + #ifdef GOPOA + if(850<=p1.proposal.header().height()&&p1.proposal.header().height()<=1350) { int h = (p1.proposal.header().height())%2; if ( h == 0) h = 2; return abs(p1.proposal.header().proposer_id() - h ) > abs(p2.proposal.header().proposer_id() - h); } + #endif int h = (p1.proposal.header().height())%total_num_; if ( h == 0) h = total_num_; diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp index 40a60bee..41c03cd2 100644 --- a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp +++ b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp @@ -415,7 +415,9 @@ int ProposalManager::VerifyProposal(const ProposalQueryResp& resp) { } tmp_proposal_.clear(); + #ifdef GOPOA return 0; + #endif for (const Proposal& p : resp.proposal()) { //LOG(ERROR)<<"verify resp proposal proposer:"<<p.header().proposer_id()<<" id:"<<p.header().proposal_id(); diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h b/platform/consensus/ordering/cassandra/algorithm/proposal_state.h index 3eb1d83a..876b1be0 100644 --- a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h +++ b/platform/consensus/ordering/cassandra/algorithm/proposal_state.h @@ -1,5 +1,11 @@ #pragma once +//#define GOPOA +//#define GOTPOA +#define GOTPOR + +//#define NOPOA + namespace resdb { namespace cassandra { diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.cpp b/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.cpp index 4a137957..beb4fb3b 100644 --- a/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.cpp +++ b/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.cpp @@ -17,8 +17,8 @@ Cassandra::Cassandra(int id, int f, int total_num, int block_size, SignatureVeri total_num_ = total_num; f_ = f; is_stop_ = false; - //timeout_ms_ = 10000; - timeout_ms_ = 60000; + timeout_ms_ = 1600; + //timeout_ms_ = 60000; local_txn_id_ = 1; local_proposal_id_ = 1; batch_size_ = block_size; @@ -432,7 +432,19 @@ void Cassandra::ReceiveProposalQueryResp(const ProposalQueryResp& resp) { bool Cassandra::ReceiveProposal(std::unique_ptr<Proposal> proposal) { // std::unique_lock<std::mutex> lk(g_mutex_); { - //LOG(ERROR)<<"recv proposal, height:"<<proposal->header().height()<<" block size:"<<proposal->block_size(); + LOG(ERROR)<<"recv proposal, height:"<<proposal->header().height()<<" block size:"<<proposal->block_size() + << "from :" + << proposal->header().proposer_id() + <<" id:"<<id_ + <<" f:"<<f_ + <<" pass:"<< (((proposal->header().proposer_id() <= total_num_-f_) && (id_ <=total_num_-f_)) + || ((proposal->header().proposer_id() > total_num_-f_) && (id_ >total_num_-f_))); + + if(!((proposal->header().proposer_id() <= total_num_-f_) && (id_ <=total_num_-f_) + || (proposal->header().proposer_id() > total_num_-f_ && id_ >total_num_-f_))) { + return true; + } + std::unique_lock<std::mutex> lk(mutex_); for(const auto& block : proposal->block()){ @@ -440,13 +452,15 @@ bool Cassandra::ReceiveProposal(std::unique_ptr<Proposal> proposal) { proposal_manager_->AddBlock(std::move(new_block)); } - //LOG(ERROR)<<"add block done"; - const Proposal* pre_p = - graph_->GetProposalInfo(proposal->header().prehash()); + LOG(ERROR)<<"add block done:"<<proposal->block_size(); + const Proposal* pre_p = nullptr; + if(proposal->header().prehash().size()>0){ + pre_p = graph_->GetProposalInfo(proposal->header().prehash()); + } if (pre_p == nullptr) { - // LOG(ERROR) << "receive proposal from :" - // << proposal->header().proposer_id() - // << " id:" << proposal->header().proposal_id() << "no pre:"; + LOG(ERROR) << "receive proposal from :" + << proposal->header().proposer_id() + << " id:" << proposal->header().proposal_id() << "no pre:"; if(!proposal->header().prehash().empty()) { if (proposal->header().height() > graph_->GetCurrentHeight()) { future_proposal_[proposal->header().height()].push_back( @@ -456,11 +470,11 @@ bool Cassandra::ReceiveProposal(std::unique_ptr<Proposal> proposal) { } assert(proposal->header().prehash().empty()); } else { - //LOG(ERROR) << "receive proposal from :" - // << proposal->header().proposer_id() - // << " id:" << proposal->header().proposal_id() - // << " pre:" << pre_p->header().proposer_id() - // << " pre id:" << pre_p->header().proposal_id(); + LOG(ERROR) << "receive proposal from :" + << proposal->header().proposer_id() + << " id:" << proposal->header().proposal_id() + << " pre:" << pre_p->header().proposer_id() + << " pre id:" << pre_p->header().proposal_id(); } /* @@ -505,7 +519,7 @@ bool Cassandra::AddProposal(const Proposal& proposal) { return false; } - //LOG(ERROR)<<" proposal blocks:"<<proposal.block_size(); + LOG(ERROR)<<" proposal blocks:"<<proposal.block_size(); for (const Block& block : proposal.block()) { bool ret = false;; for(int i = 0; i< 5; ++i){ @@ -548,11 +562,11 @@ bool Cassandra::AddProposal(const Proposal& proposal) { received_num_[proposal.header().height()].insert( proposal.header().proposer_id()); - //LOG(ERROR) << "received current height:" << graph_->GetCurrentHeight() - // << " proposal height:" << proposal.header().height() - // << " num:" << received_num_[graph_->GetCurrentHeight()].size() - // << " from:" << proposal.header().proposer_id() - // << " last vote:" << last_vote_; + LOG(ERROR) << "received current height:" << graph_->GetCurrentHeight() + << " proposal height:" << proposal.header().height() + << " num:" << received_num_[graph_->GetCurrentHeight()].size() + << " from:" << proposal.header().proposer_id() + << " last vote:" << last_vote_; if (received_num_[graph_->GetCurrentHeight()].size() == total_num_) { if (last_vote_ < graph_->GetCurrentHeight()) { last_vote_ = graph_->GetCurrentHeight(); diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.cpp b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.cpp index c2a40ebc..aab721ec 100644 --- a/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.cpp +++ b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.cpp @@ -150,19 +150,30 @@ int ProposalGraph::AddProposal(const Proposal& proposal) { const auto& sub_history = proposal.history(i); std::string sub_hash = sub_history.hash(); auto sub_node_it = node_info_.find(sub_hash); - //LOG(ERROR)<<" state:"<<node_it->second->state; - if (node_it->second->state != ProposalState::PoR) { + if(sub_node_it == node_info_.end()){ + continue; + } + LOG(ERROR)<<" state:"<<sub_node_it->second->state<<" node:"<<sub_node_it->second->proposal.header().proposer_id() + <<" height:"<<sub_node_it->second->proposal.header().height(); + if (sub_node_it->second->state != ProposalState::PoR) { break; } num++; } - //LOG(ERROR)<<"get num:"<<num; + LOG(ERROR)<<"get num:"<<num; if(num == need_num) { const auto& sub_history = proposal.history(need_num-1); std::string sub_hash = sub_history.hash(); - Commit(sub_hash); + + auto sub_node_it = node_info_.find(sub_hash); + if(sub_node_it != node_info_.end()){ + //LOG(ERROR)<<" state:"<<node_it->second->state; + if (sub_node_it->second->state != ProposalState::Committed) { + Commit(sub_hash); + } + } } } @@ -241,11 +252,11 @@ return; } int ProposalGraph::CheckState(NodeInfo* node_info, ProposalState state) { - //LOG(ERROR) << "node: (" << node_info->proposal.header().proposer_id() << - //"," - // << node_info->proposal.header().proposal_id() - // << ") state:" << node_info->state - // << " vote num:" << node_info->votes[ProposalState::New].size(); + LOG(ERROR) << "node: (" << node_info->proposal.header().proposer_id() << + "," + << node_info->proposal.header().proposal_id() + << ") state:" << node_info->state + << " vote num:" << node_info->votes[ProposalState::New].size(); if(cft_){ if(node_info->votes[ProposalState::New].size() >= f_ + 1) { @@ -267,11 +278,11 @@ int ProposalGraph::CheckState(NodeInfo* node_info, ProposalState state) { } } node_info->state = state; - //LOG(ERROR) << "node: (" << node_info->proposal.header().proposer_id() << - // "," - // << node_info->proposal.header().proposal_id() - // << ") get state:" << node_info->state - // << " vote num:" << node_info->votes[ProposalState::New].size(); + LOG(ERROR) << "node: (" << node_info->proposal.header().proposer_id() << + "," + << node_info->proposal.header().proposal_id() + << ") get state:" << node_info->state + << " vote num:" << node_info->votes[ProposalState::New].size(); return true; @@ -445,7 +456,7 @@ Proposal* ProposalGraph::GetStrongestProposal() { } } - //LOG(ERROR)<<" last node size:"<<last_node_[current_height_].size()<<" height:"<<current_height_<<" from:"<<sp->proposal.header().proposer_id(); + LOG(ERROR)<<" last node size:"<<last_node_[current_height_].size()<<" height:"<<current_height_<<" from:"<<sp->proposal.header().proposer_id(); for (const auto& last_hash : last_node_[current_height_]) { NodeInfo* node_info = node_info_[last_hash].get(); diff --git a/platform/consensus/ordering/cassandra_cft/framework/consensus.cpp b/platform/consensus/ordering/cassandra_cft/framework/consensus.cpp index 05cae0b1..1e558b06 100644 --- a/platform/consensus/ordering/cassandra_cft/framework/consensus.cpp +++ b/platform/consensus/ordering/cassandra_cft/framework/consensus.cpp @@ -37,7 +37,7 @@ Consensus::Consensus(const ResDBConfig& config, std::unique_ptr<TransactionManager> executor) : common::Consensus(config, std::move(executor)){ int total_replicas = config_.GetReplicaNum(); - int f = (total_replicas - 1) / 3; + int f = (total_replicas - 1) / 2; Init(); diff --git a/platform/consensus/ordering/multipaxos/algorithm/multipaxos.cpp b/platform/consensus/ordering/multipaxos/algorithm/multipaxos.cpp index 7885a875..00093268 100644 --- a/platform/consensus/ordering/multipaxos/algorithm/multipaxos.cpp +++ b/platform/consensus/ordering/multipaxos/algorithm/multipaxos.cpp @@ -12,7 +12,7 @@ MultiPaxos::MultiPaxos(int id, int f, int total_num, int block_size, SignatureVe LOG(ERROR)<<"id:"<<id<<" f:"<<f<<" total:"<<total_num_; - proposal_manager_ = std::make_unique<ProposalManager>(id, 2*f_+1, verifier); + proposal_manager_ = std::make_unique<ProposalManager>(id, f_+1, verifier); send_thread_ = std::thread(&MultiPaxos::AsyncSend, this); commit_thread_ = std::thread(&MultiPaxos::AsyncCommit, this); commit_seq_thread_ = std::thread(&MultiPaxos::AsyncCommitSeq, this); @@ -53,7 +53,7 @@ void MultiPaxos::AsyncSend() { std::unique_ptr<Proposal> proposal = nullptr; { proposal = proposal_manager_ -> GenerateProposal(txns); - LOG(ERROR)<<"propose view:"<<proposal->header().view()<<" current round:"<<round<<" block size:"<<txns.size(); + //LOG(ERROR)<<"propose view:"<<proposal->header().view()<<" current round:"<<round<<" block size:"<<txns.size(); } broadcast_call_(MessageType::Propose, *proposal); } @@ -132,7 +132,7 @@ void MultiPaxos::AsyncCommitSeq() { int proposer = data->header().proposer(); int round = data->header().view(); int proposal_seq = data->header().proposal_id(); - LOG(ERROR)<<" commit proposer:"<<proposer<< " round:"<<round<<" seq:"<<proposal_seq; + //LOG(ERROR)<<" commit proposer:"<<proposer<< " round:"<<round<<" seq:"<<proposal_seq; for(Transaction& txn : *data->mutable_transactions()){ txn.set_id(seq++); Commit(txn); @@ -167,10 +167,15 @@ bool MultiPaxos::ReceiveProposal(std::unique_ptr<Proposal> proposal) { int proposer = proposal->sender(); int seq = proposal->header().proposal_id(); + if(!((proposer <= total_num_-f_) && (id_ <=total_num_-f_) + || (proposer > total_num_-f_ && id_ >total_num_-f_))) { + return true; + } + bool done = false; { std::unique_lock<std::mutex> lk(mutex_); - LOG(ERROR)<<"recv proposal from:"<<proposer<<" round:"<<round<<" seq:"<<seq; + //LOG(ERROR)<<"recv proposal from:"<<proposer<<" round:"<<round<<" seq:"<<seq; receive_[round][proposer] = std::move(proposal); done = true; } @@ -207,9 +212,10 @@ bool MultiPaxos::ReceiveAccept(std::unique_ptr<Proposal> proposal) { std::unique_lock<std::mutex> lk(propose_mutex_); accept_receive_[round].insert(sender); - LOG(ERROR)<<"RECEIVE proposer accept:"<<round<<" size:"<<accept_receive_[round].size() - <<" proposer:"<<proposer<<" sender:"<<sender; + //LOG(ERROR)<<"RECEIVE proposer accept:"<<round<<" size:"<<accept_receive_[round].size() + // <<" proposer:"<<proposer<<" sender:"<<sender; if(accept_receive_[round].size() == f_+1){ + //LOG(ERROR)<<" ====== bc ======="; Broadcast(MessageType::Learn, *proposal); can_propose_[round+1] = true; @@ -219,6 +225,10 @@ bool MultiPaxos::ReceiveAccept(std::unique_ptr<Proposal> proposal) { } bool MultiPaxos::ReceiveLearn(std::unique_ptr<Proposal> proposal) { + int round = proposal->header().view(); + int sender = proposal->sender(); + int proposer = proposal->header().proposer(); +//LOG(ERROR)<<" receive learn:"<<round; commit_q_.Push(std::move(proposal)); return true; } diff --git a/platform/consensus/ordering/multipaxos/framework/consensus.cpp b/platform/consensus/ordering/multipaxos/framework/consensus.cpp index fbefc49c..86d7c9a1 100644 --- a/platform/consensus/ordering/multipaxos/framework/consensus.cpp +++ b/platform/consensus/ordering/multipaxos/framework/consensus.cpp @@ -47,10 +47,10 @@ Consensus::Consensus(const ResDBConfig& config, SetPerformanceManager(GetPerformanceManager()); Init(); - failure_mode_ = config.GetConfigData().failure_mode(); + //failure_mode_ = config.GetConfigData().failure_mode(); int total_replicas = config_.GetReplicaNum(); - int f = (total_replicas - 1) / 3; + int f = (total_replicas - 1) / 2; if (config_.GetPublicKeyCertificateInfo() .public_key() diff --git a/platform/consensus/ordering/multipaxos/framework/performance_manager.cpp b/platform/consensus/ordering/multipaxos/framework/performance_manager.cpp new file mode 100644 index 00000000..05831b6d --- /dev/null +++ b/platform/consensus/ordering/multipaxos/framework/performance_manager.cpp @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#include "platform/consensus/ordering/multipaxos/framework/performance_manager.h" + +#include <glog/logging.h> + +#include "common/utils/utils.h" + +namespace resdb { +namespace multipaxos { + +using comm::CollectorResultCode; + +MultiPaxosPerformanceManager::MultiPaxosPerformanceManager( + const ResDBConfig& config, ReplicaCommunicator* replica_communicator, + SignatureVerifier* verifier) + : PerformanceManager(config, replica_communicator, verifier){ +} + +void MultiPaxosPerformanceManager::SendMessage(const Request& request) { + replica_communicator_->SendMessage(request, 1); +} + + +} // namespace common +} // namespace resdb diff --git a/platform/consensus/ordering/multipaxos/framework/performance_manager.h b/platform/consensus/ordering/multipaxos/framework/performance_manager.h new file mode 100644 index 00000000..3bbf273b --- /dev/null +++ b/platform/consensus/ordering/multipaxos/framework/performance_manager.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#pragma once + +#include <future> + +#include "platform/consensus/ordering/common/framework/performance_manager.h" + +namespace resdb { +namespace multipaxos { + +class MultiPaxosPerformanceManager : public common::PerformanceManager { + public: + MultiPaxosPerformanceManager(const ResDBConfig& config, + ReplicaCommunicator* replica_communicator, + SignatureVerifier* verifier); + +protected: + void SendMessage(const Request& request); +}; + +} // namespace fairdag +} // namespace resdb diff --git a/platform/consensus/ordering/tusk/algorithm/tusk.cpp b/platform/consensus/ordering/tusk/algorithm/tusk.cpp index a0c34878..fd4e2862 100644 --- a/platform/consensus/ordering/tusk/algorithm/tusk.cpp +++ b/platform/consensus/ordering/tusk/algorithm/tusk.cpp @@ -11,17 +11,19 @@ Tusk::Tusk(int id, int f, int total_num, int block_size, SignatureVerifier* veri : ProtocolBase(id, f, total_num), verifier_(verifier) { limit_count_ = 2 * f + 1; batch_size_ = block_size; - proposal_manager_ = std::make_unique<ProposalManager>(id, total_num); - //proposal_manager_ = std::make_unique<ProposalManager>(id, limit_count_); + //proposal_manager_ = std::make_unique<ProposalManager>(id, total_num); + proposal_manager_ = std::make_unique<ProposalManager>(id, limit_count_); execute_id_ = 1; start_ = 0; queue_size_ = 0; + if(id<=2*f+1){ send_thread_ = std::thread(&Tusk::AsyncSend, this); commit_thread_ = std::thread(&Tusk::AsyncCommit, this); execute_thread_ = std::thread(&Tusk::AsyncExecute, this); cert_thread_ = std::thread(&Tusk::AsyncProcessCert, this); + } global_stats_ = Stats::GetGlobalStats(); @@ -138,11 +140,13 @@ void Tusk::AsyncCommit() { for (int r = previous_round + 2; r <= round; r += 2) { int leader = GetLeader(r); const Proposal * req = nullptr; + int num = 0; while(!IsStop()){ req = proposal_manager_->GetRequest(r, leader); // req:"<<(req==nullptr); - if (req == nullptr) { - //LOG(ERROR)<<" get leader:"<<leader<<" round:"<<r<<" not exit"; + if (req == nullptr && num < 10) { + LOG(ERROR)<<" get leader:"<<leader<<" round:"<<r<<" not exit"; + num++; //usleep(1000); std::unique_lock<std::mutex> lk(mutex_); vote_cv_.wait_for(lk, std::chrono::microseconds(100), @@ -152,6 +156,9 @@ void Tusk::AsyncCommit() { } break; } + if(req == nullptr){ + continue; + } //LOG(ERROR)<<" get leader:"<<leader<<" round:"<<r<<" delay:"<<(GetCurrentTime() - req->header().create_time()); int reference_num = proposal_manager_->GetReferenceNum(*req); //LOG(ERROR)<<" get leader:"<<leader<<" round:"<<r<<" ref:"<<reference_num; @@ -327,6 +334,13 @@ bool Tusk::ReceiveBlock(std::unique_ptr<Proposal> proposal) { // sleep(10); } */ + + if(!(id_ <= limit_count_ && proposal->header().proposer_id() <= limit_count_)){ + //LOG(ERROR) << "recv block from " << proposal->header().proposer_id() + // << " round:" << proposal->header().round(); + // sleep(10); + return true; + } std::unique_lock<std::mutex> lk(check_block_mutex_); { @@ -348,7 +362,7 @@ void Tusk::ReceiveBlockACK(std::unique_ptr<Metadata> metadata) { int sender = metadata->sender(); std::unique_lock<std::mutex> lk(txn_mutex_); received_num_[hash][sender] = std::move(metadata); - //LOG(ERROR) << "recv block ack from:" << sender << " num:" << received_num_[hash].size()<<" round:"<<round; + LOG(ERROR) << "recv block ack from:" << sender << " num:" << received_num_[hash].size()<<" round:"<<round; if (received_num_[hash].size() == limit_count_) { Certificate cert; for (auto& it : received_num_[hash]) { diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp index 7d611e10..a9004968 100644 --- a/platform/statistic/stats.cpp +++ b/platform/statistic/stats.cpp @@ -375,6 +375,8 @@ void Stats::MonitorGlobal() { << " commit_txn :" << (commit_txn_time - last_commit_txn_time) + << " commit_txn num:" + << (commit_txn_num - last_commit_txn_num) << " commit_txn latency :" << static_cast<double>(commit_txn_time - diff --git a/scripts/deploy/config/cassandra.config b/scripts/deploy/config/cassandra.config index 4701738f..fb5ff19d 100644 --- a/scripts/deploy/config/cassandra.config +++ b/scripts/deploy/config/cassandra.config @@ -1,11 +1,11 @@ { - "clientBatchNum": 100, + "clientBatchNum": 400, "enable_viewchange": false, "recovery_enabled": false, "max_client_complaint_num":10, - "max_process_txn": 8196, + "max_process_txn": 4096, "worker_num": 10, "input_worker_num": 1, "output_worker_num": 5, - "block_size":50 + "block_size":1000 } diff --git a/scripts/deploy/config/kv_performance_server_11.conf b/scripts/deploy/config/kv_performance_server_11.conf new file mode 100644 index 00000000..093fe804 --- /dev/null +++ b/scripts/deploy/config/kv_performance_server_11.conf @@ -0,0 +1,27 @@ +iplist=( +172.31.18.66 +172.31.27.193 +172.31.27.153 +172.31.16.133 +172.31.31.76 +172.31.17.121 +172.31.24.63 +172.31.20.159 +172.31.31.131 +172.31.17.193 +172.31.24.229 +172.31.26.164 +172.31.17.7 +172.31.26.133 +172.31.20.10 +172.31.19.41 +172.31.27.174 +172.31.19.141 +172.31.27.240 +172.31.24.175 +172.31.27.176 +172.31.28.112 +) + +key=~/.ssh/junchao.pem +client_num=11 diff --git a/scripts/deploy/config/kv_performance_server_21.conf b/scripts/deploy/config/kv_performance_server_21.conf new file mode 100644 index 00000000..7ec6b8bf --- /dev/null +++ b/scripts/deploy/config/kv_performance_server_21.conf @@ -0,0 +1,69 @@ +iplist=( +172.31.28.28 +172.31.22.26 +172.31.17.27 +172.31.30.152 +172.31.20.25 +172.31.27.20 +172.31.21.22 +172.31.17.146 +172.31.16.147 +172.31.19.13 +172.31.20.141 +172.31.20.10 +172.31.16.141 +172.31.28.6 +172.31.18.137 +172.31.19.5 +172.31.29.133 +172.31.20.0 +172.31.22.4 +172.31.19.61 +172.31.25.62 +172.31.17.187 +172.31.31.60 +172.31.22.185 +172.31.26.185 +172.31.23.183 +172.31.17.184 +172.31.31.45 +172.31.25.51 +172.31.18.39 +172.31.19.42 +172.31.30.37 +172.31.25.38 +172.31.26.162 +172.31.18.34 +172.31.27.91 +172.31.20.161 +172.31.19.218 +172.31.24.90 +172.31.23.83 +172.31.16.217 +172.31.17.82 +172.31.18.83 +172.31.20.74 +172.31.25.204 +172.31.20.71 +172.31.28.200 +172.31.17.194 +172.31.29.71 +172.31.23.252 +172.31.21.126 +172.31.29.251 +172.31.26.251 +172.31.19.122 +172.31.30.123 +172.31.18.118 +172.31.25.119 +172.31.30.234 +172.31.26.117 +172.31.25.228 +172.31.31.101 +172.31.25.99 +172.31.24.227 +172.31.23.224 +) + +key=~/.ssh/junchao.pem +client_num=32 diff --git a/scripts/deploy/config/multipaxos.config b/scripts/deploy/config/multipaxos.config index 2716b3da..3a321e6f 100644 --- a/scripts/deploy/config/multipaxos.config +++ b/scripts/deploy/config/multipaxos.config @@ -1,5 +1,5 @@ { - "clientBatchNum": 100, + "clientBatchNum": 400, "enable_viewchange": false, "recovery_enabled": false, "max_client_complaint_num":10, diff --git a/scripts/deploy/config/tusk.config b/scripts/deploy/config/tusk.config index 4e80359b..6deeade5 100644 --- a/scripts/deploy/config/tusk.config +++ b/scripts/deploy/config/tusk.config @@ -1,11 +1,11 @@ { - "clientBatchNum": 800, + "clientBatchNum": 100, "enable_viewchange": false, "recovery_enabled": false, "max_client_complaint_num":10, - "max_process_txn": 1024, + "max_process_txn": 3186, "worker_num": 50, "input_worker_num": 1, "output_worker_num": 20, - "block_size":50 + "block_size":200 }
