This is an automated email from the ASF dual-hosted git repository.

junchao pushed a commit to branch cassandra
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git

commit 6ab4f5fa9cca32cd1d400cc1e10d8971886c9206
Author: cjcchen <[email protected]>
AuthorDate: Thu Feb 26 14:29:42 2026 +0000

    add autobahn
---
 benchmark/protocols/autobahn/BUILD                 |  16 +
 .../protocols/autobahn/kv_server_performance.cpp   |  89 +++++
 benchmark/protocols/autobahn/kv_service_tools.cpp  |  57 ++++
 .../consensus/ordering/autobahn/algorithm/BUILD    |  77 +++++
 .../ordering/autobahn/algorithm/autobahn.cpp       | 369 +++++++++++++++++++++
 .../ordering/autobahn/algorithm/autobahn.h         |  74 +++++
 .../algorithm/proposal_graph.cpp                   |  72 +---
 .../ordering/autobahn/algorithm/proposal_graph.h   | 101 ++++++
 .../autobahn/algorithm/proposal_manager.cpp        | 257 ++++++++++++++
 .../ordering/autobahn/algorithm/proposal_manager.h |  71 ++++
 .../algorithm/proposal_state.h                     |   4 +-
 .../ordering/autobahn/algorithm/ranking.cpp        |  12 +
 .../ordering/autobahn/algorithm/ranking.h          |  14 +
 .../consensus/ordering/autobahn/framework/BUILD    |  16 +
 .../ordering/autobahn/framework/consensus.cpp      | 207 ++++++++++++
 .../ordering/autobahn/framework/consensus.h        |  59 ++++
 .../ordering/autobahn/framework/consensus_test.cpp | 179 ++++++++++
 platform/consensus/ordering/autobahn/proto/BUILD   |  16 +
 .../ordering/autobahn/proto/proposal.proto         | 133 ++++++++
 .../ordering/cassandra/algorithm/cassandra.cpp     |  13 +-
 .../cassandra/algorithm/proposal_graph.cpp         |   2 +-
 .../cassandra/algorithm/proposal_manager.cpp       |  14 +-
 .../cassandra/algorithm/proposal_manager.h         |   3 +-
 .../ordering/cassandra/algorithm/proposal_state.h  |   2 +-
 .../config/{cassandra.config => autobahn.config}   |   2 +-
 scripts/deploy/config/cassandra.config             |   2 +-
 scripts/deploy/config/kv_performance_server.conf   |   6 +-
 .../deploy/config/kv_performance_server_32.conf    |   4 +-
 scripts/deploy/performance/autobahn_performance.sh |   5 +
 scripts/deploy/performance/run_performance.sh      |   2 +-
 scripts/null                                       |   2 +-
 31 files changed, 1794 insertions(+), 86 deletions(-)

