This is an automated email from the ASF dual-hosted git repository.

junchao pushed a commit to branch cassandra
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git

commit 089418e3421fe6cec322a8ca439bbef30ada4734
Author: cjcchen <[email protected]>
AuthorDate: Fri Feb 27 05:47:18 2026 +0000

    done
---
 .../ordering/autobahn/algorithm/autobahn.cpp       | 65 ++++++++++++++++------
 .../ordering/autobahn/algorithm/autobahn.h         |  1 +
 .../autobahn/algorithm/proposal_manager.cpp        | 16 ++++--
 scripts/deploy/config/autobahn.config              |  2 +-
 .../deploy/config/kv_performance_server_16.conf    | 12 ++--
 scripts/deploy/config/tusk.config                  |  2 +-
 6 files changed, 68 insertions(+), 30 deletions(-)

diff --git a/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp 
b/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp
index 8580d74e..a44aa0cf 100644
--- a/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp
+++ b/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp
@@ -97,15 +97,35 @@ void AutoBahn::AsyncDissemination() {
     if(next_block == 1) {
       next_block++;
     }
-    LOG(ERROR)<<" get block :"<<next_block-1;
+    //LOG(ERROR)<<" get block :"<<next_block-1;
+
+    std::vector<std::unique_ptr<Transaction>> txns;
+    while (!IsStop()) {
+      std::unique_ptr<Transaction> txn = txns_.Pop();
+      if (txn == nullptr) {
+        continue;
+      }
+      txns.push_back(std::move(txn));
+      for(int i = 1; i < batch_size_; ++i){
+        std::unique_ptr<Transaction> txn = txns_.Pop(100);
+        if(txn == nullptr){
+          break;
+        }
+        txns.push_back(std::move(txn));
+      }
+
+      proposal_manager_->MakeBlock(txns);
+      txns.clear();
+      break;
+    }
     const Block* block = proposal_manager_->GetLocalBlock(next_block-1);
     if(block == nullptr) {
       next_block--;
       continue;
     }
-    LOG(ERROR)<<" broadcast block :"<<next_block-1<<" id:"<<block->local_id();
+    //LOG(ERROR)<<" broadcast block :"<<next_block-1<<" 
id:"<<block->local_id();
     Broadcast(MessageType::NewBlocks, *block);
-    LOG(ERROR)<<" broadcast block :"<<next_block-1<<" 
id:"<<block->local_id()<<" done";
+    //LOG(ERROR)<<" broadcast block :"<<next_block-1<<" 
id:"<<block->local_id()<<" done";
   }
 }
 
