This is an automated email from the ASF dual-hosted git repository.

junchao pushed a commit to branch cassandra
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git

commit 095f817151cdcb7b60df86a2a1617ead2c0c4f2e
Author: cjcchen <[email protected]>
AuthorDate: Tue Nov 4 08:38:53 2025 +0000

    add poa/por
---
 .../ordering/cassandra/algorithm/cassandra.cpp     | 117 +++++++---
 .../cassandra/algorithm/proposal_graph.cpp         | 231 ++++++++++++++-----
 .../ordering/cassandra/algorithm/proposal_graph.h  |   5 +-
 .../cassandra/algorithm/proposal_manager.cpp       |  35 ++-
 .../ordering/cassandra/algorithm/proposal_state.h  |   9 +
 .../ordering/cassandra/proto/proposal.proto        |   1 +
 .../ordering/cassandra_graph/algorithm/BUILD       |  76 ++++++
 .../algorithm/cassandra.cpp                        |   0
 .../ordering/cassandra_graph/algorithm/cassandra.h | 114 +++++++++
 .../algorithm/proposal_graph.cpp                   |   0
 .../algorithm/proposal_graph.h                     |   0
 .../algorithm/proposal_manager.cpp                 |   0
 .../cassandra_graph/algorithm/proposal_manager.h   |  73 ++++++
 .../algorithm/proposal_state.h                     |   0
 .../ordering/cassandra_graph/algorithm/ranking.cpp |  12 +
 .../ordering/cassandra_graph/algorithm/ranking.h   |  14 ++
 .../ordering/cassandra_graph/framework/BUILD       |  16 ++
 .../cassandra_graph/framework/consensus.cpp        | 171 ++++++++++++++
 .../ordering/cassandra_graph/framework/consensus.h |  59 +++++
 .../cassandra_graph/framework/consensus_test.cpp   | 179 ++++++++++++++
 .../consensus/ordering/cassandra_graph/proto/BUILD |  16 ++
 .../proto/proposal.proto                           |   0
 .../common/framework/performance_manager.cpp       |   2 +-
 scripts/deploy/config/cassandra.config             |   2 +-
 scripts/deploy/config/kv_performance_server.conf   |  14 +-
 .../deploy/config/kv_performance_server_32.conf    | 128 +++++------
 .../deploy/config/kv_performance_server_64.conf    | 256 ++++++++++-----------
 scripts/null                                       |   2 +-
 28 files changed, 1240 insertions(+), 292 deletions(-)

diff --git a/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp 
b/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp
index f32ba2d9..1152ff0e 100644
--- a/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp
+++ b/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp
@@ -30,7 +30,7 @@ Cassandra::Cassandra(int id, int f, int total_num, 
SignatureVerifier* verifier)
   precommitted_num_ = 0;
   execute_id_ = 1;
 