diff --git a/benchmark/protocols/autobahn/BUILD 
b/benchmark/protocols/autobahn/BUILD
new file mode 100644
index 00000000..fdf1cc4e
--- /dev/null
+++ b/benchmark/protocols/autobahn/BUILD
@@ -0,0 +1,16 @@
+package(default_visibility = ["//visibility:private"])
+
+load("@bazel_skylib//rules:common_settings.bzl", "bool_flag")
+
+cc_binary(
+    name = "kv_server_performance",
+    srcs = ["kv_server_performance.cpp"],
+    deps = [
+        "//chain/storage:memory_db",
+        "//executor/kv:kv_executor",
+        "//platform/config:resdb_config_utils",
+        "//platform/consensus/ordering/autobahn/framework:consensus",
+        "//service/utils:server_factory",
+    ],
+)
+
diff --git a/benchmark/protocols/autobahn/kv_server_performance.cpp 
b/benchmark/protocols/autobahn/kv_server_performance.cpp
new file mode 100644
index 00000000..3b209b82
--- /dev/null
+++ b/benchmark/protocols/autobahn/kv_server_performance.cpp
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include <glog/logging.h>
+
+#include "chain/storage/memory_db.h"
+#include "executor/kv/kv_executor.h"
+#include "platform/config/resdb_config_utils.h"
+#include "platform/consensus/ordering/autobahn/framework/consensus.h"
+#include "platform/networkstrate/service_network.h"
+#include "platform/statistic/stats.h"
+#include "proto/kv/kv.pb.h"
+
+using namespace resdb;
+using namespace resdb::autobahn;
+using namespace resdb::storage;
+
+void ShowUsage() {
+  printf("<config> <private_key> <cert_file> [logging_dir]\n");
+}
+
+std::string GetRandomKey() {
+  int num1 = rand() % 10;
+  int num2 = rand() % 10;
+  return std::to_string(num1) + std::to_string(num2);
+}
+
+int main(int argc, char** argv) {
+  if (argc < 3) {
+    ShowUsage();
+    exit(0);
+  }
+
+  // google::InitGoogleLogging(argv[0]);
+  // FLAGS_minloglevel = google::GLOG_WARNING;
+
+  char* config_file = argv[1];
+  char* private_key_file = argv[2];
+  char* cert_file = argv[3];
+
+  if (argc >= 5) {
+    auto monitor_port = Stats::GetGlobalStats(5);
+    monitor_port->SetPrometheus(argv[4]);
+  }
+
+  std::unique_ptr<ResDBConfig> config =
+      GenerateResDBConfig(config_file, private_key_file, cert_file);
+
+  config->RunningPerformance(true);
+  ResConfigData config_data = config->GetConfigData();
+
+  auto performance_consens = std::make_unique<Consensus>(
+      *config, std::make_unique<KVExecutor>(std::make_unique<MemoryDB>()));
+  performance_consens->SetupPerformanceDataFunc([]() {
+    KVRequest request;
+    request.set_cmd(KVRequest::SET);
+    request.set_key(GetRandomKey());
+    request.set_value("helloworld");
+    std::string request_data;
+    request.SerializeToString(&request_data);
+    return request_data;
+  });
+
+  auto server =
+      std::make_unique<ServiceNetwork>(*config, 
std::move(performance_consens));
+  server->Run();
+}
diff --git a/benchmark/protocols/autobahn/kv_service_tools.cpp 
b/benchmark/protocols/autobahn/kv_service_tools.cpp
new file mode 100644
index 00000000..17858ef2
--- /dev/null
+++ b/benchmark/protocols/autobahn/kv_service_tools.cpp
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <fstream>
+
+#include "common/proto/signature_info.pb.h"
+#include "interface/kv/kv_client.h"
+#include "platform/config/resdb_config_utils.h"
+
+using resdb::GenerateReplicaInfo;
+using resdb::GenerateResDBConfig;
+using resdb::KVClient;
+using resdb::ReplicaInfo;
+using resdb::ResDBConfig;
+
+int main(int argc, char** argv) {
+  if (argc < 2) {
+    printf("<config path>\n");
+    return 0;
+  }
+  std::string client_config_file = argv[1];
+  ResDBConfig config = GenerateResDBConfig(client_config_file);
+
+  config.SetClientTimeoutMs(100000);
+
+  KVClient client(config);
+
+  client.Set("start", "value");
+  printf("start benchmark\n");
+}
diff --git a/platform/consensus/ordering/autobahn/algorithm/BUILD 
b/platform/consensus/ordering/autobahn/algorithm/BUILD
new file mode 100644
index 00000000..0f7e42a1
--- /dev/null
+++ b/platform/consensus/ordering/autobahn/algorithm/BUILD
@@ -0,0 +1,77 @@
+package(default_visibility = 
["//platform/consensus/ordering/autobahn:__subpackages__"])
+
+cc_library(
+    name = "proposal_state",
+    hdrs = ["proposal_state.h"],
+)
+
+cc_library(
+    name = "proposal_manager",
+    srcs = ["proposal_manager.cpp"],
+    hdrs = ["proposal_manager.h"],
+    deps = [
+        ":proposal_graph",
+        "//common:comm",
+        "//platform/statistic:stats",
+        "//common/crypto:signature_verifier",
+        "//common/utils",
+        "//platform/consensus/ordering/autobahn/proto:proposal_cc_proto",
+    ],
+)
+
+cc_library(
+    name = "ranking",
+    srcs = ["ranking.cpp"],
+    hdrs = ["ranking.h"],
+    deps = [
+        "//common:comm",
+    ],
+)
+
+cc_library(
+    name = "proposal_graph",
+    srcs = ["proposal_graph.cpp"],
+    hdrs = ["proposal_graph.h"],
+    deps = [
+        #":ranking",
+        ":proposal_state",
+        "//platform/statistic:stats",
+        "//common:comm",
+        "//common/utils",
+        "//platform/consensus/ordering/autobahn/proto:proposal_cc_proto",
+    ],
+)
+
+cc_library(
+    name = "autobahn",
+    srcs = ["autobahn.cpp"],
+    hdrs = ["autobahn.h"],
+    deps = [
+        ":proposal_graph",
+        ":proposal_manager",
+        "//platform/statistic:stats",
+        "//common:comm",
+        "//common/crypto:signature_verifier",
+        "//platform/consensus/ordering/common/algorithm:protocol_base",
+        "//platform/consensus/ordering/autobahn/proto:proposal_cc_proto",
+        "//platform/common/queue:lock_free_queue",
+    ],
+)
+
+cc_test(
+    name = "proposal_graph_test",
+    srcs = ["proposal_graph_test.cpp"],
+    deps = [
+        ":proposal_graph",
+        "//common/test:test_main",
+    ],
+)
+
+cc_test(
+    name = "autobahn_test",
+    srcs = ["autobahn_test.cpp"],
+    deps = [
+        ":autobahn",
+        "//common/test:test_main",
+    ],
+)
diff --git a/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp 
b/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp
new file mode 100644
index 00000000..8580d74e
--- /dev/null
+++ b/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp
@@ -0,0 +1,369 @@
+#include "platform/consensus/ordering/autobahn/algorithm/autobahn.h"
+
+#include <glog/logging.h>
+
+#include "common/crypto/signature_verifier.h"
+#include "common/utils/utils.h"
+
+
+namespace resdb {
+namespace autobahn {
+
+AutoBahn::AutoBahn(int id, int f, int total_num, int block_size, 
SignatureVerifier* verifier)
+    : ProtocolBase(id, f, total_num), verifier_(verifier) {
+
+  LOG(ERROR) << "get proposal graph";
+  id_ = id;
+  total_num_ = total_num;
+  f_ = f;
+  is_stop_ = false;
+  //timeout_ms_ = 100;
+  timeout_ms_ = 60000;
+  batch_size_ = block_size;
+  execute_id_ = 1;
+
+  proposal_manager_ = std::make_unique<ProposalManager>(id, total_num_, f_, 
verifier);
+
+  block_thread_ = std::thread(&AutoBahn::GenerateBlocks, this);
+  dissemi_thread_ = std::thread(&AutoBahn::AsyncDissemination, this);
+  consensus_thread_ = std::thread(&AutoBahn::AsyncConsensus, this);
+  prepare_thread_ = std::thread(&AutoBahn::AsyncPrepare, this);
+  commit_thread_ = std::thread(&AutoBahn::AsyncCommit, this);
+}
+
+AutoBahn::~AutoBahn() {
+  is_stop_ = true;
+  if (block_thread_.joinable()) {
+    block_thread_.join();
+  }
+  if (dissemi_thread_.joinable()) {
+    dissemi_thread_.join();
+  }
+}
+
+bool AutoBahn::IsStop() {
+  return is_stop_;
+}
+
+bool AutoBahn::ReceiveTransaction(std::unique_ptr<Transaction> txn) {
+  // LOG(ERROR)<<"recv txn:";
+  txn->set_create_time(GetCurrentTime());
+  txns_.Push(std::move(txn));
+  return true;
+}
+
+void AutoBahn::GenerateBlocks() {
+  std::vector<std::unique_ptr<Transaction>> txns;
+  while (!IsStop()) {
+    std::unique_ptr<Transaction> txn = txns_.Pop();
+    if (txn == nullptr) {
+      continue;
+    }
+    txns.push_back(std::move(txn));
+    for(int i = 1; i < batch_size_; ++i){
+      std::unique_ptr<Transaction> txn = txns_.Pop(100);
+      if(txn == nullptr){
+        break;
+      }
+      txns.push_back(std::move(txn));
+    }
+
+    proposal_manager_->MakeBlock(txns);
+    txns.clear();
+  }
+}
+
+bool AutoBahn::WaitForResponse(int64_t block_id) {
+  std::unique_lock<std::mutex> lk(bc_mutex_);
+  //LOG(ERROR)<<"wait for block id :"<<block_id;
+  bc_block_cv_.wait_for(lk, std::chrono::microseconds(timeout_ms_ * 1000),
+      [&] { return block_id<= proposal_manager_->GetCurrentBlockId(); });
+  if (block_id > proposal_manager_->GetCurrentBlockId()) {
+    return false;
+  }
+  return true;
+}
+
+void AutoBahn::BlockDone() {
+  bc_block_cv_.notify_all();
+}
+
+void AutoBahn::AsyncDissemination() {
+  int next_block = 1;
+  while (!IsStop()) {
+    if(WaitForResponse(next_block-1)){
+      next_block++;
+    }
+    if(next_block == 1) {
+      next_block++;
+    }
+    LOG(ERROR)<<" get block :"<<next_block-1;
+    const Block* block = proposal_manager_->GetLocalBlock(next_block-1);
+    if(block == nullptr) {
+      next_block--;
+      continue;
+    }
+    LOG(ERROR)<<" broadcast block :"<<next_block-1<<" id:"<<block->local_id();
+    Broadcast(MessageType::NewBlocks, *block);
+    LOG(ERROR)<<" broadcast block :"<<next_block-1<<" 
id:"<<block->local_id()<<" done";
+  }
+}
+
+void AutoBahn::NotifyView() {
+  std::unique_lock<std::mutex> lk(view_mutex_);
+  view_cv_.notify_all();
+}
+
+bool AutoBahn::WaitForNextView(int view) {
+  std::unique_lock<std::mutex> lk(view_mutex_);
+  //LOG(ERROR)<<"wait for next view:"<<view;
+  view_cv_.wait_for(lk, std::chrono::microseconds(timeout_ms_ * 1000),
+      [&] { return proposal_manager_->ReadyView(view); });
+  return proposal_manager_->ReadyView(view);
+}
+
+void AutoBahn::AsyncConsensus() {
+  while (!IsStop()) {
+    if (id_ != 1) {
+      break;
+    }
+    int view = proposal_manager_->GetCurrentView();
+    if(!WaitForNextView(view)) {
+      continue;
+    }
+    std::pair<int, std::map<int, int64_t>> blocks = 
proposal_manager_->GetCut();
+    auto proposal = proposal_manager_->GenerateProposal(blocks.first, 
blocks.second);
+    Broadcast(MessageType::NewProposal, *proposal);
+  }
+}
+
+
+void AutoBahn::ReceiveBlock(std::unique_ptr<Block> block) {
+  LOG(ERROR)<<"recv block from:"<<block->sender_id()<<" block 
id:"<<block->local_id();
+  BlockACK block_ack;
+  block_ack.set_hash(block->hash());
+  block_ack.set_sender_id(block->sender_id());
+  block_ack.set_local_id(block->local_id());
+  block_ack.set_responder(id_);
+  *block_ack.mutable_sign_info() = proposal_manager_->SignBlock(*block);
+
+  proposal_manager_->AddBlock(std::move(block));
+  SendMessage(MessageType::CMD_BlockACK, block_ack, block_ack.sender_id());
+
+  proposal_manager_->UpdateView(block_ack.sender_id(), block_ack.local_id());
+  NotifyView();
+  LOG(ERROR)<<"send block ack to:"<<block_ack.sender_id()<<" block 
id:"<<block_ack.local_id();
+}
+
+void AutoBahn::ReceiveBlockACK(std::unique_ptr<BlockACK> block) {
+  LOG(ERROR)<<"recv block ack:"<<block->local_id()<<" 
from:"<<block->responder()<<" block sign info:"<<block->sign_info().sender_id();
+  std::unique_lock<std::mutex> lk(block_mutex_);
+  block_ack_[block->local_id()].insert(std::make_pair(block->responder(), 
block->sign_info()));
+  LOG(ERROR)<<"recv block ack:"<<block->local_id()
+    <<" from:"<<block->responder()<< " 
num:"<<block_ack_[block->local_id()].size();
+  if (block_ack_[block->local_id()].size() >= f_ + 1 &&
+      block_ack_[block->local_id()].find(id_) !=
+          block_ack_[block->local_id()].end()) {
+    std::unique_lock<std::mutex> lk(bc_mutex_);
+    proposal_manager_->BlockReady(block_ack_[block->local_id()], 
block->local_id());
+    BlockDone();
+  }
+  LOG(ERROR)<<"recv block ack:"<<block->local_id()<<" done";
+}
+
+bool AutoBahn::ReceiveProposal(std::unique_ptr<Proposal> proposal) {
+  LOG(ERROR)<<" receive proposal from:"<<proposal->sender_id()<<" 
slot:"<<proposal->slot_id()<<" block size:"<<proposal->block_size();
+  Proposal vote;
+  vote.set_slot_id(proposal->slot_id());
+  vote.set_sender_id(id_);
+  vote.set_hash(proposal->hash());
+
+  auto hash_signature_or = verifier_->SignMessage(vote.hash());
+  if (!hash_signature_or.ok()) {
+    LOG(ERROR) << "Sign message fail";
+    return false;
+  }
+  *vote.mutable_sign()=*hash_signature_or;
+
+  int sender_id = proposal->sender_id();
+  proposal_manager_->AddProposalData(std::move(proposal));
+  //Broadcast(MessageType::ProposalAck, vote);
+  SendMessage(MessageType::ProposalAck, vote, sender_id);
+
+  return true;
+}
+
+bool AutoBahn::ReceiveVote(std::unique_ptr<Proposal> vote) {
+  LOG(ERROR)<<"recv vote ack:"<<vote->slot_id()<<" from:"<<vote->sender_id(); 
+
+  std::unique_ptr<Proposal> vote_cpy = std::make_unique<Proposal>(*vote);
+
+  std::unique_lock<std::mutex> lk(vote_mutex_);
+  int slot_id = vote->slot_id();
+  int sender = vote->sender_id();
+  vote_ack_[vote->slot_id()].insert(std::make_pair(vote->sender_id(), 
std::move(vote)));
+
+  LOG(ERROR)<<"recv vote ack:"<<slot_id<<" from:"<<sender
+    << " num:"<<vote_ack_[slot_id].size();
+
+  if (vote_ack_[slot_id].size() >= 2*f_ + 1){
+      PrepareDone(std::move(vote_cpy));
+  }
+  LOG(ERROR)<<"recv vote ack done";
+  return true;
+}
+
+
+bool AutoBahn::IsFastCommit(const Proposal& proposal) {
+  
+  //LOG(ERROR)<<" is fast commit slot:"<<proposal.slot_id()<<" sign 
size:"<<proposal.cert().sign_size();
+  if(proposal.cert().sign_size() != total_num_) {
+    return false;
+  }
+
+  for(auto& sign : proposal.cert().sign()){
+    bool valid = verifier_->VerifyMessage(proposal.hash(), sign);
+    if (!valid) {
+      LOG(ERROR)<<" sign info sign fail";
+      return false;
+    }
+  }
+  return true;
+}
+
+bool AutoBahn::ReceivePrepare(std::unique_ptr<Proposal> proposal) {
+  //LOG(ERROR)<<"recv prepare:"<<proposal->slot_id()<<" 
from:"<<proposal->sender_id()
+  //        <<" is fast commit:"<<proposal->fast_commit(); 
+  // verify
+  if (IsFastCommit(*proposal)){
+ // proposal->fast_commit() ){
+    CommitDone(std::move(proposal));
+  }
+  else {
+    proposal->set_sender_id(id_);
+    Broadcast(MessageType::Commit, *proposal);
+  }
+  LOG(ERROR)<<"recv vote ack done";
+  return true;
+}
+
+bool AutoBahn::ReceiveCommit(std::unique_ptr<Proposal> proposal) {
+  //LOG(ERROR)<<"recv commit:"<<proposal->slot_id()<<" 
from:"<<proposal->sender_id();
+
+  std::unique_lock<std::mutex> lk(commit_mutex_);
+  commit_ack_[proposal->slot_id()].insert(proposal->sender_id());
+  LOG(ERROR)<<"recv commit ack:"<<proposal->slot_id()<<" 
from:"<<proposal->sender_id()
+    << " num:"<<commit_ack_[proposal->slot_id()].size();
+  if (commit_ack_[proposal->slot_id()].size() >= 2*f_ + 1){
+    CommitDone(std::move(proposal));
+  }
+  LOG(ERROR)<<"recv vote ack done";
+  return true;
+}
+
+
+void AutoBahn::PrepareDone(std::unique_ptr<Proposal> vote) {
+  LOG(ERROR)<<" vote prepare done:"<<vote->slot_id();
+  prepare_queue_.Push(std::move(vote));
+}
+
+void AutoBahn::CommitDone(std::unique_ptr<Proposal> proposal) {
+  commit_queue_.Push(std::move(proposal));
+}
+
+void AutoBahn::AsyncPrepare() {
+  int view = 1;
+  std::map<int, std::pair<int64_t,std::unique_ptr<Proposal>> > votes;
+  while (!IsStop()) {
+    std::unique_ptr<Proposal> p = prepare_queue_.Pop(timeout_ms_ * 1000);
+    if(p== nullptr) {
+      continue;
+    }
+    assert(p != nullptr);
+    //LOG(ERROR)<<" obtain slot vote:"<<p->slot_id();
+    int slot_id = p->slot_id();
+    votes[slot_id] = std::make_pair(GetCurrentTime(), std::move(p));
+    while(!votes.empty() && votes.begin()->first <= view) {
+      if(votes.begin()->first < view) {
+        votes.erase(votes.begin());
+        continue;
+      }
+      int delay = 1000;
+      int wait_time = GetCurrentTime() - votes.begin()->second.first;
+      wait_time = delay - wait_time;
+      LOG(ERROR)<<" view :"<<view<<" wait time:"<<wait_time;
+      if(wait_time> 0) {
+        usleep(wait_time);
+      }
+      Prepare(std::move(votes.begin()->second.second));
+      votes.erase(votes.begin());
+      view++;
+    }
+  }
+}
+
+void AutoBahn::AsyncCommit() {
+  int view = 1;
+  std::map<int, std::unique_ptr<Proposal> > proposals;
+  while (!IsStop()) {
+    std::unique_ptr<Proposal> p = commit_queue_.Pop(timeout_ms_ * 1000);
+    if(p== nullptr) {
+      continue;
+    }
+    assert(p != nullptr);
+    //LOG(ERROR)<<" obtain comit slot vote:"<<p->slot_id();
+    int slot_id = p->slot_id();
+    proposals[slot_id] = std::move(p);
+    while(!proposals.empty() && proposals.begin()->first <= view) {
+      if(proposals.begin()->first < view) {
+        proposals.erase(proposals.begin());
+        continue;
+      }
+      Commit(std::move(proposals.begin()->second));
+      proposals.erase(proposals.begin());
+      view++;
+    }
+  }
+}
+
+void AutoBahn::Prepare(std::unique_ptr<Proposal> vote) {
+  //LOG(ERROR)<<" prepare vote:"<<vote->slot_id()<< " 
num:"<<vote_ack_[vote->slot_id()].size();
+  if (vote_ack_[vote->slot_id()].size() == total_num_){
+    // fast path
+    //Commit(std::move(vote));
+    vote->set_fast_commit(true);
+    for(auto& it : vote_ack_[vote->slot_id()]){
+      *vote->mutable_cert()->add_sign() = it.second->sign();
+    }
+  }
+
+  // slot path
+  //LOG(ERROR)<<" broadcast commit:"<<vote->slot_id()<<" is 
fast:"<<vote->fast_commit();
+  vote->set_sender_id(id_);
+  Broadcast(MessageType::Prepare, *vote);
+}
+
+void AutoBahn::Commit(std::unique_ptr<Proposal> proposal) {
+  auto raw_proposal = proposal_manager_->GetProposalData(proposal->slot_id());
+  assert(raw_proposal != nullptr);
+  //LOG(ERROR)<<" proposal proposal slot id:"<<proposal->slot_id();
+  for(const auto& block : raw_proposal->block()) {
+    int block_owner = block.sender_id();
+    int block_id = block.local_id();
+    //LOG(ERROR)<<" commit :"<<block_owner<<" block id :"<<block_id;
+  
+    Block * data_block = proposal_manager_->GetBlock(block_owner, block_id);
+    assert(data_block != nullptr);
+
+    //LOG(ERROR)<<" txn size:"<<data_block->mutable_data()->transaction_size();
+    for (Transaction& txn :
+        *data_block->mutable_data()->mutable_transaction()) {
+      txn.set_id(execute_id_++);
+      commit_(txn);
+    }
+  }
+}
+
+
+}  // namespace autobahn
+}  // namespace resdb
diff --git a/platform/consensus/ordering/autobahn/algorithm/autobahn.h 
b/platform/consensus/ordering/autobahn/algorithm/autobahn.h
new file mode 100644
index 00000000..e4364daf
--- /dev/null
+++ b/platform/consensus/ordering/autobahn/algorithm/autobahn.h
@@ -0,0 +1,74 @@
+#pragma once
+
+#include <deque>
+#include <map>
+#include <queue>
+#include <thread>
+
+#include "platform/common/queue/lock_free_queue.h"
+#include "platform/consensus/ordering/common/algorithm/protocol_base.h"
+#include "platform/consensus/ordering/autobahn/algorithm/proposal_manager.h"
+#include "platform/consensus/ordering/autobahn/proto/proposal.pb.h"
+#include "platform/statistic/stats.h"
+
+namespace resdb {
+namespace autobahn {
+
+class AutoBahn: public common::ProtocolBase {
+ public:
+  AutoBahn(int id, int f, int total_num, int block_size, SignatureVerifier* 
verifier);
+  ~AutoBahn();
+
+  bool ReceiveTransaction(std::unique_ptr<Transaction> txn);
+  void ReceiveBlock(std::unique_ptr<Block> block);
+  void ReceiveBlockACK(std::unique_ptr<BlockACK> block);
+  bool ReceiveVote(std::unique_ptr<Proposal>);
+  bool ReceiveProposal(std::unique_ptr<Proposal> proposal);
+  bool ReceiveCommit(std::unique_ptr<Proposal> proposal);
+  bool ReceivePrepare(std::unique_ptr<Proposal> proposal);
+
+ private:
+  bool IsStop();
+  void BroadcastTxn();
+  void GenerateBlocks();
+  void AsyncDissemination();
+  void AsyncConsensus();
+  void AsyncPrepare();
+  void AsyncCommit();
+
+  bool WaitForResponse(int64_t block_id);
+  void BlockDone();
+  void PrepareDone(std::unique_ptr<Proposal> vote);
+  void CommitDone(std::unique_ptr<Proposal> proposal);
+  
+  void NotifyView();
+  bool WaitForNextView(int view);
+
+  void Prepare(std::unique_ptr<Proposal> vote);
+  void Commit(std::unique_ptr<Proposal> proposal);
+
+  bool IsFastCommit(const Proposal& proposal);
+
+ private:
+  std::condition_variable bc_block_cv_, view_cv_;
+  LockFreeQueue<Transaction> txns_;
+  LockFreeQueue<Proposal> prepare_queue_, commit_queue_;
+  std::unique_ptr<ProposalManager> proposal_manager_;
+  SignatureVerifier* verifier_;
+  int execute_id_;
+
+  int id_, total_num_, f_, batch_size_;
+  std::atomic<int> is_stop_;
+  int timeout_ms_;
+
+  std::thread block_thread_, dissemi_thread_, consensus_thread_, 
prepare_thread_, commit_thread_;
+
+  std::mutex block_mutex_, bc_mutex_, view_mutex_, vote_mutex_, commit_mutex_;
+  std::map<int, std::map<int, SignInfo>> block_ack_;
+  std::map<int, std::map<int, std::unique_ptr<Proposal>>> vote_ack_ ;
+  std::map<int, std::set<int>>  commit_ack_;
+  Stats* global_stats_;
+};
+
+}  // namespace autobahn
+}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp 
b/platform/consensus/ordering/autobahn/algorithm/proposal_graph.cpp
similarity index 91%
copy from platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp
copy to platform/consensus/ordering/autobahn/algorithm/proposal_graph.cpp
index e5b91135..fce7718b 100644
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp
+++ b/platform/consensus/ordering/autobahn/algorithm/proposal_graph.cpp
@@ -1,4 +1,4 @@
-#include "platform/consensus/ordering/cassandra/algorithm/proposal_graph.h"
+#include "platform/consensus/ordering/autobahn/algorithm/proposal_graph.h"
 
 #include <glog/logging.h>
 
