This is an automated email from the ASF dual-hosted git repository. junchao pushed a commit to branch cassandra in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit 089418e3421fe6cec322a8ca439bbef30ada4734 Author: cjcchen <[email protected]> AuthorDate: Fri Feb 27 05:47:18 2026 +0000 done --- .../ordering/autobahn/algorithm/autobahn.cpp | 65 ++++++++++++++++------ .../ordering/autobahn/algorithm/autobahn.h | 1 + .../autobahn/algorithm/proposal_manager.cpp | 16 ++++-- scripts/deploy/config/autobahn.config | 2 +- .../deploy/config/kv_performance_server_16.conf | 12 ++-- scripts/deploy/config/tusk.config | 2 +- 6 files changed, 68 insertions(+), 30 deletions(-) diff --git a/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp b/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp index 8580d74e..a44aa0cf 100644 --- a/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp +++ b/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp @@ -97,15 +97,35 @@ void AutoBahn::AsyncDissemination() { if(next_block == 1) { next_block++; } - LOG(ERROR)<<" get block :"<<next_block-1; + //LOG(ERROR)<<" get block :"<<next_block-1; + + std::vector<std::unique_ptr<Transaction>> txns; + while (!IsStop()) { + std::unique_ptr<Transaction> txn = txns_.Pop(); + if (txn == nullptr) { + continue; + } + txns.push_back(std::move(txn)); + for(int i = 1; i < batch_size_; ++i){ + std::unique_ptr<Transaction> txn = txns_.Pop(100); + if(txn == nullptr){ + break; + } + txns.push_back(std::move(txn)); + } + + proposal_manager_->MakeBlock(txns); + txns.clear(); + break; + } const Block* block = proposal_manager_->GetLocalBlock(next_block-1); if(block == nullptr) { next_block--; continue; } - LOG(ERROR)<<" broadcast block :"<<next_block-1<<" id:"<<block->local_id(); + //LOG(ERROR)<<" broadcast block :"<<next_block-1<<" id:"<<block->local_id(); Broadcast(MessageType::NewBlocks, *block); - LOG(ERROR)<<" broadcast block :"<<next_block-1<<" id:"<<block->local_id()<<" done"; + //LOG(ERROR)<<" broadcast block :"<<next_block-1<<" id:"<<block->local_id()<<" done"; } } @@ -194,7 +214,7 @@ bool AutoBahn::ReceiveProposal(std::unique_ptr<Proposal> proposal) { } bool AutoBahn::ReceiveVote(std::unique_ptr<Proposal> vote) { - LOG(ERROR)<<"recv vote ack:"<<vote->slot_id()<<" from:"<<vote->sender_id(); + //LOG(ERROR)<<"recv vote ack:"<<vote->slot_id()<<" from:"<<vote->sender_id(); std::unique_ptr<Proposal> vote_cpy = std::make_unique<Proposal>(*vote); @@ -203,13 +223,13 @@ bool AutoBahn::ReceiveVote(std::unique_ptr<Proposal> vote) { int sender = vote->sender_id(); vote_ack_[vote->slot_id()].insert(std::make_pair(vote->sender_id(), std::move(vote))); - LOG(ERROR)<<"recv vote ack:"<<slot_id<<" from:"<<sender - << " num:"<<vote_ack_[slot_id].size(); + //LOG(ERROR)<<"recv vote ack:"<<slot_id<<" from:"<<sender + // << " num:"<<vote_ack_[slot_id].size(); if (vote_ack_[slot_id].size() >= 2*f_ + 1){ PrepareDone(std::move(vote_cpy)); } - LOG(ERROR)<<"recv vote ack done"; + //LOG(ERROR)<<"recv vote ack done"; return true; } @@ -291,7 +311,7 @@ void AutoBahn::AsyncPrepare() { int delay = 1000; int wait_time = GetCurrentTime() - votes.begin()->second.first; wait_time = delay - wait_time; - LOG(ERROR)<<" view :"<<view<<" wait time:"<<wait_time; + //LOG(ERROR)<<" view :"<<view<<" wait time:"<<wait_time; if(wait_time> 0) { usleep(wait_time); } @@ -351,16 +371,27 @@ void AutoBahn::Commit(std::unique_ptr<Proposal> proposal) { int block_owner = block.sender_id(); int block_id = block.local_id(); //LOG(ERROR)<<" commit :"<<block_owner<<" block id :"<<block_id; - - Block * data_block = proposal_manager_->GetBlock(block_owner, block_id); - assert(data_block != nullptr); - - //LOG(ERROR)<<" txn size:"<<data_block->mutable_data()->transaction_size(); - for (Transaction& txn : - *data_block->mutable_data()->mutable_transaction()) { - txn.set_id(execute_id_++); - commit_(txn); + + int last_block_id = commit_block_[block_owner]; + + for(int i = last_block_id+1; i <= block_id; i++){ + Block * data_block = nullptr; + while(data_block == nullptr){ + data_block = proposal_manager_->GetBlock(block_owner, i); + if(data_block == nullptr) { + usleep(100); + } + } + assert(data_block != nullptr); + + //LOG(ERROR)<<" txn size:"<<data_block->mutable_data()->transaction_size()<<" block owner:"<<block_owner<<" id:"<<i; + for (Transaction& txn : + *data_block->mutable_data()->mutable_transaction()) { + txn.set_id(execute_id_++); + commit_(txn); + } } + commit_block_[block_owner] = block_id; } } diff --git a/platform/consensus/ordering/autobahn/algorithm/autobahn.h b/platform/consensus/ordering/autobahn/algorithm/autobahn.h index e4364daf..ab84c89a 100644 --- a/platform/consensus/ordering/autobahn/algorithm/autobahn.h +++ b/platform/consensus/ordering/autobahn/algorithm/autobahn.h @@ -68,6 +68,7 @@ class AutoBahn: public common::ProtocolBase { std::map<int, std::map<int, std::unique_ptr<Proposal>>> vote_ack_ ; std::map<int, std::set<int>> commit_ack_; Stats* global_stats_; + std::map<int, int64_t> commit_block_; }; } // namespace autobahn diff --git a/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp b/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp index b34dcb79..7d00e01d 100644 --- a/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp +++ b/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp @@ -66,6 +66,9 @@ Block* ProposalManager::GetBlock(int sender, int64_t block_id) { std::unique_lock<std::mutex> lk(mutex_); //LOG(ERROR)<<" get block from sender:"<<sender<<" block id:"<<block_id; auto it = pending_blocks_[sender].find(block_id); + if(it == pending_blocks_[sender].end()) { + return nullptr; + } assert(it != pending_blocks_[sender].end()); return it->second.get(); } @@ -201,11 +204,13 @@ void ProposalManager::IncreaseView() { std::pair<int, std::map<int, int64_t>> ProposalManager::GetCut() { std::map<int, int64_t> blocks; - std::unique_lock<std::mutex> lk(slot_mutex_); - for(auto it : slot_state_) { - if(it.second.first == current_slot_) { - blocks[it.first]=it.second.second; - //LOG(ERROR)<<"get cut sender:"<<it.first<<" block:"<<it.second.second; + { + std::unique_lock<std::mutex> lk(slot_mutex_); + for(auto it : slot_state_) { + if(it.second.first == current_slot_) { + blocks[it.first]=it.second.second; + //LOG(ERROR)<<"get cut sender:"<<it.first<<" block:"<<it.second.second; + } } } current_slot_++; @@ -219,6 +224,7 @@ std::unique_ptr<Proposal> ProposalManager::GenerateProposal(int slot, const std: for (auto& it: blocks) { Block* block = proposal->add_block(); Block* data_block = GetBlock(it.first, it.second); + assert(data_block != nullptr); data_hash += data_block->hash(); //LOG(ERROR)<<" gene proposal block from:"<<data_block->sender_id()<<" block id:"<<data_block->local_id(); *block->mutable_sign_info() = data_block->sign_info(); diff --git a/scripts/deploy/config/autobahn.config b/scripts/deploy/config/autobahn.config index f445a70a..bcf3e8d3 100644 --- a/scripts/deploy/config/autobahn.config +++ b/scripts/deploy/config/autobahn.config @@ -4,7 +4,7 @@ "recovery_enabled": false, "max_client_complaint_num":10, "max_process_txn": 4096, - "worker_num": 10, + "worker_num": 5, "input_worker_num": 1, "output_worker_num": 5, "block_size":100 diff --git a/scripts/deploy/config/kv_performance_server_16.conf b/scripts/deploy/config/kv_performance_server_16.conf index b4194eb4..8604fec1 100644 --- a/scripts/deploy/config/kv_performance_server_16.conf +++ b/scripts/deploy/config/kv_performance_server_16.conf @@ -1,27 +1,21 @@ iplist=( 172.31.18.66 -172.31.27.193 172.31.27.153 -172.31.16.133 172.31.31.76 -172.31.17.121 172.31.24.63 172.31.20.159 172.31.25.33 -172.31.19.32 172.31.30.103 172.31.31.166 172.31.30.230 172.31.16.98 172.31.18.177 -172.31.20.176 172.31.18.170 172.31.30.105 172.31.27.190 172.31.20.125 172.31.20.124 172.31.25.251 -172.31.26.198 172.31.31.197 172.31.17.129 172.31.19.128 @@ -31,6 +25,12 @@ iplist=( 172.31.22.70 172.31.19.90 172.31.18.21 +172.31.18.78 +172.31.25.28 +172.31.28.219 +172.31.20.219 +172.31.19.26 +172.31.25.95 ) key=~/.ssh/junchao.pem diff --git a/scripts/deploy/config/tusk.config b/scripts/deploy/config/tusk.config index 6deeade5..5f0fbd90 100644 --- a/scripts/deploy/config/tusk.config +++ b/scripts/deploy/config/tusk.config @@ -1,5 +1,5 @@ { - "clientBatchNum": 100, + "clientBatchNum": 400, "enable_viewchange": false, "recovery_enabled": false, "max_client_complaint_num":10,
