This is an automated email from the ASF dual-hosted git repository. junchao pushed a commit to branch cassandra in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit 195469c7885a9781690eb2e1a557cbe5eee0befd Author: cjcchen <[email protected]> AuthorDate: Sat Feb 28 07:23:48 2026 +0000 add --- .../ordering/autobahn/algorithm/autobahn.cpp | 62 ++++++++++++++++++-- .../ordering/autobahn/algorithm/autobahn.h | 12 +++- .../autobahn/algorithm/proposal_manager.cpp | 2 +- .../ordering/pbft/performance_manager.cpp | 4 +- scripts/deploy/config/autobahn.config | 4 +- .../deploy/config/kv_performance_server_16_2.conf | 67 ++++++---------------- .../deploy/config/kv_performance_server_32.conf | 3 +- ...ver_32.conf => kv_performance_server_32_2.conf} | 35 +---------- ...ver_32.conf => kv_performance_server_48_2.conf} | 36 +++++++++++- .../deploy/config/{autobahn.config => pbft.config} | 7 +-- scripts/deploy/config/poe.config | 4 +- scripts/deploy/performance/hs_performance.sh | 2 +- scripts/deploy/performance/pbft_performance.sh | 5 ++ 13 files changed, 137 insertions(+), 106 deletions(-) diff --git a/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp b/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp index a44aa0cf..8aa3dd56 100644 --- a/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp +++ b/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp @@ -21,6 +21,9 @@ AutoBahn::AutoBahn(int id, int f, int total_num, int block_size, SignatureVerifi timeout_ms_ = 60000; batch_size_ = block_size; execute_id_ = 1; + is_leader_ = id_ == 1; + cur_slot_ = 1; + use_hs_ = true; proposal_manager_ = std::make_unique<ProposalManager>(id, total_num_, f_, verifier); @@ -142,17 +145,48 @@ bool AutoBahn::WaitForNextView(int view) { return proposal_manager_->ReadyView(view); } +bool AutoBahn::WaitForNextLeader() { + std::unique_lock<std::mutex> lk(leader_mutex_); + //LOG(ERROR)<<"wait for next view:"<<view; + leader_cv_.wait_for(lk, std::chrono::microseconds(timeout_ms_ * 1000), + [&] { return is_leader_; }); + return is_leader_; +} + +void AutoBahn::StartNextLeader(int slot_id) { + //LOG(ERROR)<<" start leader slot:"<<slot_id; + std::unique_lock<std::mutex> lk(leader_mutex_); + cur_slot_ = slot_id; + is_leader_ = true; + leader_cv_.notify_all(); +} + void AutoBahn::AsyncConsensus() { while (!IsStop()) { - if (id_ != 1) { - break; + if(use_hs_){ + if(!WaitForNextLeader()){ + continue; + } + { + std::unique_lock<std::mutex> lk(leader_mutex_); + is_leader_ = false; + } + } + else { + if (!is_leader_) { + break; + } } int view = proposal_manager_->GetCurrentView(); if(!WaitForNextView(view)) { continue; } std::pair<int, std::map<int, int64_t>> blocks = proposal_manager_->GetCut(); - auto proposal = proposal_manager_->GenerateProposal(blocks.first, blocks.second); + int slot_id = blocks.first; + if(use_hs_){ + slot_id = cur_slot_; + } + auto proposal = proposal_manager_->GenerateProposal(slot_id, blocks.second); Broadcast(MessageType::NewProposal, *proposal); } } @@ -303,12 +337,15 @@ void AutoBahn::AsyncPrepare() { //LOG(ERROR)<<" obtain slot vote:"<<p->slot_id(); int slot_id = p->slot_id(); votes[slot_id] = std::make_pair(GetCurrentTime(), std::move(p)); + if(use_hs_){ + view = slot_id; + } while(!votes.empty() && votes.begin()->first <= view) { if(votes.begin()->first < view) { votes.erase(votes.begin()); continue; } - int delay = 1000; + int delay = 10000; int wait_time = GetCurrentTime() - votes.begin()->second.first; wait_time = delay - wait_time; //LOG(ERROR)<<" view :"<<view<<" wait time:"<<wait_time; @@ -316,8 +353,13 @@ void AutoBahn::AsyncPrepare() { usleep(wait_time); } Prepare(std::move(votes.begin()->second.second)); + if(use_hs_){ + view = votes.begin()->first+1; + } + else { + view++; + } votes.erase(votes.begin()); - view++; } } } @@ -366,6 +408,7 @@ void AutoBahn::Prepare(std::unique_ptr<Proposal> vote) { void AutoBahn::Commit(std::unique_ptr<Proposal> proposal) { auto raw_proposal = proposal_manager_->GetProposalData(proposal->slot_id()); assert(raw_proposal != nullptr); + int slot_id = proposal->slot_id(); //LOG(ERROR)<<" proposal proposal slot id:"<<proposal->slot_id(); for(const auto& block : raw_proposal->block()) { int block_owner = block.sender_id(); @@ -379,6 +422,7 @@ void AutoBahn::Commit(std::unique_ptr<Proposal> proposal) { while(data_block == nullptr){ data_block = proposal_manager_->GetBlock(block_owner, i); if(data_block == nullptr) { + //LOG(ERROR)<<" wait block:"<<block_owner<<" id:"<<i; usleep(100); } } @@ -393,6 +437,14 @@ void AutoBahn::Commit(std::unique_ptr<Proposal> proposal) { } commit_block_[block_owner] = block_id; } + if(use_hs_){ + int view = (slot_id+1) % total_num_; + if(view == 0) view = total_num_; + //LOG(ERROR)<<" next view:"<<view<<" id:"<<id_<<" total num:"<<total_num_; + if(view == id_){ + StartNextLeader(slot_id+1); + } + } } diff --git a/platform/consensus/ordering/autobahn/algorithm/autobahn.h b/platform/consensus/ordering/autobahn/algorithm/autobahn.h index ab84c89a..766a5c31 100644 --- a/platform/consensus/ordering/autobahn/algorithm/autobahn.h +++ b/platform/consensus/ordering/autobahn/algorithm/autobahn.h @@ -49,8 +49,12 @@ class AutoBahn: public common::ProtocolBase { bool IsFastCommit(const Proposal& proposal); + bool WaitForNextLeader(); + void StartNextLeader(int slot); + + private: - std::condition_variable bc_block_cv_, view_cv_; + std::condition_variable bc_block_cv_, view_cv_, leader_cv_; LockFreeQueue<Transaction> txns_; LockFreeQueue<Proposal> prepare_queue_, commit_queue_; std::unique_ptr<ProposalManager> proposal_manager_; @@ -63,12 +67,16 @@ class AutoBahn: public common::ProtocolBase { std::thread block_thread_, dissemi_thread_, consensus_thread_, prepare_thread_, commit_thread_; - std::mutex block_mutex_, bc_mutex_, view_mutex_, vote_mutex_, commit_mutex_; + std::mutex block_mutex_, bc_mutex_, view_mutex_, vote_mutex_, commit_mutex_, leader_mutex_; std::map<int, std::map<int, SignInfo>> block_ack_; std::map<int, std::map<int, std::unique_ptr<Proposal>>> vote_ack_ ; std::map<int, std::set<int>> commit_ack_; Stats* global_stats_; std::map<int, int64_t> commit_block_; + + bool is_leader_; + int cur_slot_; + bool use_hs_; }; } // namespace autobahn diff --git a/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp b/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp index 7d00e01d..dee83287 100644 --- a/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp +++ b/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp @@ -51,7 +51,7 @@ void ProposalManager::AddBlock(std::unique_ptr<Block> block) { int sender = block->sender_id(); int block_id = block->local_id(); - //LOG(ERROR)<<"add block from sender:"<<sender<<" id:"<<block_id; + LOG(ERROR)<<"add block from sender:"<<sender<<" id:"<<block_id; if(block_id>1) { assert(block->last_sign_info_size() >= f_+1); diff --git a/platform/consensus/ordering/pbft/performance_manager.cpp b/platform/consensus/ordering/pbft/performance_manager.cpp index a5ba979f..20d28194 100644 --- a/platform/consensus/ordering/pbft/performance_manager.cpp +++ b/platform/consensus/ordering/pbft/performance_manager.cpp @@ -107,12 +107,12 @@ int PerformanceManager::StartEval() { return 0; } eval_started_ = true; - for (int i = 0; i < 60000000000; ++i) { + for (int i = 0; i < 60000000; ++i) { std::unique_ptr<QueueItem> queue_item = std::make_unique<QueueItem>(); queue_item->context = nullptr; queue_item->user_request = GenerateUserRequest(); batch_queue_.Push(std::move(queue_item)); - if (i == 20000000) { + if (i == 200000) { eval_ready_promise_.set_value(true); } } diff --git a/scripts/deploy/config/autobahn.config b/scripts/deploy/config/autobahn.config index bcf3e8d3..f36bee92 100644 --- a/scripts/deploy/config/autobahn.config +++ b/scripts/deploy/config/autobahn.config @@ -4,8 +4,8 @@ "recovery_enabled": false, "max_client_complaint_num":10, "max_process_txn": 4096, - "worker_num": 5, + "worker_num": 10, "input_worker_num": 1, "output_worker_num": 5, - "block_size":100 + "block_size":300 } diff --git a/scripts/deploy/config/kv_performance_server_16_2.conf b/scripts/deploy/config/kv_performance_server_16_2.conf index e77ce569..25c717e3 100644 --- a/scripts/deploy/config/kv_performance_server_16_2.conf +++ b/scripts/deploy/config/kv_performance_server_16_2.conf @@ -1,53 +1,22 @@ iplist=( -172.31.25.224 -172.31.20.228 -172.31.18.224 -172.31.16.230 -172.31.31.229 -172.31.26.233 -172.31.29.231 -172.31.17.244 -172.31.17.236 -172.31.26.249 -172.31.18.247 -172.31.16.151 -172.31.31.151 -172.31.31.155 -172.31.17.155 -172.31.22.98 -172.31.19.156 -172.31.22.106 -172.31.24.98 -172.31.23.110 -172.31.27.109 -172.31.31.111 -172.31.28.110 -172.31.20.117 -172.31.30.116 -172.31.16.120 -172.31.28.119 -172.31.27.136 -172.31.20.133 -172.31.22.136 -172.31.26.136 -172.31.30.139 -172.31.21.137 -172.31.16.140 -172.31.26.139 -172.31.21.141 -172.31.27.140 -172.31.24.142 -172.31.31.142 -172.31.31.146 -172.31.19.145 -172.31.23.149 -172.31.30.149 -172.31.22.218 -172.31.17.214 -172.31.30.221 -172.31.24.220 -172.31.29.162 +172.31.18.66 +172.31.27.153 +172.31.31.76 +172.31.24.63 +172.31.20.159 +172.31.25.33 +172.31.30.103 +172.31.31.166 +172.31.30.230 +172.31.16.98 +172.31.18.177 +172.31.18.170 +172.31.30.105 +172.31.27.190 +172.31.20.125 +172.31.20.124 +172.31.25.251 ) key=~/.ssh/junchao.pem -client_num=32 +client_num=1 diff --git a/scripts/deploy/config/kv_performance_server_32.conf b/scripts/deploy/config/kv_performance_server_32.conf index 50b7fcc2..25141bd8 100644 --- a/scripts/deploy/config/kv_performance_server_32.conf +++ b/scripts/deploy/config/kv_performance_server_32.conf @@ -50,7 +50,8 @@ iplist=( 172.31.28.230 172.31.29.123 172.31.19.120 -172.31.31.176 +172.31.25.69 +#172.31.31.176 172.31.19.174 172.31.22.196 172.31.22.4 diff --git a/scripts/deploy/config/kv_performance_server_32.conf b/scripts/deploy/config/kv_performance_server_32_2.conf similarity index 51% copy from scripts/deploy/config/kv_performance_server_32.conf copy to scripts/deploy/config/kv_performance_server_32_2.conf index 50b7fcc2..268c90f4 100644 --- a/scripts/deploy/config/kv_performance_server_32.conf +++ b/scripts/deploy/config/kv_performance_server_32_2.conf @@ -32,40 +32,7 @@ iplist=( 172.31.19.90 172.31.18.21 172.31.25.83 -172.31.18.78 -172.31.25.28 -172.31.28.219 -172.31.20.219 -172.31.19.26 -172.31.23.31 -172.31.25.95 -172.31.25.96 -172.31.30.165 -172.31.31.36 -172.31.16.226 -172.31.31.97 -172.31.24.238 -172.31.29.108 -172.31.30.108 -172.31.28.230 -172.31.29.123 -172.31.19.120 -172.31.31.176 -172.31.19.174 -172.31.22.196 -172.31.22.4 -172.31.29.2 -172.31.26.254 -172.31.31.9 -172.31.16.201 -172.31.30.72 -172.31.31.71 -172.31.21.148 -172.31.17.147 -172.31.21.207 -#172.31.30.15 -#172.31.26.29 ) key=~/.ssh/junchao.pem -client_num=32 +client_num=1 diff --git a/scripts/deploy/config/kv_performance_server_32.conf b/scripts/deploy/config/kv_performance_server_48_2.conf similarity index 67% copy from scripts/deploy/config/kv_performance_server_32.conf copy to scripts/deploy/config/kv_performance_server_48_2.conf index 50b7fcc2..d6839de6 100644 --- a/scripts/deploy/config/kv_performance_server_32.conf +++ b/scripts/deploy/config/kv_performance_server_48_2.conf @@ -63,9 +63,39 @@ iplist=( 172.31.21.148 172.31.17.147 172.31.21.207 -#172.31.30.15 -#172.31.26.29 +172.31.30.15 +172.31.26.29 +172.31.18.217 +172.31.25.21 +172.31.26.84 +172.31.28.30 +172.31.19.222 +172.31.23.93 +172.31.31.131 +172.31.17.193 +172.31.24.229 +172.31.26.164 +172.31.17.7 +172.31.26.133 +172.31.20.10 +172.31.19.41 +172.31.27.174 +172.31.19.141 +172.31.27.240 +172.31.24.175 +172.31.27.176 +172.31.28.112 +172.31.25.210 +172.31.24.50 +172.31.19.151 +172.31.30.23 +172.31.27.57 +172.31.23.56 +172.31.20.61 +172.31.30.28 +172.31.31.62 +172.31.19.96 ) key=~/.ssh/junchao.pem -client_num=32 +client_num=48 diff --git a/scripts/deploy/config/autobahn.config b/scripts/deploy/config/pbft.config similarity index 62% copy from scripts/deploy/config/autobahn.config copy to scripts/deploy/config/pbft.config index bcf3e8d3..d20f841c 100644 --- a/scripts/deploy/config/autobahn.config +++ b/scripts/deploy/config/pbft.config @@ -4,8 +4,7 @@ "recovery_enabled": false, "max_client_complaint_num":10, "max_process_txn": 4096, - "worker_num": 5, - "input_worker_num": 1, - "output_worker_num": 5, - "block_size":100 + "worker_num": 16, + "input_worker_num": 5, + "output_worker_num": 5 } diff --git a/scripts/deploy/config/poe.config b/scripts/deploy/config/poe.config index c5092a94..8355bfe7 100644 --- a/scripts/deploy/config/poe.config +++ b/scripts/deploy/config/poe.config @@ -1,9 +1,9 @@ { - "clientBatchNum": 100, + "clientBatchNum": 400, "enable_viewchange": false, "recovery_enabled": false, "max_client_complaint_num":10, - "max_process_txn": 32, + "max_process_txn": 2048, "worker_num": 2, "input_worker_num": 1, "output_worker_num": 10 diff --git a/scripts/deploy/performance/hs_performance.sh b/scripts/deploy/performance/hs_performance.sh index 2eb03d82..3e653ed6 100755 --- a/scripts/deploy/performance/hs_performance.sh +++ b/scripts/deploy/performance/hs_performance.sh @@ -16,7 +16,7 @@ if [ ! -f "$config_file" ]; then break; fi echo "get cofigfile:"$config_file -/home/ubuntu/nexres/bazel-bin/benchmark/protocols/pbft/kv_service_tools $config_file +/home/ubuntu/asf_resilientdb/bazel-bin/benchmark/protocols/pbft/kv_service_tools $config_file done sleep 60 diff --git a/scripts/deploy/performance/pbft_performance.sh b/scripts/deploy/performance/pbft_performance.sh new file mode 100755 index 00000000..99ecbf15 --- /dev/null +++ b/scripts/deploy/performance/pbft_performance.sh @@ -0,0 +1,5 @@ +export server=//benchmark/protocols/pbft:kv_server_performance +export TEMPLATE_PATH=$PWD/config/pbft.config + +./performance/run_performance.sh $* +echo $0