@@ -194,7 +214,7 @@ bool AutoBahn::ReceiveProposal(std::unique_ptr<Proposal> 
proposal) {
 }
 
 bool AutoBahn::ReceiveVote(std::unique_ptr<Proposal> vote) {
-  LOG(ERROR)<<"recv vote ack:"<<vote->slot_id()<<" from:"<<vote->sender_id(); 
+  //LOG(ERROR)<<"recv vote ack:"<<vote->slot_id()<<" 
from:"<<vote->sender_id(); 
 
   std::unique_ptr<Proposal> vote_cpy = std::make_unique<Proposal>(*vote);
 
@@ -203,13 +223,13 @@ bool AutoBahn::ReceiveVote(std::unique_ptr<Proposal> 
vote) {
   int sender = vote->sender_id();
   vote_ack_[vote->slot_id()].insert(std::make_pair(vote->sender_id(), 
std::move(vote)));
 
-  LOG(ERROR)<<"recv vote ack:"<<slot_id<<" from:"<<sender
-    << " num:"<<vote_ack_[slot_id].size();
+  //LOG(ERROR)<<"recv vote ack:"<<slot_id<<" from:"<<sender
+  //  << " num:"<<vote_ack_[slot_id].size();
 
   if (vote_ack_[slot_id].size() >= 2*f_ + 1){
       PrepareDone(std::move(vote_cpy));
   }
-  LOG(ERROR)<<"recv vote ack done";
+  //LOG(ERROR)<<"recv vote ack done";
   return true;
 }
 
@@ -291,7 +311,7 @@ void AutoBahn::AsyncPrepare() {
       int delay = 1000;
       int wait_time = GetCurrentTime() - votes.begin()->second.first;
       wait_time = delay - wait_time;
-      LOG(ERROR)<<" view :"<<view<<" wait time:"<<wait_time;
+      //LOG(ERROR)<<" view :"<<view<<" wait time:"<<wait_time;
       if(wait_time> 0) {
         usleep(wait_time);
       }
@@ -351,16 +371,27 @@ void AutoBahn::Commit(std::unique_ptr<Proposal> proposal) 
{
     int block_owner = block.sender_id();
     int block_id = block.local_id();
     //LOG(ERROR)<<" commit :"<<block_owner<<" block id :"<<block_id;
-  
-    Block * data_block = proposal_manager_->GetBlock(block_owner, block_id);
-    assert(data_block != nullptr);
-
-    //LOG(ERROR)<<" txn size:"<<data_block->mutable_data()->transaction_size();
-    for (Transaction& txn :
-        *data_block->mutable_data()->mutable_transaction()) {
-      txn.set_id(execute_id_++);
-      commit_(txn);
+
+    int last_block_id = commit_block_[block_owner];
+
+    for(int i = last_block_id+1; i <= block_id; i++){
+      Block * data_block = nullptr;
+      while(data_block == nullptr){
+        data_block = proposal_manager_->GetBlock(block_owner, i);
+        if(data_block == nullptr) {
+          usleep(100);
+        }
+      }
+      assert(data_block != nullptr);
+
+      //LOG(ERROR)<<" txn 
size:"<<data_block->mutable_data()->transaction_size()<<" block  
owner:"<<block_owner<<" id:"<<i;
+      for (Transaction& txn :
+          *data_block->mutable_data()->mutable_transaction()) {
+        txn.set_id(execute_id_++);
+        commit_(txn);
+      }
     }
+    commit_block_[block_owner] = block_id;
   }
 }
 
diff --git a/platform/consensus/ordering/autobahn/algorithm/autobahn.h 
b/platform/consensus/ordering/autobahn/algorithm/autobahn.h
index e4364daf..ab84c89a 100644
--- a/platform/consensus/ordering/autobahn/algorithm/autobahn.h
+++ b/platform/consensus/ordering/autobahn/algorithm/autobahn.h
@@ -68,6 +68,7 @@ class AutoBahn: public common::ProtocolBase {
   std::map<int, std::map<int, std::unique_ptr<Proposal>>> vote_ack_ ;
   std::map<int, std::set<int>>  commit_ack_;
   Stats* global_stats_;
+  std::map<int, int64_t> commit_block_;
 };
 
 }  // namespace autobahn
diff --git 
a/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp 
b/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp
index b34dcb79..7d00e01d 100644
--- a/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp
+++ b/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp
@@ -66,6 +66,9 @@ Block* ProposalManager::GetBlock(int sender, int64_t 
block_id) {
   std::unique_lock<std::mutex> lk(mutex_);
   //LOG(ERROR)<<" get block from sender:"<<sender<<" block id:"<<block_id;
   auto it = pending_blocks_[sender].find(block_id);
+  if(it == pending_blocks_[sender].end()) {
+    return nullptr;
+  }
   assert(it != pending_blocks_[sender].end());
   return it->second.get();
 }
@@ -201,11 +204,13 @@ void ProposalManager::IncreaseView() {
 
 std::pair<int, std::map<int, int64_t>> ProposalManager::GetCut() {
   std::map<int, int64_t> blocks;
-  std::unique_lock<std::mutex> lk(slot_mutex_);
-  for(auto it : slot_state_) {
-    if(it.second.first == current_slot_) {
-      blocks[it.first]=it.second.second;
-      //LOG(ERROR)<<"get cut sender:"<<it.first<<" block:"<<it.second.second;
+  {
+    std::unique_lock<std::mutex> lk(slot_mutex_);
+    for(auto it : slot_state_) {
+      if(it.second.first == current_slot_) {
+        blocks[it.first]=it.second.second;
+        //LOG(ERROR)<<"get cut sender:"<<it.first<<" block:"<<it.second.second;
+      }
     }
   }
   current_slot_++;
@@ -219,6 +224,7 @@ std::unique_ptr<Proposal> 
ProposalManager::GenerateProposal(int slot, const std:
     for (auto& it: blocks) {
       Block* block = proposal->add_block();
       Block* data_block = GetBlock(it.first, it.second);
+      assert(data_block != nullptr);
       data_hash += data_block->hash();
       //LOG(ERROR)<<" gene proposal block from:"<<data_block->sender_id()<<" 
block id:"<<data_block->local_id();
       *block->mutable_sign_info() = data_block->sign_info();
diff --git a/scripts/deploy/config/autobahn.config 
b/scripts/deploy/config/autobahn.config
index f445a70a..bcf3e8d3 100644
--- a/scripts/deploy/config/autobahn.config
+++ b/scripts/deploy/config/autobahn.config
@@ -4,7 +4,7 @@
   "recovery_enabled": false,
   "max_client_complaint_num":10,
   "max_process_txn": 4096,
-  "worker_num": 10,
+  "worker_num": 5,
   "input_worker_num": 1,
   "output_worker_num": 5,
   "block_size":100
diff --git a/scripts/deploy/config/kv_performance_server_16.conf 
b/scripts/deploy/config/kv_performance_server_16.conf
index b4194eb4..8604fec1 100644
--- a/scripts/deploy/config/kv_performance_server_16.conf
+++ b/scripts/deploy/config/kv_performance_server_16.conf
@@ -1,27 +1,21 @@
 iplist=(
 172.31.18.66
-172.31.27.193
 172.31.27.153
-172.31.16.133
 172.31.31.76
-172.31.17.121
 172.31.24.63
 172.31.20.159
 172.31.25.33
-172.31.19.32
 172.31.30.103
 172.31.31.166
 172.31.30.230
 172.31.16.98
 172.31.18.177
-172.31.20.176
 172.31.18.170
 172.31.30.105
 172.31.27.190
 172.31.20.125
 172.31.20.124
 172.31.25.251
-172.31.26.198
 172.31.31.197
 172.31.17.129
 172.31.19.128
@@ -31,6 +25,12 @@ iplist=(
 172.31.22.70
 172.31.19.90
 172.31.18.21
+172.31.18.78
+172.31.25.28
+172.31.28.219
+172.31.20.219
+172.31.19.26
+172.31.25.95
 )
 
 key=~/.ssh/junchao.pem
diff --git a/scripts/deploy/config/tusk.config 
b/scripts/deploy/config/tusk.config
index 6deeade5..5f0fbd90 100644
--- a/scripts/deploy/config/tusk.config
+++ b/scripts/deploy/config/tusk.config
@@ -1,5 +1,5 @@
 {
-  "clientBatchNum": 100,
+  "clientBatchNum": 400,
   "enable_viewchange": false,
   "recovery_enabled": false,
   "max_client_complaint_num":10,

Reply via email to