-  graph_ = std::make_unique<ProposalGraph>(f_);
+  graph_ = std::make_unique<ProposalGraph>(f_, id);
   proposal_manager_ = std::make_unique<ProposalManager>(id, graph_.get());
 
   graph_->SetCommitCallBack(
@@ -93,16 +93,21 @@ void Cassandra::AsyncConsensus() {
 
 bool Cassandra::WaitVote(int height) {
   std::unique_lock<std::mutex> lk(mutex_);
+  //LOG(ERROR)<<"wait vote height:"<<height;
   vote_cv_.wait_for(lk, std::chrono::microseconds(timeout_ms_ * 1000),
                     [&] { return can_vote_[height]; });
   if (!can_vote_[height]) {
     LOG(ERROR) << "wait vote time out"
                << " can vote:" << can_vote_[height] << " height:" << height;
   }
+  //LOG(ERROR)<<"wait vote height:"<<height<<" done";
   return true;
 }
 
 void Cassandra::AsyncCommit() {
+
+  std::set<std::pair<int, int>> committed;
+
   while (!is_stop_) {
     std::unique_ptr<Proposal> p = execute_queue_.Pop(timeout_ms_ * 1000);
     if (p == nullptr) {
@@ -110,19 +115,43 @@ void Cassandra::AsyncCommit() {
       continue;
     }
      //LOG(ERROR) << "execute proposal from proposer:" <<
-     //        p->header().proposer_id()
-     //          << " id:" << p->header().proposal_id()
-     //          << " height:" << p->header().height()
-     //          << " block size:" << p->block_size();
+      //       p->header().proposer_id()
+       //        << " id:" << p->header().proposal_id()
+        //       << " height:" << p->header().height()
+         //      << " block size:" << p->block_size()
+          //     << " subblock size:"<<p->sub_block_size();
 
     int txn_num = 0;
-    for (const Block& block : p->block()) {
+    for (const Block& block : p->sub_block()) {
       std::unique_lock<std::mutex> lk(mutex_);
+      //LOG(ERROR)<<"!!!!!!!!! commit proposal from:"
+      //  <<p->header().proposer_id()
+      //  <<"local id:"<<block.local_id();
       std::unique_ptr<Block> data_block =
           proposal_manager_->GetBlock(block.hash(), p->header().proposer_id());
-       //LOG(ERROR)<<"!!!!!!!!! commit proposal from:"
+      if(data_block == nullptr){
+      //  LOG(ERROR)<<"!!!!!!!!! proposal from:" <<p->header().proposer_id()
+       //   <<"local id:"<<block.local_id()
+        //  <<" has been committed";
+          //assert(1==0);
+        continue;
+      }
+
+
+      //LOG(ERROR)<<"!!!!!!!!! commit proposal from:"
       //<<p->header().proposer_id()<<" txn size:"
-      //<<data_block->data().transaction_size()<<" 
height:"<<p->header().height();
+      //<<data_block->data().transaction_size()<<" 
height:"<<p->header().height()
+      //<<"local id:"<<block.local_id();
+      auto it = committed.find(std::make_pair(p->header().proposer_id(), 
block.local_id()));
+      if( it != committed.end()){
+        //LOG(ERROR)<<"!!!!!!!!! proposal from:" <<p->header().proposer_id()
+         // <<"local id:"<<block.local_id()
+          //<<" has been committed";
+          assert(1==0);
+          continue;
+      }
+      committed.insert(std::make_pair(p->header().proposer_id(), 
block.local_id()));
+      
       if (p->header().proposer_id() == id_) {
         execute_num_ += data_block->data().transaction_size();
         // LOG(ERROR) << "recv num:" << recv_num_
@@ -131,7 +160,7 @@ void Cassandra::AsyncCommit() {
         //           << " block delay:" << (GetCurrentTime() -
         //           data_block->create_time());
       }
-
+    //LOG(ERROR)<<" txn size:"<<data_block->mutable_data()->transaction_size();
       for (Transaction& txn :
            *data_block->mutable_data()->mutable_transaction()) {
         txn.set_id(execute_id_++);
@@ -139,15 +168,18 @@ void Cassandra::AsyncCommit() {
         commit_(txn);
       }
     }
+    //LOG(ERROR)<<" commit done";
     global_stats_->AddCommitTxn(txn_num);
   }
+
 }
 
 void Cassandra::CommitProposal(const Proposal& p) {
-   //LOG(ERROR) << "commit proposal from proposer:" << p.header().proposer_id()
-   //          << " id:" << p.header().proposal_id()
-   //          << " height:" << p.header().height()
-   //          << " block size:" << p.block_size();
+   LOG(ERROR) << "commit proposal from proposer:" << p.header().proposer_id()
+             << " id:" << p.header().proposal_id()
+             << " height:" << p.header().height()
+             << " block size:" << p.block_size()
+             << " subblock size:"<< p.sub_block_size();
   if (p.block_size() == 0) {
     return;
   }
@@ -262,16 +294,19 @@ void Cassandra::BroadcastTxn() {
     global_stats_->AddCommitBlock(txns.size());
     std::unique_ptr<Block> block = proposal_manager_->MakeBlock(txns);
     assert(block != nullptr);
-    //LOG(ERROR)<<" send block:"<<block->local_id();
-    Broadcast(MessageType::NewBlocks, *block);
+    LOG(ERROR)<<" send block:"<<block->local_id();
+    //Broadcast(MessageType::NewBlocks, *block);
+    std::string hash = block->hash();
+    int local_id = block->local_id();
     proposal_manager_->AddLocalBlock(std::move(block));
+    proposal_manager_->BlockReady(hash, local_id);
     txns.clear();
   }
 }
 
 void Cassandra::ReceiveBlock(std::unique_ptr<Block> block) {
   // std::unique_lock<std::mutex> lk(g_mutex_);
-  //LOG(ERROR)<<"recv block from:"<<block->sender_id()<<" block 
id:"<<block->local_id();
+  LOG(ERROR)<<"recv block from:"<<block->sender_id()<<" block 
id:"<<block->local_id();
   BlockACK block_ack;
   block_ack.set_hash(block->hash());
   block_ack.set_sender_id(block->sender_id());
@@ -355,10 +390,10 @@ int Cassandra::SendTxn(int round) {
   }
   proposal_manager_->AddLocalProposal(*proposal);
 
-  //LOG(ERROR) << "====== bc proposal block size:" << proposal->block_size()
-             //<< " round:" << round
-             //<< " id:" << proposal->header().proposal_id()
-             //<< " weak links:"<< proposal->weak_proposals().hash_size();
+  LOG(ERROR) << "====== bc proposal block size:" << proposal->block_size()
+             << " round:" << round
+             << " id:" << proposal->header().proposal_id()
+             << " weak links:"<< proposal->weak_proposals().hash_size();
 
   Broadcast(MessageType::NewProposal, *proposal);
 
@@ -423,14 +458,21 @@ void Cassandra::ReceiveProposalQueryResp(const 
ProposalQueryResp& resp) {
 bool Cassandra::ReceiveProposal(std::unique_ptr<Proposal> proposal) {
   // std::unique_lock<std::mutex> lk(g_mutex_);
   {
-    // LOG(ERROR)<<"recv proposal";
+    LOG(ERROR)<<"recv proposal, height:"<<proposal->header().height()<<" block 
size:"<<proposal->block_size();
     std::unique_lock<std::mutex> lk(mutex_);
+  
+    for(const auto& block : proposal->block()){
+      std::unique_ptr<Block> new_block = std::make_unique<Block>(block);
+      proposal_manager_->AddBlock(std::move(new_block));
+    }
+
+    //LOG(ERROR)<<"add block done";
     const Proposal* pre_p =
         graph_->GetProposalInfo(proposal->header().prehash());
     if (pre_p == nullptr) {
-      //LOG(ERROR) << "receive proposal from :"
+     // LOG(ERROR) << "receive proposal from :"
       //           << proposal->header().proposer_id()
-      //           << " id:" << proposal->header().proposal_id() << "no pre:";
+       //          << " id:" << proposal->header().proposal_id() << "no pre:";
        if(!proposal->header().prehash().empty()) {
          if (proposal->header().height() > graph_->GetCurrentHeight()) {
            future_proposal_[proposal->header().height()].push_back(
@@ -462,7 +504,9 @@ bool Cassandra::ReceiveProposal(std::unique_ptr<Proposal> 
proposal) {
           std::move(proposal));
       return true;
     }
+    //LOG(ERROR)<<" add proposal"; 
     AddProposal(*proposal);
+    //LOG(ERROR)<<" add proposal done"; 
 
     auto it = future_proposal_.find(graph_->GetCurrentHeight());
     if (it != future_proposal_.end()) {
@@ -472,7 +516,7 @@ bool Cassandra::ReceiveProposal(std::unique_ptr<Proposal> 
proposal) {
       future_proposal_.erase(it);
     }
   }
-  // LOG(ERROR)<<"receive proposal done";
+   //LOG(ERROR)<<"receive proposal done";
   return true;
 }
 
@@ -481,20 +525,24 @@ bool Cassandra::AddProposal(const Proposal& proposal) {
   if (ret != 0) {
     if (ret == 2) {
       LOG(ERROR) << "verify proposal fail";
+      assert(1==0);
       AskProposal(proposal);
     }
     return false;
   }
 
+  //LOG(ERROR)<<" proposal blocks:"<<proposal.block_size();
   for (const Block& block : proposal.block()) {
     bool ret = false;;
     for(int i = 0; i< 5; ++i){
       ret = proposal_manager_->ContainBlock(block);
+      //LOG(ERROR)<<" contain block:"<<ret;
       if (ret == false) {
-        LOG(ERROR) << "======== block from:" << block.sender_id()
-          << " block id:" << block.local_id() << " not exist";
-         usleep(1000);
+       // LOG(ERROR) << "======== block from:" << block.sender_id()
+        //  << " block id:" << block.local_id() << " not exist";
+         //usleep(1000);
         //AskBlock(block);
+        assert(1==0);
         continue;
       }
       else {
@@ -504,9 +552,10 @@ bool Cassandra::AddProposal(const Proposal& proposal) {
     assert(ret);
   }
 
+
   {
     std::unique_lock<std::mutex> lk(g_mutex_);
-    // LOG(ERROR) << "add proposal";
+     //LOG(ERROR) << "add proposal to graph";
     int v_ret = graph_->AddProposal(proposal);
     if (v_ret != 0) {
       LOG(ERROR) << "add proposal fail, ret:" << v_ret;
@@ -525,11 +574,11 @@ bool Cassandra::AddProposal(const Proposal& proposal) {
 
   received_num_[proposal.header().height()].insert(
       proposal.header().proposer_id());
-  //LOG(ERROR) << "received current height:" << graph_->GetCurrentHeight()
-  //           << " proposal height:" << proposal.header().height()
-  //           << " num:" << received_num_[graph_->GetCurrentHeight()].size()
-  //           << " from:" << proposal.header().proposer_id()
-  //           << " last vote:" << last_vote_;
+  LOG(ERROR) << "received current height:" << graph_->GetCurrentHeight()
+             << " proposal height:" << proposal.header().height()
+             << " num:" << received_num_[graph_->GetCurrentHeight()].size()
+             << " from:" << proposal.header().proposer_id()
+             << " last vote:" << last_vote_;
   if (received_num_[graph_->GetCurrentHeight()].size() == total_num_) {
     if (last_vote_ < graph_->GetCurrentHeight()) {
       last_vote_ = graph_->GetCurrentHeight();
@@ -538,7 +587,7 @@ bool Cassandra::AddProposal(const Proposal& proposal) {
       //LOG(ERROR) << "can vote:";
     }
   }
-  // LOG(ERROR)<<"recv done";
+   //LOG(ERROR)<<"recv done";
 
   std::vector<std::unique_ptr<Proposal>> future_g = graph_->GetNotFound(
       proposal.header().height() + 1, proposal.header().hash());
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp 
b/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp
index c3236049..326ccea5 100644
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp
+++ b/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp
@@ -11,12 +11,14 @@ namespace resdb {
 namespace cassandra {
 namespace cassandra_recv {
 
+/*
 std::vector<ProposalState> GetStates() {
   return std::vector<ProposalState>{ProposalState::New, 
ProposalState::Prepared,
                                     ProposalState::PreCommit};
 }
+*/
 
-ProposalGraph::ProposalGraph(int fault_num) : f_(fault_num) {
+ProposalGraph::ProposalGraph(int fault_num, int id) : f_(fault_num),id_(id) {
   ranking_ = std::make_unique<Ranking>();
   current_height_ = 0;
   global_stats_ = Stats::GetGlobalStats();
@@ -59,7 +61,7 @@ void ProposalGraph::AddProposalOnly(const Proposal& proposal) 
{
 
 int ProposalGraph::AddProposal(const Proposal& proposal) {
    //LOG(ERROR) << "add proposal height:" << proposal.header().height()
-   //          << " current height:" << current_height_<<" 
from:"<<proposal.header().proposer_id()<<" proposal 
id:"<<proposal.header().proposal_id();
+    //         << " current height:" << current_height_<<" 
from:"<<proposal.header().proposer_id()<<" proposal 
id:"<<proposal.header().proposal_id();
   assert(current_height_ >= latest_commit_.header().height());
   /*
   if (proposal.header().height() < current_height_) {
@@ -74,6 +76,7 @@ int ProposalGraph::AddProposal(const Proposal& proposal) {
         proposal.header().proposer_id());
   } else {
     while (!pending_header_.empty()) {
+    LOG(ERROR)<<" pending heade:"<<pending_header_.begin()->first<<" current 
height:"<<current_height_;
       if (pending_header_.begin()->first <= current_height_) {
         pending_header_.erase(pending_header_.begin());
       } else {
@@ -98,7 +101,7 @@ int ProposalGraph::AddProposal(const Proposal& proposal) {
     return 2;
   }
 
-  // LOG(ERROR)<<"history size:"<<proposal.history_size();
+   //LOG(ERROR)<<"history size:"<<proposal.history_size();
   for (const auto& history : proposal.history()) {
     std::string hash = history.hash();
     auto node_it = node_info_.find(hash);
@@ -114,6 +117,50 @@ int ProposalGraph::AddProposal(const Proposal& proposal) {
     */
   }
 
+  //LOG(ERROR)<<" proposal history:"<<proposal.history_size();
+  if(proposal.history_size()>0){
+    const auto& history = proposal.history(0);
+    std::string hash = history.hash();
+    auto node_it = node_info_.find(hash);
+    
node_it->second->votes[ProposalState::New].insert(proposal.header().proposer_id());
+    CheckState(node_it->second.get(),
+        static_cast<resdb::cassandra::ProposalState>(history.state()));
+
+    if (node_it->second->state == ProposalState::PoR 
+      && node_it->second->proposal.header().proposer_id() == id_){
+      //LOG(ERROR)<<" remove por 
blocks:"<<node_it->second->proposal.sub_block_size();
+      std::set<std::string> exist;
+      for(auto block : node_it->second->proposal.sub_block()) {
+        if(new_blocks_.find(block.hash()) != new_blocks_.end()){
+          new_blocks_.erase(new_blocks_.find(block.hash()));
+          exist.insert(block.hash());
+        }
+      }
+
+      //RemoveBlocks(node_it->second->proposal);
+    }
+
+    int num = 0;
+    for(int i = 0; i <3 && proposal.history_size()>=3; ++i){
+      const auto& sub_history = proposal.history(i);
+      std::string sub_hash = sub_history.hash();
+      auto sub_node_it = node_info_.find(sub_hash);
+      //LOG(ERROR)<<" state:"<<node_it->second->state;
+      if (node_it->second->state != ProposalState::PoR) {
+        break;
+      }
+      num++;
+    }
+  
+    //LOG(ERROR)<<"get num:"<<num; 
+    if(num == 3) {
+      const auto& sub_history = proposal.history(2);
+      std::string sub_hash = sub_history.hash();
+      Commit(sub_hash);
+    }
+  }
+
+/*
   for (const auto& history : proposal.history()) {
     std::string hash = history.hash();
     auto node_it = node_info_.find(hash);
@@ -138,7 +185,10 @@ int ProposalGraph::AddProposal(const Proposal& proposal) {
     //<<","<<node_it->second->proposal.header().proposal_id()
     //<<" state:"<<node_it->second->state;
   }
+  */
 
+    //LOG(ERROR) << "height:" << current_height_
+     //          << " proposal height:" << proposal.header().height();
   if (proposal.header().height() < current_height_) {
     LOG(ERROR) << "height not match:" << current_height_
                << " proposal height:" << proposal.header().height();
@@ -147,7 +197,7 @@ int ProposalGraph::AddProposal(const Proposal& proposal) {
     // id:"<<proposal.header().proposal_id();
     // g_[proposal.header().prehash()].push_back(proposal.header().hash());
     auto np = std::make_unique<NodeInfo>(proposal);
-    new_proposals_[proposal.header().hash()] = &np->proposal;
+    //new_proposals_[proposal.header().hash()] = &np->proposal;
     node_info_[proposal.header().hash()] = std::move(np);
     last_node_[proposal.header().height()].insert(proposal.header().hash());
 
@@ -155,45 +205,59 @@ int ProposalGraph::AddProposal(const Proposal& proposal) {
   } else {
     g_[proposal.header().prehash()].push_back(proposal.header().hash());
     auto np = std::make_unique<NodeInfo>(proposal);
-    new_proposals_[proposal.header().hash()] = &np->proposal;
+    //new_proposals_[proposal.header().hash()] = &np->proposal;
     // LOG(ERROR)<<"add proposal proposer:"<<proposal.header().proposer_id()<<"
     // id:"<<proposal.header().proposal_id()<<"
     // hash:"<<Encode(proposal.header().hash());
     node_info_[proposal.header().hash()] = std::move(np);
     last_node_[proposal.header().height()].insert(proposal.header().hash());
   }
+
+/*
+  LOG(ERROR)<<" node info size:"<<node_info_.size();
+  std::string hash = proposal.header().hash();
+  auto node_it = node_info_.find(hash);
+  assert(node_it != node_info_.end());
+  
node_it->second->votes[ProposalState::New].insert(proposal.header().proposer_id());
+
+  CheckState(node_it->second.get(),
+               
static_cast<resdb::cassandra::ProposalState>(ProposalState::New));
+               */
+
+
+
+  //LOG(ERROR)<<"add graph done";
   return 0;
 }
 
 void ProposalGraph::UpgradeState(ProposalState& state) {
-  switch (state) {
-    case None:
-    case New:
-      state = Prepared;
-      break;
-    case Prepared:
-      state = PreCommit;
-      break;
-    default:
-      break;
-  }
+return;
 }
 
 int ProposalGraph::CheckState(NodeInfo* node_info, ProposalState state) {
-  // LOG(ERROR) << "node: (" << node_info->proposal.header().proposer_id() <<
+   //LOG(ERROR) << "node: (" << node_info->proposal.header().proposer_id() <<
+   //","
+    //         << node_info->proposal.header().proposal_id()
+     //        << ") state:" << node_info->state
+      //       << " vote num:" << node_info->votes[ProposalState::New].size();
+
+  if(node_info->votes[ProposalState::New].size() >= 2 * f_ + 1) {
+      state = PoR;
+  }
+  else if(node_info->votes[ProposalState::New].size() >= f_ + 1) {
+      state = PoA;
+  }
+  else {
+      state = New;
+  }
+  node_info->state = state;
+  //LOG(ERROR) << "node: (" << node_info->proposal.header().proposer_id() <<
   // ","
   //           << node_info->proposal.header().proposal_id()
-  //           << ") state:" << node_info->state
-  //           << " vote num:" << node_info->votes[node_info->state].size();
+  //           << ") get state:" << node_info->state
+  //           << " vote num:" << node_info->votes[ProposalState::New].size();
+
 
-  while (node_info->votes[node_info->state].size() >= 2 * f_ + 1) {
-    UpgradeState(node_info->state);
-    // LOG(ERROR) << "==========  proposal:("
-    //           << node_info->proposal.header().proposer_id() << ","
-    //           << node_info->proposal.header().proposal_id() << ")"
-    //           << " upgrate state:" << node_info->state << (GetCurrentTime() 
-
-    //           node_info->proposal.create_time());
-  }
   return true;
 }
 
@@ -205,18 +269,13 @@ void ProposalGraph::Commit(const std::string& hash) {
     return;
   }
 
-  if (it->second->state != ProposalState::PreCommit) {
-    LOG(ERROR) << "hash not committed:" << hash;
-    assert(1 == 0);
-    return;
-  }
-
+ // LOG(ERROR) << "commit, hash:";
   std::set<std::string> is_main_hash;
   is_main_hash.insert(hash);
   
   int from_proposer = it->second->proposal.header().proposer_id();
   int from_proposal_id = it->second->proposal.header().proposal_id();
-   //LOG(ERROR)<<"commit :"<<it->second->proposal.header().proposer_id()<<" 
id:"<<it->second->proposal.header().proposal_id();
+//  LOG(ERROR)<<"commit :"<<it->second->proposal.header().proposer_id()<<" 
id:"<<it->second->proposal.header().proposal_id();
 
   std::vector<std::vector<Proposal*>> commit_p;
   auto bfs = [&]() {
@@ -234,12 +293,15 @@ void ProposalGraph::Commit(const std::string& hash) {
 
       Proposal* p = &it->second->proposal;
       if (it->second->state == ProposalState::Committed) {
-        // LOG(ERROR)<<" try commit proposal,
-        // sender:"<<p->header().proposer_id()
-        //<<" proposal id:"<<p->header().proposal_id()<<" has been committed";
         continue;
       }
 
+      //LOG(ERROR)<<" bfs sub block size :"<<p->sub_block_size();
+      /*
+      for(auto block : p->sub_block()){
+        LOG(ERROR)<<" get sub block proposer:"<<p->header().proposer_id()<<" 
local id:"<<block.local_id();
+      }
+    */
       it->second->state = ProposalState::Committed;
       if (is_main_hash.find(c_hash) != is_main_hash.end()) {
         commit_num_[p->header().proposer_id()]++;
@@ -253,6 +315,7 @@ void ProposalGraph::Commit(const std::string& hash) {
       //<<" weak proposal size:"<<p->weak_proposals().hash_size();
        //LOG(ERROR)<<"push p:"<<p->header().proposer_id();
       for (const std::string& w_hash : p->weak_proposals().hash()) {
+        assert(1==0);
         auto it = node_info_.find(w_hash);
         if (it == node_info_.end()) {
           LOG(ERROR) << "node not found, hash:";
@@ -286,11 +349,11 @@ void ProposalGraph::Commit(const std::string& hash) {
       }
       */
       //LOG(ERROR) << "commmit proposal:"
-      //           << commit_p[i][j]->header().proposer_id()
-      //           << " height:" << commit_p[i][j]->header().height()
-      //           << " idx:" << j 
-      //           << " delay:" << (GetCurrentTime() - 
commit_p[i][j]->create_time()) 
-      //           << " commit from:"<< from_proposer<<" 
id:"<<from_proposal_id;
+       //          << commit_p[i][j]->header().proposer_id()
+        //         << " height:" << commit_p[i][j]->header().height()
+         //        << " idx:" << j 
+          //       << " delay:" << (GetCurrentTime() - 
commit_p[i][j]->create_time()) 
+           //      << " commit from:"<< from_proposer<<" 
id:"<<from_proposal_id;
       block_num += commit_p[i][j]->block_size();
       if (commit_callback_) {
         commit_callback_(*commit_p[i][j]);
@@ -317,7 +380,7 @@ std::vector<std::unique_ptr<Proposal>> 
ProposalGraph::GetNotFound(
   if (pre_it != it->second.end()) {
     ret = std::move(pre_it->second);
     it->second.erase(pre_it);
-    LOG(ERROR) << "found future height:" << height;
+    //LOG(ERROR) << "found future height:" << height;
   }
   return ret;
 }
@@ -336,7 +399,7 @@ bool ProposalGraph::VerifyParent(const Proposal& proposal) {
 
   auto it = node_info_.find(prehash);
   if (it == node_info_.end()) {
-    LOG(ERROR) << "prehash not here";
+  //  LOG(ERROR) << "prehash not here";
     
not_found_proposal_[proposal.header().height()][proposal.header().prehash()]
         .push_back(std::make_unique<Proposal>(proposal));
     return false;
@@ -382,6 +445,44 @@ Proposal* ProposalGraph::GetStrongestProposal() {
     }
   }
 
+  //LOG(ERROR)<<" last node size:"<<last_node_[current_height_].size()<<" 
height:"<<current_height_<<" from:"<<sp->proposal.header().proposer_id();
+
+  for (const auto& last_hash : last_node_[current_height_]) {
+    NodeInfo* node_info = node_info_[last_hash].get();
+  //  LOG(ERROR)<<" node info:"<<node_info->proposal.header().proposer_id()<<" 
sub blocks:"<<node_info->proposal.sub_block_size();
+
+    if(node_info->proposal.header().proposer_id() != id_){
+      continue;
+    }
+
+//    LOG(ERROR)<<" node info:"<<node_info->proposal.header().proposer_id()<<" 
sub blocks:"<<node_info->proposal.sub_block_size()<<" node 
state:"<<node_info->state;
+    if(node_info->state == ProposalState::PoR) {
+      assert(node_info->proposal.header().proposer_id() == 
sp->proposal.header().proposer_id());
+      for(auto sub_block : node_info->proposal.sub_block()){ 
+        if(new_blocks_.find(sub_block.hash()) != new_blocks_.end()){
+          new_blocks_.erase(new_blocks_.find(sub_block.hash()));
+        }
+      }
+      continue;
+    }
+    
+    if(node_info->proposal.header().proposer_id() == 
sp->proposal.header().proposer_id()) {
+      continue;
+    }
+
+    if(sp == node_info) {
+      continue;
+    }
+
+//    LOG(ERROR)<<"get sub block size:"<<node_info->proposal.sub_block_size();
+    for(auto sub_block : node_info->proposal.sub_block()){ 
+      new_blocks_[sub_block.hash()] = sub_block;
+    }
+    node_info_.erase(node_info_.find(last_hash));
+  }
+
+
+
   UpdateHistory(&sp->proposal);
    //LOG(ERROR) << "get strong proposal from height:" << current_height_ << " 
->("
    //          << sp->proposal.header().proposer_id() << ","
@@ -390,14 +491,14 @@ Proposal* ProposalGraph::GetStrongestProposal() {
 }
 
 bool ProposalGraph::Cmp(int id1, int id2) {
-  // LOG(ERROR) << "commit commit num:" << id1 << " " << id2
-  //           << " commit  time:" << commit_num_[id1] << " " <<
-  //           commit_num_[id2];
-  if (commit_num_[id1] + 1 < commit_num_[id2]) {
+   //LOG(ERROR) << "commit commit num:" << id1 << " " << id2
+   //          << " commit  time:" << commit_num_[id1] << " " <<
+   //          commit_num_[id2];
+  if (commit_num_[id1]  < commit_num_[id2]) {
     return false;
   }
 
-  if (commit_num_[id1] > commit_num_[id2] + 1) {
+  if (commit_num_[id1] > commit_num_[id2] ) {
     return true;
   }
   return id1 < id2;
@@ -416,10 +517,14 @@ int ProposalGraph::CompareState(const ProposalState& 
state1,
 
 // p1 < p2
 bool ProposalGraph::Compare(const NodeInfo& p1, const NodeInfo& p2) {
-  // LOG(ERROR) << "proposer:" << p1.proposal.header().proposer_id() << " "
+  //LOG(ERROR) << "proposer:" << p1.proposal.header().proposer_id() << " "
   //           << p2.proposal.header().proposer_id()
   //          << "height:" << p1.proposal.header().height() << " "
-  //          << p2.proposal.header().height();
+  //          << p2.proposal.header().height()
+  //          <<" state:"<< p1.state<<" "<<p2.state
+  //          <<" hash cmp:"<< (p1.proposal.header().hash() < 
p2.proposal.header().hash())
+  //          <<" cmp num:" << Cmp(p1.proposal.header().proposer_id(), 
p2.proposal.header().proposer_id())
+  //          <<" sub block:" << p1.proposal.sub_block_size() <<" "<< 
p2.proposal.sub_block_size();
   if (p1.proposal.header().height() != p2.proposal.header().height()) {
     return p1.proposal.header().height() < p2.proposal.header().height();
   }
@@ -429,6 +534,16 @@ bool ProposalGraph::Compare(const NodeInfo& p1, const 
NodeInfo& p2) {
     return CompareState(p1.state, p2.state) < 0;
   }
 
+  int h = (p1.proposal.header().height())%64;
+  if ( h == 0) h = 64;
+  //LOG(ERROR)<<" check height cmp:"<<abs(p1.proposal.header().proposer_id() - 
h )<<" "<<abs(p2.proposal.header().proposer_id() - h);
+  //return abs(p1.proposal.header().proposer_id() - h ) > 
abs(p2.proposal.header().proposer_id() - h);
+
+  if (abs(p1.proposal.sub_block_size() - p2.proposal.sub_block_size()) > 5) {
+    //return p1.proposal.sub_block_size() < p2.proposal.sub_block_size();
+  }
+  return p1.proposal.header().hash() < p2.proposal.header().hash();
+
   if (p1.proposal.header().proposer_id() ==
       p2.proposal.header().proposer_id()) {
     return p1.proposal.header().proposal_id() <
@@ -456,7 +571,7 @@ Proposal* ProposalGraph::GetLatestStrongestProposal() {
 ProposalState ProposalGraph::GetProposalState(const std::string& hash) const {
   auto node_it = node_info_.find(hash);
   if (node_it == node_info_.end()) {
-    return ProposalState::None;
+    return ProposalState::New;
   }
   return node_it->second->state;
 }
@@ -475,9 +590,11 @@ int ProposalGraph::GetCurrentHeight() { return 
current_height_; }
 std::vector<Proposal*> ProposalGraph::GetNewProposals(int height) {
   std::vector<Proposal*> ps;
   for (auto it : new_proposals_) {
+  /*
     if (it.second->header().height() >= height) {
       continue;
     }
+    */
     ps.push_back(it.second);
   }
   for (Proposal* p : ps) {
@@ -486,6 +603,16 @@ std::vector<Proposal*> ProposalGraph::GetNewProposals(int 
height) {
   return ps;
 }
 
+std::vector<Block> ProposalGraph::GetNewBlocks() {
+  std::vector<Block> ps;
+  for (auto it : new_blocks_) {
+    ps.push_back(it.second);
+  }
+  //new_blocks_.clear();
+  return ps;
+}
+
+
 }  // namespace cassandra_recv
 }  // namespace cassandra
 }  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.h 
b/platform/consensus/ordering/cassandra/algorithm/proposal_graph.h
index ee21834c..696c4890 100644
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.h
+++ b/platform/consensus/ordering/cassandra/algorithm/proposal_graph.h
@@ -13,7 +13,7 @@ namespace cassandra_recv {
 
 class ProposalGraph {
  public:
-  ProposalGraph(int fault_num);
+  ProposalGraph(int fault_num, int id);
   inline void SetCommitCallBack(std::function<void(const Proposal&)> func) {
     commit_callback_ = func;
   }
@@ -34,6 +34,7 @@ class ProposalGraph {
                                                      const std::string& hash);
 
   std::vector<Proposal*> GetNewProposals(int height);
+  std::vector<Block> GetNewBlocks();
 
  private:
   struct NodeInfo {
@@ -80,6 +81,8 @@ class ProposalGraph {
       not_found_proposal_;
   std::map<std::string, Proposal*> new_proposals_;
   Stats* global_stats_;
+  int id_;
+  std::map<std::string, Block> new_blocks_;
 };
 
 }  // namespace cassandra_recv
diff --git 
a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp 
b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp
index 8e7028b5..d887ff20 100644
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp
+++ b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp
@@ -119,8 +119,10 @@ std::unique_ptr<Block> ProposalManager::GetBlock(const 
std::string& hash,
     std::unique_lock<std::mutex> lk(p_mutex_);
     auto it = pending_blocks_[sender].find(hash);
     if (it == pending_blocks_[sender].end()) {
-      LOG(ERROR) << "block from sender:" << sender << " not found";
+      //LOG(ERROR) << "block from sender:" << sender << " not found";
+      return nullptr;
       need_wait = true;
+      assert(1==0);
       continue;
     }
     assert(it != pending_blocks_[sender].end());
@@ -174,17 +176,28 @@ std::unique_ptr<Proposal> 
ProposalManager::GenerateProposal(int round,
       return nullptr;
       // LOG(ERROR) << "generate wait proposal block size:" << blocks_.size();
     }
-    int max_block = 10;
+    int max_block = 1;
     int num = 0;
     int64_t current_time = GetCurrentTime();
     proposal->set_create_time(current_time);
+    //LOG(ERROR)<<"block size:"<<blocks_.size();
     for (auto& block : blocks_) {
       data += block->hash();
       Block* ab = proposal->add_block();
+      *ab = *block;
+
+      Block * sb = proposal->add_sub_block();
+      sb->set_hash(block->hash());
+      sb->set_sender_id(block->sender_id());
+      sb->set_local_id(block->local_id());
+      sb->set_create_time(block->create_time());
+
+      /*
       ab->set_hash(block->hash());
       ab->set_sender_id(block->sender_id());
       ab->set_local_id(block->local_id());
       ab->set_create_time(block->create_time());
+      */
       //LOG(ERROR) << " add block:" << block->local_id()
       //           << " block delay:" << (current_time - block->create_time())
       //           << " block size:" << blocks_.size()
@@ -223,9 +236,19 @@ std::unique_ptr<Proposal> 
ProposalManager::GenerateProposal(int round,
     *proposal->mutable_history() = last->history();
   }
 
+  
+  {
+    std::vector<Block> ps = graph_->GetNewBlocks();
+    for(auto block : ps) {
+      auto sub_block = proposal->add_sub_block();
+      *sub_block = block;
+    }
+    //LOG(ERROR)<<" proposal sub block size:"<<proposal->sub_block_size();
+  }
+
   {
     std::vector<Proposal*> ps = graph_->GetNewProposals(round);
-    // LOG(ERROR)<<"get weak p from round:"<<round<<" size:"<<ps.size();
+    //LOG(ERROR)<<"get weak p from round:"<<round<<" size:"<<ps.size();
     for (Proposal* p : ps) {
       if (p->header().height() >= round) {
         LOG(ERROR) << "round invalid:" << round
@@ -236,8 +259,10 @@ std::unique_ptr<Proposal> 
ProposalManager::GenerateProposal(int round,
       if (p->header().hash() == last->header().hash()) {
         continue;
       }
-      // LOG(ERROR)<<"add weak p:"<<p->header().height()<<"
-      // proposer:"<<p->header().proposer_id();
+      if(p->header().proposer_id() != id_){
+        continue;
+      }
+      LOG(ERROR)<<"add weak p:"<<p->header().height()<<" 
proposer:"<<p->header().proposer_id()<<" height:"<<p->header().height();
       *proposal->mutable_weak_proposals()->add_hash() = p->header().hash();
       data += p->header().hash();
     }
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h 
b/platform/consensus/ordering/cassandra/algorithm/proposal_state.h
index b25bc293..3eb1d83a 100644
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h
+++ b/platform/consensus/ordering/cassandra/algorithm/proposal_state.h
@@ -3,6 +3,7 @@
 namespace resdb {
 namespace cassandra {
 
+/*
 enum ProposalState {
   None = 0,
   New = 1,
@@ -11,6 +12,14 @@ enum ProposalState {
   PreCommit = 4,
   Committed = 5,
 };
+*/
+
+enum ProposalState {
+  New = 0,
+  PoA = 1,
+  PoR = 2,
+  Committed = 3
+};
 
 }
 }  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/proto/proposal.proto 
b/platform/consensus/ordering/cassandra/proto/proposal.proto
index 1fe96ba1..644d5fa7 100644
--- a/platform/consensus/ordering/cassandra/proto/proposal.proto
+++ b/platform/consensus/ordering/cassandra/proto/proposal.proto
@@ -41,6 +41,7 @@ message Proposal {
   uint64 create_time = 5;
   repeated Block block = 6;
   WeakProposal weak_proposals = 7;
+  repeated Block sub_block = 8;
 };
 
 enum MessageType {
diff --git a/platform/consensus/ordering/cassandra_graph/algorithm/BUILD 
b/platform/consensus/ordering/cassandra_graph/algorithm/BUILD
new file mode 100644
index 00000000..1d86e0fe
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_graph/algorithm/BUILD
@@ -0,0 +1,76 @@
+package(default_visibility = 
["//platform/consensus/ordering/cassandra:__subpackages__"])
+
+cc_library(
+    name = "proposal_state",
+    hdrs = ["proposal_state.h"],
+)
+
+cc_library(
+    name = "proposal_manager",
+    srcs = ["proposal_manager.cpp"],
+    hdrs = ["proposal_manager.h"],
+    deps = [
+        ":proposal_graph",
+        "//common:comm",
+        "//platform/statistic:stats",
+        "//common/crypto:signature_verifier",
+        "//common/utils",
+        "//platform/consensus/ordering/cassandra/proto:proposal_cc_proto",
+    ],
+)
+
+cc_library(
+    name = "ranking",
+    srcs = ["ranking.cpp"],
+    hdrs = ["ranking.h"],
+    deps = [
+        "//common:comm",
+    ],
+)
+
+cc_library(
+    name = "proposal_graph",
+    srcs = ["proposal_graph.cpp"],
+    hdrs = ["proposal_graph.h"],
+    deps = [
+        ":ranking",
+        ":proposal_state",
+        "//platform/statistic:stats",
+        "//common:comm",
+        "//common/utils",
+        "//platform/consensus/ordering/cassandra/proto:proposal_cc_proto",
+    ],
+)
+
+cc_library(
+    name = "cassandra",
+    srcs = ["cassandra.cpp"],
+    hdrs = ["cassandra.h"],
+    deps = [
+        ":proposal_graph",
+        ":proposal_manager",
+        "//platform/statistic:stats",
+        "//common:comm",
+        "//common/crypto:signature_verifier",
+        "//platform/consensus/ordering/common/algorithm:protocol_base",
+        "//platform/common/queue:lock_free_queue",
+    ],
+)
+
+cc_test(
+    name = "proposal_graph_test",
+    srcs = ["proposal_graph_test.cpp"],
+    deps = [
+        ":proposal_graph",
+        "//common/test:test_main",
+    ],
+)
+
+cc_test(
+    name = "cassandra_test",
+    srcs = ["cassandra_test.cpp"],
+    deps = [
+        ":cassandra",
+        "//common/test:test_main",
+    ],
+)
diff --git a/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp 
b/platform/consensus/ordering/cassandra_graph/algorithm/cassandra.cpp
similarity index 100%
copy from platform/consensus/ordering/cassandra/algorithm/cassandra.cpp
copy to platform/consensus/ordering/cassandra_graph/algorithm/cassandra.cpp
diff --git a/platform/consensus/ordering/cassandra_graph/algorithm/cassandra.h 
b/platform/consensus/ordering/cassandra_graph/algorithm/cassandra.h
new file mode 100644
index 00000000..b4a5e221
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_graph/algorithm/cassandra.h
@@ -0,0 +1,114 @@
+#pragma once
+
+#include <deque>
+#include <map>
+#include <queue>
+#include <thread>
+
+#include "platform/common/queue/lock_free_queue.h"
+#include "platform/consensus/ordering/common/algorithm/protocol_base.h"
+#include "platform/consensus/ordering/cassandra/algorithm/proposal_graph.h"
+#include "platform/consensus/ordering/cassandra/algorithm/proposal_manager.h"
+#include "platform/consensus/ordering/cassandra/proto/proposal.pb.h"
+#include "platform/statistic/stats.h"
+
+namespace resdb {
+namespace cassandra {
+namespace cassandra_recv {
+
+class Cassandra: public common::ProtocolBase {
+ public:
+  Cassandra(int id, int f, int total_num, SignatureVerifier* verifier);
+  ~Cassandra();
+
+  void CheckBlock(const std::string& hash, int local_id);
+  void ReceiveBlock(std::unique_ptr<Block> block);
+  void ReceiveBlockACK(std::unique_ptr<BlockACK> block);
+  bool ReceiveTransaction(std::unique_ptr<Transaction> txn);
+  bool ReceiveProposal(std::unique_ptr<Proposal> proposal);
+  bool ReceiveVote(const VoteMessage& msg);
+  bool ReceivePrepare(const VoteMessage& msg);
+  int ReceiveRecovery(const CommittedProposals& proposals);
+
+  bool ReceiveVoteACK(const VoteMessage& msg);
+  bool ReceiveCommit(const VoteMessage& msg);
+
+  void AskBlock(const Block& block);
+  void SendBlock(const BlockQuery& block);
+
+  void AskProposal(const Proposal& proposal);
+  void SendProposal(const ProposalQuery& query);
+  void ReceiveProposalQueryResp(const ProposalQueryResp& resp);
+  void PrepareProposal(const Proposal& p);
+
+
+  void SetPrepareFunction(std::function<int(const Transaction&)> prepare);
+
+ private:
+  bool IsStop();
+
+  int SendTxn(int round);
+
+  void Commit(const VoteMessage& msg);
+  void CommitProposal(const Proposal& p);
+
+  void AsyncConsensus();
+  void AsyncCommit();
+  void AsyncPrepare();
+  bool WaitVote(int);
+  void WaitCommit();
+
+  bool CheckHistory(const Proposal& proposal);
+
+  void Reset();
+  bool CheckState(MessageType type, ProposalState state);
+
+  void TrySendRecoveery(const Proposal& proposal);
+  void BroadcastTxn();
+  bool AddProposal(const Proposal& proposal);
+
+  bool ProcessProposal(std::unique_ptr<Proposal> proposal);
+
+ private:
+  std::unique_ptr<ProposalGraph> graph_;
+  LockFreeQueue<Transaction> txns_;
+  std::unique_ptr<ProposalManager> proposal_manager_;
+  SignatureVerifier* verifier_;
+  std::mutex mutex_, g_mutex_;
+  std::map<int, std::set<int>> received_num_;
+  // int state_;
+  int id_, total_num_, f_, batch_size_;
+  std::atomic<int> is_stop_;
+  int timeout_ms_;
+  int local_txn_id_, local_proposal_id_;
+  LockFreeQueue<Proposal> commit_queue_, execute_queue_, prepare_queue_;
+  std::thread commit_thread_, consensus_thread_, block_thread_, 
prepare_thread_;
+  std::condition_variable vote_cv_;
+  std::map<int, bool> can_vote_;
+  std::atomic<int> committed_num_;
+  int voting_, start_ = false;
+  std::map<int, std::vector<std::unique_ptr<Transaction>>> uncommitted_txn_;
+
+  bool use_linear_ = false;
+  int recv_num_ = 0;
+  int execute_num_ = 0;
+  int pending_num_ = 0;
+  std::atomic<int> executed_;
+  std::atomic<int> precommitted_num_;
+  int last_vote_ = 0;
+  int execute_id_;
+
+  std::mutex block_mutex_;
+  std::set<int> received_;
+  std::map<int, std::set<int>> block_ack_;
+  std::map<int, std::vector<std::unique_ptr<Proposal>>> future_proposal_;
+
+  std::function<int(const Transaction&)> prepare_;
+  int current_round_;
+
+  Stats* global_stats_;
+};
+
+}  // namespace cassandra_recv
+}  // namespace cassandra
+}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp 
b/platform/consensus/ordering/cassandra_graph/algorithm/proposal_graph.cpp
similarity index 100%
copy from platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp
copy to platform/consensus/ordering/cassandra_graph/algorithm/proposal_graph.cpp
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.h 
b/platform/consensus/ordering/cassandra_graph/algorithm/proposal_graph.h
similarity index 100%
copy from platform/consensus/ordering/cassandra/algorithm/proposal_graph.h
copy to platform/consensus/ordering/cassandra_graph/algorithm/proposal_graph.h
diff --git 
a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp 
b/platform/consensus/ordering/cassandra_graph/algorithm/proposal_manager.cpp
similarity index 100%
copy from platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp
copy to 
platform/consensus/ordering/cassandra_graph/algorithm/proposal_manager.cpp
diff --git 
a/platform/consensus/ordering/cassandra_graph/algorithm/proposal_manager.h 
b/platform/consensus/ordering/cassandra_graph/algorithm/proposal_manager.h
new file mode 100644
index 00000000..4a937470
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_graph/algorithm/proposal_manager.h
@@ -0,0 +1,73 @@
+#pragma once
+
+#include <condition_variable>
+#include <list>
+
+#include "platform/consensus/ordering/cassandra/algorithm/proposal_graph.h"
+#include "platform/consensus/ordering/cassandra/proto/proposal.pb.h"
+#include "platform/statistic/stats.h"
+
+namespace resdb {
+namespace cassandra {
+namespace cassandra_recv {
+
+class ProposalManager {
+ public:
+  ProposalManager(int32_t id, ProposalGraph* graph);
+
+  int VerifyProposal(const Proposal& proposal);
+
+  void AddLocalBlock(std::unique_ptr<Block> block);
+  void AddBlock(std::unique_ptr<Block> block);
+  std::unique_ptr<Block> MakeBlock(
+      std::vector<std::unique_ptr<Transaction>>& txn);
+
+  std::unique_ptr<Proposal> GenerateProposal(int round, bool need_empty);
+  int CurrentRound();
+
+  void ClearProposal(const Proposal& p);
+  std::unique_ptr<Block> GetBlock(const std::string& hash, int sender);
+  Block* GetBlockSnap(const std::string& hash, int sender);
+
+  bool ContainBlock(const std::string& hash, int sender);
+  bool ContainBlock(const Block& block);
+  bool WaitBlock();
+  void BlockReady(const std::string& hash, int local_id);
+  const Block* QueryBlock(const std::string& hash);
+  std::unique_ptr<ProposalQueryResp> QueryProposal(const std::string& hash);
+  bool VerifyProposal(const Proposal* proposal);
+  void AddTmpProposal(std::unique_ptr<Proposal> proposal);
+  void ReleaseTmpProposal(const Proposal& proposal);
+  int VerifyProposalHistory(const Proposal* p);
+  void AddLocalProposal(const Proposal& proposal);
+  void RemoveLocalProposal(const std::string& hash);
+
+  int VerifyProposal(const ProposalQueryResp& resp);
+
+ private:
+  void ObtainHistoryProposal(const Proposal* p,
+                             std::set<std::pair<int, int>>& v,
+                             std::vector<const Proposal*>& resp,
+                             int current_height);
+  Proposal* GetLocalProposal(const std::string& hash);
+
+ private:
+  int32_t id_;
+  ProposalGraph* graph_;
+  int64_t local_proposal_id_ = 1, local_block_id_ = 1;
+
+  std::map<std::string, std::unique_ptr<Block>> pending_blocks_[512];
+  std::list<std::unique_ptr<Block>> blocks_;
+  std::mutex mutex_, p_mutex_, q_mutex_;
+  std::condition_variable notify_;
+  std::map<int, std::unique_ptr<Block>> blocks_candidates_;
+  std::map<std::string, std::unique_ptr<Proposal>> tmp_proposal_;
+
+  std::mutex t_mutex_;
+  std::map<std::string, std::unique_ptr<Proposal>> local_proposal_;
+  Stats* global_stats_;
+};
+
+}  // namespace cassandra_recv
+}  // namespace cassandra
+}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h 
b/platform/consensus/ordering/cassandra_graph/algorithm/proposal_state.h
similarity index 100%
copy from platform/consensus/ordering/cassandra/algorithm/proposal_state.h
copy to platform/consensus/ordering/cassandra_graph/algorithm/proposal_state.h
diff --git a/platform/consensus/ordering/cassandra_graph/algorithm/ranking.cpp 
b/platform/consensus/ordering/cassandra_graph/algorithm/ranking.cpp
new file mode 100644
index 00000000..d8454e43
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_graph/algorithm/ranking.cpp
@@ -0,0 +1,12 @@
+
+#include "platform/consensus/ordering/cassandra/algorithm/ranking.h"
+
+namespace resdb {
+namespace cassandra {
+namespace cassandra_recv {
+
+int Ranking::GetRank(int proposer_id) { return proposer_id; }
+
+}  // namespace cassandra_recv
+}  // namespace cassandra
+}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra_graph/algorithm/ranking.h 
b/platform/consensus/ordering/cassandra_graph/algorithm/ranking.h
new file mode 100644
index 00000000..d3d2dfb6
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_graph/algorithm/ranking.h
@@ -0,0 +1,14 @@
+#pragma once
+
+namespace resdb {
+namespace cassandra {
+namespace cassandra_recv {
+
+class Ranking {
+ public:
+  int GetRank(int proposer_id);
+};
+
+}  // namespace cassandra_recv
+}  // namespace cassandra
+}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra_graph/framework/BUILD 
b/platform/consensus/ordering/cassandra_graph/framework/BUILD
new file mode 100644
index 00000000..a764ecff
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_graph/framework/BUILD
@@ -0,0 +1,16 @@
+package(default_visibility = ["//visibility:private"])
+
+cc_library(
+    name = "consensus",
+    srcs = ["consensus.cpp"],
+    hdrs = ["consensus.h"],
+    visibility = [
+        "//visibility:public",
+    ],
+    deps = [
+        "//common/utils",
+        "//platform/consensus/ordering/common/framework:consensus",
+        "//platform/consensus/ordering/cassandra/algorithm:cassandra",
+    ],
+)
+
diff --git 
a/platform/consensus/ordering/cassandra_graph/framework/consensus.cpp 
b/platform/consensus/ordering/cassandra_graph/framework/consensus.cpp
new file mode 100644
index 00000000..69989bed
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_graph/framework/consensus.cpp
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "platform/consensus/ordering/cassandra/framework/consensus.h"
+
+#include <glog/logging.h>
+#include <unistd.h>
+
+#include "common/utils/utils.h"
+
+namespace resdb {
+namespace cassandra {
+
+Consensus::Consensus(const ResDBConfig& config,
+                     std::unique_ptr<TransactionManager> executor)
+    : common::Consensus(config, std::move(executor)){
+  int total_replicas = config_.GetReplicaNum();
+  int f = (total_replicas - 1) / 3;
+
+  Init();
+
+  start_ = 0;
+
+  if (config_.GetPublicKeyCertificateInfo()
+          .public_key()
+          .public_key_info()
+          .type() != CertificateKeyInfo::CLIENT) {
+    cassandra_ = std::make_unique<cassandra_recv::Cassandra>(
+        config_.GetSelfInfo().id(), f,
+                                   total_replicas, GetSignatureVerifier());
+
+    InitProtocol(cassandra_.get());
+
+
+    cassandra_->SetPrepareFunction([&](const Transaction& msg) { 
+      return Prepare(msg); 
+    });
+  }
+}
+
+int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) {
+  //LOG(ERROR)<<"receive commit:"<<request->type()<<" 
"<<MessageType_Name(request->user_type());
+  if (request->user_type() == MessageType::NewBlocks) {
+    std::unique_ptr<Block> block = std::make_unique<Block>();
+    if (!block->ParseFromString(request->data())) {
+      assert(1 == 0);
+      LOG(ERROR) << "parse proposal fail";
+      return -1;
+    }
+    cassandra_->ReceiveBlock(std::move(block));
+    return 0;
+  } else if (request->user_type() == MessageType::CMD_BlockACK) {
+    std::unique_ptr<BlockACK> block_ack = std::make_unique<BlockACK>();
+    if (!block_ack->ParseFromString(request->data())) {
+      LOG(ERROR) << "parse proposal fail";
+      assert(1 == 0);
+      return -1;
+    }
+    cassandra_->ReceiveBlockACK(std::move(block_ack));
+    return 0;
+
+  } else if (request->user_type() == MessageType::NewProposal) {
+    // LOG(ERROR)<<"receive proposal:";
+    std::unique_ptr<Proposal> proposal = std::make_unique<Proposal>();
+    if (!proposal->ParseFromString(request->data())) {
+      LOG(ERROR) << "parse proposal fail";
+      assert(1 == 0);
+      return -1;
+    }
+    if (!cassandra_->ReceiveProposal(std::move(proposal))) {
+      return -1;
+    }
+    return 0;
+  } else if (request->user_type() == MessageType::CMD_BlockQuery) {
+    std::unique_ptr<BlockQuery> block = std::make_unique<BlockQuery>();
+    if (!block->ParseFromString(request->data())) {
+      assert(1 == 0);
+      LOG(ERROR) << "parse proposal fail";
+      return -1;
+    }
+    cassandra_->SendBlock(*block);
+    return 0;
+  } else if (request->user_type() == MessageType::CMD_ProposalQuery) {
+    std::unique_ptr<ProposalQuery> query =
+      std::make_unique<ProposalQuery>();
+    if (!query->ParseFromString(request->data())) {
+      assert(1 == 0);
+      LOG(ERROR) << "parse proposal fail";
+      return -1;
+    }
+    cassandra_->SendProposal(*query);
+  } else if (request->user_type() ==
+      MessageType::CMD_ProposalQueryResponse) {
+    std::unique_ptr<ProposalQueryResp> resp =
+      std::make_unique<ProposalQueryResp>();
+    if (!resp->ParseFromString(request->data())) {
+      assert(1 == 0);
+      LOG(ERROR) << "parse proposal fail";
+      return -1;
+    }
+    cassandra_->ReceiveProposalQueryResp(*resp);
+  } 
+  return 0;
+}
+
+int Consensus::ProcessNewTransaction(std::unique_ptr<Request> request) {
+  std::unique_ptr<Transaction> txn = std::make_unique<Transaction>();
+  txn->set_data(request->data());
+  txn->set_hash(request->hash());
+  txn->set_proxy_id(request->proxy_id());
+  //LOG(ERROR)<<"receive txn";
+  return cassandra_->ReceiveTransaction(std::move(txn));
+}
+
+int Consensus::CommitMsg(const google::protobuf::Message& msg) {
+  return CommitMsgInternal(dynamic_cast<const Transaction&>(msg));
+}
+
+int Consensus::CommitMsgInternal(const Transaction& txn) {
+  //LOG(ERROR)<<"commit txn:"<<txn.id()<<" proxy id:"<<txn.proxy_id()<<" 
uid:"<<txn.uid();
+  std::unique_ptr<Request> request = std::make_unique<Request>();
+  request->set_queuing_time(txn.queuing_time());
+  request->set_data(txn.data());
+  request->set_seq(txn.id());
+  request->set_uid(txn.uid());
+  //if (txn.proposer_id() == config_.GetSelfInfo().id()) {
+    request->set_proxy_id(txn.proxy_id());
+   // LOG(ERROR)<<"commit txn:"<<txn.id()<<" proxy id:"<<request->uid();
+    //assert(request->uid()>0);
+  //}
+
+  transaction_executor_->AddExecuteMessage(std::move(request));
+  return 0;
+}
+
+
+int Consensus::Prepare(const Transaction& txn) {
+  // LOG(ERROR)<<"prepare txn:"<<txn.id()<<" proxy id:"<<txn.proxy_id()<<"
+  // uid:"<<txn.uid();
+  std::unique_ptr<Request> request = std::make_unique<Request>();
+  request->set_data(txn.data());
+  request->set_uid(txn.uid());
+  transaction_executor_->Prepare(std::move(request));
+  return 0;
+}
+
+
+}  // namespace cassandra
+}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra_graph/framework/consensus.h 
b/platform/consensus/ordering/cassandra_graph/framework/consensus.h
new file mode 100644
index 00000000..a0069c9d
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_graph/framework/consensus.h
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#pragma once
+
+#include "executor/common/transaction_manager.h"
+#include "platform/consensus/ordering/common/framework/consensus.h"
+#include "platform/consensus/ordering/cassandra/algorithm/cassandra.h"
+#include "platform/networkstrate/consensus_manager.h"
+
+namespace resdb {
+namespace cassandra {
+
+class Consensus : public common::Consensus {
+ public:
+  Consensus(const ResDBConfig& config,
+            std::unique_ptr<TransactionManager> transaction_manager);
+  virtual ~Consensus() = default;
+
+ private:
+  int ProcessCustomConsensus(std::unique_ptr<Request> request) override;
+  int ProcessNewTransaction(std::unique_ptr<Request> request) override;
+  int CommitMsg(const google::protobuf::Message& msg) override;
+  int CommitMsgInternal(const Transaction& txn);
+
+  int Prepare(const Transaction& txn);
+
+ protected:
+  std::unique_ptr<cassandra_recv::Cassandra> cassandra_;
+  Stats* global_stats_;
+  int64_t start_;
+  std::mutex mutex_;
+  int send_num_[200];
+};
+
+}  // namespace cassandra
+}  // namespace resdb
diff --git 
a/platform/consensus/ordering/cassandra_graph/framework/consensus_test.cpp 
b/platform/consensus/ordering/cassandra_graph/framework/consensus_test.cpp
new file mode 100644
index 00000000..2c8834a8
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_graph/framework/consensus_test.cpp
@@ -0,0 +1,179 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "platform/consensus/ordering/cassandra/framework/consensus.h"
+
+#include <glog/logging.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <future>
+
+#include "common/test/test_macros.h"
+#include "executor/common/mock_transaction_manager.h"
+#include "platform/config/resdb_config_utils.h"
+#include "platform/networkstrate/mock_replica_communicator.h"
+
+namespace resdb {
+namespace cassandra {
+namespace {
+
+using ::resdb::testing::EqualsProto;
+using ::testing::_;
+using ::testing::Invoke;
+using ::testing::Test;
+
+ResDBConfig GetConfig() {
+  ResDBConfig config({GenerateReplicaInfo(1, "127.0.0.1", 1234),
+                      GenerateReplicaInfo(2, "127.0.0.1", 1235),
+                      GenerateReplicaInfo(3, "127.0.0.1", 1236),
+                      GenerateReplicaInfo(4, "127.0.0.1", 1237)},
+                     GenerateReplicaInfo(1, "127.0.0.1", 1234));
+  return config;
+}
+
+class ConsensusTest : public Test {
+ public:
+  ConsensusTest() : config_(GetConfig()) {
+    auto transaction_manager =
+        std::make_unique<MockTransactionExecutorDataImpl>();
+    mock_transaction_manager_ = transaction_manager.get();
+    consensus_ =
+        std::make_unique<Consensus>(config_, std::move(transaction_manager));
+    consensus_->SetCommunicator(&replica_communicator_);
+  }
+
+  void AddTransaction(const std::string& data) {
+    auto request = std::make_unique<Request>();
+    request->set_type(Request::TYPE_NEW_TXNS);
+
+    Transaction txn;
+
+    BatchUserRequest batch_request;
+    auto req = batch_request.add_user_requests();
+    req->mutable_request()->set_data(data);
+
+    batch_request.set_local_id(1);
+    batch_request.SerializeToString(txn.mutable_data());
+
+    txn.SerializeToString(request->mutable_data());
+
+    EXPECT_EQ(consensus_->ConsensusCommit(nullptr, std::move(request)), 0);
+  }
+
+ protected:
+  ResDBConfig config_;
+  MockTransactionExecutorDataImpl* mock_transaction_manager_;
+  MockReplicaCommunicator replica_communicator_;
+  std::unique_ptr<TransactionManager> transaction_manager_;
+  std::unique_ptr<Consensus> consensus_;
+};
+
+TEST_F(ConsensusTest, NormalCase) {
+  std::promise<bool> commit_done;
+  std::future<bool> commit_done_future = commit_done.get_future();
+
+  EXPECT_CALL(replica_communicator_, BroadCast)
+      .WillRepeatedly(Invoke([&](const google::protobuf::Message& msg) {
+        Request request = *dynamic_cast<const Request*>(&msg);
+
+        if (request.user_type() == MessageType::NewProposal) {
+          LOG(ERROR) << "bc new proposal";
+          consensus_->ConsensusCommit(nullptr,
+                                      std::make_unique<Request>(request));
+          LOG(ERROR) << "recv proposal done";
+        }
+        if (request.user_type() == MessageType::Vote) {
+          LOG(ERROR) << "bc vote";
+
+          VoteMessage ack_msg;
+          assert(ack_msg.ParseFromString(request.data()));
+          for (int i = 1; i <= 3; ++i) {
+            ack_msg.set_proposer_id(i);
+            auto new_req = std::make_unique<Request>(request);
+            ack_msg.SerializeToString(new_req->mutable_data());
+
+            consensus_->ConsensusCommit(nullptr, std::move(new_req));
+          }
+        }
+        // LOG(ERROR)<<"bc type:"<<request->type()<<" user
+        // type:"<<request->user_type();
+        if (request.user_type() == MessageType::Prepare) {
+          LOG(ERROR) << "bc prepare";
+
+          VoteMessage ack_msg;
+          assert(ack_msg.ParseFromString(request.data()));
+          for (int i = 1; i <= 3; ++i) {
+            ack_msg.set_proposer_id(i);
+            auto new_req = std::make_unique<Request>(request);
+            ack_msg.SerializeToString(new_req->mutable_data());
+
+            consensus_->ConsensusCommit(nullptr, std::move(new_req));
+          }
+        }
+        if (request.user_type() == MessageType::Voteprep) {
+          LOG(ERROR) << "bc voterep:";
+
+          VoteMessage ack_msg;
+          assert(ack_msg.ParseFromString(request.data()));
+          for (int i = 1; i <= 3; ++i) {
+            ack_msg.set_proposer_id(i);
+            auto new_req = std::make_unique<Request>(request);
+            ack_msg.SerializeToString(new_req->mutable_data());
+            LOG(ERROR) << "new request type:" << new_req->user_type();
+
+            consensus_->ConsensusCommit(nullptr, std::move(new_req));
+          }
+        }
+        LOG(ERROR) << "done";
+        return 0;
+      }));
+
+  EXPECT_CALL(*mock_transaction_manager_, ExecuteData)
+      .WillOnce(Invoke([&](const std::string& msg) {
+        LOG(ERROR) << "execute txn:" << msg;
+        EXPECT_EQ(msg, "transaction1");
+        return nullptr;
+      }));
+
+  EXPECT_CALL(replica_communicator_, SendMessage(_, 0))
+      .WillRepeatedly(
+          Invoke([&](const google::protobuf::Message& msg, int64_t) {
+            Request request = *dynamic_cast<const Request*>(&msg);
+            if (request.type() == Request::TYPE_RESPONSE) {
+              LOG(ERROR) << "get response";
+              commit_done.set_value(true);
+            }
+            return;
+          }));
+
+  AddTransaction("transaction1");
+
+  commit_done_future.get();
+}
+
+}  // namespace
+}  // namespace cassandra
+}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra_graph/proto/BUILD 
b/platform/consensus/ordering/cassandra_graph/proto/BUILD
new file mode 100644
index 00000000..558db374
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_graph/proto/BUILD
@@ -0,0 +1,16 @@
+package(default_visibility = 
["//platform/consensus/ordering/cassandra:__subpackages__"])
+
+load("@rules_cc//cc:defs.bzl", "cc_proto_library")
+load("@rules_proto//proto:defs.bzl", "proto_library")
+load("@rules_proto_grpc//python:defs.bzl", "python_proto_library")
+
+proto_library(
+    name = "proposal_proto",
+    srcs = ["proposal.proto"],
+    #visibility = ["//visibility:public"],
+)
+
+cc_proto_library(
+    name = "proposal_cc_proto",
+    deps = [":proposal_proto"],
+)
diff --git a/platform/consensus/ordering/cassandra/proto/proposal.proto 
b/platform/consensus/ordering/cassandra_graph/proto/proposal.proto
similarity index 100%
copy from platform/consensus/ordering/cassandra/proto/proposal.proto
copy to platform/consensus/ordering/cassandra_graph/proto/proposal.proto
diff --git 
a/platform/consensus/ordering/common/framework/performance_manager.cpp 
b/platform/consensus/ordering/common/framework/performance_manager.cpp
index a5fc7ca8..cfc99add 100644
--- a/platform/consensus/ordering/common/framework/performance_manager.cpp
+++ b/platform/consensus/ordering/common/framework/performance_manager.cpp
@@ -272,7 +272,7 @@ int PerformanceManager::DoBatch(
   global_stats_->BroadCastMsg();
   send_num_++;
   sum_ += batch_req.size();
-  //LOG(ERROR)<<"send num:"<<send_num_<<" total num:"<<total_num_<<" 
sum:"<<sum_<<" to:"<<GetPrimary();
+  LOG(ERROR)<<"send num:"<<send_num_<<" total num:"<<total_num_<<" 
sum:"<<sum_<<" to:"<<GetPrimary();
   if (total_num_++ == 1000000) {
     stop_ = true;
     LOG(WARNING) << "total num is done:" << total_num_;
diff --git a/scripts/deploy/config/cassandra.config 
b/scripts/deploy/config/cassandra.config
index 2d91c531..645ba9bf 100644
--- a/scripts/deploy/config/cassandra.config
+++ b/scripts/deploy/config/cassandra.config
@@ -3,7 +3,7 @@
   "enable_viewchange": false,
   "recovery_enabled": false,
   "max_client_complaint_num":10,
-  "max_process_txn": 512,
+  "max_process_txn": 4096,
   "worker_num": 10,
   "input_worker_num": 1,
   "output_worker_num": 5
diff --git a/scripts/deploy/config/kv_performance_server.conf 
b/scripts/deploy/config/kv_performance_server.conf
index 98253c0a..8cb28551 100644
--- a/scripts/deploy/config/kv_performance_server.conf
+++ b/scripts/deploy/config/kv_performance_server.conf
@@ -1,8 +1,12 @@
 iplist=(
-172.31.25.232
-172.31.17.37
-172.31.30.17
-172.31.27.41
+172.31.81.8
+172.31.86.61
+172.31.84.9
+172.31.87.9
+172.31.95.18
+172.31.92.145
+172.31.81.89
+172.31.93.214
 )
 
-client_num=1
+client_num=4
diff --git a/scripts/deploy/config/kv_performance_server_32.conf 
b/scripts/deploy/config/kv_performance_server_32.conf
index 7a3ee111..58bfaf76 100644
--- a/scripts/deploy/config/kv_performance_server_32.conf
+++ b/scripts/deploy/config/kv_performance_server_32.conf
@@ -1,68 +1,68 @@
 iplist=(
-172.31.45.163
-172.31.32.226
-172.31.41.104
-172.31.47.100
-172.31.42.174
-172.31.36.105
-172.31.47.113
-172.31.45.112
-172.31.42.121
-172.31.47.116
-172.31.45.187
-172.31.45.250
-172.31.47.192
-172.31.40.62
-172.31.41.198
-172.31.45.196
-172.31.37.160
-172.31.33.9
-172.31.37.136
-172.31.44.139
-172.31.36.138
-172.31.40.211
-172.31.36.209
-172.31.36.21
-172.31.38.85
-172.31.38.151
-172.31.32.87
-172.31.39.153
-172.31.42.217
-172.31.47.218
-172.31.34.90
-172.31.36.92
-172.31.44.19
-172.31.44.89
-172.31.46.181
-172.31.41.184
-172.31.46.177
-172.31.38.180
-172.31.35.64
-172.31.38.192
-172.31.32.187
-172.31.38.190
-172.31.32.133
-172.31.33.74
-172.31.33.1
-172.31.42.130
-172.31.35.144
-172.31.33.210
-172.31.39.11
-172.31.46.79
-172.31.40.224
-172.31.39.225
-172.31.47.227
-172.31.37.166
-172.31.46.98
-172.31.44.163
-172.31.32.231
-172.31.35.169
-172.31.40.166
-172.31.45.102
-172.31.44.172
-172.31.46.173
-172.31.45.171
-172.31.46.235
+172.31.81.8
+172.31.86.61
+172.31.84.9
+172.31.87.9
+172.31.95.18
+172.31.92.145
+172.31.81.89
+172.31.93.214
+172.31.80.26
+172.31.93.218
+172.31.95.29
+172.31.84.91
+172.31.89.162
+172.31.90.98
+172.31.83.165
+172.31.88.162
+172.31.85.40
+172.31.84.168
+172.31.82.239
+172.31.82.106
+172.31.85.51
+172.31.81.47
+172.31.88.182
+172.31.93.51
+172.31.83.55
+172.31.87.119
+172.31.89.185
+172.31.87.184
+172.31.83.58
+172.31.88.186
+172.31.89.189
+172.31.84.125
+172.31.88.196
+172.31.94.129
+172.31.95.251
+172.31.94.183
+172.31.82.74
+172.31.95.72
+172.31.88.136
+172.31.82.132
+172.31.95.15
+172.31.95.205
+172.31.93.75
+172.31.82.75
+172.31.83.220
+172.31.92.215
+172.31.83.21
+172.31.87.166
+172.31.95.228
+172.31.85.163
+172.31.80.225
+172.31.88.43
+172.31.91.107
+172.31.92.42
+172.31.89.105
+172.31.91.175
+172.31.94.238
+172.31.91.173
+172.31.94.237
+172.31.90.245
+172.31.92.52
+172.31.93.240
+172.31.82.48
+172.31.89.96
 )
 
 key=~/.ssh/junchao.pem
diff --git a/scripts/deploy/config/kv_performance_server_64.conf 
b/scripts/deploy/config/kv_performance_server_64.conf
index 24237ad3..1b75f088 100644
--- a/scripts/deploy/config/kv_performance_server_64.conf
+++ b/scripts/deploy/config/kv_performance_server_64.conf
@@ -1,132 +1,132 @@
 iplist=(
-172.31.45.163
-172.31.32.226
-172.31.41.104
-172.31.47.100
-172.31.42.174
-172.31.36.105
-172.31.47.113
-172.31.45.112
-172.31.42.121
-172.31.47.116
-172.31.45.187
-172.31.45.250
-172.31.47.192
-172.31.40.62
-172.31.41.198
-172.31.45.196
-172.31.37.160
-172.31.33.9
-172.31.37.136
-172.31.44.139
-172.31.36.138
-172.31.40.211
-172.31.36.209
-172.31.36.21
-172.31.38.85
-172.31.38.151
-172.31.32.87
-172.31.39.153
-172.31.42.217
-172.31.47.218
-172.31.34.90
-172.31.36.92
-172.31.44.96
-172.31.47.254
-172.31.40.126
-172.31.39.65
-172.31.45.195
-172.31.35.112
-172.31.47.242
-172.31.45.117
-172.31.32.119
-172.31.35.234
-172.31.45.106
-172.31.41.110
-172.31.42.238
-172.31.34.97
-172.31.35.97
-172.31.39.97
-172.31.34.99
-172.31.41.40
-172.31.44.168
-172.31.38.45
-172.31.40.48
-172.31.34.221
-172.31.33.222
-172.31.37.34
-172.31.34.34
-172.31.35.202
-172.31.37.85
-172.31.37.215
-172.31.36.217
-172.31.40.199
-172.31.36.201
-172.31.34.202
-172.31.41.202
-172.31.37.0
-172.31.46.129
-172.31.32.131
-172.31.46.5
-172.31.47.188
-172.31.34.189
-172.31.40.61
-172.31.47.190
-172.31.42.181
-172.31.37.56
-172.31.44.60
-172.31.42.60
-172.31.47.50
-172.31.36.51
-172.31.34.180
-172.31.42.180
-172.31.46.28
-172.31.46.156
-172.31.47.158
-172.31.34.22
-172.31.39.22
-172.31.38.153
-172.31.43.156
-172.31.44.9
-172.31.38.141
-172.31.40.17
-172.31.42.18
-172.31.40.134
-172.31.34.6
-172.31.33.135
-172.31.42.137
-172.31.44.19
-172.31.44.89
-172.31.46.181
-172.31.41.184
-172.31.46.177
-172.31.38.180
-172.31.35.64
-172.31.38.192
-172.31.32.187
-172.31.38.190
-172.31.32.133
-172.31.33.74
-172.31.33.1
-172.31.42.130
-172.31.35.144
-172.31.33.210
-172.31.39.11
-172.31.46.79
-172.31.40.224
-172.31.39.225
-172.31.47.227
-172.31.37.166
-172.31.46.98
-172.31.44.163
-172.31.32.231
-172.31.35.169
-172.31.40.166
-172.31.45.102
-172.31.44.172
-172.31.46.173
-172.31.45.171
-172.31.46.235
+172.31.81.8
+172.31.86.61
+172.31.84.9
+172.31.87.9
+172.31.95.18
+172.31.92.145
+172.31.81.89
+172.31.93.214
+172.31.80.26
+172.31.93.218
+172.31.95.29
+172.31.84.91
+172.31.89.162
+172.31.90.98
+172.31.83.165
+172.31.88.162
+172.31.85.40
+172.31.84.168
+172.31.82.239
+172.31.82.106
+172.31.85.51
+172.31.81.47
+172.31.88.182
+172.31.93.51
+172.31.83.55
+172.31.87.119
+172.31.89.185
+172.31.87.184
+172.31.83.58
+172.31.88.186
+172.31.89.189
+172.31.84.125
+172.31.92.60
+172.31.95.58
+172.31.85.7
+172.31.91.135
+172.31.93.9
+172.31.84.8
+172.31.84.140
+172.31.88.139
+172.31.86.144
+172.31.95.142
+172.31.91.18
+172.31.87.16
+172.31.91.156
+172.31.82.26
+172.31.95.158
+172.31.91.158
+172.31.90.34
+172.31.89.33
+172.31.86.39
+172.31.87.167
+172.31.89.42
+172.31.95.41
+172.31.92.44
+172.31.80.171
+172.31.83.176
+172.31.89.48
+172.31.93.179
+172.31.93.49
+172.31.82.53
+172.31.80.180
+172.31.89.182
+172.31.85.54
+172.31.94.64
+172.31.80.255
+172.31.89.194
+172.31.90.64
+172.31.87.198
+172.31.92.196
+172.31.83.208
+172.31.94.207
+172.31.92.83
+172.31.84.81
+172.31.89.87
+172.31.94.215
+172.31.82.218
+172.31.87.217
+172.31.87.220
+172.31.95.219
+172.31.88.103
+172.31.87.98
+172.31.89.107
+172.31.86.106
+172.31.94.107
+172.31.95.235
+172.31.85.240
+172.31.93.108
+172.31.88.113
+172.31.93.112
+172.31.85.246
+172.31.93.114
+172.31.92.251
+172.31.82.246
+172.31.85.255
+172.31.90.252
+172.31.88.196
+172.31.94.129
+172.31.95.251
+172.31.94.183
+172.31.82.74
+172.31.95.72
+172.31.88.136
+172.31.82.132
+172.31.95.15
+172.31.95.205
+172.31.93.75
+172.31.82.75
+172.31.83.220
+172.31.92.215
+172.31.83.21
+172.31.87.166
+172.31.95.228
+172.31.85.163
+172.31.80.225
+172.31.88.43
+172.31.91.107
+172.31.92.42
+172.31.89.105
+172.31.91.175
+172.31.94.238
+172.31.91.173
+172.31.94.237
+172.31.90.245
+172.31.92.52
+172.31.93.240
+172.31.82.48
+172.31.89.96
 )
 
 key=~/.ssh/junchao.pem
diff --git a/scripts/null b/scripts/null
index 1afefe98..93ada22f 100644
--- a/scripts/null
+++ b/scripts/null
@@ -1 +1 @@
-/home/ubuntu/asf_resilientdb/service/contract/benchmark/data/smallbank.json: 
No such file or directory
+/home/ubuntu/asf-resilientdb/service/contract/benchmark/data/smallbank.json: 
No such file or directory

Reply via email to