This is an automated email from the ASF dual-hosted git repository.

junchao pushed a commit to branch cassandra
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git

commit ad8daedc80e0b9e318f630ec5cb85949b377d345
Author: cjcchen <[email protected]>
AuthorDate: Mon Dec 1 23:14:01 2025 +0000

    add test
---
 .../ordering/cassandra/algorithm/cassandra.cpp     |  40 ++++---
 .../cassandra/algorithm/proposal_graph.cpp         | 119 +++++++--------------
 .../cassandra/algorithm/proposal_manager.cpp       |   2 +
 .../ordering/cassandra/algorithm/proposal_state.h  |   6 ++
 .../ordering/cassandra_cft/algorithm/cassandra.cpp |  54 ++++++----
 .../cassandra_cft/algorithm/proposal_graph.cpp     |  41 ++++---
 .../ordering/cassandra_cft/framework/consensus.cpp |   2 +-
 .../ordering/multipaxos/algorithm/multipaxos.cpp   |  22 ++--
 .../ordering/multipaxos/framework/consensus.cpp    |   4 +-
 .../multipaxos/framework/performance_manager.cpp   |  49 +++++++++
 .../multipaxos/framework/performance_manager.h     |  46 ++++++++
 .../consensus/ordering/tusk/algorithm/tusk.cpp     |  24 ++++-
 platform/statistic/stats.cpp                       |   2 +
 scripts/deploy/config/cassandra.config             |   6 +-
 .../deploy/config/kv_performance_server_11.conf    |  27 +++++
 .../deploy/config/kv_performance_server_21.conf    |  69 ++++++++++++
 scripts/deploy/config/multipaxos.config            |   2 +-
 scripts/deploy/config/tusk.config                  |   6 +-
 18 files changed, 372 insertions(+), 149 deletions(-)

diff --git a/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp 
b/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp
index 832e00c0..ddad256f 100644
--- a/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp
+++ b/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp
@@ -5,6 +5,7 @@
 #include "common/crypto/signature_verifier.h"
 #include "common/utils/utils.h"
 
