This is an automated email from the ASF dual-hosted git repository.

junchao pushed a commit to branch cassandra
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git

commit 52f4525919da35671cc26b6601f6648f16fb6d1d
Author: cjcchen <[email protected]>
AuthorDate: Fri Jan 10 16:54:14 2025 +0000

    commit
---
 benchmark/protocols/cassandra_cft/BUILD            |  16 +
 .../cassandra_cft/kv_server_performance.cpp        |  89 ++++
 .../protocols/cassandra_cft/kv_service_tools.cpp   |  60 +--
 .../ordering/cassandra_cft/algorithm/BUILD         |  76 ++++
 .../ordering/cassandra_cft/algorithm/cassandra.cpp | 274 ++++++++++++
 .../ordering/cassandra_cft/algorithm/cassandra.h   | 100 +++++
 .../cassandra_cft/algorithm/proposal_graph.cpp     | 477 +++++++++++++++++++++
 .../cassandra_cft/algorithm/proposal_graph.h       |  85 ++++
 .../cassandra_cft/algorithm/proposal_manager.cpp   | 238 ++++++++++
 .../cassandra_cft/algorithm/proposal_manager.h     |  64 +++
 .../cassandra_cft/algorithm/proposal_state.h       |  16 +
 .../ordering/cassandra_cft/algorithm/ranking.cpp   |  10 +
 .../ordering/cassandra_cft/algorithm/ranking.h     |  12 +
 .../ordering/cassandra_cft/framework/BUILD         |  16 +
 .../framework/consensus.cpp                        |  84 ++--
 .../framework/consensus.h                          |  22 +-
 .../cassandra_cft/framework/consensus_test.cpp     | 179 ++++++++
 .../consensus/ordering/cassandra_cft/proto/BUILD   |  16 +
 .../ordering/cassandra_cft/proto/proposal.proto    | 124 ++++++
 .../common/framework/performance_manager.cpp       |   3 +
 .../ordering/multipaxos/algorithm/multipaxos.cpp   |  66 ++-
 .../ordering/multipaxos/algorithm/multipaxos.h     |   6 +-
 .../multipaxos/algorithm/proposal_manager.cpp      |   1 -
 .../ordering/multipaxos/framework/consensus.cpp    |  15 +
 .../ordering/multipaxos/framework/consensus.h      |   1 +
 .../ordering/multipaxos/proto/proposal.proto       |   1 +
 platform/networkstrate/consensus_manager.cpp       |   2 +-
 platform/networkstrate/replica_communicator.cpp    |  79 +++-
 platform/networkstrate/replica_communicator.h      |  13 +
 platform/proto/replica_info.proto                  |   1 +
 .../{multipaxos.config => cassandra_cft.config}    |   7 +-
 .../deploy/config/kv_performance_server_16.conf    |  64 +--
 .../deploy/config/kv_performance_server_32.conf    | 128 +++---
 scripts/deploy/config/kv_performance_server_4.conf |  13 +
 .../deploy/config/kv_performance_server_48.conf    | 101 +++++
 .../deploy/config/kv_performance_server_64.conf    | 256 +++++------
 scripts/deploy/config/kv_performance_server_8.conf |  32 +-
 scripts/deploy/config/multipaxos.config            |   7 +-
 .../performance/cassandra_cft_performance.sh       |   5 +
 scripts/deploy/performance/run_performance.sh      |   5 +-
 scripts/deploy/script/deploy.sh                    |   1 +
 41 files changed, 2421 insertions(+), 344 deletions(-)

diff --git a/benchmark/protocols/cassandra_cft/BUILD 
b/benchmark/protocols/cassandra_cft/BUILD
new file mode 100644
index 00000000..cef949a3
--- /dev/null
+++ b/benchmark/protocols/cassandra_cft/BUILD
@@ -0,0 +1,16 @@
+package(default_visibility = ["//visibility:private"])
+
+load("@bazel_skylib//rules:common_settings.bzl", "bool_flag")
+
+cc_binary(
+    name = "kv_server_performance",
+    srcs = ["kv_server_performance.cpp"],
+    deps = [
+        "//chain/storage:memory_db",
+        "//executor/kv:kv_executor",
+        "//platform/config:resdb_config_utils",
+        "//platform/consensus/ordering/cassandra_cft/framework:consensus",
+        "//service/utils:server_factory",
+    ],
+)
+
diff --git a/benchmark/protocols/cassandra_cft/kv_server_performance.cpp 
b/benchmark/protocols/cassandra_cft/kv_server_performance.cpp
new file mode 100644
index 00000000..ed4746e4
--- /dev/null
+++ b/benchmark/protocols/cassandra_cft/kv_server_performance.cpp
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include <glog/logging.h>
+
+#include "chain/storage/memory_db.h"
+#include "executor/kv/kv_executor.h"
+#include "platform/config/resdb_config_utils.h"
+#include "platform/consensus/ordering/cassandra_cft/framework/consensus.h"
+#include "platform/networkstrate/service_network.h"
+#include "platform/statistic/stats.h"
+#include "proto/kv/kv.pb.h"
+
+using namespace resdb;
+using namespace resdb::cassandra_cft;
+using namespace resdb::storage;
+
+void ShowUsage() {
+  printf("<config> <private_key> <cert_file> [logging_dir]\n");
+}
+
+std::string GetRandomKey() {
+  int num1 = rand() % 10;
+  int num2 = rand() % 10;
+  return std::to_string(num1) + std::to_string(num2);
+}
+
+int main(int argc, char** argv) {
+  if (argc < 3) {
+    ShowUsage();
+    exit(0);
+  }
+
+  // google::InitGoogleLogging(argv[0]);
+  // FLAGS_minloglevel = google::GLOG_WARNING;
+
+  char* config_file = argv[1];
+  char* private_key_file = argv[2];
+  char* cert_file = argv[3];
+
+  if (argc >= 5) {
+    auto monitor_port = Stats::GetGlobalStats(5);
+    monitor_port->SetPrometheus(argv[4]);
+  }
+
+  std::unique_ptr<ResDBConfig> config =
+      GenerateResDBConfig(config_file, private_key_file, cert_file);
+
+  config->RunningPerformance(true);
+  ResConfigData config_data = config->GetConfigData();
+
+  auto performance_consens = std::make_unique<Consensus>(
+      *config, std::make_unique<KVExecutor>(std::make_unique<MemoryDB>()));
+  performance_consens->SetupPerformanceDataFunc([]() {
+    KVRequest request;
+    request.set_cmd(KVRequest::SET);
+    request.set_key(GetRandomKey());
+    request.set_value("helloworld");
+    std::string request_data;
+    request.SerializeToString(&request_data);
+    return request_data;
+  });
+
+  auto server =
+      std::make_unique<ServiceNetwork>(*config, 
std::move(performance_consens));
+  server->Run();
+}
diff --git a/platform/consensus/ordering/multipaxos/framework/consensus.h 
b/benchmark/protocols/cassandra_cft/kv_service_tools.cpp
similarity index 56%
copy from platform/consensus/ordering/multipaxos/framework/consensus.h
copy to benchmark/protocols/cassandra_cft/kv_service_tools.cpp
index 4d08c3ac..17858ef2 100644
--- a/platform/consensus/ordering/multipaxos/framework/consensus.h
+++ b/benchmark/protocols/cassandra_cft/kv_service_tools.cpp
@@ -23,31 +23,35 @@
  *
  */
 