@@ -8,22 +8,13 @@
 #include "common/utils/utils.h"
 
 namespace resdb {
-namespace cassandra {
-namespace cassandra_recv {
+namespace autobahn {
 
-/*
-std::vector<ProposalState> GetStates() {
-  return std::vector<ProposalState>{ProposalState::New, 
ProposalState::Prepared,
-                                    ProposalState::PreCommit};
-}
-*/
 
 ProposalGraph::ProposalGraph(int fault_num, int id, int total_num) : 
f_(fault_num),id_(id), total_num_(total_num) {
-  ranking_ = std::make_unique<Ranking>();
-  current_height_ = 0;
-  global_stats_ = Stats::GetGlobalStats();
 }
 
+/*
 int ProposalGraph::GetBlockNum(const std::string& hash, int local_id, int 
proposer_id) {
 if(num_callback_) {
   return num_callback_(hash, local_id, proposer_id);
@@ -72,15 +63,6 @@ int ProposalGraph::AddProposal(const Proposal& proposal) {
              << " current height:" << current_height_<<" 
from:"<<proposal.header().proposer_id()<<" proposal 
id:"<<proposal.header().proposal_id();
   assert(current_height_ >= latest_commit_.header().height());
 
-
-  /*
-  if (proposal.header().height() < current_height_) {
-    LOG(ERROR) << "height not match:" << current_height_
-               << " proposal height:" << proposal.header().height();
-    return false;
-  }
-  */
-
   if (proposal.header().height() > current_height_) {
     pending_header_[proposal.header().height()].insert(
         proposal.header().proposer_id());
@@ -124,13 +106,6 @@ int ProposalGraph::AddProposal(const Proposal& proposal) {
   }
 
   //LOG(ERROR)<<"history size:"<<proposal.history_size();
-   /*
-  for (const auto& history : proposal.history()) {
-    std::string hash = history.hash();
-    auto node_it = node_info_.find(hash);
-    assert(node_it != node_info_.end());
-  }
-  */
 
   //LOG(ERROR)<<" proposal history:"<<proposal.history_size();
   if(proposal.history_size()>0){
@@ -139,7 +114,7 @@ int ProposalGraph::AddProposal(const Proposal& proposal) {
     auto node_it = node_info_.find(hash);
     
node_it->second->votes[ProposalState::New].insert(proposal.header().proposer_id());
     CheckState(node_it->second.get(),
-        static_cast<resdb::cassandra::ProposalState>(history.state()));
+        static_cast<resdb::autobahn::ProposalState>(history.state()));
 
     if (node_it->second->state == ProposalState::PoR 
       && node_it->second->proposal.header().proposer_id() == id_){
@@ -289,11 +264,6 @@ void ProposalGraph::Commit(const std::string& hash) {
       }
 
       //LOG(ERROR)<<" bfs sub block size :"<<p->sub_block_size();
-      /*
-      for(auto block : p->sub_block()){
-        LOG(ERROR)<<" get sub block proposer:"<<p->header().proposer_id()<<" 
local id:"<<block.local_id();
-      }
-    */
       it->second->state = ProposalState::Committed;
       if (is_main_hash.find(c_hash) != is_main_hash.end()) {
         commit_num_[p->header().proposer_id()]++;
@@ -322,14 +292,6 @@ void ProposalGraph::Commit(const std::string& hash) {
   int p_num = 0;
   for (int i = commit_p.size() - 1; i >= 0; i--) {
     for (int j = 0; j < commit_p[i].size(); ++j) {
-      /*
-      if (j == 0) {
-        LOG(ERROR) << "commmit proposal lead from:"
-                   << commit_p[i][j]->header().proposer_id()
-                   << " height:" << commit_p[i][j]->header().height()
-                   << " size:" << commit_p[i].size();
-      }
-      */
       //LOG(ERROR) << "commmit proposal:"
        //          << commit_p[i][j]->header().proposer_id()
         //         << " height:" << commit_p[i][j]->header().height()
@@ -508,21 +470,6 @@ Proposal* ProposalGraph::GetStrongestProposal() {
       }
     }
 
-    /*
-    if(node_info->proposal.header().proposer_id() == 
sp->proposal.header().proposer_id()) {
-      continue;
-    }
-
-    if(sp == node_info) {
-      continue;
-    }
-    */
-
-        /*
-    if(node_info_.find(last_hash) != node_info_.end()){
-      node_info_.erase(node_info_.find(last_hash));
-    }
-    */
   }
 
 
@@ -589,7 +536,7 @@ bool ProposalGraph::Compare(const NodeInfo& p1, const 
NodeInfo& p2) {
 
   int h = (p1.proposal.header().height())%total_num_;
   if ( h == 0) h = total_num_;
-  //LOG(ERROR)<<" check height :"<<h<<" 
cmp:"<<abs(p1.proposal.header().proposer_id() - h )<<" 
"<<abs(p2.proposal.header().proposer_id() - h);
+  LOG(ERROR)<<" check height :"<<h<<" 
cmp:"<<abs(p1.proposal.header().proposer_id() - h )<<" 
"<<abs(p2.proposal.header().proposer_id() - h)<<" 
from:"<<p1.proposal.header().proposer_id();
   //if (p1.proposal.header().height() <= 120 && 220 <= 
proposal.header().height()) {
     return abs(p1.proposal.header().proposer_id() - h ) > 
abs(p2.proposal.header().proposer_id() - h);
   //}
@@ -645,11 +592,6 @@ int ProposalGraph::GetCurrentHeight() { return 
current_height_; }
 std::vector<Proposal*> ProposalGraph::GetNewProposals(int height) {
   std::vector<Proposal*> ps;
   for (auto it : new_proposals_) {
-  /*
-    if (it.second->header().height() >= height) {
-      continue;
-    }
-    */
     ps.push_back(it.second);
   }
   for (Proposal* p : ps) {
@@ -667,7 +609,7 @@ std::vector<Block> ProposalGraph::GetNewBlocks() {
   return ps;
 }
 
+*/
 
-}  // namespace cassandra_recv
-}  // namespace cassandra
+}  // namespace autobahn
 }  // namespace resdb
diff --git a/platform/consensus/ordering/autobahn/algorithm/proposal_graph.h 
b/platform/consensus/ordering/autobahn/algorithm/proposal_graph.h
new file mode 100644
index 00000000..7fff4a0b
--- /dev/null
+++ b/platform/consensus/ordering/autobahn/algorithm/proposal_graph.h
@@ -0,0 +1,101 @@
+#pragma once
+
+#include <map>
+
+#include "platform/consensus/ordering/autobahn/algorithm/proposal_state.h"
+//#include "platform/consensus/ordering/autobahn/algorithm/ranking.h"
+#include "platform/consensus/ordering/autobahn/proto/proposal.pb.h"
+#include "platform/statistic/stats.h"
+
+namespace resdb {
+namespace autobahn {
+
+class ProposalGraph {
+ public:
+  ProposalGraph(int fault_num, int id,int total_num);
+  /*
+  inline void SetCommitCallBack(std::function<void(const Proposal&)> func) {
+    commit_callback_ = func;
+  }
+
+  inline void SetBlockNumCallBack(
+    std::function<int(const std::string& hash, int id, int sender)> func) {
+    num_callback_ = func;
+  }
+
+  int AddProposal(const Proposal& proposal);
+  void AddProposalOnly(const Proposal& proposal);
+
+  Proposal* GetLatestStrongestProposal();
+  const Proposal* GetProposalInfo(const std::string& hash) const;
+
+  int GetCurrentHeight();
+
+  void Clear(const std::string& hash);
+  void IncreaseHeight();
+  ProposalState GetProposalState(const std::string& hash) const;
+
+  std::vector<std::unique_ptr<Proposal>> GetNotFound(int height,
+                                                     const std::string& hash);
+
+  std::vector<Proposal*> GetNewProposals(int height);
+  std::vector<Block> GetNewBlocks();
+  */
+
+ private:
+  struct NodeInfo {
+    Proposal proposal;
+    ProposalState state;
+    int score;
+    int is_main;
+    // std::set<int> received_num[5];
+    std::map<int, std::set<int>> votes;
+
+    NodeInfo(const Proposal& proposal)
+        : proposal(proposal), state(ProposalState::New), score(0), is_main(0) 
{}
+  };
+
+  /*
+  bool VerifyParent(const Proposal& proposal);
+
+  bool Compare(const NodeInfo& p1, const NodeInfo& p2);
+  bool Cmp(int id1, int id2);
+  int StateScore(const ProposalState& state);
+  int CompareState(const ProposalState& state1, const ProposalState& state2);
+
+  Proposal* GetStrongestProposal();
+
+  void UpdateHistory(Proposal* proposal);
+  int CheckState(NodeInfo* node_info, ProposalState state);
+  void UpgradeState(ProposalState& state);
+  void TryUpgradeHeight(int height);
+
+  void Commit(const std::string& hash);
+  int GetBlockNum(const std::string& hash, int local_id, int proposer_id);
+  */
+
+ private:
+  Proposal latest_commit_;
+  std::map<std::string, std::vector<std::string>> g_;
+  std::map<std::string, std::unique_ptr<NodeInfo>> node_info_;
+  std::map<std::string, std::vector<VoteMessage>> not_found_;
+  //std::unique_ptr<Ranking> ranking_;
+  std::map<int, int> commit_num_;
+  std::map<int, std::set<std::string>> last_node_;
+  int current_height_;
+  uint32_t f_;
+  std::function<void(const Proposal&)> commit_callback_;
+  std::function<int(const std::string&id, int, int&)> num_callback_;
+  std::map<int, std::set<int>> pending_header_;
+  std::map<int, std::map<std::string, std::vector<std::unique_ptr<Proposal>>>>
+      not_found_proposal_;
+  std::map<std::string, Proposal*> new_proposals_;
+  Stats* global_stats_;
+  int id_;
+  std::map<std::string, Block> new_blocks_;
+  int total_num_;
+  std::set<std::pair<int,int> > check_;
+};
+
+}  // namespace autobahn
+}  // namespace resdb
diff --git 
a/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp 
b/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp
new file mode 100644
index 00000000..b34dcb79
--- /dev/null
+++ b/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp
@@ -0,0 +1,257 @@
+#include "platform/consensus/ordering/autobahn/algorithm/proposal_manager.h"
+
+#include <glog/logging.h>
+
+#include "common/crypto/signature_verifier.h"
+#include "common/utils/utils.h"
+
+namespace resdb {
+namespace autobahn {
+
+namespace {
+std::string Encode(const std::string& hash) {
+  std::string ret;
+  for (int i = 0; i < hash.size(); ++i) {
+    int x = hash[i];
+    ret += std::to_string(x);
+  }
+  return ret;
+}
+
+}
+
+ProposalManager::ProposalManager(int32_t id, int total_num, int f, 
SignatureVerifier* verifier)
+    : id_(id), total_num_(total_num), f_(f), verifier_(verifier) {
+  current_height_ = 0;
+  local_block_id_ = 1;
+  current_slot_ = 1;
+}
+
+void ProposalManager::MakeBlock(
+    std::vector<std::unique_ptr<Transaction>>& txns) {
+  auto block = std::make_unique<Block>();
+  Block::BlockData* data = block->mutable_data();
+  for (const auto& txn : txns) {
+    *data->add_transaction() = *txn;
+  }
+
+  std::string data_str;
+  data->SerializeToString(&data_str);
+  std::string hash = SignatureVerifier::CalculateHash(data_str);
+  block->set_hash(hash);
+  block->set_sender_id(id_);
+  block->set_create_time(GetCurrentTime());
+  block->set_local_id(local_block_id_++);
+  AddLocalBlock(std::move(block));
+  // LOG(ERROR)<<"make block time:"<<block->create_time();
+}
+
+void ProposalManager::AddBlock(std::unique_ptr<Block> block) {
+  std::unique_lock<std::mutex> lk(mutex_);
+  int sender = block->sender_id();
+  int block_id = block->local_id();
+
+  //LOG(ERROR)<<"add block from sender:"<<sender<<" id:"<<block_id; 
+  
+  if(block_id>1) {
+    assert(block->last_sign_info_size() >= f_+1);
+    assert(VerifyBlock(*block));
+    assert(pending_blocks_[sender].find(block_id-1) != 
pending_blocks_[sender].end());
+    *pending_blocks_[sender][block_id-1]->mutable_sign_info() = 
block->last_sign_info();
+  }
+  pending_blocks_[sender][block_id] = std::move(block);
+}
+      
+Block* ProposalManager::GetBlock(int sender, int64_t block_id) {
+  std::unique_lock<std::mutex> lk(mutex_);
+  //LOG(ERROR)<<" get block from sender:"<<sender<<" block id:"<<block_id;
+  auto it = pending_blocks_[sender].find(block_id);
+  assert(it != pending_blocks_[sender].end());
+  return it->second.get();
+}
+
+void ProposalManager::AddLocalBlock(std::unique_ptr<Block> block) {
+  std::unique_lock<std::mutex> lk(mutex_);
+  //LOG(ERROR)<<"add local block :"<<block->local_id();
+  blocks_candidates_[block->local_id()] = std::move(block);
+}
+
+const Block* ProposalManager::GetLocalBlock(int64_t block_id) {
+  std::unique_lock<std::mutex> lk(mutex_);
+  if(blocks_candidates_.find(block_id) == blocks_candidates_.end()) {
+    return nullptr;
+  }
+  //LOG(ERROR)<<"get local block :"<<block_id;
+  while(!blocks_candidates_.empty() && blocks_candidates_.begin()->first < 
block_id) {
+    blocks_candidates_.erase(blocks_candidates_.begin());
+  }
+  Block * block = blocks_candidates_.begin()->second.get();
+  UpdateLastSign(block);
+  return block;
+}
+
+void ProposalManager::BlockReady(const std::map<int, SignInfo>& sign_info, 
int64_t local_id) {
+  std::unique_lock<std::mutex> lk(mutex_);
+  //LOG(ERROR)<<"ready block:"<<local_id;
+  auto it = blocks_candidates_.find(local_id);
+  if(it == blocks_candidates_.end()){
+    return;
+  }
+  assert(it != blocks_candidates_.end());
+  Block * block = it->second.get();
+  for(auto sit : sign_info) {
+    assert(sit.second.hash() == block->hash());
+    *block->add_sign_info() = sit.second;
+    LOG(ERROR)<<" add last sign:"<<sit.second.sender_id();
+  }
+
+  //LOG(ERROR)<<" update block sender:"<<id_<<" local id:"<<local_id;
+  assert(it->second != nullptr);
+  pending_blocks_[id_][local_id] = std::move(it->second);
+  blocks_candidates_.erase(it);
+  current_height_ = std::max(current_height_, local_id);
+}
+
+int64_t ProposalManager::GetCurrentBlockId() {
+  std::unique_lock<std::mutex> lk(mutex_);
+  return current_height_;
+}
+
+void ProposalManager:: UpdateLastSign(Block * block) {
+  int block_id = block->local_id();
+  //LOG(ERROR)<<" update block sign:"<<block_id;
+  if(block_id>1) {
+    auto it = pending_blocks_[id_].find(block_id-1);
+    assert(it != pending_blocks_[id_].end());
+    *block->mutable_last_sign_info() = it->second->sign_info();
+  }
+}
+
+bool ProposalManager::VerifyBlock(const Block& block) {
+
+  if(block.last_sign_info_size() < f_+1) {
+    LOG(ERROR)<<" sign info size fail";
+    return false;
+  }
+
+  std::set<int> senders;
+  for(const auto& sign_info : block.last_sign_info()){
+    if(sign_info.hash() != block.last_sign_info(0).hash()){
+      LOG(ERROR)<<" sign info hash fail";
+      return false;
+    }
+    if(sign_info.local_id() != block.last_sign_info(0).local_id()){
+      LOG(ERROR)<<" sign info local id fail";
+      return false;
+    }
+    //LOG(ERROR)<<" check sign :"<<sign_info.sender_id();
+    senders.insert(sign_info.sender_id());
+
+    bool valid = verifier_->VerifyMessage(sign_info.hash(),
+        sign_info.sign());
+    if (!valid) {
+      LOG(ERROR)<<" sign info sign fail";
+      return false;
+    }
+  }
+  //LOG(ERROR)<<" sign info sender size"<< senders.size();
+  return senders.size() >= f_+1;
+}
+
+SignInfo ProposalManager::SignBlock(const Block& block) {
+  SignInfo sign_info;
+  sign_info.set_hash(block.hash());
+  sign_info.set_sender_id(id_);
+  sign_info.set_local_id(block.local_id());
+
+  auto hash_signature_or = verifier_->SignMessage(block.hash());
+  if (!hash_signature_or.ok()) {
+    LOG(ERROR) << "Sign message fail";
+    return SignInfo();
+  }
+  *sign_info.mutable_sign()=*hash_signature_or;
+  return sign_info;
+}
+
+void ProposalManager::UpdateView(int sender, int64_t block_id) {
+  std::unique_lock<std::mutex> lk(slot_mutex_);
+  //LOG(ERROR)<<"update slot sender:"<<sender<<" slot:"<<current_slot_;
+  if(slot_state_[sender].first != current_slot_) {
+    new_blocks_[current_slot_]++;
+  }
+  slot_state_[sender] = std::make_pair(current_slot_, block_id);
+}
+
+bool ProposalManager::ReadyView(int slot){
+  std::unique_lock<std::mutex> lk(slot_mutex_);
+  //LOG(ERROR)<<"ready slot sender:"<<current_slot_<<" new 
blocks:"<<new_blocks_[current_slot_];
+  return new_blocks_[current_slot_]>=2*f_+1;
+}
+
+int ProposalManager::GetCurrentView() {
+  std::unique_lock<std::mutex> lk(slot_mutex_);
+  return current_slot_;
+}
+
+void ProposalManager::IncreaseView() {
+  std::unique_lock<std::mutex> lk(slot_mutex_);
+  //LOG(ERROR)<<"increase slot:"<<current_slot_;
+  current_slot_++;
+}
+
+std::pair<int, std::map<int, int64_t>> ProposalManager::GetCut() {
+  std::map<int, int64_t> blocks;
+  std::unique_lock<std::mutex> lk(slot_mutex_);
+  for(auto it : slot_state_) {
+    if(it.second.first == current_slot_) {
+      blocks[it.first]=it.second.second;
+      //LOG(ERROR)<<"get cut sender:"<<it.first<<" block:"<<it.second.second;
+    }
+  }
+  current_slot_++;
+  return std::make_pair(current_slot_-1, blocks);
+}
+
+std::unique_ptr<Proposal> ProposalManager::GenerateProposal(int slot, const 
std::map<int, int64_t>& blocks) {
+  auto proposal = std::make_unique<Proposal>();
+  std::string data_hash;
+  {
+    for (auto& it: blocks) {
+      Block* block = proposal->add_block();
+      Block* data_block = GetBlock(it.first, it.second);
+      data_hash += data_block->hash();
+      //LOG(ERROR)<<" gene proposal block from:"<<data_block->sender_id()<<" 
block id:"<<data_block->local_id();
+      *block->mutable_sign_info() = data_block->sign_info();
+      block->set_local_id(data_block->local_id());
+      block->set_sender_id(data_block->sender_id());
+    }
+  }
+  proposal->set_slot_id(slot);
+  proposal->set_sender_id(id_);
+  proposal->set_hash(data_hash);
+  return proposal;
+}
+
+std::unique_ptr<Proposal> ProposalManager::GetProposalData(int slot) {
+  std::unique_lock<std::mutex> lk(p_mutex_);
+  //LOG(ERROR)<<" get proposal:"<<slot;
+  return std::move(pending_proposals_[slot]);
+}
+
+void ProposalManager::AddProposalData(std::unique_ptr<Proposal> p) {
+  std::unique_lock<std::mutex> lk(p_mutex_);
+  int slot_id = p->slot_id();
+  //LOG(ERROR)<<" add proposal:"<<slot_id;
+  /*
+  for(const auto& block : p->block()) {
+    int block_owner = block.sender_id();
+    int block_id = block.local_id();
+    LOG(ERROR)<<" add proposal block:"<<block_owner<<" block id 
:"<<block_id<<" slot:"<<slot_id;
+  }
+  */
+
+  pending_proposals_[slot_id] = std::move(p);
+}
+
+}  // namespace autobahn
+}  // namespace resdb
diff --git a/platform/consensus/ordering/autobahn/algorithm/proposal_manager.h 
b/platform/consensus/ordering/autobahn/algorithm/proposal_manager.h
new file mode 100644
index 00000000..638d6cff
--- /dev/null
+++ b/platform/consensus/ordering/autobahn/algorithm/proposal_manager.h
@@ -0,0 +1,71 @@
+#pragma once
+
+#include <condition_variable>
+#include <list>
+
+#include "platform/consensus/ordering/autobahn/algorithm/proposal_graph.h"
+#include "platform/consensus/ordering/autobahn/proto/proposal.pb.h"
+#include "platform/statistic/stats.h"
+#include "common/crypto/signature_verifier.h"
+
+namespace resdb {
+namespace autobahn {
+
+class ProposalManager {
+ public:
+  ProposalManager(int32_t id, int total_num, int f, SignatureVerifier* 
verifier);
+
+  void MakeBlock(
+      std::vector<std::unique_ptr<Transaction>>& txn);
+  void AddBlock(std::unique_ptr<Block> block);
+  void AddLocalBlock(std::unique_ptr<Block> block);
+  const Block* GetLocalBlock(int64_t block_id);
+  Block* GetBlock(int sender, int64_t block_id);
+  int64_t GetCurrentBlockId();
+
+  void BlockReady(const std::map<int, SignInfo>& sign_info, int64_t local_id);
+
+  SignInfo SignBlock(const Block& block);
+  bool VerifyBlock(const Block& block);
+
+  bool ReadyView(int slot);
+  int GetCurrentView();
+  void IncreaseView();
+  void UpdateView(int sender, int64_t block_id);
+
+  std::pair<int, std::map<int, int64_t>> GetCut();
+  std::unique_ptr<Proposal> GenerateProposal(int slot, const std::map<int, 
int64_t>& blocks);
+
+  std::unique_ptr<Proposal> GetProposalData(int slot);
+  void AddProposalData(std::unique_ptr<Proposal> p);
+
+
+ private:
+  void UpdateLastSign(Block * block);
+
+ private:
+  int32_t id_;
+  int64_t local_block_id_ = 1;
+
+  std::map<int64_t, std::unique_ptr<Block>> pending_blocks_[512];
+  std::mutex mutex_, slot_mutex_, p_mutex_;
+  std::map<int, std::unique_ptr<Block>> blocks_candidates_;
+
+  std::map<int, std::pair<int, int64_t>> slot_state_;
+  std::map<int,int> new_blocks_;
+
+  //std::mutex t_mutex_;
+  //std::map<std::string, std::unique_ptr<Proposal>> local_proposal_;
+  //Stats* global_stats_;
+  int total_num_;
+  int f_;
+  int64_t current_height_;
+  int current_slot_;
+
+  SignatureVerifier* verifier_;
+
+  std::map<int, std::unique_ptr<Proposal> > pending_proposals_;
+};
+
+}  // namespace autobahn
+}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h 
b/platform/consensus/ordering/autobahn/algorithm/proposal_state.h
similarity index 88%
copy from platform/consensus/ordering/cassandra/algorithm/proposal_state.h
copy to platform/consensus/ordering/autobahn/algorithm/proposal_state.h
index 876b1be0..5ff98e16 100644
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h
+++ b/platform/consensus/ordering/autobahn/algorithm/proposal_state.h
@@ -2,12 +2,12 @@
 
 //#define GOPOA
 //#define GOTPOA
-#define GOTPOR
+//#define GOTPOR
 
 //#define NOPOA
 
 namespace resdb {
-namespace cassandra {
+namespace autobahn {
 
 /*
 enum ProposalState {
diff --git a/platform/consensus/ordering/autobahn/algorithm/ranking.cpp 
b/platform/consensus/ordering/autobahn/algorithm/ranking.cpp
new file mode 100644
index 00000000..d8454e43
--- /dev/null
+++ b/platform/consensus/ordering/autobahn/algorithm/ranking.cpp
@@ -0,0 +1,12 @@
+
+#include "platform/consensus/ordering/cassandra/algorithm/ranking.h"
+
+namespace resdb {
+namespace cassandra {
+namespace cassandra_recv {
+
+int Ranking::GetRank(int proposer_id) { return proposer_id; }
+
+}  // namespace cassandra_recv
+}  // namespace cassandra
+}  // namespace resdb
diff --git a/platform/consensus/ordering/autobahn/algorithm/ranking.h 
b/platform/consensus/ordering/autobahn/algorithm/ranking.h
new file mode 100644
index 00000000..d3d2dfb6
--- /dev/null
+++ b/platform/consensus/ordering/autobahn/algorithm/ranking.h
@@ -0,0 +1,14 @@
+#pragma once
+
+namespace resdb {
+namespace cassandra {
+namespace cassandra_recv {
+
+class Ranking {
+ public:
+  int GetRank(int proposer_id);
+};
+
+}  // namespace cassandra_recv
+}  // namespace cassandra
+}  // namespace resdb
diff --git a/platform/consensus/ordering/autobahn/framework/BUILD 
b/platform/consensus/ordering/autobahn/framework/BUILD
new file mode 100644
index 00000000..8bf19829
--- /dev/null
+++ b/platform/consensus/ordering/autobahn/framework/BUILD
@@ -0,0 +1,16 @@
+package(default_visibility = ["//visibility:private"])
+
+cc_library(
+    name = "consensus",
+    srcs = ["consensus.cpp"],
+    hdrs = ["consensus.h"],
+    visibility = [
+        "//visibility:public",
+    ],
+    deps = [
+        "//common/utils",
+        "//platform/consensus/ordering/common/framework:consensus",
+        "//platform/consensus/ordering/autobahn/algorithm:autobahn",
+    ],
+)
+
diff --git a/platform/consensus/ordering/autobahn/framework/consensus.cpp 
b/platform/consensus/ordering/autobahn/framework/consensus.cpp
new file mode 100644
index 00000000..0097a5c4
--- /dev/null
+++ b/platform/consensus/ordering/autobahn/framework/consensus.cpp
@@ -0,0 +1,207 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "platform/consensus/ordering/autobahn/framework/consensus.h"
+
+#include <glog/logging.h>
+#include <unistd.h>
+
+#include "common/utils/utils.h"
+
+namespace resdb {
+namespace autobahn {
+
+Consensus::Consensus(const ResDBConfig& config,
+                     std::unique_ptr<TransactionManager> executor)
+    : common::Consensus(config, std::move(executor)){
+  int total_replicas = config_.GetReplicaNum();
+  int f = (total_replicas - 1) / 3;
+
+  Init();
+
+  start_ = 0;
+
+  if (config_.GetPublicKeyCertificateInfo()
+          .public_key()
+          .public_key_info()
+          .type() != CertificateKeyInfo::CLIENT) {
+    autobahn_ = std::make_unique<AutoBahn>(
+        config_.GetSelfInfo().id(), f,
+                                   total_replicas, 
config_.GetConfigData().block_size(), 
+                                   GetSignatureVerifier());
+
+    InitProtocol(autobahn_.get());
+
+  }
+}
+
+int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) {
+  //LOG(ERROR)<<"receive commit:"<<request->type()<<" 
"<<MessageType_Name(request->user_type())<<" from:"<<request->sender_id();
+  if (request->user_type() == MessageType::NewBlocks) {
+    std::unique_ptr<Block> block = std::make_unique<Block>();
+    if (!block->ParseFromString(request->data())) {
+      assert(1 == 0);
+      LOG(ERROR) << "parse proposal fail";
+      return -1;
+    }
+    autobahn_->ReceiveBlock(std::move(block));
+    return 0;
+  } 
+  else if (request->user_type() == MessageType::CMD_BlockACK) {
+    std::unique_ptr<BlockACK> block_ack = std::make_unique<BlockACK>();
+    if (!block_ack->ParseFromString(request->data())) {
+      LOG(ERROR) << "parse proposal fail";
+      assert(1 == 0);
+      return -1;
+    }
+    autobahn_->ReceiveBlockACK(std::move(block_ack));
+    return 0;
+
+  } else if (request->user_type() == MessageType::NewProposal) {
+    // LOG(ERROR)<<"receive proposal:";
+    std::unique_ptr<Proposal> proposal = std::make_unique<Proposal>();
+    if (!proposal->ParseFromString(request->data())) {
+      LOG(ERROR) << "parse proposal fail";
+      assert(1 == 0);
+      return -1;
+    }
+    if (!autobahn_->ReceiveProposal(std::move(proposal))) {
+      return -1;
+    }
+    return 0;
+  } else if (request->user_type() == MessageType::ProposalAck) {
+    // LOG(ERROR)<<"receive proposal:";
+    std::unique_ptr<Proposal> proposal = std::make_unique<Proposal>();
+    if (!proposal->ParseFromString(request->data())) {
+      LOG(ERROR) << "parse proposal fail";
+      assert(1 == 0);
+      return -1;
+    }
+    if (!autobahn_->ReceiveVote(std::move(proposal))) {
+      return -1;
+    }
+    return 0;
+  } else if (request->user_type() == MessageType::Prepare) {
+    std::unique_ptr<Proposal> proposal = std::make_unique<Proposal>();
+    if (!proposal->ParseFromString(request->data())) {
+      LOG(ERROR) << "parse proposal fail";
+      assert(1 == 0);
+      return -1;
+    }
+    if (!autobahn_->ReceivePrepare(std::move(proposal))) {
+      return -1;
+    }
+    return 0;
+
+  } else if (request->user_type() == MessageType::Commit) {
+    std::unique_ptr<Proposal> proposal = std::make_unique<Proposal>();
+    if (!proposal->ParseFromString(request->data())) {
+      LOG(ERROR) << "parse proposal fail";
+      assert(1 == 0);
+      return -1;
+    }
+    if (!autobahn_->ReceiveCommit(std::move(proposal))) {
+      return -1;
+    }
+    return 0;
+
+    /*
+  } else if (request->user_type() == MessageType::CMD_BlockQuery) {
+    std::unique_ptr<BlockQuery> block = std::make_unique<BlockQuery>();
+    if (!block->ParseFromString(request->data())) {
+      assert(1 == 0);
+      LOG(ERROR) << "parse proposal fail";
+      return -1;
+    }
+    autobahn_->SendBlock(*block);
+    return 0;
+  } else if (request->user_type() == MessageType::CMD_ProposalQuery) {
+    std::unique_ptr<ProposalQuery> query =
+      std::make_unique<ProposalQuery>();
+    if (!query->ParseFromString(request->data())) {
+      assert(1 == 0);
+      LOG(ERROR) << "parse proposal fail";
+      return -1;
+    }
+    autobahn_->SendProposal(*query);
+  } else if (request->user_type() ==
+      MessageType::CMD_ProposalQueryResponse) {
+    std::unique_ptr<ProposalQueryResp> resp =
+      std::make_unique<ProposalQueryResp>();
+    if (!resp->ParseFromString(request->data())) {
+      assert(1 == 0);
+      LOG(ERROR) << "parse proposal fail";
+      return -1;
+    }
+    autobahn_->ReceiveProposalQueryResp(*resp);
+    */
+  } 
+  return 0;
+}
+
+int Consensus::ProcessNewTransaction(std::unique_ptr<Request> request) {
+  std::unique_ptr<Transaction> txn = std::make_unique<Transaction>();
+  txn->set_data(request->data());
+  txn->set_hash(request->hash());
+  txn->set_proxy_id(request->proxy_id());
+  //LOG(ERROR)<<"receive txn";
+  return autobahn_->ReceiveTransaction(std::move(txn));
+}
+
+int Consensus::CommitMsg(const google::protobuf::Message& msg) {
+  return CommitMsgInternal(dynamic_cast<const Transaction&>(msg));
+}
+
+int Consensus::CommitMsgInternal(const Transaction& txn) {
+  //LOG(ERROR)<<"commit txn:"<<txn.id()<<" proxy id:"<<txn.proxy_id()<<" 
uid:"<<txn.uid();
+  std::unique_ptr<Request> request = std::make_unique<Request>();
+  request->set_queuing_time(txn.queuing_time());
+  request->set_data(txn.data());
+  request->set_seq(txn.id());
+  request->set_uid(txn.uid());
+  //if (txn.proposer_id() == config_.GetSelfInfo().id()) {
+    request->set_proxy_id(txn.proxy_id());
+   // LOG(ERROR)<<"commit txn:"<<txn.id()<<" proxy id:"<<request->uid();
+    //assert(request->uid()>0);
+  //}
+
+  transaction_executor_->AddExecuteMessage(std::move(request));
+  return 0;
+}
+
+
+int Consensus::Prepare(const Transaction& txn) {
+  // LOG(ERROR)<<"prepare txn:"<<txn.id()<<" proxy id:"<<txn.proxy_id()<<"
+  // uid:"<<txn.uid();
+  std::unique_ptr<Request> request = std::make_unique<Request>();
+  request->set_data(txn.data());
+  request->set_uid(txn.uid());
+  transaction_executor_->Prepare(std::move(request));
+  return 0;
+}
+
+
+}  // namespace autobahn
+}  // namespace resdb
diff --git a/platform/consensus/ordering/autobahn/framework/consensus.h 
b/platform/consensus/ordering/autobahn/framework/consensus.h
new file mode 100644
index 00000000..a13e3eff
--- /dev/null
+++ b/platform/consensus/ordering/autobahn/framework/consensus.h
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#pragma once
+
+#include "executor/common/transaction_manager.h"
+#include "platform/consensus/ordering/common/framework/consensus.h"
+#include "platform/consensus/ordering/autobahn/algorithm/autobahn.h"
+#include "platform/networkstrate/consensus_manager.h"
+
+namespace resdb {
+namespace autobahn {
+
+class Consensus : public common::Consensus {
+ public:
+  Consensus(const ResDBConfig& config,
+            std::unique_ptr<TransactionManager> transaction_manager);
+  virtual ~Consensus() = default;
+
+ private:
+  int ProcessCustomConsensus(std::unique_ptr<Request> request) override;
+  int ProcessNewTransaction(std::unique_ptr<Request> request) override;
+  int CommitMsg(const google::protobuf::Message& msg) override;
+  int CommitMsgInternal(const Transaction& txn);
+
+  int Prepare(const Transaction& txn);
+
+ protected:
+  std::unique_ptr<AutoBahn> autobahn_;
+  Stats* global_stats_;
+  int64_t start_;
+  std::mutex mutex_;
+  int send_num_[200];
+};
+
+}  // namespace autobahn
+}  // namespace resdb
diff --git a/platform/consensus/ordering/autobahn/framework/consensus_test.cpp 
b/platform/consensus/ordering/autobahn/framework/consensus_test.cpp
new file mode 100644
index 00000000..2c8834a8
--- /dev/null
+++ b/platform/consensus/ordering/autobahn/framework/consensus_test.cpp
@@ -0,0 +1,179 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "platform/consensus/ordering/cassandra/framework/consensus.h"
+
+#include <glog/logging.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <future>
+
+#include "common/test/test_macros.h"
+#include "executor/common/mock_transaction_manager.h"
+#include "platform/config/resdb_config_utils.h"
+#include "platform/networkstrate/mock_replica_communicator.h"
+
+namespace resdb {
+namespace cassandra {
+namespace {
+
+using ::resdb::testing::EqualsProto;
+using ::testing::_;
+using ::testing::Invoke;
+using ::testing::Test;
+
+ResDBConfig GetConfig() {
+  ResDBConfig config({GenerateReplicaInfo(1, "127.0.0.1", 1234),
+                      GenerateReplicaInfo(2, "127.0.0.1", 1235),
+                      GenerateReplicaInfo(3, "127.0.0.1", 1236),
+                      GenerateReplicaInfo(4, "127.0.0.1", 1237)},
+                     GenerateReplicaInfo(1, "127.0.0.1", 1234));
+  return config;
+}
+
+class ConsensusTest : public Test {
+ public:
+  ConsensusTest() : config_(GetConfig()) {
+    auto transaction_manager =
+        std::make_unique<MockTransactionExecutorDataImpl>();
+    mock_transaction_manager_ = transaction_manager.get();
+    consensus_ =
+        std::make_unique<Consensus>(config_, std::move(transaction_manager));
+    consensus_->SetCommunicator(&replica_communicator_);
+  }
+
+  void AddTransaction(const std::string& data) {
+    auto request = std::make_unique<Request>();
+    request->set_type(Request::TYPE_NEW_TXNS);
+
+    Transaction txn;
+
+    BatchUserRequest batch_request;
+    auto req = batch_request.add_user_requests();
+    req->mutable_request()->set_data(data);
+
+    batch_request.set_local_id(1);
+    batch_request.SerializeToString(txn.mutable_data());
+
+    txn.SerializeToString(request->mutable_data());
+
+    EXPECT_EQ(consensus_->ConsensusCommit(nullptr, std::move(request)), 0);
+  }
+
+ protected:
+  ResDBConfig config_;
+  MockTransactionExecutorDataImpl* mock_transaction_manager_;
+  MockReplicaCommunicator replica_communicator_;
+  std::unique_ptr<TransactionManager> transaction_manager_;
+  std::unique_ptr<Consensus> consensus_;
+};
+
+TEST_F(ConsensusTest, NormalCase) {
+  std::promise<bool> commit_done;
+  std::future<bool> commit_done_future = commit_done.get_future();
+
+  EXPECT_CALL(replica_communicator_, BroadCast)
+      .WillRepeatedly(Invoke([&](const google::protobuf::Message& msg) {
+        Request request = *dynamic_cast<const Request*>(&msg);
+
+        if (request.user_type() == MessageType::NewProposal) {
+          LOG(ERROR) << "bc new proposal";
+          consensus_->ConsensusCommit(nullptr,
+                                      std::make_unique<Request>(request));
+          LOG(ERROR) << "recv proposal done";
+        }
+        if (request.user_type() == MessageType::Vote) {
+          LOG(ERROR) << "bc vote";
+
+          VoteMessage ack_msg;
+          assert(ack_msg.ParseFromString(request.data()));
+          for (int i = 1; i <= 3; ++i) {
+            ack_msg.set_proposer_id(i);
+            auto new_req = std::make_unique<Request>(request);
+            ack_msg.SerializeToString(new_req->mutable_data());
+
+            consensus_->ConsensusCommit(nullptr, std::move(new_req));
+          }
+        }
+        // LOG(ERROR)<<"bc type:"<<request->type()<<" user
+        // type:"<<request->user_type();
+        if (request.user_type() == MessageType::Prepare) {
+          LOG(ERROR) << "bc prepare";
+
+          VoteMessage ack_msg;
+          assert(ack_msg.ParseFromString(request.data()));
+          for (int i = 1; i <= 3; ++i) {
+            ack_msg.set_proposer_id(i);
+            auto new_req = std::make_unique<Request>(request);
+            ack_msg.SerializeToString(new_req->mutable_data());
+
+            consensus_->ConsensusCommit(nullptr, std::move(new_req));
+          }
+        }
+        if (request.user_type() == MessageType::Voteprep) {
+          LOG(ERROR) << "bc voterep:";
+
+          VoteMessage ack_msg;
+          assert(ack_msg.ParseFromString(request.data()));
+          for (int i = 1; i <= 3; ++i) {
+            ack_msg.set_proposer_id(i);
+            auto new_req = std::make_unique<Request>(request);
+            ack_msg.SerializeToString(new_req->mutable_data());
+            LOG(ERROR) << "new request type:" << new_req->user_type();
+
+            consensus_->ConsensusCommit(nullptr, std::move(new_req));
+          }
+        }
+        LOG(ERROR) << "done";
+        return 0;
+      }));
+
+  EXPECT_CALL(*mock_transaction_manager_, ExecuteData)
+      .WillOnce(Invoke([&](const std::string& msg) {
+        LOG(ERROR) << "execute txn:" << msg;
+        EXPECT_EQ(msg, "transaction1");
+        return nullptr;
+      }));
+
+  EXPECT_CALL(replica_communicator_, SendMessage(_, 0))
+      .WillRepeatedly(
+          Invoke([&](const google::protobuf::Message& msg, int64_t) {
+            Request request = *dynamic_cast<const Request*>(&msg);
+            if (request.type() == Request::TYPE_RESPONSE) {
+              LOG(ERROR) << "get response";
+              commit_done.set_value(true);
+            }
+            return;
+          }));
+
+  AddTransaction("transaction1");
+
+  commit_done_future.get();
+}
+
+}  // namespace
+}  // namespace cassandra
+}  // namespace resdb
diff --git a/platform/consensus/ordering/autobahn/proto/BUILD 
b/platform/consensus/ordering/autobahn/proto/BUILD
new file mode 100644
index 00000000..58e3ed37
--- /dev/null
+++ b/platform/consensus/ordering/autobahn/proto/BUILD
@@ -0,0 +1,16 @@
+package(default_visibility = 
["//platform/consensus/ordering/autobahn:__subpackages__"])
+
+load("@rules_cc//cc:defs.bzl", "cc_proto_library")
+load("@rules_proto//proto:defs.bzl", "proto_library")
+load("@rules_proto_grpc//python:defs.bzl", "python_proto_library")
+
+proto_library(
+    name = "proposal_proto",
+    srcs = ["proposal.proto"],
+    deps = ["//common/proto:signature_info_proto"],
+)
+
+cc_proto_library(
+    name = "proposal_cc_proto",
+    deps = [":proposal_proto"],
+)
diff --git a/platform/consensus/ordering/autobahn/proto/proposal.proto 
b/platform/consensus/ordering/autobahn/proto/proposal.proto
new file mode 100644
index 00000000..9afd7567
--- /dev/null
+++ b/platform/consensus/ordering/autobahn/proto/proposal.proto
@@ -0,0 +1,133 @@
+
+syntax = "proto3";
+import "common/proto/signature_info.proto";
+
+package resdb.autobahn;
+
+message Transaction{
+  int32 id = 1;
+  bytes data = 2;
+  bytes hash = 3;
+  int32 proxy_id = 4;
+  int32 proposer_id = 5;
+  int64 uid = 6;
+  int64 create_time = 7;
+  int64 queuing_time = 8;
+}
+
+message Header {
+  bytes hash = 1;
+  int32 height = 2;
+  int32 proposer_id = 3;
+  int32 proposal_id = 4;
+  bytes prehash = 5;
+}
+
+message History {
+  bytes hash = 1;
+  int32 state = 2;
+  int32 sender = 3;
+}
+
+message Cert {
+  repeated SignatureInfo sign = 1;
+}
+
+message Proposal {
+  int32 slot_id = 1;
+  int32 sender_id = 2;
+  repeated Block block = 3;
+  bool fast_commit = 4;
+  SignatureInfo sign = 5;
+  bytes hash = 6;
+  Cert cert = 7;
+};
+
+enum MessageType {
+  NewProposal = 0;
+  Vote = 1;
+  Prepare = 2;
+  VoteAck = 3;
+  Commit = 4;
+  Recovery = 5;
+  NewBlocks = 6;
+  ProposalAck = 7;
+  CMD_BlockACK = 8;
+  CMD_BlockQuery = 9;
+  CMD_SingleBlock = 10;
+  CMD_ProposalQuery = 11;
+  CMD_ProposalQueryResponse = 12;
+};
+
+message VoteMessage {
+  bytes hash = 1;
+  int32 proposer_id = 2;
+  MessageType type = 3;
+  int32 sender_id = 4;
+  int32 proposal_id = 5;
+};
+
+message CommittedProposals{
+  repeated Proposal proposals = 1;
+  int32 sender_id = 2;
+};
+
+message HashValue{
+       repeated uint64 bits = 1;
+};
+
+message SignInfo {
+  bytes hash = 1;
+  int32 sender_id = 2;
+  int32 local_id = 3;
+  SignatureInfo sign = 4;
+}
+
+message Block {
+  message BlockData { 
+    repeated Transaction transaction = 1;
+  }
+  BlockData data = 1;
+  bytes hash = 2;
+  int32 sender_id = 3;
+  int32 local_id = 4;
+  int64 create_time = 5;
+  repeated SignInfo last_sign_info = 6;
+  repeated SignInfo sign_info = 7;
+}
+
+message BlockACK {
+  bytes hash = 2;
+  int32 sender_id = 3;
+  int32 local_id = 4;
+  int32 responder = 5;
+  SignInfo sign_info = 6;
+}
+
+message BlockQuery {
+  bytes hash = 2;
+  int32 proposer = 3;
+  int32 local_id = 4;
+  int32 sender = 5;
+}
+
+message ProposalQuery {
+  bytes hash = 2;
+  int32 proposer = 3;
+  int32 id = 4;
+  int32 sender = 5;
+}
+
+message ProposalQueryResp {
+  repeated Proposal proposal = 1;
+}
+
+message ProposalResponse {
+  Header header = 1;
+  int32 leader = 2;
+}
+
+message VoteMsg {
+  int32 slot_id = 1;
+  int32 sender_id = 2;
+}
diff --git a/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp 
b/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp
index ddad256f..ba79e744 100644
--- a/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp
+++ b/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp
@@ -18,8 +18,8 @@ Cassandra::Cassandra(int id, int f, int total_num, int 
block_size, SignatureVeri
   total_num_ = total_num;
   f_ = f;
   is_stop_ = false;
-  timeout_ms_ = 100;
-  //timeout_ms_ = 60000;
+  //timeout_ms_ = 100;
+  timeout_ms_ = 60000;
   local_txn_id_ = 1;
   local_proposal_id_ = 1;
   batch_size_ = block_size;
@@ -32,7 +32,7 @@ Cassandra::Cassandra(int id, int f, int total_num, int 
block_size, SignatureVeri
   execute_id_ = 1;
 
   graph_ = std::make_unique<ProposalGraph>(f_, id, total_num);
-  proposal_manager_ = std::make_unique<ProposalManager>(id, graph_.get());
+  proposal_manager_ = std::make_unique<ProposalManager>(id, graph_.get(), 
total_num_);
 
   graph_->SetCommitCallBack(
       [&](const Proposal& proposal) { CommitProposal(proposal); });
@@ -617,13 +617,16 @@ bool Cassandra::AddProposal(const Proposal& proposal) {
              << " proposal height:" << proposal.header().height()
              << " num:" << received_num_[graph_->GetCurrentHeight()].size()
              << " from:" << proposal.header().proposer_id()
-             << " last vote:" << last_vote_;
+             << " last vote?:" << last_vote_
+             << " total num:"<<total_num_
+             <<" check:"<<(received_num_[graph_->GetCurrentHeight()].size() == 
total_num_);
   if (received_num_[graph_->GetCurrentHeight()].size() == total_num_) {
+    LOG(ERROR)<<" last vote:"<<last_vote_<<" 
CurrentHeight:"<<graph_->GetCurrentHeight();
     if (last_vote_ < graph_->GetCurrentHeight()) {
       last_vote_ = graph_->GetCurrentHeight();
       can_vote_[graph_->GetCurrentHeight()] = true;
       vote_cv_.notify_all();
-      //LOG(ERROR) << "can vote:";
+      LOG(ERROR) << "can vote:";
     }
   }
    //LOG(ERROR)<<"recv done";
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp 
b/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp
index e5b91135..fb7a85f0 100644
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp
+++ b/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp
@@ -589,7 +589,7 @@ bool ProposalGraph::Compare(const NodeInfo& p1, const 
NodeInfo& p2) {
 
   int h = (p1.proposal.header().height())%total_num_;
   if ( h == 0) h = total_num_;
-  //LOG(ERROR)<<" check height :"<<h<<" 
cmp:"<<abs(p1.proposal.header().proposer_id() - h )<<" 
"<<abs(p2.proposal.header().proposer_id() - h);
+  LOG(ERROR)<<" check height :"<<h<<" 
cmp:"<<abs(p1.proposal.header().proposer_id() - h )<<" 
"<<abs(p2.proposal.header().proposer_id() - h)<<" 
from:"<<p1.proposal.header().proposer_id();
   //if (p1.proposal.header().height() <= 120 && 220 <= 
proposal.header().height()) {
     return abs(p1.proposal.header().proposer_id() - h ) > 
abs(p2.proposal.header().proposer_id() - h);
   //}
diff --git 
a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp 
b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp
index 41c03cd2..708c4204 100644
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp
+++ b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp
@@ -21,8 +21,8 @@ std::string Encode(const std::string& hash) {
 
 }
 
-ProposalManager::ProposalManager(int32_t id, ProposalGraph* graph)
-    : id_(id), graph_(graph) {
+ProposalManager::ProposalManager(int32_t id, ProposalGraph* graph, int 
total_num)
+    : id_(id), graph_(graph), total_num_(total_num) {
   local_proposal_id_ = 1;
   global_stats_ = Stats::GetGlobalStats();
 }
@@ -190,12 +190,22 @@ std::unique_ptr<Proposal> 
ProposalManager::GenerateProposal(int round,
       return nullptr;
       // LOG(ERROR) << "generate wait proposal block size:" << blocks_.size();
     }
+
     int max_block = 1;
+
+    int h = round % total_num_;
+    if ( h == 0) h = total_num_;
+    if (id_ != h ) {
+      max_block = 0;
+    }
+
+
     int num = 0;
     int64_t current_time = GetCurrentTime();
     proposal->set_create_time(current_time);
     //LOG(ERROR)<<"block size:"<<blocks_.size();
     for (auto& block : blocks_) {
+      if (max_block <= 0) break;
       data += block->hash();
       Block* ab = proposal->add_block();
       *ab = *block;
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.h 
b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.h
index 0ba35bb0..75b13fc9 100644
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.h
+++ b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.h
@@ -13,7 +13,7 @@ namespace cassandra_recv {
 
 class ProposalManager {
  public:
-  ProposalManager(int32_t id, ProposalGraph* graph);
+  ProposalManager(int32_t id, ProposalGraph* graph, int total_num);
 
   int VerifyProposal(const Proposal& proposal);
 
@@ -68,6 +68,7 @@ class ProposalManager {
   std::mutex t_mutex_;
   std::map<std::string, std::unique_ptr<Proposal>> local_proposal_;
   Stats* global_stats_;
+  int total_num_;
 };
 
 }  // namespace cassandra_recv
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h 
b/platform/consensus/ordering/cassandra/algorithm/proposal_state.h
index 876b1be0..ae497e7a 100644
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h
+++ b/platform/consensus/ordering/cassandra/algorithm/proposal_state.h
@@ -2,7 +2,7 @@
 
 //#define GOPOA
 //#define GOTPOA
-#define GOTPOR
+//#define GOTPOR
 
 //#define NOPOA
 
diff --git a/scripts/deploy/config/cassandra.config 
b/scripts/deploy/config/autobahn.config
similarity index 91%
copy from scripts/deploy/config/cassandra.config
copy to scripts/deploy/config/autobahn.config
index fb5ff19d..f445a70a 100644
--- a/scripts/deploy/config/cassandra.config
+++ b/scripts/deploy/config/autobahn.config
@@ -7,5 +7,5 @@
   "worker_num": 10,
   "input_worker_num": 1,
   "output_worker_num": 5,
-  "block_size":1000
+  "block_size":100
 }
diff --git a/scripts/deploy/config/cassandra.config 
b/scripts/deploy/config/cassandra.config
index fb5ff19d..f445a70a 100644
--- a/scripts/deploy/config/cassandra.config
+++ b/scripts/deploy/config/cassandra.config
@@ -7,5 +7,5 @@
   "worker_num": 10,
   "input_worker_num": 1,
   "output_worker_num": 5,
-  "block_size":1000
+  "block_size":100
 }
diff --git a/scripts/deploy/config/kv_performance_server.conf 
b/scripts/deploy/config/kv_performance_server.conf
index 823c66cb..c260a9bb 100644
--- a/scripts/deploy/config/kv_performance_server.conf
+++ b/scripts/deploy/config/kv_performance_server.conf
@@ -1,12 +1,12 @@
 iplist=(
 172.31.18.66
-172.31.27.193
 172.31.27.153
-172.31.16.133
 172.31.31.76
-172.31.17.121
 172.31.24.63
 172.31.20.159
+172.31.25.33
+172.31.30.103
+172.31.31.166
 )
 
 client_num=4
diff --git a/scripts/deploy/config/kv_performance_server_32.conf 
b/scripts/deploy/config/kv_performance_server_32.conf
index 338c7b0a..50b7fcc2 100644
--- a/scripts/deploy/config/kv_performance_server_32.conf
+++ b/scripts/deploy/config/kv_performance_server_32.conf
@@ -63,8 +63,8 @@ iplist=(
 172.31.21.148
 172.31.17.147
 172.31.21.207
-172.31.30.15
-172.31.26.29
+#172.31.30.15
+#172.31.26.29
 )
 
 key=~/.ssh/junchao.pem
diff --git a/scripts/deploy/performance/autobahn_performance.sh 
b/scripts/deploy/performance/autobahn_performance.sh
new file mode 100755
index 00000000..750be672
--- /dev/null
+++ b/scripts/deploy/performance/autobahn_performance.sh
@@ -0,0 +1,5 @@
+export server=//benchmark/protocols/autobahn:kv_server_performance
+export TEMPLATE_PATH=$PWD/config/autobahn.config
+
+./performance/run_performance.sh $*
+echo $0
diff --git a/scripts/deploy/performance/run_performance.sh 
b/scripts/deploy/performance/run_performance.sh
index d1b9c22a..ece3816f 100755
--- a/scripts/deploy/performance/run_performance.sh
+++ b/scripts/deploy/performance/run_performance.sh
@@ -21,7 +21,7 @@ echo "get cofigfile:"$config_file
 ${BAZEL_WORKSPACE_PATH}/bazel-bin/benchmark/protocols/pbft/kv_service_tools 
$config_file
 done
 
-sleep 120
+sleep 60
 
 echo "benchmark done"
 count=1
diff --git a/scripts/null b/scripts/null
index 93ada22f..1afefe98 100644
--- a/scripts/null
+++ b/scripts/null
@@ -1 +1 @@
-/home/ubuntu/asf-resilientdb/service/contract/benchmark/data/smallbank.json: 
No such file or directory
+/home/ubuntu/asf_resilientdb/service/contract/benchmark/data/smallbank.json: 
No such file or directory

Reply via email to