+
 namespace resdb {
 namespace cassandra {
 namespace cassandra_recv {
@@ -17,7 +18,7 @@ Cassandra::Cassandra(int id, int f, int total_num, int 
block_size, SignatureVeri
   total_num_ = total_num;
   f_ = f;
   is_stop_ = false;
-  timeout_ms_ = 1000;
+  timeout_ms_ = 100;
   //timeout_ms_ = 60000;
   local_txn_id_ = 1;
   local_proposal_id_ = 1;
@@ -168,7 +169,7 @@ void Cassandra::AsyncCommit() {
       }
     }
     LOG(ERROR)<<" commit done:"<<txn_num<<" 
proposer:"<<p->header().proposer_id()<<" height:"<<p->header().height();
-    global_stats_->AddCommitTxn(txn_num);
+    //global_stats_->AddCommitTxn(txn_num);
   }
 
 }
@@ -469,9 +470,8 @@ bool Cassandra::ReceiveProposal(std::unique_ptr<Proposal> 
proposal) {
                  << " pre id:" << pre_p->header().proposal_id();
     }
 
-  /*
-
-    if(proposal->header().height() >=250 && proposal->header().height() <300){
+#ifdef NOPOA
+    if(proposal->header().height() >=850 && proposal->header().height() <1350){
       if(Checklimit(0, f_, proposal->header().proposer_id())
           || Checklimit(f_+1, 2*f_,proposal->header().proposer_id())
           || Checklimit(2*f_+1, 3*f_,proposal->header().proposer_id())
@@ -479,24 +479,34 @@ bool Cassandra::ReceiveProposal(std::unique_ptr<Proposal> 
proposal) {
         return true;
       }
     }
-    */
+    #endif
 
-    if(proposal->header().height() >=150 && proposal->header().height() <200){
+  #ifdef GOTPOA
+    if(proposal->header().height() >=850 && proposal->header().height() <1350){
         if(!((proposal->header().proposer_id() <= 2*f_ && id_ <=2*f_)
         || (proposal->header().proposer_id() > 2*f_ && id_ >2*f_))) {
           return true;
         }
       }
-   
+      #endif
+
 
-   /*
-    if(proposal->header().height() >=250 && proposal->header().height() <350){
+  #ifdef GOTPOR
+    if(proposal->header().height() >=150 && proposal->header().height() <250){
         if(!((proposal->header().proposer_id() <= 2*f_+1 && id_ <=2*f_+1)
         || (proposal->header().proposer_id() > 2*f_+1 && id_ >2*f_+1))) {
           return true;
         }
       }
-      */
+    #endif
+
+/*
+    if(!((proposal->header().proposer_id() <= 2*f_+1 && id_ <=2*f_+1)
+          || (proposal->header().proposer_id() > 2*f_+1 && id_ >2*f_+1))) {
+      return true;
+    }
+    */
+
 
     /*
       if(proposal->header().height() >=200 && proposal->header().height() 
<205){
@@ -551,12 +561,16 @@ bool Cassandra::AddProposal(const Proposal& proposal) {
       AskProposal(proposal);
       //assert(1==0);
     }
+  #ifdef GOPOA
     else {
+  #endif
       return false;
+  #ifdef GOPOA
     }
+  #endif
   }
 
-  LOG(ERROR)<<" proposal blocks:"<<proposal.block_size();
+  //LOG(ERROR)<<" proposal blocks:"<<proposal.block_size();
   for (const Block& block : proposal.block()) {
     bool ret = false;;
     for(int i = 0; i< 5; ++i){
@@ -612,7 +626,7 @@ bool Cassandra::AddProposal(const Proposal& proposal) {
       //LOG(ERROR) << "can vote:";
     }
   }
-   LOG(ERROR)<<"recv done";
+   //LOG(ERROR)<<"recv done";
 
   std::vector<std::unique_ptr<Proposal>> future_g = graph_->GetNotFound(
       proposal.header().height() + 1, proposal.header().hash());
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp 
b/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp
index df8ccb42..e5b91135 100644
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp
+++ b/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp
@@ -72,23 +72,6 @@ int ProposalGraph::AddProposal(const Proposal& proposal) {
              << " current height:" << current_height_<<" 
from:"<<proposal.header().proposer_id()<<" proposal 
id:"<<proposal.header().proposal_id();
   assert(current_height_ >= latest_commit_.header().height());
 
-  {
-    int cur_h = proposal.header().height();
-    for(int i = 0; i <3 && proposal.history_size()>=3; ++i){
-      const auto& sub_history = proposal.history(i);
-      std::string sub_hash = sub_history.hash();
-      auto sub_node_it = node_info_.find(sub_hash);
-      if(sub_node_it == node_info_.end()){
-        continue;
-      }
-      LOG(ERROR)<<" history node 
:"<<sub_node_it->second->proposal.header().proposer_id()
-        <<" height:"<<sub_node_it->second->proposal.header().height()
-        <<" hash:"<<sub_hash;
-      assert(cur_h == sub_node_it->second->proposal.header().height()+1);
-      cur_h--;
-    }
-  }
-
 
   /*
   if (proposal.header().height() < current_height_) {
@@ -125,6 +108,7 @@ int ProposalGraph::AddProposal(const Proposal& proposal) {
     LOG(ERROR) << "verify parent fail:" << proposal.header().proposer_id()
                << " id:" << proposal.header().proposal_id();
 
+  #ifdef GOPOA
     g_[proposal.header().prehash()].push_back(proposal.header().hash());
     auto np = std::make_unique<NodeInfo>(proposal);
     //new_proposals_[proposal.header().hash()] = &np->proposal;
@@ -133,12 +117,13 @@ int ProposalGraph::AddProposal(const Proposal& proposal) {
     // hash:"<<Encode(proposal.header().hash());
     node_info_[proposal.header().hash()] = std::move(np);
     last_node_[proposal.header().height()].insert(proposal.header().hash());
+  #endif
 
     // assert(1==0);
     return 2;
   }
 
-  LOG(ERROR)<<"history size:"<<proposal.history_size();
+  //LOG(ERROR)<<"history size:"<<proposal.history_size();
    /*
   for (const auto& history : proposal.history()) {
     std::string hash = history.hash();
@@ -179,8 +164,8 @@ int ProposalGraph::AddProposal(const Proposal& proposal) {
       if(sub_node_it == node_info_.end()){
         continue;
       }
-      LOG(ERROR)<<" state:"<<sub_node_it->second->state<<" 
node:"<<sub_node_it->second->proposal.header().proposer_id()
-      <<" height:"<<sub_node_it->second->proposal.header().height();
+      //LOG(ERROR)<<" state:"<<sub_node_it->second->state<<" 
node:"<<sub_node_it->second->proposal.header().proposer_id()
+      //<<" height:"<<sub_node_it->second->proposal.header().height();
       assert(cur_h == sub_node_it->second->proposal.header().height()+1);
       cur_h--;
     // 
LOG(ERROR)<<"proposal:("<<node_it->second->proposal.header().proposer_id()
@@ -190,45 +175,20 @@ int ProposalGraph::AddProposal(const Proposal& proposal) {
       num++;
     }
   
-    LOG(ERROR)<<"get num:"<<num; 
+    //LOG(ERROR)<<"get num:"<<num; 
     if(num == 3) {
       const auto& sub_history = proposal.history(2);
       std::string sub_hash = sub_history.hash();
 
       auto sub_node_it = node_info_.find(sub_hash);
-      LOG(ERROR)<<" state:"<<node_it->second->state;
-      if (sub_node_it->second->state != ProposalState::Committed) {
-        Commit(sub_hash);
-      }
-    }
-  }
-
-/*
-  for (const auto& history : proposal.history()) {
-    std::string hash = history.hash();
-    auto node_it = node_info_.find(hash);
-    for (auto s : GetStates()) {
-      if (s >= node_it->second->state && s <= history.state()) {
-        if (s != history.state()) {
-          LOG(ERROR) << "update from higher state";
+      if(sub_node_it != node_info_.end()){
+        //LOG(ERROR)<<" state:"<<node_it->second->state;
+        if (sub_node_it->second->state != ProposalState::Committed) {
+          Commit(sub_hash);
         }
-        node_it->second->votes[s].insert(proposal.header().proposer_id());
       }
     }
-    // 
LOG(ERROR)<<"proposal:("<<node_it->second->proposal.header().proposer_id()
-    //<<","<<node_it->second->proposal.header().proposal_id()<<")"
-    //<<" history state:"<<history.state()<<"
-    //num:"<<node_it->second->votes[history.state()].size();
-    CheckState(node_it->second.get(),
-               static_cast<resdb::cassandra::ProposalState>(history.state()));
-    if (node_it->second->state == ProposalState::PreCommit) {
-      Commit(hash);
-    }
-    //LOG(ERROR)<<"history 
proposal:("<<node_it->second->proposal.header().proposer_id()
-    //<<","<<node_it->second->proposal.header().proposal_id()
-    //<<" state:"<<node_it->second->state;
   }
-  */
 
     //LOG(ERROR) << "height:" << current_height_
      //          << " proposal height:" << proposal.header().height();
@@ -249,24 +209,11 @@ int ProposalGraph::AddProposal(const Proposal& proposal) {
     g_[proposal.header().prehash()].push_back(proposal.header().hash());
     auto np = std::make_unique<NodeInfo>(proposal);
     //new_proposals_[proposal.header().hash()] = &np->proposal;
-    LOG(ERROR)<<"add proposal proposer:"<<proposal.header().proposer_id()<<" 
id:"<<proposal.header().proposal_id()<<" 
hash:"<<Encode(proposal.header().hash());
+    //LOG(ERROR)<<"add proposal proposer:"<<proposal.header().proposer_id()<<" 
id:"<<proposal.header().proposal_id()<<" 
hash:"<<Encode(proposal.header().hash());
     node_info_[proposal.header().hash()] = std::move(np);
     last_node_[proposal.header().height()].insert(proposal.header().hash());
   }
 
-/*
-  LOG(ERROR)<<" node info size:"<<node_info_.size();
-  std::string hash = proposal.header().hash();
-  auto node_it = node_info_.find(hash);
-  assert(node_it != node_info_.end());
-  
node_it->second->votes[ProposalState::New].insert(proposal.header().proposer_id());
-
-  CheckState(node_it->second.get(),
-               
static_cast<resdb::cassandra::ProposalState>(ProposalState::New));
-               */
-
-
-
   //LOG(ERROR)<<"add graph done";
   return 0;
 }
@@ -329,6 +276,10 @@ void ProposalGraph::Commit(const std::string& hash) {
       auto it = node_info_.find(c_hash);
       if (it == node_info_.end()) {
         LOG(ERROR) << "node not found, hash:";
+  #ifdef GOPOA
+        commit_p.clear();
+        break;
+  #endif
         assert(1 == 0);
       }
 
@@ -362,6 +313,10 @@ void ProposalGraph::Commit(const std::string& hash) {
   if (commit_p.size() > 1) {
     LOG(ERROR) << "commit more hash";
   }
+  if(commit_p.size()==0) {
+    LOG(ERROR) << "commit not ready";
+    return;
+  }
   assert(commit_p.size()>0);
   int block_num = 0;
   int p_num = 0;
@@ -390,11 +345,11 @@ void ProposalGraph::Commit(const std::string& hash) {
 
 
       for(auto block : commit_p[i][j]->sub_block()){
-        LOG(ERROR) << "commmit proposal from:" << 
commit_p[i][j]->header().proposer_id()
-                   << " block id:" << block.local_id();
+        //LOG(ERROR) << "commmit proposal from:" << 
commit_p[i][j]->header().proposer_id()
+                   //<< " block id:" << block.local_id();
         if(check_.find(std::make_pair(block.local_id(), 
commit_p[i][j]->header().proposer_id())) != check_.end()){
-          LOG(ERROR) << "commmit proposal from:" << 
commit_p[i][j]->header().proposer_id()
-                   << " block id:" << block.local_id() << "has committed";
+          //LOG(ERROR) << "commmit proposal from:" << 
commit_p[i][j]->header().proposer_id()
+          //         << " block id:" << block.local_id() << "has committed";
         }
         else {
           check_.insert(std::make_pair(block.local_id(), 
commit_p[i][j]->header().proposer_id()));
@@ -409,7 +364,9 @@ void ProposalGraph::Commit(const std::string& hash) {
     }
   }
   //global_stats_->AddCommitBlock(block_num);
-  LOG(ERROR)<<" commit proposal num:"<<p_num;
+  global_stats_->AddCommitTxn(p_num);
+  //LOG(ERROR)<<" commit proposal num:"<<p_num;
+  LOG(ERROR)<<"commit proposal from 
:"<<it->second->proposal.header().proposer_id()<<" 
id:"<<it->second->proposal.header().proposal_id()<<" 
height:"<<it->second->proposal.header().height()<<" num:"<<p_num;
   // TODO clean
   last_node_[it->second->proposal.header().height()].clear();
   latest_commit_ = it->second->proposal;
@@ -478,7 +435,7 @@ void ProposalGraph::UpdateHistory(Proposal* proposal) {
     his->set_sender(node_it->second->proposal.header().proposer_id());
     his->set_id(node_it->second->proposal.header().proposal_id());
     hash = node_it->second->proposal.header().prehash();
-    LOG(ERROR)<<" get his 
proposer:"<<node_it->second->proposal.header().proposer_id()<<" 
height:"<<node_it->second->proposal.header().height();
+    //LOG(ERROR)<<" get his 
proposer:"<<node_it->second->proposal.header().proposer_id()<<" 
height:"<<node_it->second->proposal.header().height();
   }
 }
 
@@ -570,7 +527,7 @@ Proposal* ProposalGraph::GetStrongestProposal() {
 
 
 
-  LOG(ERROR)<<" update his";
+  //LOG(ERROR)<<" update his";
   UpdateHistory(&sp->proposal);
    //LOG(ERROR) << "get strong proposal from height:" << current_height_ << " 
->("
    //          << sp->proposal.header().proposer_id() << ","
@@ -605,14 +562,14 @@ int ProposalGraph::CompareState(const ProposalState& 
state1,
 
 // p1 < p2
 bool ProposalGraph::Compare(const NodeInfo& p1, const NodeInfo& p2) {
-  LOG(ERROR) << "proposer:" << p1.proposal.header().proposer_id() << " "
-             << p2.proposal.header().proposer_id()
-            << "height:" << p1.proposal.header().height() << " "
-            << p2.proposal.header().height()
-            <<" state:"<< p1.state<<" "<<p2.state
-           <<" hash cmp:"<< (p1.proposal.header().hash() < 
p2.proposal.header().hash())
-           <<" cmp num:" << Cmp(p1.proposal.header().proposer_id(), 
p2.proposal.header().proposer_id())
-           <<" sub block:" << p1.proposal.sub_block_size() <<" "<< 
p2.proposal.sub_block_size();
+  //LOG(ERROR) << "proposer:" << p1.proposal.header().proposer_id() << " "
+  //           << p2.proposal.header().proposer_id()
+  //          << "height:" << p1.proposal.header().height() << " "
+  //          << p2.proposal.header().height()
+  //          <<" state:"<< p1.state<<" "<<p2.state
+  //         <<" hash cmp:"<< (p1.proposal.header().hash() < 
p2.proposal.header().hash())
+  //         <<" cmp num:" << Cmp(p1.proposal.header().proposer_id(), 
p2.proposal.header().proposer_id())
+  //         <<" sub block:" << p1.proposal.sub_block_size() <<" "<< 
p2.proposal.sub_block_size();
   if (p1.proposal.header().height() != p2.proposal.header().height()) {
     return p1.proposal.header().height() < p2.proposal.header().height();
   }
@@ -622,11 +579,13 @@ bool ProposalGraph::Compare(const NodeInfo& p1, const 
NodeInfo& p2) {
     return CompareState(p1.state, p2.state) < 0;
   }
 
-  if(150<=p1.proposal.header().height()&&p1.proposal.header().height()<=250) {
+  #ifdef GOPOA
+  if(850<=p1.proposal.header().height()&&p1.proposal.header().height()<=1350) {
     int h = (p1.proposal.header().height())%2;
     if ( h == 0) h = 2;
     return abs(p1.proposal.header().proposer_id() - h ) > 
abs(p2.proposal.header().proposer_id() - h);
   }
+  #endif
 
   int h = (p1.proposal.header().height())%total_num_;
   if ( h == 0) h = total_num_;
diff --git 
a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp 
b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp
index 40a60bee..41c03cd2 100644
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp
+++ b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp
@@ -415,7 +415,9 @@ int ProposalManager::VerifyProposal(const 
ProposalQueryResp& resp) {
   }
 
   tmp_proposal_.clear();
+  #ifdef GOPOA
   return 0;
+  #endif
 
   for (const Proposal& p : resp.proposal()) {
     //LOG(ERROR)<<"verify resp proposal 
proposer:"<<p.header().proposer_id()<<" id:"<<p.header().proposal_id();
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h 
b/platform/consensus/ordering/cassandra/algorithm/proposal_state.h
index 3eb1d83a..876b1be0 100644
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h
+++ b/platform/consensus/ordering/cassandra/algorithm/proposal_state.h
@@ -1,5 +1,11 @@
 #pragma once
 
+//#define GOPOA
+//#define GOTPOA
+#define GOTPOR
+
+//#define NOPOA
+
 namespace resdb {
 namespace cassandra {
 
diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.cpp 
b/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.cpp
index 4a137957..beb4fb3b 100644
--- a/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.cpp
+++ b/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.cpp
@@ -17,8 +17,8 @@ Cassandra::Cassandra(int id, int f, int total_num, int 
block_size, SignatureVeri
   total_num_ = total_num;
   f_ = f;
   is_stop_ = false;
-  //timeout_ms_ = 10000;
-  timeout_ms_ = 60000;
+  timeout_ms_ = 1600;
+  //timeout_ms_ = 60000;
   local_txn_id_ = 1;
   local_proposal_id_ = 1;
   batch_size_ = block_size;
@@ -432,7 +432,19 @@ void Cassandra::ReceiveProposalQueryResp(const 
ProposalQueryResp& resp) {
 bool Cassandra::ReceiveProposal(std::unique_ptr<Proposal> proposal) {
   // std::unique_lock<std::mutex> lk(g_mutex_);
   {
-    //LOG(ERROR)<<"recv proposal, height:"<<proposal->header().height()<<" 
block size:"<<proposal->block_size();
+    LOG(ERROR)<<"recv proposal, height:"<<proposal->header().height()<<" block 
size:"<<proposal->block_size()
+      << "from :"
+                 << proposal->header().proposer_id()
+                 <<" id:"<<id_
+                 <<" f:"<<f_
+                 <<" pass:"<< (((proposal->header().proposer_id() <= 
total_num_-f_) && (id_ <=total_num_-f_))
+          || ((proposal->header().proposer_id() > total_num_-f_) && (id_ 
>total_num_-f_)));
+
+    if(!((proposal->header().proposer_id() <= total_num_-f_) && (id_ 
<=total_num_-f_)
+          || (proposal->header().proposer_id() > total_num_-f_ && id_ 
>total_num_-f_))) {
+      return true;
+    }
+
     std::unique_lock<std::mutex> lk(mutex_);
   
     for(const auto& block : proposal->block()){
@@ -440,13 +452,15 @@ bool Cassandra::ReceiveProposal(std::unique_ptr<Proposal> 
proposal) {
       proposal_manager_->AddBlock(std::move(new_block));
     }
 
-    //LOG(ERROR)<<"add block done";
-    const Proposal* pre_p =
-        graph_->GetProposalInfo(proposal->header().prehash());
+    LOG(ERROR)<<"add block done:"<<proposal->block_size();
+    const Proposal* pre_p = nullptr;
+    if(proposal->header().prehash().size()>0){
+      pre_p = graph_->GetProposalInfo(proposal->header().prehash());
+    }
     if (pre_p == nullptr) {
-     // LOG(ERROR) << "receive proposal from :"
-      //           << proposal->header().proposer_id()
-       //          << " id:" << proposal->header().proposal_id() << "no pre:";
+      LOG(ERROR) << "receive proposal from :"
+                 << proposal->header().proposer_id()
+                 << " id:" << proposal->header().proposal_id() << "no pre:";
        if(!proposal->header().prehash().empty()) {
          if (proposal->header().height() > graph_->GetCurrentHeight()) {
            future_proposal_[proposal->header().height()].push_back(
@@ -456,11 +470,11 @@ bool Cassandra::ReceiveProposal(std::unique_ptr<Proposal> 
proposal) {
        }
       assert(proposal->header().prehash().empty());
     } else {
-      //LOG(ERROR) << "receive proposal from :"
-      //           << proposal->header().proposer_id()
-      //           << " id:" << proposal->header().proposal_id()
-      //           << " pre:" << pre_p->header().proposer_id()
-      //           << " pre id:" << pre_p->header().proposal_id();
+      LOG(ERROR) << "receive proposal from :"
+                 << proposal->header().proposer_id()
+                 << " id:" << proposal->header().proposal_id()
+                 << " pre:" << pre_p->header().proposer_id()
+                 << " pre id:" << pre_p->header().proposal_id();
     }
 
     /*
@@ -505,7 +519,7 @@ bool Cassandra::AddProposal(const Proposal& proposal) {
     return false;
   }
 
-  //LOG(ERROR)<<" proposal blocks:"<<proposal.block_size();
+  LOG(ERROR)<<" proposal blocks:"<<proposal.block_size();
   for (const Block& block : proposal.block()) {
     bool ret = false;;
     for(int i = 0; i< 5; ++i){
@@ -548,11 +562,11 @@ bool Cassandra::AddProposal(const Proposal& proposal) {
 
   received_num_[proposal.header().height()].insert(
       proposal.header().proposer_id());
-  //LOG(ERROR) << "received current height:" << graph_->GetCurrentHeight()
-  //           << " proposal height:" << proposal.header().height()
-  //           << " num:" << received_num_[graph_->GetCurrentHeight()].size()
-  //           << " from:" << proposal.header().proposer_id()
-  //           << " last vote:" << last_vote_;
+  LOG(ERROR) << "received current height:" << graph_->GetCurrentHeight()
+             << " proposal height:" << proposal.header().height()
+             << " num:" << received_num_[graph_->GetCurrentHeight()].size()
+             << " from:" << proposal.header().proposer_id()
+             << " last vote:" << last_vote_;
   if (received_num_[graph_->GetCurrentHeight()].size() == total_num_) {
     if (last_vote_ < graph_->GetCurrentHeight()) {
       last_vote_ = graph_->GetCurrentHeight();
diff --git 
a/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.cpp 
b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.cpp
index c2a40ebc..aab721ec 100644
--- a/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.cpp
+++ b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.cpp
@@ -150,19 +150,30 @@ int ProposalGraph::AddProposal(const Proposal& proposal) {
       const auto& sub_history = proposal.history(i);
       std::string sub_hash = sub_history.hash();
       auto sub_node_it = node_info_.find(sub_hash);
-      //LOG(ERROR)<<" state:"<<node_it->second->state;
-      if (node_it->second->state != ProposalState::PoR) {
+      if(sub_node_it == node_info_.end()){
+        continue;
+      }
+      LOG(ERROR)<<" state:"<<sub_node_it->second->state<<" 
node:"<<sub_node_it->second->proposal.header().proposer_id()
+      <<" height:"<<sub_node_it->second->proposal.header().height();
+      if (sub_node_it->second->state != ProposalState::PoR) {
         break;
       }
       num++;
     }
   
     
-    //LOG(ERROR)<<"get num:"<<num; 
+    LOG(ERROR)<<"get num:"<<num; 
     if(num == need_num) {
       const auto& sub_history = proposal.history(need_num-1);
       std::string sub_hash = sub_history.hash();
-      Commit(sub_hash);
+
+      auto sub_node_it = node_info_.find(sub_hash);
+      if(sub_node_it != node_info_.end()){
+        //LOG(ERROR)<<" state:"<<node_it->second->state;
+        if (sub_node_it->second->state != ProposalState::Committed) {
+          Commit(sub_hash);
+        }
+      }
     }
   }
 
@@ -241,11 +252,11 @@ return;
 }
 
 int ProposalGraph::CheckState(NodeInfo* node_info, ProposalState state) {
-   //LOG(ERROR) << "node: (" << node_info->proposal.header().proposer_id() <<
-   //","
-    //         << node_info->proposal.header().proposal_id()
-     //        << ") state:" << node_info->state
-      //       << " vote num:" << node_info->votes[ProposalState::New].size();
+   LOG(ERROR) << "node: (" << node_info->proposal.header().proposer_id() <<
+   ","
+             << node_info->proposal.header().proposal_id()
+             << ") state:" << node_info->state
+             << " vote num:" << node_info->votes[ProposalState::New].size();
 
   if(cft_){
     if(node_info->votes[ProposalState::New].size() >= f_ + 1) {
@@ -267,11 +278,11 @@ int ProposalGraph::CheckState(NodeInfo* node_info, 
ProposalState state) {
     }
   }
   node_info->state = state;
-  //LOG(ERROR) << "node: (" << node_info->proposal.header().proposer_id() <<
-  // ","
-  //           << node_info->proposal.header().proposal_id()
-  //           << ") get state:" << node_info->state
-  //           << " vote num:" << node_info->votes[ProposalState::New].size();
+  LOG(ERROR) << "node: (" << node_info->proposal.header().proposer_id() <<
+   ","
+             << node_info->proposal.header().proposal_id()
+             << ") get state:" << node_info->state
+             << " vote num:" << node_info->votes[ProposalState::New].size();
 
 
   return true;
@@ -445,7 +456,7 @@ Proposal* ProposalGraph::GetStrongestProposal() {
     }
   }
 
-  //LOG(ERROR)<<" last node size:"<<last_node_[current_height_].size()<<" 
height:"<<current_height_<<" from:"<<sp->proposal.header().proposer_id();
+  LOG(ERROR)<<" last node size:"<<last_node_[current_height_].size()<<" 
height:"<<current_height_<<" from:"<<sp->proposal.header().proposer_id();
 
   for (const auto& last_hash : last_node_[current_height_]) {
     NodeInfo* node_info = node_info_[last_hash].get();
diff --git a/platform/consensus/ordering/cassandra_cft/framework/consensus.cpp 
b/platform/consensus/ordering/cassandra_cft/framework/consensus.cpp
index 05cae0b1..1e558b06 100644
--- a/platform/consensus/ordering/cassandra_cft/framework/consensus.cpp
+++ b/platform/consensus/ordering/cassandra_cft/framework/consensus.cpp
@@ -37,7 +37,7 @@ Consensus::Consensus(const ResDBConfig& config,
                      std::unique_ptr<TransactionManager> executor)
     : common::Consensus(config, std::move(executor)){
   int total_replicas = config_.GetReplicaNum();
-  int f = (total_replicas - 1) / 3;
+  int f = (total_replicas - 1) / 2;
 
   Init();
 
diff --git a/platform/consensus/ordering/multipaxos/algorithm/multipaxos.cpp 
b/platform/consensus/ordering/multipaxos/algorithm/multipaxos.cpp
index 7885a875..00093268 100644
--- a/platform/consensus/ordering/multipaxos/algorithm/multipaxos.cpp
+++ b/platform/consensus/ordering/multipaxos/algorithm/multipaxos.cpp
@@ -12,7 +12,7 @@ MultiPaxos::MultiPaxos(int id, int f, int total_num, int 
block_size, SignatureVe
 
     LOG(ERROR)<<"id:"<<id<<" f:"<<f<<" total:"<<total_num_;
 
-    proposal_manager_ = std::make_unique<ProposalManager>(id, 2*f_+1, 
verifier);
+    proposal_manager_ = std::make_unique<ProposalManager>(id, f_+1, verifier);
     send_thread_ = std::thread(&MultiPaxos::AsyncSend, this);
     commit_thread_ = std::thread(&MultiPaxos::AsyncCommit, this);
     commit_seq_thread_ = std::thread(&MultiPaxos::AsyncCommitSeq, this);
@@ -53,7 +53,7 @@ void MultiPaxos::AsyncSend() {
     std::unique_ptr<Proposal> proposal =  nullptr;
     {
       proposal = proposal_manager_ -> GenerateProposal(txns);
-      LOG(ERROR)<<"propose view:"<<proposal->header().view()<<" current 
round:"<<round<<" block size:"<<txns.size();
+      //LOG(ERROR)<<"propose view:"<<proposal->header().view()<<" current 
round:"<<round<<" block size:"<<txns.size();
     }
     broadcast_call_(MessageType::Propose, *proposal);
   }
@@ -132,7 +132,7 @@ void MultiPaxos::AsyncCommitSeq() {
     int proposer = data->header().proposer();
     int round = data->header().view();
     int proposal_seq = data->header().proposal_id();
-    LOG(ERROR)<<" commit proposer:"<<proposer<< " round:"<<round<<" 
seq:"<<proposal_seq;
+    //LOG(ERROR)<<" commit proposer:"<<proposer<< " round:"<<round<<" 
seq:"<<proposal_seq;
     for(Transaction& txn : *data->mutable_transactions()){
       txn.set_id(seq++);
       Commit(txn);
@@ -167,10 +167,15 @@ bool 
MultiPaxos::ReceiveProposal(std::unique_ptr<Proposal> proposal) {
   int proposer = proposal->sender();
   int seq = proposal->header().proposal_id();
 
+  if(!((proposer <= total_num_-f_) && (id_ <=total_num_-f_)
+        || (proposer  > total_num_-f_ && id_ >total_num_-f_))) {
+    return true;
+  }
+
   bool done = false;
   {
     std::unique_lock<std::mutex> lk(mutex_);
-    LOG(ERROR)<<"recv proposal from:"<<proposer<<" round:"<<round<<" 
seq:"<<seq;
+    //LOG(ERROR)<<"recv proposal from:"<<proposer<<" round:"<<round<<" 
seq:"<<seq;
     receive_[round][proposer] = std::move(proposal);
     done = true;
   }
@@ -207,9 +212,10 @@ bool MultiPaxos::ReceiveAccept(std::unique_ptr<Proposal> 
proposal) {
 
   std::unique_lock<std::mutex> lk(propose_mutex_);
   accept_receive_[round].insert(sender);
-  LOG(ERROR)<<"RECEIVE proposer accept:"<<round<<" 
size:"<<accept_receive_[round].size()
-    <<" proposer:"<<proposer<<" sender:"<<sender;
+  //LOG(ERROR)<<"RECEIVE proposer accept:"<<round<<" 
size:"<<accept_receive_[round].size()
+   // <<" proposer:"<<proposer<<" sender:"<<sender;
   if(accept_receive_[round].size() == f_+1){
+    //LOG(ERROR)<<" ====== bc =======";
     Broadcast(MessageType::Learn, *proposal);
 
     can_propose_[round+1] = true;
@@ -219,6 +225,10 @@ bool MultiPaxos::ReceiveAccept(std::unique_ptr<Proposal> 
proposal) {
 }
 
 bool MultiPaxos::ReceiveLearn(std::unique_ptr<Proposal> proposal) {
+  int round = proposal->header().view();
+  int sender = proposal->sender();
+  int proposer = proposal->header().proposer();
+//LOG(ERROR)<<" receive learn:"<<round;
   commit_q_.Push(std::move(proposal));
   return true;
 }
diff --git a/platform/consensus/ordering/multipaxos/framework/consensus.cpp 
b/platform/consensus/ordering/multipaxos/framework/consensus.cpp
index fbefc49c..86d7c9a1 100644
--- a/platform/consensus/ordering/multipaxos/framework/consensus.cpp
+++ b/platform/consensus/ordering/multipaxos/framework/consensus.cpp
@@ -47,10 +47,10 @@ Consensus::Consensus(const ResDBConfig& config,
   SetPerformanceManager(GetPerformanceManager());
 
   Init();
-  failure_mode_ = config.GetConfigData().failure_mode();
+  //failure_mode_ = config.GetConfigData().failure_mode();
 
   int total_replicas = config_.GetReplicaNum();
-  int f = (total_replicas - 1) / 3;
+  int f = (total_replicas - 1) / 2;
 
   if (config_.GetPublicKeyCertificateInfo()
           .public_key()
diff --git 
a/platform/consensus/ordering/multipaxos/framework/performance_manager.cpp 
b/platform/consensus/ordering/multipaxos/framework/performance_manager.cpp
new file mode 100644
index 00000000..05831b6d
--- /dev/null
+++ b/platform/consensus/ordering/multipaxos/framework/performance_manager.cpp
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include 
"platform/consensus/ordering/multipaxos/framework/performance_manager.h"
+
+#include <glog/logging.h>
+
+#include "common/utils/utils.h"
+
+namespace resdb {
+namespace multipaxos {
+
+using comm::CollectorResultCode;
+
+MultiPaxosPerformanceManager::MultiPaxosPerformanceManager(
+    const ResDBConfig& config, ReplicaCommunicator* replica_communicator,
+    SignatureVerifier* verifier)
+    : PerformanceManager(config, replica_communicator, verifier){
+}
+
+void MultiPaxosPerformanceManager::SendMessage(const Request& request) {
+  replica_communicator_->SendMessage(request, 1);
+}
+
+
+}  // namespace common
+}  // namespace resdb
diff --git 
a/platform/consensus/ordering/multipaxos/framework/performance_manager.h 
b/platform/consensus/ordering/multipaxos/framework/performance_manager.h
new file mode 100644
index 00000000..3bbf273b
--- /dev/null
+++ b/platform/consensus/ordering/multipaxos/framework/performance_manager.h
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#pragma once
+
+#include <future>
+
+#include "platform/consensus/ordering/common/framework/performance_manager.h"
+
+namespace resdb {
+namespace multipaxos {
+
+class MultiPaxosPerformanceManager : public common::PerformanceManager {
+ public:
+  MultiPaxosPerformanceManager(const ResDBConfig& config,
+                     ReplicaCommunicator* replica_communicator,
+                     SignatureVerifier* verifier);
+
+protected:
+  void SendMessage(const Request& request);
+};
+
+}  // namespace fairdag
+}  // namespace resdb
diff --git a/platform/consensus/ordering/tusk/algorithm/tusk.cpp 
b/platform/consensus/ordering/tusk/algorithm/tusk.cpp
index a0c34878..fd4e2862 100644
--- a/platform/consensus/ordering/tusk/algorithm/tusk.cpp
+++ b/platform/consensus/ordering/tusk/algorithm/tusk.cpp
@@ -11,17 +11,19 @@ Tusk::Tusk(int id, int f, int total_num, int block_size, 
SignatureVerifier* veri
     : ProtocolBase(id, f, total_num), verifier_(verifier) {
   limit_count_ = 2 * f + 1;
   batch_size_ = block_size;
-  proposal_manager_ = std::make_unique<ProposalManager>(id, total_num);
-  //proposal_manager_ = std::make_unique<ProposalManager>(id, limit_count_);
+  //proposal_manager_ = std::make_unique<ProposalManager>(id, total_num);
+  proposal_manager_ = std::make_unique<ProposalManager>(id, limit_count_);
 
   execute_id_ = 1;
   start_ = 0;
   queue_size_ = 0;
 
+  if(id<=2*f+1){
   send_thread_ = std::thread(&Tusk::AsyncSend, this);
   commit_thread_ = std::thread(&Tusk::AsyncCommit, this);
   execute_thread_ = std::thread(&Tusk::AsyncExecute, this);
   cert_thread_ = std::thread(&Tusk::AsyncProcessCert, this);
+  }
 
   global_stats_ = Stats::GetGlobalStats();
 
@@ -138,11 +140,13 @@ void Tusk::AsyncCommit() {
     for (int r = previous_round + 2; r <= round; r += 2) {
       int leader = GetLeader(r);
       const Proposal * req = nullptr;
+      int num = 0;
       while(!IsStop()){
         req = proposal_manager_->GetRequest(r, leader);
         // req:"<<(req==nullptr);
-        if (req == nullptr) {
-          //LOG(ERROR)<<" get leader:"<<leader<<" round:"<<r<<" not exit";
+        if (req == nullptr && num < 10) {
+          LOG(ERROR)<<" get leader:"<<leader<<" round:"<<r<<" not exit";
+          num++;
           //usleep(1000);
           std::unique_lock<std::mutex> lk(mutex_);
           vote_cv_.wait_for(lk, std::chrono::microseconds(100),
@@ -152,6 +156,9 @@ void Tusk::AsyncCommit() {
         }
         break;
       }
+      if(req == nullptr){
+        continue;
+      }
       //LOG(ERROR)<<" get leader:"<<leader<<" round:"<<r<<" 
delay:"<<(GetCurrentTime() - req->header().create_time());
       int reference_num = proposal_manager_->GetReferenceNum(*req);
       //LOG(ERROR)<<" get leader:"<<leader<<" round:"<<r<<" 
ref:"<<reference_num;
@@ -327,6 +334,13 @@ bool Tusk::ReceiveBlock(std::unique_ptr<Proposal> 
proposal) {
    // sleep(10);
   }
   */
+
+  if(!(id_ <= limit_count_ && proposal->header().proposer_id() <= 
limit_count_)){
+  //LOG(ERROR) << "recv block from " << proposal->header().proposer_id()
+   //          << " round:" << proposal->header().round();
+   // sleep(10);
+   return true;
+  }
   
       std::unique_lock<std::mutex> lk(check_block_mutex_);
   {
@@ -348,7 +362,7 @@ void Tusk::ReceiveBlockACK(std::unique_ptr<Metadata> 
metadata) {
   int sender = metadata->sender();
   std::unique_lock<std::mutex> lk(txn_mutex_);
   received_num_[hash][sender] = std::move(metadata);
-  //LOG(ERROR) << "recv block ack from:" << sender << " num:" << 
received_num_[hash].size()<<" round:"<<round;
+  LOG(ERROR) << "recv block ack from:" << sender << " num:" << 
received_num_[hash].size()<<" round:"<<round;
   if (received_num_[hash].size() == limit_count_) {
     Certificate cert;
     for (auto& it : received_num_[hash]) {
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index 7d611e10..a9004968 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -375,6 +375,8 @@ void Stats::MonitorGlobal() {
             << " commit_txn :"
                       << (commit_txn_time - last_commit_txn_time) 
 
+              << " commit_txn num:"
+                 << (commit_txn_num - last_commit_txn_num) 
 
               << " commit_txn latency :"
                  << static_cast<double>(commit_txn_time -
diff --git a/scripts/deploy/config/cassandra.config 
b/scripts/deploy/config/cassandra.config
index 4701738f..fb5ff19d 100644
--- a/scripts/deploy/config/cassandra.config
+++ b/scripts/deploy/config/cassandra.config
@@ -1,11 +1,11 @@
 {
-  "clientBatchNum": 100,
+  "clientBatchNum": 400,
   "enable_viewchange": false,
   "recovery_enabled": false,
   "max_client_complaint_num":10,
-  "max_process_txn": 8196,
+  "max_process_txn": 4096,
   "worker_num": 10,
   "input_worker_num": 1,
   "output_worker_num": 5,
-  "block_size":50
+  "block_size":1000
 }
diff --git a/scripts/deploy/config/kv_performance_server_11.conf 
b/scripts/deploy/config/kv_performance_server_11.conf
new file mode 100644
index 00000000..093fe804
--- /dev/null
+++ b/scripts/deploy/config/kv_performance_server_11.conf
@@ -0,0 +1,27 @@
+iplist=(
+172.31.18.66
+172.31.27.193
+172.31.27.153
+172.31.16.133
+172.31.31.76
+172.31.17.121
+172.31.24.63
+172.31.20.159
+172.31.31.131
+172.31.17.193
+172.31.24.229
+172.31.26.164
+172.31.17.7
+172.31.26.133
+172.31.20.10
+172.31.19.41
+172.31.27.174
+172.31.19.141
+172.31.27.240
+172.31.24.175
+172.31.27.176
+172.31.28.112
+)
+
+key=~/.ssh/junchao.pem
+client_num=11
diff --git a/scripts/deploy/config/kv_performance_server_21.conf 
b/scripts/deploy/config/kv_performance_server_21.conf
new file mode 100644
index 00000000..7ec6b8bf
--- /dev/null
+++ b/scripts/deploy/config/kv_performance_server_21.conf
@@ -0,0 +1,69 @@
+iplist=(
+172.31.28.28
+172.31.22.26
+172.31.17.27
+172.31.30.152
+172.31.20.25
+172.31.27.20
+172.31.21.22
+172.31.17.146
+172.31.16.147
+172.31.19.13
+172.31.20.141
+172.31.20.10
+172.31.16.141
+172.31.28.6
+172.31.18.137
+172.31.19.5
+172.31.29.133
+172.31.20.0
+172.31.22.4
+172.31.19.61
+172.31.25.62
+172.31.17.187
+172.31.31.60
+172.31.22.185
+172.31.26.185
+172.31.23.183
+172.31.17.184
+172.31.31.45
+172.31.25.51
+172.31.18.39
+172.31.19.42
+172.31.30.37
+172.31.25.38
+172.31.26.162
+172.31.18.34
+172.31.27.91
+172.31.20.161
+172.31.19.218
+172.31.24.90
+172.31.23.83
+172.31.16.217
+172.31.17.82
+172.31.18.83
+172.31.20.74
+172.31.25.204
+172.31.20.71
+172.31.28.200
+172.31.17.194
+172.31.29.71
+172.31.23.252
+172.31.21.126
+172.31.29.251
+172.31.26.251
+172.31.19.122
+172.31.30.123
+172.31.18.118
+172.31.25.119
+172.31.30.234
+172.31.26.117
+172.31.25.228
+172.31.31.101
+172.31.25.99
+172.31.24.227
+172.31.23.224
+)
+
+key=~/.ssh/junchao.pem
+client_num=32
diff --git a/scripts/deploy/config/multipaxos.config 
b/scripts/deploy/config/multipaxos.config
index 2716b3da..3a321e6f 100644
--- a/scripts/deploy/config/multipaxos.config
+++ b/scripts/deploy/config/multipaxos.config
@@ -1,5 +1,5 @@
 {
-  "clientBatchNum": 100,
+  "clientBatchNum": 400,
   "enable_viewchange": false,
   "recovery_enabled": false,
   "max_client_complaint_num":10,
diff --git a/scripts/deploy/config/tusk.config 
b/scripts/deploy/config/tusk.config
index 4e80359b..6deeade5 100644
--- a/scripts/deploy/config/tusk.config
+++ b/scripts/deploy/config/tusk.config
@@ -1,11 +1,11 @@
 {
-  "clientBatchNum": 800,
+  "clientBatchNum": 100,
   "enable_viewchange": false,
   "recovery_enabled": false,
   "max_client_complaint_num":10,
-  "max_process_txn": 1024,
+  "max_process_txn": 3186,
   "worker_num": 50,
   "input_worker_num": 1,
   "output_worker_num": 20,
-  "block_size":50
+  "block_size":200
 }


Reply via email to