-#pragma once
-
-#include "executor/common/transaction_manager.h"
-#include "platform/consensus/execution/transaction_executor.h"
-#include "platform/consensus/ordering/multipaxos/algorithm/multipaxos.h"
-#include "platform/consensus/ordering/common/framework/consensus.h"
-#include "platform/networkstrate/consensus_manager.h"
-
-namespace resdb {
-namespace multipaxos {
-
-class Consensus : public common::Consensus{
- public:
-  Consensus(const ResDBConfig& config,
-            std::unique_ptr<TransactionManager> transaction_manager);
-
-  protected:
-  int ProcessCustomConsensus(std::unique_ptr<Request> request) override;
-  int ProcessNewTransaction(std::unique_ptr<Request> request) override;
-  int CommitMsg(const google::protobuf::Message& msg) override;
-  int CommitMsgInternal(const Transaction& txn);
-
-  private:
-    std::unique_ptr<MultiPaxos> multipaxos_;
-};
-
-}  // namespace tusk
-}  // namespace resdb
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <fstream>
+
+#include "common/proto/signature_info.pb.h"
+#include "interface/kv/kv_client.h"
+#include "platform/config/resdb_config_utils.h"
+
+using resdb::GenerateReplicaInfo;
+using resdb::GenerateResDBConfig;
+using resdb::KVClient;
+using resdb::ReplicaInfo;
+using resdb::ResDBConfig;
+
+int main(int argc, char** argv) {
+  if (argc < 2) {
+    printf("<config path>\n");
+    return 0;
+  }
+  std::string client_config_file = argv[1];
+  ResDBConfig config = GenerateResDBConfig(client_config_file);
+
+  config.SetClientTimeoutMs(100000);
+
+  KVClient client(config);
+
+  client.Set("start", "value");
+  printf("start benchmark\n");
+}
diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/BUILD 
b/platform/consensus/ordering/cassandra_cft/algorithm/BUILD
new file mode 100644
index 00000000..a3429088
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_cft/algorithm/BUILD
@@ -0,0 +1,76 @@
+package(default_visibility = 
["//platform/consensus/ordering/cassandra_cft:__subpackages__"])
+
+cc_library(
+    name = "proposal_state",
+    hdrs = ["proposal_state.h"],
+)
+
+cc_library(
+    name = "proposal_manager",
+    srcs = ["proposal_manager.cpp"],
+    hdrs = ["proposal_manager.h"],
+    deps = [
+        ":proposal_graph",
+        "//common:comm",
+        "//platform/statistic:stats",
+        "//common/crypto:signature_verifier",
+        "//common/utils",
+        "//platform/consensus/ordering/cassandra_cft/proto:proposal_cc_proto",
+    ],
+)
+
+cc_library(
+    name = "ranking",
+    srcs = ["ranking.cpp"],
+    hdrs = ["ranking.h"],
+    deps = [
+        "//common:comm",
+    ],
+)
+
+cc_library(
+    name = "proposal_graph",
+    srcs = ["proposal_graph.cpp"],
+    hdrs = ["proposal_graph.h"],
+    deps = [
+        ":ranking",
+        ":proposal_state",
+        "//platform/statistic:stats",
+        "//common:comm",
+        "//common/utils",
+        "//platform/consensus/ordering/cassandra_cft/proto:proposal_cc_proto",
+    ],
+)
+
+cc_library(
+    name = "cassandra",
+    srcs = ["cassandra.cpp"],
+    hdrs = ["cassandra.h"],
+    deps = [
+        ":proposal_graph",
+        ":proposal_manager",
+        "//platform/statistic:stats",
+        "//common:comm",
+        "//common/crypto:signature_verifier",
+        "//platform/consensus/ordering/common/algorithm:protocol_base",
+        "//platform/common/queue:lock_free_queue",
+    ],
+)
+
+cc_test(
+    name = "proposal_graph_test",
+    srcs = ["proposal_graph_test.cpp"],
+    deps = [
+        ":proposal_graph",
+        "//common/test:test_main",
+    ],
+)
+
+cc_test(
+    name = "cassandra_cft_test",
+    srcs = ["cassandra_cft_test.cpp"],
+    deps = [
+        ":cassandra",
+        "//common/test:test_main",
+    ],
+)
diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.cpp 
b/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.cpp
new file mode 100644
index 00000000..e4d257a2
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.cpp
@@ -0,0 +1,274 @@
+#include "platform/consensus/ordering/cassandra_cft/algorithm/cassandra.h"
+
+#include <glog/logging.h>
+
+#include "common/crypto/signature_verifier.h"
+#include "common/utils/utils.h"
+
+#define Fail
+
+namespace resdb {
+namespace cassandra_cft {
+
+Cassandra::Cassandra(int id, int f, int total_num, bool failure_mode)
+    : ProtocolBase(id, f, total_num), failure_mode_(failure_mode) {
+
+  LOG(ERROR) << "get proposal graph";
+  id_ = id;
+  total_num_ = total_num;
+  f_ = f;
+  is_stop_ = false;
+  //timeout_ms_ = 10000;
+  timeout_ms_ = 60000;
+  local_txn_id_ = 1;
+  local_proposal_id_ = 1;
+  batch_size_ = 15;
+
+  recv_num_ = 0;
+  execute_num_ = 0;
+  executed_ = 0;
+  committed_num_ = 0;
+  precommitted_num_ = 0;
+  execute_id_ = 1;
+
+  graph_ = std::make_unique<ProposalGraph>(f_);
+  proposal_manager_ = std::make_unique<ProposalManager>(id, f, total_num, 
graph_.get());
+
+  graph_->SetCommitCallBack(
+      [&](const Proposal& proposal) { 
+      //CommitProposal(proposal); 
+    });
+
+  proposal_manager_->SetCommitCallBack(
+      [&](std::unique_ptr<Proposal> proposal) { 
+      CommitProposal(std::move(proposal));
+    });
+
+  //Reset();
+
+  //consensus_thread_ = std::thread(&Cassandra::AsyncConsensus, this);
+
+  block_thread_ = std::thread(&Cassandra::BroadcastTxn, this);
+  sbc_thread_ = std::thread(&Cassandra::SBC, this);
+
+  commit_thread_ = std::thread(&Cassandra::AsyncCommit, this);
+
+  global_stats_ = Stats::GetGlobalStats();
+
+  //prepare_thread_ = std::thread(&Cassandra::AsyncPrepare, this);
+}
+
+Cassandra::~Cassandra() {
+  is_stop_ = true;
+  if (consensus_thread_.joinable()) {
+    consensus_thread_.join();
+  }
+  if (commit_thread_.joinable()) {
+    commit_thread_.join();
+  }
+  if (prepare_thread_.joinable()) {
+    prepare_thread_.join();
+  }
+}
+
+void Cassandra::SetPrepareFunction(std::function<int(const Transaction&)> 
prepare){
+  prepare_ = prepare;
+}
+
+bool Cassandra::IsStop() { return is_stop_; }
+
+bool Cassandra::ReceiveTransaction(std::unique_ptr<Transaction> txn) {
+  // LOG(ERROR)<<"recv txn:";
+  txn->set_create_time(GetCurrentTime());
+  txns_.Push(std::move(txn));
+  recv_num_++;
+  return true;
+}
+
+void Cassandra::Notify(int round) {
+  std::unique_lock<std::mutex> lk(mutex_);
+  SetSendNext(round);
+  vote_cv_.notify_one();
+}
+
+void Cassandra::BroadcastTxn() {
+  std::vector<std::unique_ptr<Transaction>> txns;
+  while (!IsStop()) {
+    std::unique_ptr<Transaction> txn = txns_.Pop();
+    if (txn == nullptr) {
+      continue;
+    }
+    txns.push_back(std::move(txn));
+
+    while (!IsStop()) {
+      //int current_round = proposal_manager_->CurrentRound();
+      std::unique_lock<std::mutex> lk(mutex_);
+      vote_cv_.wait_for(lk, std::chrono::microseconds(10000),
+                        [&] { return 
CanSendNext(proposal_manager_->CurrentRound()); });
+
+      if (CanSendNext(proposal_manager_->CurrentRound())) {
+        start_ = 1;
+        break;
+      }
+    }
+
+    for(int i = 1; i < batch_size_; ++i){
+      std::unique_ptr<Transaction> txn = txns_.Pop(0);
+      if(txn == nullptr){
+        break;
+      }
+      txns.push_back(std::move(txn));
+    }
+
+    //LOG(ERROR)<<" make block";
+    std::unique_ptr<Proposal> proposal = proposal_manager_->MakeBlock(txns);
+    //LOG(ERROR)<<" send new proposal :"<<txns.size()<<" 
height:"<<proposal->header().height();
+    Broadcast(MessageType::NewProposal, *proposal);
+    txns.clear();
+    SetSBCSendReady(proposal->header().height());
+    //LOG(ERROR)<<" make block done";
+  }
+}
+
+void Cassandra::SetSBCSendReady(int round) {
+  std::unique_lock<std::mutex> lk(send_ready_mutex_);
+  has_sent_ = round;
+  send_ready_cv_.notify_one();
+  //LOG(ERROR)<<" set send ready round:"<<round;
+}
+
+bool Cassandra::SBCSendReady(int round) {
+  return has_sent_ == round;
+}
+
+void Cassandra::SetSendNext(int round) {
+  //LOG(ERROR)<<" set send next:"<<round;
+  sent_next_ = round;
+}
+
+bool Cassandra::CanSendNext(int round) {
+  //LOG(ERROR)<<" can send next:"<<round;
+  return sent_next_ == round;
+}
+
+
+bool Cassandra::SBCRecvReady(int round) {
+  return proposal_manager_->Ready(round);
+}
+
+void Cassandra::NotifyRecvReady() {
+    std::unique_lock<std::mutex> lk(recv_ready_mutex_);
+    recv_ready_cv_.notify_one();
+}
+
+void Cassandra::SBC() {
+  std::vector<std::unique_ptr<Transaction>> txns;
+  int round = 1;
+  while (!IsStop()) {
+    while (!IsStop()) {
+      //int current_round = proposal_manager_->CurrentRound();
+      std::unique_lock<std::mutex> lk(send_ready_mutex_);
+      send_ready_cv_.wait_for(lk, std::chrono::microseconds(10000),
+                        [&] { return SBCSendReady(round); });
+
+      if (SBCSendReady(round)) {
+        break;
+      }
+    }
+    //LOG(ERROR)<<" send ready round ready:"<<round;
+
+    for(int i = 0; i < 10; ++i) {
+      //int current_round = proposal_manager_->CurrentRound();
+      std::unique_lock<std::mutex> lk(recv_ready_mutex_);
+      recv_ready_cv_.wait_for(lk, std::chrono::microseconds(100000),
+                        [&] { return SBCRecvReady(round); });
+      //if(SBCRecvReady(round)){
+      if(proposal_manager_->MayReady(round)){
+        break;
+      }
+    }
+    //LOG(ERROR)<<" cehck recv round ready:"<<round;
+    proposal_manager_->CheckVote(round-1);
+    //LOG(ERROR)<<" notify";
+    Notify(proposal_manager_->CurrentRound());
+    round++;
+  }
+}
+
+
+bool Cassandra::ReceiveProposal(std::unique_ptr<Proposal> proposal) {
+  int round = proposal->header().height();
+  int proposer = proposal->header().proposer_id();
+  //LOG(ERROR)<<" receive proposal round:"<<round<<" proposer:"<<proposer<<" 
fail mode:"<<failure_mode_;
+  if(failure_mode_){
+    if(round > 10 && proposer == 1) {
+      //LOG(ERROR)<<" reject";
+      return true;
+    }
+  }
+
+  proposal_manager_->AddProposal(std::move(proposal));
+  NotifyRecvReady();
+  return true;
+}
+
+void Cassandra::CommitProposal(std::unique_ptr<Proposal> proposal) {
+  //LOG(ERROR)<<" commit proposal, 
proposer:"<<proposal->header().proposer_id()<<" 
round:"<<proposal->header().height();
+  commit_q_.Push(std::move(proposal));
+}
+
+void Cassandra::AsyncCommit() {
+  int seq = 1;
+  while (!IsStop()) {
+    std::unique_ptr<Proposal> proposal = commit_q_.Pop();
+    if (proposal == nullptr) {
+      continue;
+    }
+    //LOG(ERROR)<<" proposer commit:"<<proposal->header().proposer_id()<<" 
round:"<<proposal->header().height()<<" node_info:"<<proposal->node_info_size();
+    if(proposal->header().height()>1){
+      assert(proposal->node_info_size() > 0);
+    }
+
+    {
+      int round = proposal->header().height();
+      int proposer = proposal->header().proposer_id();
+      assert(committed_.find(std::make_pair(proposer, round)) == 
committed_.end());
+      committed_.insert(std::make_pair(proposer, round));
+    }
+
+    std::queue<std::unique_ptr<Proposal>> q;
+    q.push(std::move(proposal));
+    std::vector<std::unique_ptr<Proposal>> list;
+    while(!q.empty()){
+      std::unique_ptr<Proposal> p = std::move(q.front());
+      q.pop();
+      //LOG(ERROR)<<" commit queue proposal, 
proposer:"<<p->header().proposer_id()<<" round:"<<p->header().height();
+      for(auto node_info : p->node_info()) {
+        int round = node_info.round();
+        int proposer = node_info.proposer();
+        if(committed_.find(std::make_pair(proposer, round)) != 
committed_.end()){
+          continue;
+        }
+        committed_.insert(std::make_pair(proposer, round));
+        //LOG(ERROR)<<" commit sub proposal:"<<proposer<<" round:"<<round;
+        std::unique_ptr<Proposal> np = 
proposal_manager_->FetchProposal(node_info);
+        assert(np != nullptr);
+        q.push(std::move(np));
+      }
+      list.push_back(std::move(p));
+    }
+
+    for(auto& p : list) {
+      int round = p->header().height();
+      int proposer = p->header().proposer_id();
+      for (Transaction& txn : *p->mutable_transactions()) {
+        txn.set_id(seq++);
+        commit_(txn);
+      }
+    }
+  }
+}
+
+
+}  // namespace cassandra_cft
+}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.h 
b/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.h
new file mode 100644
index 00000000..b7d1b85f
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.h
@@ -0,0 +1,100 @@
+#pragma once
+
+#include <deque>
+#include <map>
+#include <queue>
+#include <thread>
+
+#include "platform/common/queue/lock_free_queue.h"
+#include "platform/consensus/ordering/common/algorithm/protocol_base.h"
+#include "platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.h"
+#include 
"platform/consensus/ordering/cassandra_cft/algorithm/proposal_manager.h"
+#include "platform/consensus/ordering/cassandra_cft/proto/proposal.pb.h"
+#include "platform/statistic/stats.h"
+
+namespace resdb {
+namespace cassandra_cft {
+
+class Cassandra: public common::ProtocolBase {
+ public:
+  Cassandra(int id, int f, int total_num, bool failure_mode);
+  ~Cassandra();
+
+  bool ReceiveProposal(std::unique_ptr<Proposal> proposal);
+
+  void SetPrepareFunction(std::function<int(const Transaction&)> prepare);
+  bool ReceiveTransaction(std::unique_ptr<Transaction> txn);
+
+void BroadcastTxn();
+ private:
+  bool IsStop();
+
+  int SendTxn(int round);
+
+void Notify(int round);
+void CommitProposal(std::unique_ptr<Proposal> proposal);
+void AsyncCommit();
+
+
+void SetSBCSendReady(int round);
+bool SBCSendReady(int round);
+void RecvSBCSendReady(int round);
+bool SBCRecvReady(int round);
+void NotifyRecvReady();
+
+void SBC();
+void SetSendNext(int round);
+bool CanSendNext(int round);
+
+
+ private:
+  std::unique_ptr<ProposalGraph> graph_;
+  LockFreeQueue<Transaction> txns_;
+  std::unique_ptr<ProposalManager> proposal_manager_;
+  SignatureVerifier* verifier_;
+  std::mutex mutex_, g_mutex_;
+  std::map<int, std::set<int>> received_num_;
+  // int state_;
+  int id_, total_num_, f_, batch_size_;
+  bool failure_mode_ = false;
+  std::atomic<int> is_stop_;
+  int timeout_ms_;
+  int local_txn_id_, local_proposal_id_;
+  LockFreeQueue<Proposal> commit_q_, execute_queue_, prepare_queue_;
+  std::thread commit_thread_, consensus_thread_, block_thread_, 
prepare_thread_, sbc_thread_;
+  std::condition_variable vote_cv_;
+  std::map<int, bool> can_vote_;
+  std::atomic<int> committed_num_;
+  int voting_, start_ = false;
+  std::map<int, std::vector<std::unique_ptr<Transaction>>> uncommitted_txn_;
+
+  bool use_linear_ = false;
+  int recv_num_ = 0;
+  int execute_num_ = 0;
+  int pending_num_ = 0;
+  std::atomic<int> executed_;
+  std::atomic<int> precommitted_num_;
+  int last_vote_ = 0;
+  int execute_id_;
+
+  std::mutex block_mutex_;
+  std::set<int> received_;
+  std::map<int, std::set<int>> block_ack_;
+  std::map<int, std::vector<std::unique_ptr<Proposal>>> future_proposal_;
+  std::set<std::pair<int,int> > committed_;
+
+  std::function<int(const Transaction&)> prepare_;
+
+  int current_round_;
+
+  std::mutex send_ready_mutex_, recv_ready_mutex_;
+  std::condition_variable send_ready_cv_, recv_ready_cv_;
+  int has_sent_ = 0;
+  int sent_next_ = 1;
+
+  Stats* global_stats_;
+
+};
+
+}  // namespace cassandra_cft
+}  // namespace resdb
diff --git 
a/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.cpp 
b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.cpp
new file mode 100644
index 00000000..dcfa032e
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.cpp
@@ -0,0 +1,477 @@
+#include "platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.h"
+
+#include <glog/logging.h>
+
+#include <queue>
+#include <stack>
+
+#include "common/utils/utils.h"
+
+namespace resdb {
+namespace cassandra_cft {
+
+std::vector<ProposalState> GetStates() {
+  return std::vector<ProposalState>{ProposalState::New, 
ProposalState::Prepared,
+                                    ProposalState::PreCommit};
+}
+
+ProposalGraph::ProposalGraph(int fault_num) : f_(fault_num) {
+  ranking_ = std::make_unique<Ranking>();
+  current_height_ = 0;
+  global_stats_ = Stats::GetGlobalStats();
+}
+
+void ProposalGraph::IncreaseHeight() {
+  //LOG(ERROR) << "increase height:" << current_height_;
+  current_height_++;
+}
+
+void ProposalGraph::TryUpgradeHeight(int height) {
+  if (last_node_[height].size() > 0) {
+    // LOG(ERROR) << "upgrade height:" << height;
+    current_height_ = height;
+  } else {
+    // assert(1 == 0);
+    LOG(ERROR) << "need to recovery";
+  }
+}
+
+std::string Encode(const std::string& hash) {
+  std::string ret;
+  for (int i = 0; i < hash.size(); ++i) {
+    int x = hash[i];
+    ret += std::to_string(x);
+  }
+  return ret;
+}
+
+void ProposalGraph::AddProposalOnly(const Proposal& proposal) {
+  auto it = node_info_.find(proposal.header().hash());
+  if (it == node_info_.end()) {
+    auto np = std::make_unique<NodeInfo>(proposal);
+    node_info_[proposal.header().hash()] = std::move(np);
+    //LOG(ERROR) << "add proposal proposer:" << proposal.header().proposer_id()
+    //           << " id:" << proposal.header().proposal_id()
+    //           << " hash:" << Encode(proposal.header().hash());
+  }
+}
+
+int ProposalGraph::AddProposal(const Proposal& proposal) {
+   //LOG(ERROR) << "add proposal height:" << proposal.header().height()
+   //          << " current height:" << current_height_<<" 
from:"<<proposal.header().proposer_id()<<" proposal 
id:"<<proposal.header().proposal_id();
+  assert(current_height_ >= latest_commit_.header().height());
+  /*
+  if (proposal.header().height() < current_height_) {
+    LOG(ERROR) << "height not match:" << current_height_
+               << " proposal height:" << proposal.header().height();
+    return false;
+  }
+  */
+
+  if (proposal.header().height() > current_height_) {
+    pending_header_[proposal.header().height()].insert(
+        proposal.header().proposer_id());
+  } else {
+    while (!pending_header_.empty()) {
+      if (pending_header_.begin()->first <= current_height_) {
+        pending_header_.erase(pending_header_.begin());
+      } else {
+        break;
+      }
+    }
+  }
+
+  if (proposal.header().height() > current_height_ + 1) {
+    LOG(ERROR) << "height not match:" << current_height_
+               << " proposal height:" << proposal.header().height();
+    if (pending_header_[proposal.header().height()].size() >= f_ + 1) {
+      TryUpgradeHeight(proposal.header().height());
+    }
+    return 1;
+  }
+
+  if (!VerifyParent(proposal)) {
+    LOG(ERROR) << "verify parent fail:" << proposal.header().proposer_id()
+               << " id:" << proposal.header().proposal_id();
+    // assert(1==0);
+    return 2;
+  }
+
+  // LOG(ERROR)<<"history size:"<<proposal.history_size();
+  for (const auto& history : proposal.history()) {
+    std::string hash = history.hash();
+    auto node_it = node_info_.find(hash);
+    assert(node_it != node_info_.end());
+    /*
+    if (node_it == node_info_.end()) {
+      LOG(ERROR) << " history node not found";
+      return false;
+    }
+    else {
+      LOG(ERROR)<<"find history";
+    }
+    */
+  }
+
+  for (const auto& history : proposal.history()) {
+    std::string hash = history.hash();
+    auto node_it = node_info_.find(hash);
+    for (auto s : GetStates()) {
+      if (s >= node_it->second->state && s <= history.state()) {
+        if (s != history.state()) {
+          LOG(ERROR) << "update from higher state";
+        }
+        node_it->second->votes[s].insert(proposal.header().proposer_id());
+      }
+    }
+    // 
LOG(ERROR)<<"proposal:("<<node_it->second->proposal.header().proposer_id()
+    //<<","<<node_it->second->proposal.header().proposal_id()<<")"
+    //<<" history state:"<<history.state()<<"
+    //num:"<<node_it->second->votes[history.state()].size();
+    CheckState(node_it->second.get(),
+               
static_cast<resdb::cassandra_cft::ProposalState>(history.state()));
+    if (node_it->second->state == ProposalState::PreCommit) {
+      Commit(hash);
+    }
+    //LOG(ERROR)<<"history 
proposal:("<<node_it->second->proposal.header().proposer_id()
+    //<<","<<node_it->second->proposal.header().proposal_id()
+    //<<" state:"<<node_it->second->state;
+  }
+
+  if (proposal.header().height() < current_height_) {
+    LOG(ERROR) << "height not match:" << current_height_
+               << " proposal height:" << proposal.header().height();
+
+    // LOG(ERROR)<<"add proposal proposer:"<<proposal.header().proposer_id()<<"
+    // id:"<<proposal.header().proposal_id();
+    // g_[proposal.header().prehash()].push_back(proposal.header().hash());
+    auto np = std::make_unique<NodeInfo>(proposal);
+    new_proposals_[proposal.header().hash()] = &np->proposal;
+    node_info_[proposal.header().hash()] = std::move(np);
+    last_node_[proposal.header().height()].insert(proposal.header().hash());
+
+    return 1;
+  } else {
+    g_[proposal.header().prehash()].push_back(proposal.header().hash());
+    auto np = std::make_unique<NodeInfo>(proposal);
+    new_proposals_[proposal.header().hash()] = &np->proposal;
+    // LOG(ERROR)<<"add proposal proposer:"<<proposal.header().proposer_id()<<"
+    // id:"<<proposal.header().proposal_id()<<"
+    // hash:"<<Encode(proposal.header().hash());
+    node_info_[proposal.header().hash()] = std::move(np);
+    last_node_[proposal.header().height()].insert(proposal.header().hash());
+  }
+  return 0;
+}
+
+void ProposalGraph::UpgradeState(ProposalState& state) {
+  switch (state) {
+    case None:
+    case New:
+      state = Prepared;
+      break;
+    case Prepared:
+      state = PreCommit;
+      break;
+    default:
+      break;
+  }
+}
+
+int ProposalGraph::CheckState(NodeInfo* node_info, ProposalState state) {
+  // LOG(ERROR) << "node: (" << node_info->proposal.header().proposer_id() <<
+  // ","
+  //           << node_info->proposal.header().proposal_id()
+  //           << ") state:" << node_info->state
+  //           << " vote num:" << node_info->votes[node_info->state].size();
+
+  while (node_info->votes[node_info->state].size() >= 2 * f_ + 1) {
+    UpgradeState(node_info->state);
+    // LOG(ERROR) << "==========  proposal:("
+    //           << node_info->proposal.header().proposer_id() << ","
+    //           << node_info->proposal.header().proposal_id() << ")"
+    //           << " upgrate state:" << node_info->state << (GetCurrentTime() 
-
+    //           node_info->proposal.create_time());
+  }
+  return true;
+}
+
+void ProposalGraph::Commit(const std::string& hash) {
+/*
+  auto it = node_info_.find(hash);
+  if (it == node_info_.end()) {
+    LOG(ERROR) << "node not found, hash:" << hash;
+    assert(1 == 0);
+    return;
+  }
+
+  if (it->second->state != ProposalState::PreCommit) {
+    LOG(ERROR) << "hash not committed:" << hash;
+    assert(1 == 0);
+    return;
+  }
+
+  std::set<std::string> is_main_hash;
+  is_main_hash.insert(hash);
+  
+  int from_proposer = it->second->proposal.header().proposer_id();
+  int from_proposal_id = it->second->proposal.header().proposal_id();
+   //LOG(ERROR)<<"commit :"<<it->second->proposal.header().proposer_id()<<" 
id:"<<it->second->proposal.header().proposal_id();
+
+  std::vector<std::vector<Proposal*>> commit_p;
+  auto bfs = [&]() {
+    std::queue<std::string> q;
+    q.push(hash);
+    while (!q.empty()) {
+      std::string c_hash = q.front();
+      q.pop();
+
+      auto it = node_info_.find(c_hash);
+      if (it == node_info_.end()) {
+        LOG(ERROR) << "node not found, hash:";
+        assert(1 == 0);
+      }
+
+      Proposal* p = &it->second->proposal;
+      if (it->second->state == ProposalState::Committed) {
+        // LOG(ERROR)<<" try commit proposal,
+        // sender:"<<p->header().proposer_id()
+        //<<" proposal id:"<<p->header().proposal_id()<<" has been committed";
+        continue;
+      }
+
+      it->second->state = ProposalState::Committed;
+      if (is_main_hash.find(c_hash) != is_main_hash.end()) {
+        commit_num_[p->header().proposer_id()]++;
+        // LOG(ERROR)<<"commit main node:"<<p->header().proposer_id();
+        is_main_hash.insert(p->header().prehash());
+        commit_p.push_back(std::vector<Proposal*>());
+      }
+
+      commit_p.back().push_back(p);
+       //LOG(ERROR)<<"commit node:"<<p->header().proposer_id()<<" 
id:"<<p->header().proposal_id()
+      //<<" weak proposal size:"<<p->weak_proposals().hash_size();
+       //LOG(ERROR)<<"push p:"<<p->header().proposer_id();
+      for (const std::string& w_hash : p->weak_proposals().hash()) {
+        auto it = node_info_.find(w_hash);
+        if (it == node_info_.end()) {
+          LOG(ERROR) << "node not found, hash:";
+          assert(1 == 0);
+        }
+
+        // LOG(ERROR)<<"add weak
+        // proposal:"<<it->second->proposal.header().proposer_id()<<"
+        // id:"<<it->second->proposal.header().proposal_id();
+        q.push(w_hash);
+      }
+      if (!p->header().prehash().empty()) {
+        q.push(p->header().prehash());
+      }
+    }
+  };
+
+  bfs();
+  if (commit_p.size() > 1) {
+    LOG(ERROR) << "commit more hash";
+  }
+  int block_num = 0;
+  for (int i = commit_p.size() - 1; i >= 0; i--) {
+    for (int j = 0; j < commit_p[i].size(); ++j) {
+      block_num += commit_p[i][j]->block_size();
+      if (commit_callback_) {
+        commit_callback_(*commit_p[i][j]);
+      }
+    }
+  }
+  //global_stats_->AddCommitBlock(block_num);
+  */
+
+  // TODO clean
+  //last_node_[it->second->proposal.header().height()].clear();
+  //latest_commit_ = it->second->proposal;
+  //it->second->state = ProposalState::Committed;
+  // Clear(latest_commit_.header().hash());
+}
+
+std::vector<std::unique_ptr<Proposal>> ProposalGraph::GetNotFound(
+    int height, const std::string& hash) {
+  auto it = not_found_proposal_.find(height);
+  if (it == not_found_proposal_.end()) {
+    return std::vector<std::unique_ptr<Proposal>>();
+  }
+  auto pre_it = it->second.find(hash);
+  std::vector<std::unique_ptr<Proposal>> ret;
+  if (pre_it != it->second.end()) {
+    ret = std::move(pre_it->second);
+    it->second.erase(pre_it);
+    LOG(ERROR) << "found future height:" << height;
+  }
+  return ret;
+}
+
+bool ProposalGraph::VerifyParent(const Proposal& proposal) {
+  // LOG(ERROR) << "last commit:" << latest_commit_.header().proposal_id()
+  //           << " current :" << proposal.header().proposal_id()
+  //           << " height:" << proposal.header().height();
+
+  if (proposal.header().prehash() == latest_commit_.header().hash()) {
+    return true;
+  }
+
+  std::string prehash = proposal.header().prehash();
+  // LOG(ERROR)<<"prehash:"<<prehash;
+
+  auto it = node_info_.find(prehash);
+  if (it == node_info_.end()) {
+    LOG(ERROR) << "prehash not here";
+    
not_found_proposal_[proposal.header().height()][proposal.header().prehash()]
+        .push_back(std::make_unique<Proposal>(proposal));
+    return false;
+  } else {
+    if (proposal.header().height() !=
+        it->second->proposal.header().height() + 1) {
+      LOG(ERROR) << "link to invalid proposal, height:"
+                 << proposal.header().height()
+                 << " pre height:" << it->second->proposal.header().height();
+      return false;
+    }
+  }
+  return true;
+}
+
+void ProposalGraph::UpdateHistory(Proposal* proposal) {
+  proposal->mutable_history()->Clear();
+  std::string hash = proposal->header().hash();
+
+  for (int i = 0; i < 3 && !hash.empty(); ++i) {
+    auto node_it = node_info_.find(hash);
+    auto his = proposal->add_history();
+    his->set_hash(hash);
+    his->set_state(node_it->second->state);
+    his->set_sender(node_it->second->proposal.header().proposer_id());
+    his->set_id(node_it->second->proposal.header().proposal_id());
+    hash = node_it->second->proposal.header().prehash();
+  }
+}
+
+Proposal* ProposalGraph::GetStrongestProposal() {
+  // LOG(ERROR) << "get strong proposal from height:" << current_height_;
+  if (last_node_.find(current_height_) == last_node_.end()) {
+    LOG(ERROR) << "no data:" << current_height_;
+    return nullptr;
+  }
+
+  NodeInfo* sp = nullptr;
+  for (const auto& last_hash : last_node_[current_height_]) {
+    NodeInfo* node_info = node_info_[last_hash].get();
+    if (sp == nullptr || Compare(*sp, *node_info)) {
+      sp = node_info;
+    }
+  }
+
+  UpdateHistory(&sp->proposal);
+   //LOG(ERROR) << "get strong proposal from height:" << current_height_ << " 
->("
+   //          << sp->proposal.header().proposer_id() << ","
+   //          << sp->proposal.header().proposal_id() << ")";
+  return &sp->proposal;
+}
+
+bool ProposalGraph::Cmp(int id1, int id2) {
+  // LOG(ERROR) << "commit commit num:" << id1 << " " << id2
+  //           << " commit  time:" << commit_num_[id1] << " " <<
+  //           commit_num_[id2];
+  if (commit_num_[id1] + 1 < commit_num_[id2]) {
+    return false;
+  }
+
+  if (commit_num_[id1] > commit_num_[id2] + 1) {
+    return true;
+  }
+  return id1 < id2;
+}
+
+int ProposalGraph::StateScore(const ProposalState& state) {
+  // return state == ProposalState::Prepared? 1:0;
+  return state;
+}
+
+int ProposalGraph::CompareState(const ProposalState& state1,
+                                const ProposalState& state2) {
+  // LOG(ERROR) << "check state:" << state1 << " " << state2;
+  return StateScore(state1) - StateScore(state2);
+}
+
+// p1 < p2
+bool ProposalGraph::Compare(const NodeInfo& p1, const NodeInfo& p2) {
+  // LOG(ERROR) << "proposer:" << p1.proposal.header().proposer_id() << " "
+  //           << p2.proposal.header().proposer_id()
+  //          << "height:" << p1.proposal.header().height() << " "
+  //          << p2.proposal.header().height();
+  if (p1.proposal.header().height() != p2.proposal.header().height()) {
+    return p1.proposal.header().height() < p2.proposal.header().height();
+  }
+  // LOG(ERROR)<<"proposer:"<<p1.proposal.header().proposer_id()<<"
+  // "<<p2.proposal.header().proposer_id();
+  if (CompareState(p1.state, p2.state) != 0) {
+    return CompareState(p1.state, p2.state) < 0;
+  }
+
+  if (p1.proposal.header().proposer_id() ==
+      p2.proposal.header().proposer_id()) {
+    return p1.proposal.header().proposal_id() <
+           p2.proposal.header().proposal_id();
+  }
+
+  return Cmp(p1.proposal.header().proposer_id(),
+             p2.proposal.header().proposer_id());
+}
+
+Proposal* ProposalGraph::GetLatestStrongestProposal() {
+  Proposal* sp = GetStrongestProposal();
+  if (sp == nullptr) {
+    if (current_height_ > 0) {
+      assert(1 == 0);
+    }
+    return &latest_commit_;
+  }
+  // LOG(ERROR) << "====== get strong proposal from:" <<
+  // sp->header().proposer_id()
+  //           << " id:" << sp->header().proposal_id();
+  return sp;
+}
+
+ProposalState ProposalGraph::GetProposalState(const std::string& hash) const {
+  auto node_it = node_info_.find(hash);
+  if (node_it == node_info_.end()) {
+    return ProposalState::None;
+  }
+  return node_it->second->state;
+}
+
+const Proposal* ProposalGraph::GetProposalInfo(const std::string& hash) const {
+  auto it = node_info_.find(hash);
+  if (it == node_info_.end()) {
+    LOG(ERROR) << "hash not found:" << Encode(hash);
+    return nullptr;
+  }
+  return &it->second->proposal;
+}
+
+int ProposalGraph::GetCurrentHeight() { return current_height_; }
+
+std::vector<Proposal*> ProposalGraph::GetNewProposals(int height) {
+  std::vector<Proposal*> ps;
+  for (auto it : new_proposals_) {
+    if (it.second->header().height() >= height) {
+      continue;
+    }
+    ps.push_back(it.second);
+  }
+  for (Proposal* p : ps) {
+    new_proposals_.erase(new_proposals_.find(p->header().hash()));
+  }
+  return ps;
+}
+
+}  // namespace cassandra_cft
+}  // namespace resdb
diff --git 
a/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.h 
b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.h
new file mode 100644
index 00000000..a0917b54
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.h
@@ -0,0 +1,85 @@
+#pragma once
+
+#include <map>
+
+#include "platform/consensus/ordering/cassandra_cft/algorithm/proposal_state.h"
+#include "platform/consensus/ordering/cassandra_cft/algorithm/ranking.h"
+#include "platform/consensus/ordering/cassandra_cft/proto/proposal.pb.h"
+#include "platform/statistic/stats.h"
+
+namespace resdb {
+namespace cassandra_cft {
+
+class ProposalGraph {
+ public:
+  ProposalGraph(int fault_num);
+  inline void SetCommitCallBack(std::function<void(const Proposal&)> func) {
+    commit_callback_ = func;
+  }
+
+  int AddProposal(const Proposal& proposal);
+  void AddProposalOnly(const Proposal& proposal);
+
+  Proposal* GetLatestStrongestProposal();
+  const Proposal* GetProposalInfo(const std::string& hash) const;
+
+  int GetCurrentHeight();
+
+  void Clear(const std::string& hash);
+  void IncreaseHeight();
+  ProposalState GetProposalState(const std::string& hash) const;
+
+  std::vector<std::unique_ptr<Proposal>> GetNotFound(int height,
+                                                     const std::string& hash);
+
+  std::vector<Proposal*> GetNewProposals(int height);
+
+ private:
+  struct NodeInfo {
+    Proposal proposal;
+    ProposalState state;
+    int score;
+    int is_main;
+    // std::set<int> received_num[5];
+    std::map<int, std::set<int>> votes;
+
+    NodeInfo(const Proposal& proposal)
+        : proposal(proposal), state(ProposalState::New), score(0), is_main(0) 
{}
+  };
+
+  bool VerifyParent(const Proposal& proposal);
+
+  bool Compare(const NodeInfo& p1, const NodeInfo& p2);
+  bool Cmp(int id1, int id2);
+  int StateScore(const ProposalState& state);
+  int CompareState(const ProposalState& state1, const ProposalState& state2);
+
+  Proposal* GetStrongestProposal();
+
+  void UpdateHistory(Proposal* proposal);
+  int CheckState(NodeInfo* node_info, ProposalState state);
+  void UpgradeState(ProposalState& state);
+  void TryUpgradeHeight(int height);
+
+  void Commit(const std::string& hash);
+
+ private:
+  Proposal latest_commit_;
+  std::map<std::string, std::vector<std::string>> g_;
+  std::map<std::string, std::unique_ptr<NodeInfo>> node_info_;
+  std::map<std::string, std::vector<VoteMessage>> not_found_;
+  std::unique_ptr<Ranking> ranking_;
+  std::map<int, int> commit_num_;
+  std::map<int, std::set<std::string>> last_node_;
+  int current_height_;
+  uint32_t f_;
+  std::function<void(const Proposal&)> commit_callback_;
+  std::map<int, std::set<int>> pending_header_;
+  std::map<int, std::map<std::string, std::vector<std::unique_ptr<Proposal>>>>
+      not_found_proposal_;
+  std::map<std::string, Proposal*> new_proposals_;
+  Stats* global_stats_;
+};
+
+}  // namespace cassandra_cft
+}  // namespace resdb
diff --git 
a/platform/consensus/ordering/cassandra_cft/algorithm/proposal_manager.cpp 
b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_manager.cpp
new file mode 100644
index 00000000..ad163356
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_manager.cpp
@@ -0,0 +1,238 @@
+#include 
"platform/consensus/ordering/cassandra_cft/algorithm/proposal_manager.h"
+
+#include <glog/logging.h>
+
+#include "common/crypto/signature_verifier.h"
+#include "common/utils/utils.h"
+
+namespace resdb {
+namespace cassandra_cft {
+
+ProposalManager::ProposalManager(int32_t id, int f, int total_num, 
ProposalGraph* graph)
+    : id_(id), f_(f), total_num_(total_num), graph_(graph) {
+  local_proposal_id_ = 1;
+  round_ = 1;
+  global_stats_ = Stats::GetGlobalStats();
+}
+
+int ProposalManager::CurrentRound() {
+  return round_;
+}
+
+std::unique_ptr<Proposal> ProposalManager::MakeBlock(
+    std::vector<std::unique_ptr<Transaction>>& txns) {
+  auto proposal = std::make_unique<Proposal>();
+  for (const auto& txn : txns) {
+    *proposal->add_transactions() = *txn;
+  }
+
+  proposal->mutable_header()->set_height(round_++);
+  proposal->mutable_header()->set_proposer_id(id_);
+  proposal->set_sender(id_);
+  //LOG(ERROR)<<" make proposal round:"<<proposal->header().height();
+  //proposal->set_local_id(local_block_id_++);
+  GeneateGraph(proposal.get());
+  return proposal;
+}
+
+void ProposalManager::GeneateGraph(Proposal * proposal) {
+  int round = proposal->header().height();
+  std::unique_lock<std::mutex> lk(mutex_);
+  for(auto it : proposals_) {
+    int proposer = it.first;
+    if(it.second.round() < round) {
+      //LOG(ERROR)<<" add graph proposal round:"<<it.second.round()<<" 
proposer:"<<proposer;
+      *proposal->add_node_info() = it.second;
+    }
+    else {
+      if(round>1) {
+        //LOG(ERROR)<<" add graph pre proposal round:"<<it.second.round()<<" 
proposer:"<<proposer;
+        assert(proposals_rounds_[round-1][proposer]->proposal !=nullptr);
+        *proposal->add_node_info() = 
GetNodeInfo(*proposals_rounds_[round-1][proposer]->proposal);
+      }
+    }
+  }
+  if(round > 1) {
+    GenerateVoteInfo(proposal);
+  }
+  //LOG(ERROR)<<" ====== "<<round;
+}
+
+void ProposalManager::GenerateVoteInfo(Proposal * proposal) {
+  NodeInfo * vote_info = proposal->mutable_vote();
+  int proposer = 1;
+  int vote_round = -1;
+  int round = proposal->header().height()-1;
+  for(const auto& p : proposal->node_info()) {
+    //LOG(ERROR)<<" node info proposer:"<<p.proposer()<<" 
round:"<<p.round()<<" vote record:"<<vote_record_[p.proposer()];
+    if(p.round() != round) {
+      continue;
+    }
+    if(vote_record_[p.proposer()] > vote_round) {
+      vote_round = vote_record_[p.proposer()];
+      proposer = p.proposer();
+    }
+  }
+  //LOG(ERROR)<<" generate vote:"<<proposer<<" vote round:"<<vote_round<<" 
round:"<<round;
+  assert(vote_round>=0);
+  vote_info->set_proposer(proposer);
+  vote_info->set_round(round);
+}
+
+void ProposalManager::AddProposal(std::unique_ptr<Proposal> proposal) {
+  int round = proposal->header().height();
+  int proposer = proposal->header().proposer_id();
+  //LOG(ERROR)<<" add proposal round:"<<round<<" from proposer:"<<proposer;
+  NodeInfo node_info = GetNodeInfo(*proposal);
+  {
+    std::unique_lock<std::mutex> lk(mutex_);
+    /*
+    if(round > 1) {
+      AddVote(*proposal);
+    }
+    */
+    proposals_[proposer] = node_info;
+    LOG(ERROR)<<" add proposal round:"<<round<<" from proposer:"<<proposer;
+    proposals_rounds_[round][proposer] = 
std::make_unique<ProposalInfo>(std::move(proposal));
+    //LOG(ERROR)<<" add proposal round:"<<round<<" from proposer:"<<proposer;
+    assert(proposals_rounds_[round][proposer]->proposal != nullptr);
+  }
+}
+
+std::unique_ptr<Proposal> ProposalManager::FetchProposal(const NodeInfo& info) 
{
+  int round = info.round();
+  int proposer = info.proposer();
+  std::unique_lock<std::mutex> lk(mutex_);
+  LOG(ERROR)<<" fetch proposer:"<<proposer<<" round:"<<round;
+  assert(proposals_rounds_[round][proposer]->proposal !=nullptr);
+  return std::move(proposals_rounds_[round][proposer]->proposal);
+}
+
+void ProposalManager::AddVote(const Proposal& proposal) {
+  NodeInfo vote_info = proposal.vote();
+  int round = vote_info.round();
+  int proposer = vote_info.proposer();
+  //LOG(ERROR)<<" add vote:"<<round<<" proposer:"<<proposer;
+  ProposalInfo *pi = proposals_rounds_[round][proposer].get();
+  assert(pi != nullptr);
+  pi->vote.insert(proposal.header().proposer_id());
+  if(pi->vote.size() == f_+1) {
+    //LOG(ERROR)<<" vote done:"<<round<<" proposer:"<<proposer;
+    assert(round > vote_record_[proposer]);
+    vote_record_[proposer] = round;
+    vote_proposer_[round] = proposer;
+  }
+}
+
+
+void ProposalManager::CheckVote(int round) {
+
+  if(round < 1) {
+    return;
+  }
+
+  int leader = -1;
+  std::map<int,int> num;
+  {
+    std::unique_lock<std::mutex> lk(mutex_);
+    for(auto& it : proposals_rounds_[round+1]) {
+      ProposalInfo *rpi = it.second.get();
+      Proposal * p = rpi->proposal.get();
+      {
+        NodeInfo vote_info = p->vote();
+        int round = vote_info.round();
+        int proposer = vote_info.proposer();
+        num[proposer]++;
+        if(num[proposer] == f_+1) {
+          leader = proposer;
+        }
+      }
+    }
+  }
+
+/*
+      //LOG(ERROR)<<" add vote:"<<round<<" proposer:"<<proposer<<" from 
round:"<<round+1<<" proposer:"<<p->header().proposer_id();
+      ProposalInfo *pi = proposals_rounds_[round][proposer].get();
+
+      assert(pi != nullptr);
+      pi->vote.insert(p->header().proposer_id());
+      if(pi->vote.size() == f_+1) {
+        //LOG(ERROR)<<" vote done:"<<round<<" proposer:"<<proposer;
+        assert(round > vote_record_[proposer]);
+        vote_record_[proposer] = round;
+        leader = proposer;
+        break;
+      }
+    }
+  }
+  */
+
+  //LOG(ERROR)<<" check vote:"<<round;
+  int proposer = leader;
+  //LOG(ERROR)<<" check vote:"<<round<<" proposer:"<<proposer;
+
+  std::unique_ptr<Proposal> commit_p = nullptr;
+  while(commit_p == nullptr){
+    std::unique_lock<std::mutex> lk(mutex_);
+    ProposalInfo *pi = proposals_rounds_[round][proposer].get();
+    //LOG(ERROR)<<" get pi:"<<(pi == nullptr);
+    if(pi == nullptr) {
+      usleep(100);
+      continue;
+    }
+    commit_p = std::move(pi->proposal);
+    break;
+  }
+  //assert(pi != nullptr);
+  //assert(pi->vote.size() >= f_+1);
+  //LOG(ERROR)<<" commit vote:"<<round<<" proposer:"<<proposer;
+  CommitProposal(std::move(commit_p));
+  //CommitProposal(std::move(pi->proposal));
+  //vote_proposer_.erase(vote_proposer_.find(round));
+  //LOG(ERROR)<<" check vote done";
+}
+
+NodeInfo ProposalManager::GetNodeInfo(const Proposal& proposal) {
+  int round = proposal.header().height();
+  int proposer = proposal.header().proposer_id();
+  NodeInfo node_info;
+  node_info.set_round(round);
+  node_info.set_proposer(proposer);
+  return node_info;
+}
+
+void ProposalManager::CommitProposal(std::unique_ptr<Proposal> proposal) {
+  commit_func_(std::move(proposal));
+}
+
+bool ProposalManager::Ready() {
+  if(round_ == 1){
+    return true;
+  }
+  std::unique_lock<std::mutex> lk(mutex_);
+  //LOG(ERROR)<<" round size:"<<proposals_rounds_[round_-1].size()<<" 
total:"<<total_num_<<" round:"<<round_;
+  return proposals_rounds_[round_-1].size() == total_num_;
+}
+
+bool ProposalManager::Ready(int round) {
+  if(round < 1){
+    return true;
+  }
+  std::unique_lock<std::mutex> lk(mutex_);
+  //LOG(ERROR)<<" round size:"<<proposals_rounds_[round].size()<<" 
total:"<<total_num_<<" round:"<<round;
+  return proposals_rounds_[round].size() == total_num_;
+}
+
+bool ProposalManager::MayReady(int round){
+  if(round < 1){
+    return true;
+  }
+  std::unique_lock<std::mutex> lk(mutex_);
+  //LOG(ERROR)<<" round size:"<<proposals_rounds_[round].size()<<" 
total:"<<total_num_<<" round:"<<round;
+  return proposals_rounds_[round].size() >= f_+1;
+
+}
+
+
+}  // namespace cassandra_cft
+}  // namespace resdb
diff --git 
a/platform/consensus/ordering/cassandra_cft/algorithm/proposal_manager.h 
b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_manager.h
new file mode 100644
index 00000000..05f7a2af
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_manager.h
@@ -0,0 +1,64 @@
+#pragma once
+
+#include <condition_variable>
+#include <list>
+
+#include "platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.h"
+#include "platform/consensus/ordering/cassandra_cft/proto/proposal.pb.h"
+#include "platform/statistic/stats.h"
+
+namespace resdb {
+namespace cassandra_cft {
+
+struct ProposalInfo {
+  std::unique_ptr<Proposal> proposal;
+  std::set<int> vote;
+  ProposalInfo(std::unique_ptr<Proposal> proposal) : 
proposal(std::move(proposal)){}
+};
+
+class ProposalManager {
+ public:
+  ProposalManager(int32_t id, int f, int total_num, ProposalGraph* graph);
+
+  std::unique_ptr<Proposal> MakeBlock(
+      std::vector<std::unique_ptr<Transaction>>& txns);
+
+  void GeneateGraph(Proposal * proposal);
+  NodeInfo GetNodeInfo(const Proposal& proposal);
+  std::unique_ptr<Proposal> FetchProposal(const NodeInfo& info);
+  bool Ready() ;
+  void AddProposal(std::unique_ptr<Proposal> proposal);
+  void AddVote(const Proposal& proposal);
+  void GenerateVoteInfo(Proposal * proposal);
+int CurrentRound();
+bool MayReady(int round);
+
+  void CommitProposal(std::unique_ptr<Proposal> proposal);
+  void SetCommitCallBack(std::function<void(std::unique_ptr<Proposal>)> func) {
+    commit_func_ = func;
+  }
+
+  bool Ready(int round);
+  void CheckVote(int round);
+
+ private:
+  int32_t id_;
+  int f_;
+  int total_num_;
+  ProposalGraph* graph_;
+  int64_t local_proposal_id_ = 1, local_block_id_ = 1;
+  int round_;
+
+  std::map<int, NodeInfo > proposals_;
+  std::map<int, std::map<int, std::unique_ptr<ProposalInfo>> > 
proposals_rounds_;
+  std::map<int, int> vote_record_;
+  std::function<void(std::unique_ptr<Proposal>)> commit_func_;
+  std::map<int, int> vote_proposer_;
+
+  std::mutex mutex_;
+
+  Stats* global_stats_;
+};
+
+}  // namespace cassandra_cft
+}  // namespace resdb
diff --git 
a/platform/consensus/ordering/cassandra_cft/algorithm/proposal_state.h 
b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_state.h
new file mode 100644
index 00000000..ba33525e
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_state.h
@@ -0,0 +1,16 @@
+#pragma once
+
+namespace resdb {
+namespace cassandra_cft {
+
+enum ProposalState {
+  None = 0,
+  New = 1,
+  Voted = 2,
+  Prepared = 3,
+  PreCommit = 4,
+  Committed = 5,
+};
+
+}
+}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/ranking.cpp 
b/platform/consensus/ordering/cassandra_cft/algorithm/ranking.cpp
new file mode 100644
index 00000000..cf3871f5
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_cft/algorithm/ranking.cpp
@@ -0,0 +1,10 @@
+
+#include "platform/consensus/ordering/cassandra_cft/algorithm/ranking.h"
+
+namespace resdb {
+namespace cassandra_cft {
+
+int Ranking::GetRank(int proposer_id) { return proposer_id; }
+
+}  // namespace cassandra_cft
+}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/ranking.h 
b/platform/consensus/ordering/cassandra_cft/algorithm/ranking.h
new file mode 100644
index 00000000..925af9be
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_cft/algorithm/ranking.h
@@ -0,0 +1,12 @@
+#pragma once
+
+namespace resdb {
+namespace cassandra_cft {
+
+class Ranking {
+ public:
+  int GetRank(int proposer_id);
+};
+
+}  // namespace cassandra_cft
+}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra_cft/framework/BUILD 
b/platform/consensus/ordering/cassandra_cft/framework/BUILD
new file mode 100644
index 00000000..f9fcfe10
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_cft/framework/BUILD
@@ -0,0 +1,16 @@
+package(default_visibility = ["//visibility:private"])
+
+cc_library(
+    name = "consensus",
+    srcs = ["consensus.cpp"],
+    hdrs = ["consensus.h"],
+    visibility = [
+        "//visibility:public",
+    ],
+    deps = [
+        "//common/utils",
+        "//platform/consensus/ordering/common/framework:consensus",
+        "//platform/consensus/ordering/cassandra_cft/algorithm:cassandra",
+    ],
+)
+
diff --git a/platform/consensus/ordering/multipaxos/framework/consensus.cpp 
b/platform/consensus/ordering/cassandra_cft/framework/consensus.cpp
similarity index 60%
copy from platform/consensus/ordering/multipaxos/framework/consensus.cpp
copy to platform/consensus/ordering/cassandra_cft/framework/consensus.cpp
index b20d4e5e..de88e159 100644
--- a/platform/consensus/ordering/multipaxos/framework/consensus.cpp
+++ b/platform/consensus/ordering/cassandra_cft/framework/consensus.cpp
@@ -23,7 +23,7 @@
  *
  */
 
-#include "platform/consensus/ordering/multipaxos/framework/consensus.h"
+#include "platform/consensus/ordering/cassandra_cft/framework/consensus.h"
 
 #include <glog/logging.h>
 #include <unistd.h>
@@ -31,57 +31,49 @@
 #include "common/utils/utils.h"
 
 namespace resdb {
-namespace multipaxos {
+namespace cassandra_cft {
 
 Consensus::Consensus(const ResDBConfig& config,
                      std::unique_ptr<TransactionManager> executor)
-    : common::Consensus(config, std::move(executor)) {
-
-  Init();
-
+    : common::Consensus(config, std::move(executor)){
   int total_replicas = config_.GetReplicaNum();
   int f = (total_replicas - 1) / 3;
 
+  Init();
+
+  start_ = 0;
   if (config_.GetPublicKeyCertificateInfo()
           .public_key()
           .public_key_info()
           .type() != CertificateKeyInfo::CLIENT) {
-    multipaxos_= std::make_unique<MultiPaxos>(config_.GetSelfInfo().id(), f, 
total_replicas, GetSignatureVerifier());
-    InitProtocol(multipaxos_.get());
+    cassandra_cft_ = std::make_unique<Cassandra>(
+        config_.GetSelfInfo().id(), f,
+                                   total_replicas, 
config.GetConfigData().failure_mode());
+
+    InitProtocol(cassandra_cft_.get());
+
+
+    cassandra_cft_->SetPrepareFunction([&](const Transaction& msg) { 
+      return Prepare(msg); 
+    });
   }
 }
 
 int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) {
-  //LOG(ERROR)<<"recv request:"<<MessageType_Name(request->user_type());
-  //int64_t current_time = GetCurrentTime();
-
-  if(request->user_type() == MessageType::Propose) {
-    std::unique_ptr<Proposal> p = std::make_unique<Proposal>();
-    if (!p->ParseFromString(request->data())) {
+  //LOG(ERROR)<<"receive commit:"<<request->type()<<" 
"<<MessageType_Name(request->user_type());
+  if (request->user_type() == MessageType::NewProposal) {
+    // LOG(ERROR)<<"receive proposal:";
+    std::unique_ptr<Proposal> proposal = std::make_unique<Proposal>();
+    if (!proposal->ParseFromString(request->data())) {
       LOG(ERROR) << "parse proposal fail";
       assert(1 == 0);
       return -1;
     }
-    multipaxos_->ReceiveProposal(std::move(p));
-  }
-  else if(request->user_type() == MessageType::Query) {
-    std::unique_ptr<Proposal> p = std::make_unique<Proposal>();
-    if (!p->ParseFromString(request->data())) {
-      LOG(ERROR) << "parse proposal fail";
-      assert(1 == 0);
+    if (!cassandra_cft_->ReceiveProposal(std::move(proposal))) {
       return -1;
     }
-    //multipaxos_->ReceivePropose(std::move(p));
-  }
-  else if(request->user_type() == MessageType::Learn) {
-    std::unique_ptr<Proposal> p = std::make_unique<Proposal>();
-    if (!p->ParseFromString(request->data())) {
-      LOG(ERROR) << "parse proposal fail";
-      assert(1 == 0);
-      return -1;
-    }
-    multipaxos_->ReceiveLearn(std::move(p));
-  }
+    return 0;
+  } 
   return 0;
 }
 
@@ -90,8 +82,8 @@ int Consensus::ProcessNewTransaction(std::unique_ptr<Request> 
request) {
   txn->set_data(request->data());
   txn->set_hash(request->hash());
   txn->set_proxy_id(request->proxy_id());
-  txn->set_user_seq(request->user_seq());
-  return multipaxos_->ReceiveTransaction(std::move(txn));
+  //LOG(ERROR)<<"receive txn";
+  return cassandra_cft_->ReceiveTransaction(std::move(txn));
 }
 
 int Consensus::CommitMsg(const google::protobuf::Message& msg) {
@@ -99,15 +91,33 @@ int Consensus::CommitMsg(const google::protobuf::Message& 
msg) {
 }
 
 int Consensus::CommitMsgInternal(const Transaction& txn) {
-  //LOG(ERROR)<<"commit txn:"<<txn.id()<<" proxy id:"<<txn.proxy_id();
+  //LOG(ERROR)<<"commit txn:"<<txn.id()<<" proxy id:"<<txn.proxy_id()<<" 
uid:"<<txn.uid();
   std::unique_ptr<Request> request = std::make_unique<Request>();
+  request->set_queuing_time(txn.queuing_time());
   request->set_data(txn.data());
   request->set_seq(txn.id());
-  request->set_proxy_id(txn.proxy_id());
+  request->set_uid(txn.uid());
+  //if (txn.proposer_id() == config_.GetSelfInfo().id()) {
+    request->set_proxy_id(txn.proxy_id());
+   // LOG(ERROR)<<"commit txn:"<<txn.id()<<" proxy id:"<<request->uid();
+    //assert(request->uid()>0);
+  //}
+
   transaction_executor_->AddExecuteMessage(std::move(request));
   return 0;
 }
 
 
-}  // namespace fairdag
+int Consensus::Prepare(const Transaction& txn) {
+  // LOG(ERROR)<<"prepare txn:"<<txn.id()<<" proxy id:"<<txn.proxy_id()<<"
+  // uid:"<<txn.uid();
+  std::unique_ptr<Request> request = std::make_unique<Request>();
+  request->set_data(txn.data());
+  request->set_uid(txn.uid());
+  transaction_executor_->Prepare(std::move(request));
+  return 0;
+}
+
+
+}  // namespace cassandra_cft
 }  // namespace resdb
diff --git a/platform/consensus/ordering/multipaxos/framework/consensus.h 
b/platform/consensus/ordering/cassandra_cft/framework/consensus.h
similarity index 81%
copy from platform/consensus/ordering/multipaxos/framework/consensus.h
copy to platform/consensus/ordering/cassandra_cft/framework/consensus.h
index 4d08c3ac..956bda2b 100644
--- a/platform/consensus/ordering/multipaxos/framework/consensus.h
+++ b/platform/consensus/ordering/cassandra_cft/framework/consensus.h
@@ -26,28 +26,34 @@
 #pragma once
 
 #include "executor/common/transaction_manager.h"
-#include "platform/consensus/execution/transaction_executor.h"
-#include "platform/consensus/ordering/multipaxos/algorithm/multipaxos.h"
 #include "platform/consensus/ordering/common/framework/consensus.h"
+#include "platform/consensus/ordering/cassandra_cft/algorithm/cassandra.h"
 #include "platform/networkstrate/consensus_manager.h"
 
 namespace resdb {
-namespace multipaxos {
+namespace cassandra_cft {
 
-class Consensus : public common::Consensus{
+class Consensus : public common::Consensus {
  public:
   Consensus(const ResDBConfig& config,
             std::unique_ptr<TransactionManager> transaction_manager);
+  virtual ~Consensus() = default;
 
-  protected:
+ private:
   int ProcessCustomConsensus(std::unique_ptr<Request> request) override;
   int ProcessNewTransaction(std::unique_ptr<Request> request) override;
   int CommitMsg(const google::protobuf::Message& msg) override;
   int CommitMsgInternal(const Transaction& txn);
 
-  private:
-    std::unique_ptr<MultiPaxos> multipaxos_;
+  int Prepare(const Transaction& txn);
+
+ protected:
+  std::unique_ptr<Cassandra> cassandra_cft_;
+  Stats* global_stats_;
+  int64_t start_;
+  std::mutex mutex_;
+  int send_num_[200];
 };
 
-}  // namespace tusk
+}  // namespace cassandra_cft
 }  // namespace resdb
diff --git 
a/platform/consensus/ordering/cassandra_cft/framework/consensus_test.cpp 
b/platform/consensus/ordering/cassandra_cft/framework/consensus_test.cpp
new file mode 100644
index 00000000..2c8834a8
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_cft/framework/consensus_test.cpp
@@ -0,0 +1,179 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "platform/consensus/ordering/cassandra/framework/consensus.h"
+
+#include <glog/logging.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <future>
+
+#include "common/test/test_macros.h"
+#include "executor/common/mock_transaction_manager.h"
+#include "platform/config/resdb_config_utils.h"
+#include "platform/networkstrate/mock_replica_communicator.h"
+
+namespace resdb {
+namespace cassandra {
+namespace {
+
+using ::resdb::testing::EqualsProto;
+using ::testing::_;
+using ::testing::Invoke;
+using ::testing::Test;
+
+ResDBConfig GetConfig() {
+  ResDBConfig config({GenerateReplicaInfo(1, "127.0.0.1", 1234),
+                      GenerateReplicaInfo(2, "127.0.0.1", 1235),
+                      GenerateReplicaInfo(3, "127.0.0.1", 1236),
+                      GenerateReplicaInfo(4, "127.0.0.1", 1237)},
+                     GenerateReplicaInfo(1, "127.0.0.1", 1234));
+  return config;
+}
+
+class ConsensusTest : public Test {
+ public:
+  ConsensusTest() : config_(GetConfig()) {
+    auto transaction_manager =
+        std::make_unique<MockTransactionExecutorDataImpl>();
+    mock_transaction_manager_ = transaction_manager.get();
+    consensus_ =
+        std::make_unique<Consensus>(config_, std::move(transaction_manager));
+    consensus_->SetCommunicator(&replica_communicator_);
+  }
+
+  void AddTransaction(const std::string& data) {
+    auto request = std::make_unique<Request>();
+    request->set_type(Request::TYPE_NEW_TXNS);
+
+    Transaction txn;
+
+    BatchUserRequest batch_request;
+    auto req = batch_request.add_user_requests();
+    req->mutable_request()->set_data(data);
+
+    batch_request.set_local_id(1);
+    batch_request.SerializeToString(txn.mutable_data());
+
+    txn.SerializeToString(request->mutable_data());
+
+    EXPECT_EQ(consensus_->ConsensusCommit(nullptr, std::move(request)), 0);
+  }
+
+ protected:
+  ResDBConfig config_;
+  MockTransactionExecutorDataImpl* mock_transaction_manager_;
+  MockReplicaCommunicator replica_communicator_;
+  std::unique_ptr<TransactionManager> transaction_manager_;
+  std::unique_ptr<Consensus> consensus_;
+};
+
+TEST_F(ConsensusTest, NormalCase) {
+  std::promise<bool> commit_done;
+  std::future<bool> commit_done_future = commit_done.get_future();
+
+  EXPECT_CALL(replica_communicator_, BroadCast)
+      .WillRepeatedly(Invoke([&](const google::protobuf::Message& msg) {
+        Request request = *dynamic_cast<const Request*>(&msg);
+
+        if (request.user_type() == MessageType::NewProposal) {
+          LOG(ERROR) << "bc new proposal";
+          consensus_->ConsensusCommit(nullptr,
+                                      std::make_unique<Request>(request));
+          LOG(ERROR) << "recv proposal done";
+        }
+        if (request.user_type() == MessageType::Vote) {
+          LOG(ERROR) << "bc vote";
+
+          VoteMessage ack_msg;
+          assert(ack_msg.ParseFromString(request.data()));
+          for (int i = 1; i <= 3; ++i) {
+            ack_msg.set_proposer_id(i);
+            auto new_req = std::make_unique<Request>(request);
+            ack_msg.SerializeToString(new_req->mutable_data());
+
+            consensus_->ConsensusCommit(nullptr, std::move(new_req));
+          }
+        }
+        // LOG(ERROR)<<"bc type:"<<request->type()<<" user
+        // type:"<<request->user_type();
+        if (request.user_type() == MessageType::Prepare) {
+          LOG(ERROR) << "bc prepare";
+
+          VoteMessage ack_msg;
+          assert(ack_msg.ParseFromString(request.data()));
+          for (int i = 1; i <= 3; ++i) {
+            ack_msg.set_proposer_id(i);
+            auto new_req = std::make_unique<Request>(request);
+            ack_msg.SerializeToString(new_req->mutable_data());
+
+            consensus_->ConsensusCommit(nullptr, std::move(new_req));
+          }
+        }
+        if (request.user_type() == MessageType::Voteprep) {
+          LOG(ERROR) << "bc voterep:";
+
+          VoteMessage ack_msg;
+          assert(ack_msg.ParseFromString(request.data()));
+          for (int i = 1; i <= 3; ++i) {
+            ack_msg.set_proposer_id(i);
+            auto new_req = std::make_unique<Request>(request);
+            ack_msg.SerializeToString(new_req->mutable_data());
+            LOG(ERROR) << "new request type:" << new_req->user_type();
+
+            consensus_->ConsensusCommit(nullptr, std::move(new_req));
+          }
+        }
+        LOG(ERROR) << "done";
+        return 0;
+      }));
+
+  EXPECT_CALL(*mock_transaction_manager_, ExecuteData)
+      .WillOnce(Invoke([&](const std::string& msg) {
+        LOG(ERROR) << "execute txn:" << msg;
+        EXPECT_EQ(msg, "transaction1");
+        return nullptr;
+      }));
+
+  EXPECT_CALL(replica_communicator_, SendMessage(_, 0))
+      .WillRepeatedly(
+          Invoke([&](const google::protobuf::Message& msg, int64_t) {
+            Request request = *dynamic_cast<const Request*>(&msg);
+            if (request.type() == Request::TYPE_RESPONSE) {
+              LOG(ERROR) << "get response";
+              commit_done.set_value(true);
+            }
+            return;
+          }));
+
+  AddTransaction("transaction1");
+
+  commit_done_future.get();
+}
+
+}  // namespace
+}  // namespace cassandra
+}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra_cft/proto/BUILD 
b/platform/consensus/ordering/cassandra_cft/proto/BUILD
new file mode 100644
index 00000000..87992193
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_cft/proto/BUILD
@@ -0,0 +1,16 @@
+package(default_visibility = 
["//platform/consensus/ordering/cassandra_cft:__subpackages__"])
+
+load("@rules_cc//cc:defs.bzl", "cc_proto_library")
+load("@rules_proto//proto:defs.bzl", "proto_library")
+load("@rules_proto_grpc//python:defs.bzl", "python_proto_library")
+
+proto_library(
+    name = "proposal_proto",
+    srcs = ["proposal.proto"],
+    #visibility = ["//visibility:public"],
+)
+
+cc_proto_library(
+    name = "proposal_cc_proto",
+    deps = [":proposal_proto"],
+)
diff --git a/platform/consensus/ordering/cassandra_cft/proto/proposal.proto 
b/platform/consensus/ordering/cassandra_cft/proto/proposal.proto
new file mode 100644
index 00000000..81e2ce79
--- /dev/null
+++ b/platform/consensus/ordering/cassandra_cft/proto/proposal.proto
@@ -0,0 +1,124 @@
+
+syntax = "proto3";
+
+package resdb.cassandra_cft;
+
+message Transaction{
+  int32 id = 1;
+  bytes data = 2;
+  bytes hash = 3;
+  int32 proxy_id = 4;
+  int32 proposer_id = 5;
+  int64 uid = 6;
+  int64 create_time = 7;
+  int64 queuing_time = 8;
+}
+
+message Header {
+  bytes hash = 1;
+  int32 height = 2;
+  int32 proposer_id = 3;
+  int32 proposal_id = 4;
+  bytes prehash = 5;
+}
+
+message History {
+  bytes hash = 1;
+  int32 state = 2;
+  int32 sender = 3;
+  int32 id = 4;
+}
+
+message WeakProposal {
+  repeated bytes hash = 1;
+}
+
+message Proposal {
+  Header header = 1;
+  repeated Transaction transactions = 2;
+  uint64 create_time = 3;
+  repeated NodeInfo node_info = 4;
+  repeated History history = 5;
+  int32 sender = 6;
+  NodeInfo vote = 7;
+};
+
+enum MessageType {
+  NewProposal = 0;
+  Vote = 1;
+  Prepare = 2;
+  VoteAck = 3;
+  Commit = 4;
+  Recovery = 5;
+  NewBlocks = 6;
+  ProposalAck = 7;
+  CMD_BlockACK = 8;
+  CMD_BlockQuery = 9;
+  CMD_SingleBlock = 10;
+  CMD_ProposalQuery = 11;
+  CMD_ProposalQueryResponse = 12;
+};
+
+message VoteMessage {
+  bytes hash = 1;
+  int32 proposer_id = 2;
+  MessageType type = 3;
+  int32 sender_id = 4;
+  int32 proposal_id = 5;
+};
+
+message CommittedProposals{
+  repeated Proposal proposals = 1;
+  int32 sender_id = 2;
+};
+
+message HashValue{
+       repeated uint64 bits = 1;
+};
+
+message Block {
+  message BlockData { 
+    repeated Transaction transaction = 1;
+  }
+  BlockData data = 1;
+  bytes hash = 2;
+  int32 sender_id = 3;
+  int32 local_id = 4;
+  int64 create_time = 5;
+}
+
+message BlockACK {
+  bytes hash = 2;
+  int32 sender_id = 3;
+  int32 local_id = 4;
+  int32 responder = 5;
+}
+
+message BlockQuery {
+  bytes hash = 2;
+  int32 proposer = 3;
+  int32 local_id = 4;
+  int32 sender = 5;
+}
+
+message ProposalQuery {
+  bytes hash = 2;
+  int32 proposer = 3;
+  int32 id = 4;
+  int32 sender = 5;
+}
+
+message ProposalQueryResp {
+  repeated Proposal proposal = 1;
+}
+
+message ProposalResponse {
+  Header header = 1;
+  int32 leader = 2;
+}
+
+
+message NodeInfo {
+  int32 round = 1;
+  int32 proposer = 2;
+}
diff --git 
a/platform/consensus/ordering/common/framework/performance_manager.cpp 
b/platform/consensus/ordering/common/framework/performance_manager.cpp
index 883c6ea7..a5fc7ca8 100644
--- a/platform/consensus/ordering/common/framework/performance_manager.cpp
+++ b/platform/consensus/ordering/common/framework/performance_manager.cpp
@@ -44,6 +44,7 @@ PerformanceManager::PerformanceManager(
   stop_ = false;
   eval_started_ = false;
   eval_ready_future_ = eval_ready_promise_.get_future();
+  LOG(ERROR)<<"?????";
   if (config_.GetPublicKeyCertificateInfo()
           .public_key()
           .public_key_info()
@@ -53,6 +54,7 @@ PerformanceManager::PerformanceManager(
           std::thread(&PerformanceManager::BatchProposeMsg, this);
     }
   }
+  LOG(ERROR)<<"?????";
   global_stats_ = Stats::GetGlobalStats();
   send_num_ = 0;
   total_num_ = 0;
@@ -62,6 +64,7 @@ PerformanceManager::PerformanceManager(
   if (primary_ == 0) primary_ = replica_num_;
   local_id_ = 1;
   sum_ = 0;
+  LOG(ERROR)<<"?????";
 }
 
 PerformanceManager::~PerformanceManager() {
diff --git a/platform/consensus/ordering/multipaxos/algorithm/multipaxos.cpp 
b/platform/consensus/ordering/multipaxos/algorithm/multipaxos.cpp
index a7966898..aab869e5 100644
--- a/platform/consensus/ordering/multipaxos/algorithm/multipaxos.cpp
+++ b/platform/consensus/ordering/multipaxos/algorithm/multipaxos.cpp
@@ -16,9 +16,11 @@ MultiPaxos::MultiPaxos(int id, int f, int total_num, 
SignatureVerifier * verifie
     send_thread_ = std::thread(&MultiPaxos::AsyncSend, this);
     commit_thread_ = std::thread(&MultiPaxos::AsyncCommit, this);
     commit_seq_thread_ = std::thread(&MultiPaxos::AsyncCommitSeq, this);
-    batch_size_ = 10;
+    learn_thread_ = std::thread(&MultiPaxos::AsyncLearn, this);
+    batch_size_ = 15;
     start_seq_ = 1;
-  }
+    master_ = 1;
+}
 
 MultiPaxos::~MultiPaxos() {
 }
@@ -91,15 +93,23 @@ void MultiPaxos::AsyncCommit() {
     int proposer = p->header().proposer();
     int round = p->header().view();
     std::unique_ptr<Proposal> data = nullptr;
-    {
+    while(data == nullptr) {
       std::unique_lock<std::mutex> lk(mutex_);
       auto it = receive_.find(round);
+      if(it == receive_.end()){
+        //LOG(ERROR)<<" proposer:"<<proposer<<" round:"<<round<<" not exist";
+        usleep(100);
+        continue;
+      }
       assert(it != receive_.end());
       auto dit = it->second.find(proposer);
+      if(dit == it->second.end()){
+        continue;
+      }
       assert(dit != it->second.end());
       data = std::move(dit->second);
-      assert(data != nullptr);
     }
+    assert(data != nullptr);
     AddCommitData(std::move(data));
   }
 }
@@ -111,10 +121,10 @@ void MultiPaxos::AsyncCommitSeq() {
       break;
     }
     std::unique_ptr<Proposal> data = GetCommitData();
-    int proposer = data->header().proposer();
-    int round = data->header().view();
-    int proposal_seq = data->header().proposal_id();
-    LOG(ERROR)<<" commit proposer:"<<proposer<< " round:"<<round<<" 
seq:"<<proposal_seq;
+    //int proposer = data->header().proposer();
+    //int round = data->header().view();
+    //int proposal_seq = data->header().proposal_id();
+    //LOG(ERROR)<<" commit proposer:"<<proposer<< " round:"<<round<<" 
seq:"<<proposal_seq;
     for(Transaction& txn : *data->mutable_transactions()){
       txn.set_id(seq++);
       Commit(txn);
@@ -122,7 +132,34 @@ void MultiPaxos::AsyncCommitSeq() {
   }
 }
 
+void MultiPaxos::AsyncLearn() {
+  int seq = 1;
+  while (!IsStop()) {
+    auto proposal = learn_q_.Pop();
+    if(proposal == nullptr){
+      continue;
+    }
+
+    int round = proposal->header().view();
+    int sender = proposal->sender();
+    int proposer = proposal->header().proposer();
+
+    std::unique_lock<std::mutex> lk(learn_mutex_);
+    learn_receive_[round].insert(sender);
+    //LOG(ERROR)<<"RECEIVE proposer learn:"<<round<<" 
size:"<<learn_receive_[round].size()<<" proposer:"<<proposer<<" 
sender:"<<sender;
+    if(learn_receive_[round].size() == f_+1){
+      CommitProposal(std::move(proposal));
+    }
+
+  }
+}
+
+
 bool MultiPaxos::ReceiveTransaction(std::unique_ptr<Transaction> txn) {
+  if(id_ != master_) {
+    SendMessage(MessageType::Redirect, *txn, master_);
+    return true;
+  }
   txn->set_proposer(id_);
   txns_.Push(std::move(txn));
   return true;
@@ -135,8 +172,8 @@ bool MultiPaxos::ReceiveProposal(std::unique_ptr<Proposal> 
proposal) {
 
   bool done = false;
   {
-    //LOG(ERROR)<<"recv proposal from:"<<proposer<<" round:"<<round<<" 
seq:"<<seq;
     std::unique_lock<std::mutex> lk(mutex_);
+    //LOG(ERROR)<<"recv proposal from:"<<proposer<<" round:"<<round<<" 
seq:"<<seq;
     receive_[round][proposer] = std::move(proposal);
     done = true;
   }
@@ -152,16 +189,7 @@ bool MultiPaxos::ReceiveProposal(std::unique_ptr<Proposal> 
proposal) {
 }
 
 bool MultiPaxos::ReceiveLearn(std::unique_ptr<Proposal> proposal) {
-  int round = proposal->header().view();
-  int sender = proposal->sender();
-  int proposer = proposal->header().proposer();
-
-  std::unique_lock<std::mutex> lk(learn_mutex_);
-  learn_receive_[round].insert(sender);
-  LOG(ERROR)<<"RECEIVE proposer learn:"<<round<<" 
size:"<<learn_receive_[round].size()<<" proposer:"<<proposer;
-  if(learn_receive_[round].size() == f_+1){
-    CommitProposal(std::move(proposal));
-  }
+  learn_q_.Push(std::move(proposal));
   return true;
 }
 
diff --git a/platform/consensus/ordering/multipaxos/algorithm/multipaxos.h 
b/platform/consensus/ordering/multipaxos/algorithm/multipaxos.h
index 6ade5312..e9838593 100644
--- a/platform/consensus/ordering/multipaxos/algorithm/multipaxos.h
+++ b/platform/consensus/ordering/multipaxos/algorithm/multipaxos.h
@@ -26,6 +26,7 @@ class MultiPaxos: public common::ProtocolBase {
     void AsyncSend();
     void AsyncCommit();
     void AsyncCommitSeq();
+    void AsyncLearn();
 
     void CommitProposal(std::unique_ptr<Proposal> p);
 
@@ -36,11 +37,11 @@ class MultiPaxos: public common::ProtocolBase {
 
  private:
   LockFreeQueue<Transaction> txns_;
-  LockFreeQueue<Proposal> commit_q_;
+  LockFreeQueue<Proposal> commit_q_, learn_q_;
 
   std::unique_ptr<ProposalManager> proposal_manager_;
 
-  std::thread send_thread_, commit_thread_, commit_seq_thread_;
+  std::thread send_thread_, commit_thread_, commit_seq_thread_, learn_thread_;
 
   int batch_size_;
 
@@ -54,6 +55,7 @@ class MultiPaxos: public common::ProtocolBase {
   std::map<int, std::unique_ptr<Proposal> > commit_data_;
   std::condition_variable vote_cv_;
   int start_seq_;
+  int master_;
 };
 
 }  // namespace tusk
diff --git 
a/platform/consensus/ordering/multipaxos/algorithm/proposal_manager.cpp 
b/platform/consensus/ordering/multipaxos/algorithm/proposal_manager.cpp
index 1a55f8db..2dd3e410 100644
--- a/platform/consensus/ordering/multipaxos/algorithm/proposal_manager.cpp
+++ b/platform/consensus/ordering/multipaxos/algorithm/proposal_manager.cpp
@@ -11,7 +11,6 @@ ProposalManager::ProposalManager(int32_t id, int limit_count, 
SignatureVerifier*
     : id_(id) {
     round_ = 1;
     seq_ = 1;
-    assert(verifier_ != nullptr);
 }
 
 std::unique_ptr<Proposal> ProposalManager::GenerateProposal(
diff --git a/platform/consensus/ordering/multipaxos/framework/consensus.cpp 
b/platform/consensus/ordering/multipaxos/framework/consensus.cpp
index b20d4e5e..f2442587 100644
--- a/platform/consensus/ordering/multipaxos/framework/consensus.cpp
+++ b/platform/consensus/ordering/multipaxos/framework/consensus.cpp
@@ -38,6 +38,7 @@ Consensus::Consensus(const ResDBConfig& config,
     : common::Consensus(config, std::move(executor)) {
 
   Init();
+  failure_mode_ = config.GetConfigData().failure_mode();
 
   int total_replicas = config_.GetReplicaNum();
   int f = (total_replicas - 1) / 3;
@@ -54,6 +55,10 @@ Consensus::Consensus(const ResDBConfig& config,
 int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) {
   //LOG(ERROR)<<"recv request:"<<MessageType_Name(request->user_type());
   //int64_t current_time = GetCurrentTime();
+  //LOG(ERROR)<<" failure more:"<<failure_mode_;
+  if(failure_mode_ && config_.GetSelfInfo().id() == 2) {
+    return 0;
+  }
 
   if(request->user_type() == MessageType::Propose) {
     std::unique_ptr<Proposal> p = std::make_unique<Proposal>();
@@ -64,6 +69,16 @@ int 
Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) {
     }
     multipaxos_->ReceiveProposal(std::move(p));
   }
+  else if(request->user_type() == MessageType::Redirect) {
+    std::unique_ptr<Transaction> p = std::make_unique<Transaction>();
+    if (!p->ParseFromString(request->data())) {
+      LOG(ERROR) << "parse proposal fail";
+      assert(1 == 0);
+      return -1;
+    }
+    multipaxos_->ReceiveTransaction(std::move(p));
+
+  }
   else if(request->user_type() == MessageType::Query) {
     std::unique_ptr<Proposal> p = std::make_unique<Proposal>();
     if (!p->ParseFromString(request->data())) {
diff --git a/platform/consensus/ordering/multipaxos/framework/consensus.h 
b/platform/consensus/ordering/multipaxos/framework/consensus.h
index 4d08c3ac..f47a275c 100644
--- a/platform/consensus/ordering/multipaxos/framework/consensus.h
+++ b/platform/consensus/ordering/multipaxos/framework/consensus.h
@@ -47,6 +47,7 @@ class Consensus : public common::Consensus{
 
   private:
     std::unique_ptr<MultiPaxos> multipaxos_;
+    bool failure_mode_ = false;
 };
 
 }  // namespace tusk
diff --git a/platform/consensus/ordering/multipaxos/proto/proposal.proto 
b/platform/consensus/ordering/multipaxos/proto/proposal.proto
index 83875513..1b924e7f 100644
--- a/platform/consensus/ordering/multipaxos/proto/proposal.proto
+++ b/platform/consensus/ordering/multipaxos/proto/proposal.proto
@@ -31,5 +31,6 @@ enum MessageType {
   Propose = 1;
   Learn = 2;
   Query = 3;
+  Redirect = 4;
 };
 
diff --git a/platform/networkstrate/consensus_manager.cpp 
b/platform/networkstrate/consensus_manager.cpp
index 26725cbf..fa9acbab 100644
--- a/platform/networkstrate/consensus_manager.cpp
+++ b/platform/networkstrate/consensus_manager.cpp
@@ -138,7 +138,7 @@ void ConsensusManager::HeartBeat() {
       if (config_.IsTestMode()) {
         sleep_time = 1;
       } else {
-        sleep_time = 30;
+        sleep_time = 60 * 2;
       }
     }
   }
diff --git a/platform/networkstrate/replica_communicator.cpp 
b/platform/networkstrate/replica_communicator.cpp
index 6b7f06e6..056e82e2 100644
--- a/platform/networkstrate/replica_communicator.cpp
+++ b/platform/networkstrate/replica_communicator.cpp
@@ -34,7 +34,8 @@ ReplicaCommunicator::ReplicaCommunicator(
       verifier_(verifier),
       is_running_(false),
       batch_queue_("bc_batch", tcp_batch),
-      is_use_long_conn_(is_use_long_conn) {
+      is_use_long_conn_(is_use_long_conn),
+      tcp_batch_(tcp_batch) {
   global_stats_ = Stats::GetGlobalStats();
   if (is_use_long_conn_) {
     worker_ = std::make_unique<boost::asio::io_service::work>(io_service_);
@@ -45,6 +46,7 @@ ReplicaCommunicator::ReplicaCommunicator(
   LOG(ERROR)<<" tcp batch:"<<tcp_batch;
 
   StartBroadcastInBackGround();
+
 }
 
 ReplicaCommunicator::~ReplicaCommunicator() {
@@ -122,6 +124,78 @@ void ReplicaCommunicator::StartBroadcastInBackGround() {
   });
 }
 
+void ReplicaCommunicator::StartSingleInBackGround(const std::string& ip, int 
port) {
+  single_bq_[std::make_pair(ip,port)] = 
std::make_unique<BatchQueue<std::unique_ptr<QueueItem>>>("s_batch", tcp_batch_);
+
+  ReplicaInfo replica_info;
+  for (const auto& replica : replicas_) {
+    if (replica.ip() == ip) {
+      replica_info = replica;
+      break;
+    }
+  }
+
+  if (replica_info.ip().empty()) {
+    for (const auto& replica : GetClientReplicas()) {
+      if (replica.ip() == ip) {
+        replica_info = replica;
+        break;
+      }
+    }
+  }
+
+
+  
single_thread_.push_back(std::thread([&](BatchQueue<std::unique_ptr<QueueItem>> 
*bq, ReplicaInfo replica_info) {
+    while (IsRunning()) {
+      std::vector<std::unique_ptr<QueueItem>> batch_req =
+          bq->Pop(50000);
+      if (batch_req.empty()) {
+        continue;
+      }
+      BroadcastData broadcast_data;
+      for (auto& queue_item : batch_req) {
+        broadcast_data.add_data()->swap(queue_item->data);
+      }
+
+      global_stats_->SendBroadCastMsg(broadcast_data.data_size());
+      //LOG(ERROR)<<" send to ip:"<<replica_info.ip()<<" 
port:"<<replica_info.port()<<" bq size:"<<batch_req.size();
+      int ret = SendMessageFromPool(broadcast_data, {replica_info});
+      if (ret < 0) {
+        LOG(ERROR) << "broadcast request fail:";
+      }
+      //LOG(ERROR)<<" send to ip:"<<replica_info.ip()<<" 
port:"<<replica_info.port()<<" bq size:"<<batch_req.size()<<" done";
+    }
+  }, single_bq_[std::make_pair(ip,port)].get(), replica_info));
+}
+
+
+int ReplicaCommunicator::SendSingleMessage(const google::protobuf::Message& 
message, 
+const ReplicaInfo& replica_info) {
+
+  std::string ip = replica_info.ip();
+  int port = replica_info.port();
+
+
+
+
+    //LOG(ERROR)<<" send msg ip:"<<ip<<" port:"<<port;
+  global_stats_->BroadCastMsg();
+  if (is_use_long_conn_) {
+    auto item = std::make_unique<QueueItem>();
+    item->data = NetChannel::GetRawMessageString(message, verifier_);
+    std::lock_guard<std::mutex> lk(smutex_);
+    if(single_bq_.find(std::make_pair(ip, port)) == single_bq_.end()){
+      StartSingleInBackGround(ip, port);
+    }
+    assert(single_bq_[std::make_pair(ip, port)] != nullptr);
+    single_bq_[std::make_pair(ip, port)]->Push(std::move(item));
+    return 0;
+  } else {
+    LOG(ERROR) << "send internal";
+    return SendMessageInternal(message, replicas_);
+  }
+}
+
 int ReplicaCommunicator::SendMessage(const google::protobuf::Message& message) 
{
   global_stats_->BroadCastMsg();
   if (is_use_long_conn_) {
@@ -137,6 +211,8 @@ int ReplicaCommunicator::SendMessage(const 
google::protobuf::Message& message) {
 
 int ReplicaCommunicator::SendMessage(const google::protobuf::Message& message,
                                      const ReplicaInfo& replica_info) {
+  return SendSingleMessage(message, replica_info);
+
   if (is_use_long_conn_) {
     std::string data = NetChannel::GetRawMessageString(message, verifier_);
     BroadcastData broadcast_data;
@@ -215,6 +291,7 @@ AsyncReplicaClient* ReplicaCommunicator::GetClientFromPool(
     auto client = std::make_unique<AsyncReplicaClient>(
         &io_service_, ip, port + (is_use_long_conn_ ? 10000 : 0), true);
     client_pools_[std::make_pair(ip, port)] = std::move(client);
+    //StartSingleInBackGround(ip, port);
   }
   return client_pools_[std::make_pair(ip, port)].get();
 }
diff --git a/platform/networkstrate/replica_communicator.h 
b/platform/networkstrate/replica_communicator.h
index 87994770..239e4d21 100644
--- a/platform/networkstrate/replica_communicator.h
+++ b/platform/networkstrate/replica_communicator.h
@@ -73,6 +73,11 @@ class ReplicaCommunicator {
   bool IsRunning() const;
   bool IsInPool(const ReplicaInfo& replica_info);
 
+  void StartSingleInBackGround(const std::string& ip, int port);
+
+  int SendSingleMessage(const google::protobuf::Message& message, 
+      const ReplicaInfo& replica_info);
+
  private:
   std::vector<ReplicaInfo> replicas_;
   SignatureVerifier* verifier_;
@@ -93,6 +98,14 @@ class ReplicaCommunicator {
   std::vector<std::thread> worker_threads_;
   std::vector<ReplicaInfo> clients_;
   std::mutex mutex_;
+  
+
+
+  std::map<std::pair<std::string, int>, 
+    std::unique_ptr<BatchQueue<std::unique_ptr<QueueItem>>>> single_bq_;
+  std::vector<std::thread> single_thread_;
+  int tcp_batch_;
+  std::mutex smutex_;
 };
 
 }  // namespace resdb
diff --git a/platform/proto/replica_info.proto 
b/platform/proto/replica_info.proto
index 9c81dade..aeda4643 100644
--- a/platform/proto/replica_info.proto
+++ b/platform/proto/replica_info.proto
@@ -45,6 +45,7 @@ message ResConfigData{
   optional int32 max_client_complaint_num = 21;
 
   optional int32 duplicate_check_frequency_useconds = 22;
+  optional bool failure_mode = 23;
 }
 
 message ReplicaStates {
diff --git a/scripts/deploy/config/multipaxos.config 
b/scripts/deploy/config/cassandra_cft.config
similarity index 60%
copy from scripts/deploy/config/multipaxos.config
copy to scripts/deploy/config/cassandra_cft.config
index 1ac9fe46..c63d58a7 100644
--- a/scripts/deploy/config/multipaxos.config
+++ b/scripts/deploy/config/cassandra_cft.config
@@ -3,8 +3,9 @@
   "enable_viewchange": false,
   "recovery_enabled": false,
   "max_client_complaint_num":10,
-  "max_process_txn": 256,
-  "worker_num": 30,
+  "max_process_txn": 64,
+  "worker_num": 20,
   "input_worker_num": 1,
-  "output_worker_num": 10
+  "output_worker_num": 5,
+  "failure_mode" : false
 }
diff --git a/scripts/deploy/config/kv_performance_server_16.conf 
b/scripts/deploy/config/kv_performance_server_16.conf
index 981ff4eb..1f6a31a0 100644
--- a/scripts/deploy/config/kv_performance_server_16.conf
+++ b/scripts/deploy/config/kv_performance_server_16.conf
@@ -1,36 +1,36 @@
 iplist=(
-172.31.29.163
-172.31.22.39
-172.31.16.32
-172.31.27.98
-172.31.23.49
-172.31.18.178
-172.31.25.109
-172.31.17.109
-172.31.18.116
-172.31.25.117
-172.31.28.50
-172.31.24.115
-172.31.17.251
-172.31.17.252
-172.31.27.122
-172.31.26.123
-172.31.20.132
-172.31.20.70
-172.31.24.255
-172.31.25.132
-172.31.24.15
-172.31.20.16
-172.31.26.76
-172.31.18.15
-172.31.29.23
-172.31.19.215
-172.31.19.17
-172.31.31.149
-172.31.17.92
-172.31.31.159
-172.31.26.155
-172.31.24.155
+172.31.45.163
+172.31.32.226
+172.31.41.104
+172.31.47.100
+172.31.42.174
+172.31.36.105
+172.31.47.113
+172.31.45.112
+172.31.42.121
+172.31.47.116
+172.31.45.187
+172.31.45.250
+172.31.47.192
+172.31.40.62
+172.31.41.198
+172.31.45.196
+172.31.37.160
+172.31.33.9
+172.31.37.136
+172.31.44.139
+172.31.36.138
+172.31.40.211
+172.31.36.209
+172.31.36.21
+172.31.38.85
+172.31.38.151
+172.31.32.87
+172.31.39.153
+172.31.42.217
+172.31.47.218
+172.31.34.90
+172.31.36.92
 )
 
 key=~/.ssh/junchao.pem
diff --git a/scripts/deploy/config/kv_performance_server_32.conf 
b/scripts/deploy/config/kv_performance_server_32.conf
index aab16fae..7a3ee111 100644
--- a/scripts/deploy/config/kv_performance_server_32.conf
+++ b/scripts/deploy/config/kv_performance_server_32.conf
@@ -1,68 +1,68 @@
 iplist=(
-172.31.31.59
-172.31.19.187
-172.31.28.180
-172.31.27.51
-172.31.20.3
-172.31.24.2
-172.31.29.63
-172.31.18.59
-172.31.26.174
-172.31.28.43
-172.31.27.40
-172.31.27.36
-172.31.20.179
-172.31.17.176
-172.31.27.48
-172.31.31.174
-172.31.20.20
-172.31.27.19
-172.31.17.17
-172.31.16.16
-172.31.23.31
-172.31.21.158
-172.31.24.152
-172.31.18.23
-172.31.16.11
-172.31.27.137
-172.31.26.6
-172.31.24.133
-172.31.20.14
-172.31.17.13
-172.31.18.141
-172.31.24.11
-172.31.30.254
-172.31.16.252
-172.31.29.120
-172.31.22.248
-172.31.28.200
-172.31.26.71
-172.31.24.197
-172.31.26.255
-172.31.24.233
-172.31.19.227
-172.31.28.98
-172.31.18.224
-172.31.19.119
-172.31.31.113
-172.31.25.111
-172.31.26.233
-172.31.17.218
-172.31.26.89
-172.31.16.87
-172.31.18.87
-172.31.26.34
-172.31.27.160
-172.31.24.93
-172.31.24.92
-172.31.18.83
-172.31.29.210
-172.31.30.76
-172.31.19.73
-172.31.21.87
-172.31.28.213
-172.31.23.211
-172.31.16.211
+172.31.45.163
+172.31.32.226
+172.31.41.104
+172.31.47.100
+172.31.42.174
+172.31.36.105
+172.31.47.113
+172.31.45.112
+172.31.42.121
+172.31.47.116
+172.31.45.187
+172.31.45.250
+172.31.47.192
+172.31.40.62
+172.31.41.198
+172.31.45.196
+172.31.37.160
+172.31.33.9
+172.31.37.136
+172.31.44.139
+172.31.36.138
+172.31.40.211
+172.31.36.209
+172.31.36.21
+172.31.38.85
+172.31.38.151
+172.31.32.87
+172.31.39.153
+172.31.42.217
+172.31.47.218
+172.31.34.90
+172.31.36.92
+172.31.44.19
+172.31.44.89
+172.31.46.181
+172.31.41.184
+172.31.46.177
+172.31.38.180
+172.31.35.64
+172.31.38.192
+172.31.32.187
+172.31.38.190
+172.31.32.133
+172.31.33.74
+172.31.33.1
+172.31.42.130
+172.31.35.144
+172.31.33.210
+172.31.39.11
+172.31.46.79
+172.31.40.224
+172.31.39.225
+172.31.47.227
+172.31.37.166
+172.31.46.98
+172.31.44.163
+172.31.32.231
+172.31.35.169
+172.31.40.166
+172.31.45.102
+172.31.44.172
+172.31.46.173
+172.31.45.171
+172.31.46.235
 )
 
 key=~/.ssh/junchao.pem
diff --git a/scripts/deploy/config/kv_performance_server_4.conf 
b/scripts/deploy/config/kv_performance_server_4.conf
new file mode 100644
index 00000000..c03f863c
--- /dev/null
+++ b/scripts/deploy/config/kv_performance_server_4.conf
@@ -0,0 +1,13 @@
+iplist=(
+172.31.33.62
+172.31.35.77
+172.31.40.74
+172.31.39.30
+172.31.44.157
+172.31.34.48
+172.31.39.186
+172.31.46.4
+)
+
+key=~/.ssh/junchao.pem
+client_num=4
diff --git a/scripts/deploy/config/kv_performance_server_48.conf 
b/scripts/deploy/config/kv_performance_server_48.conf
new file mode 100644
index 00000000..335ea701
--- /dev/null
+++ b/scripts/deploy/config/kv_performance_server_48.conf
@@ -0,0 +1,101 @@
+iplist=(
+172.31.45.163
+172.31.32.226
+172.31.41.104
+172.31.47.100
+172.31.42.174
+172.31.36.105
+172.31.47.113
+172.31.45.112
+172.31.42.121
+172.31.47.116
+172.31.45.187
+172.31.45.250
+172.31.47.192
+172.31.40.62
+172.31.41.198
+172.31.45.196
+172.31.37.160
+172.31.33.9
+172.31.37.136
+172.31.44.139
+172.31.36.138
+172.31.40.211
+172.31.36.209
+172.31.36.21
+172.31.38.85
+172.31.38.151
+172.31.32.87
+172.31.39.153
+172.31.42.217
+172.31.47.218
+172.31.34.90
+172.31.36.92
+172.31.44.96
+172.31.47.254
+172.31.40.126
+172.31.39.65
+172.31.45.195
+172.31.35.112
+172.31.47.242
+172.31.45.117
+172.31.32.119
+172.31.35.234
+172.31.45.106
+172.31.41.110
+172.31.42.238
+172.31.34.97
+172.31.35.97
+172.31.39.97
+172.31.34.99
+172.31.41.40
+172.31.44.168
+172.31.38.45
+172.31.40.48
+172.31.34.221
+172.31.33.222
+172.31.37.34
+172.31.34.34
+172.31.35.202
+172.31.37.85
+172.31.37.215
+172.31.36.217
+172.31.40.199
+172.31.36.201
+172.31.34.202
+172.31.41.202
+172.31.37.0
+172.31.46.129
+172.31.32.131
+172.31.46.5
+172.31.47.188
+172.31.34.189
+172.31.40.61
+172.31.47.190
+172.31.42.181
+172.31.37.56
+172.31.44.60
+172.31.42.60
+172.31.47.50
+172.31.36.51
+172.31.34.180
+172.31.42.180
+172.31.46.28
+172.31.46.156
+172.31.47.158
+172.31.34.22
+172.31.39.22
+172.31.38.153
+172.31.43.156
+172.31.44.9
+172.31.38.141
+172.31.40.17
+172.31.42.18
+172.31.40.134
+172.31.34.6
+172.31.33.135
+172.31.42.137
+)
+
+key=~/.ssh/junchao.pem
+client_num=48
diff --git a/scripts/deploy/config/kv_performance_server_64.conf 
b/scripts/deploy/config/kv_performance_server_64.conf
index 7e8be748..24237ad3 100644
--- a/scripts/deploy/config/kv_performance_server_64.conf
+++ b/scripts/deploy/config/kv_performance_server_64.conf
@@ -1,132 +1,132 @@
 iplist=(
-172.31.29.160
-172.31.22.239
-172.31.23.112
-172.31.21.177
-172.31.23.242
-172.31.26.233
-172.31.23.235
-172.31.16.108
-172.31.22.174
-172.31.23.56
-172.31.30.57
-172.31.22.58
-172.31.16.127
-172.31.16.245
-172.31.17.246
-172.31.17.118
-172.31.24.247
-172.31.24.204
-172.31.23.12
-172.31.29.140
-172.31.28.141
-172.31.31.3
-172.31.30.199
-172.31.29.73
-172.31.27.75
-172.31.19.87
-172.31.25.31
-172.31.18.95
-172.31.23.208
-172.31.22.83
-172.31.20.214
-172.31.16.87
-172.31.26.116
-172.31.27.244
-172.31.18.245
-172.31.25.121
-172.31.29.121
-172.31.27.254
-172.31.21.255
-172.31.27.255
-172.31.28.96
-172.31.21.231
-172.31.21.236
-172.31.24.237
-172.31.30.240
-172.31.22.242
-172.31.18.49
-172.31.18.178
-172.31.17.179
-172.31.26.181
-172.31.31.54
-172.31.29.55
-172.31.30.188
-172.31.27.190
-172.31.17.162
-172.31.31.168
-172.31.20.169
-172.31.22.171
-172.31.28.172
-172.31.24.173
-172.31.28.45
-172.31.19.174
-172.31.31.215
-172.31.22.216
-172.31.19.217
-172.31.28.217
-172.31.19.220
-172.31.18.221
-172.31.21.32
-172.31.17.34
-172.31.29.69
-172.31.23.202
-172.31.16.205
-172.31.30.208
-172.31.20.82
-172.31.28.211
-172.31.30.83
-172.31.30.84
-172.31.25.28
-172.31.18.30
-172.31.28.14
-172.31.19.14
-172.31.30.144
-172.31.31.149
-172.31.28.22
-172.31.31.22
-172.31.20.23
-172.31.22.155
-172.31.26.191
-172.31.21.1
-172.31.17.130
-172.31.17.6
-172.31.29.9
-172.31.29.12
-172.31.18.142
-172.31.26.142
-172.31.29.220
-172.31.24.157
-172.31.27.64
-172.31.30.5
-172.31.30.139
-172.31.27.13
-172.31.22.60
-172.31.22.189
-172.31.31.62
-172.31.24.192
-172.31.23.82
-172.31.23.83
-172.31.29.25
-172.31.23.154
-172.31.23.143
-172.31.29.16
-172.31.29.144
-172.31.22.144
-172.31.20.35
-172.31.25.228
-172.31.24.164
-172.31.16.165
-172.31.22.33
-172.31.29.161
-172.31.26.168
-172.31.31.173
-172.31.31.53
-172.31.20.54
-172.31.24.165
-172.31.26.230
-172.31.18.230
-172.31.22.104
+172.31.45.163
+172.31.32.226
+172.31.41.104
+172.31.47.100
+172.31.42.174
+172.31.36.105
+172.31.47.113
+172.31.45.112
+172.31.42.121
+172.31.47.116
+172.31.45.187
+172.31.45.250
+172.31.47.192
+172.31.40.62
+172.31.41.198
+172.31.45.196
+172.31.37.160
+172.31.33.9
+172.31.37.136
+172.31.44.139
+172.31.36.138
+172.31.40.211
+172.31.36.209
+172.31.36.21
+172.31.38.85
+172.31.38.151
+172.31.32.87
+172.31.39.153
+172.31.42.217
+172.31.47.218
+172.31.34.90
+172.31.36.92
+172.31.44.96
+172.31.47.254
+172.31.40.126
+172.31.39.65
+172.31.45.195
+172.31.35.112
+172.31.47.242
+172.31.45.117
+172.31.32.119
+172.31.35.234
+172.31.45.106
+172.31.41.110
+172.31.42.238
+172.31.34.97
+172.31.35.97
+172.31.39.97
+172.31.34.99
+172.31.41.40
+172.31.44.168
+172.31.38.45
+172.31.40.48
+172.31.34.221
+172.31.33.222
+172.31.37.34
+172.31.34.34
+172.31.35.202
+172.31.37.85
+172.31.37.215
+172.31.36.217
+172.31.40.199
+172.31.36.201
+172.31.34.202
+172.31.41.202
+172.31.37.0
+172.31.46.129
+172.31.32.131
+172.31.46.5
+172.31.47.188
+172.31.34.189
+172.31.40.61
+172.31.47.190
+172.31.42.181
+172.31.37.56
+172.31.44.60
+172.31.42.60
+172.31.47.50
+172.31.36.51
+172.31.34.180
+172.31.42.180
+172.31.46.28
+172.31.46.156
+172.31.47.158
+172.31.34.22
+172.31.39.22
+172.31.38.153
+172.31.43.156
+172.31.44.9
+172.31.38.141
+172.31.40.17
+172.31.42.18
+172.31.40.134
+172.31.34.6
+172.31.33.135
+172.31.42.137
+172.31.44.19
+172.31.44.89
+172.31.46.181
+172.31.41.184
+172.31.46.177
+172.31.38.180
+172.31.35.64
+172.31.38.192
+172.31.32.187
+172.31.38.190
+172.31.32.133
+172.31.33.74
+172.31.33.1
+172.31.42.130
+172.31.35.144
+172.31.33.210
+172.31.39.11
+172.31.46.79
+172.31.40.224
+172.31.39.225
+172.31.47.227
+172.31.37.166
+172.31.46.98
+172.31.44.163
+172.31.32.231
+172.31.35.169
+172.31.40.166
+172.31.45.102
+172.31.44.172
+172.31.46.173
+172.31.45.171
+172.31.46.235
 )
 
 key=~/.ssh/junchao.pem
diff --git a/scripts/deploy/config/kv_performance_server_8.conf 
b/scripts/deploy/config/kv_performance_server_8.conf
index 0580f94f..3b5792e6 100644
--- a/scripts/deploy/config/kv_performance_server_8.conf
+++ b/scripts/deploy/config/kv_performance_server_8.conf
@@ -1,20 +1,20 @@
 iplist=(
-172.31.40.86
-172.31.33.214
-172.31.35.90
-172.31.40.93
-172.31.46.32
-172.31.37.33
-172.31.37.162
-172.31.44.169
-172.31.45.42
-172.31.33.42
-172.31.37.172
-172.31.37.44
-172.31.38.178
-172.31.35.180
-172.31.33.52
-172.31.47.184
+172.31.44.213
+172.31.34.55
+172.31.45.219
+172.31.32.91
+172.31.42.124
+172.31.43.254
+172.31.46.234
+172.31.40.203
+172.31.32.172
+172.31.42.236
+172.31.35.140
+172.31.34.241
+172.31.36.113
+172.31.37.19
+172.31.38.69
+172.31.34.136
 )
 
 key=~/.ssh/junchao.pem
diff --git a/scripts/deploy/config/multipaxos.config 
b/scripts/deploy/config/multipaxos.config
index 1ac9fe46..bfd9b15b 100644
--- a/scripts/deploy/config/multipaxos.config
+++ b/scripts/deploy/config/multipaxos.config
@@ -3,8 +3,9 @@
   "enable_viewchange": false,
   "recovery_enabled": false,
   "max_client_complaint_num":10,
-  "max_process_txn": 256,
-  "worker_num": 30,
+  "max_process_txn": 64,
+  "worker_num": 20,
   "input_worker_num": 1,
-  "output_worker_num": 10
+  "output_worker_num": 10,
+  "failure_mode" :true 
 }
diff --git a/scripts/deploy/performance/cassandra_cft_performance.sh 
b/scripts/deploy/performance/cassandra_cft_performance.sh
new file mode 100755
index 00000000..bc1b6a21
--- /dev/null
+++ b/scripts/deploy/performance/cassandra_cft_performance.sh
@@ -0,0 +1,5 @@
+export server=//benchmark/protocols/cassandra_cft:kv_server_performance
+export TEMPLATE_PATH=$PWD/config/cassandra_cft.config
+
+./performance/run_performance.sh $*
+echo $0
diff --git a/scripts/deploy/performance/run_performance.sh 
b/scripts/deploy/performance/run_performance.sh
index 1caa2c6e..ece3816f 100755
--- a/scripts/deploy/performance/run_performance.sh
+++ b/scripts/deploy/performance/run_performance.sh
@@ -27,6 +27,7 @@ echo "benchmark done"
 count=1
 for ip in ${iplist[@]};
 do
+echo "$ip"
 `ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} 
"killall -9 ${server_bin}"` 
 ((count++))
 done
@@ -40,9 +41,11 @@ echo "getting results"
 for ip in ${iplist[@]};
 do
   echo "scp -i ${key} ubuntu@${ip}:/home/ubuntu/${server_bin}.log ./${ip}_log"
-  `scp -i ${key} ubuntu@${ip}:/home/ubuntu/${server_bin}.log result_${ip}_log` 
+  `scp -i ${key} ubuntu@${ip}:/home/ubuntu/${server_bin}.log result_${ip}_log` 
 &
 done
 
+wait
+
 python3 performance/calculate_result.py `ls result_*_log` > results.log
 
 rm -rf result_*_log
diff --git a/scripts/deploy/script/deploy.sh b/scripts/deploy/script/deploy.sh
index 4d220c7b..d57869ce 100755
--- a/scripts/deploy/script/deploy.sh
+++ b/scripts/deploy/script/deploy.sh
@@ -79,6 +79,7 @@ function run_cmd(){
 }
 
 function run_one_cmd(){
+  echo " $1"
   ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} 
"$1" 
 }
 

Reply via email to