This is an automated email from the ASF dual-hosted git repository.

junchao pushed a commit to branch smart_contract_merge
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/smart_contract_merge by this 
push:
     new e5695291 merge master
e5695291 is described below

commit e56952911aa098a391392f744503bfdd8cd04c30
Author: cjcchen <[email protected]>
AuthorDate: Fri Feb 14 03:06:10 2025 +0800

    merge master
---
 platform/config/resdb_config.cpp                   |   5 +-
 platform/config/resdb_config.h                     |  15 +-
 .../consensus/execution/transaction_executor.cpp   | 394 +++++++++++++--
 .../consensus/execution/transaction_executor.h     |  51 +-
 .../consensus/ordering/cassandra/algorithm/BUILD   |  76 ---
 .../ordering/cassandra/algorithm/cassandra.cpp     | 562 ---------------------
 .../ordering/cassandra/algorithm/cassandra.h       | 114 -----
 .../cassandra/algorithm/proposal_graph.cpp         | 491 ------------------
 .../ordering/cassandra/algorithm/proposal_graph.h  |  87 ----
 .../cassandra/algorithm/proposal_manager.cpp       | 470 -----------------
 .../cassandra/algorithm/proposal_manager.h         |  73 ---
 .../ordering/cassandra/algorithm/proposal_state.h  |  16 -
 .../ordering/cassandra/algorithm/ranking.cpp       |  12 -
 .../ordering/cassandra/algorithm/ranking.h         |  14 -
 .../consensus/ordering/cassandra/framework/BUILD   |  16 -
 .../ordering/cassandra/framework/consensus.cpp     | 171 -------
 .../ordering/cassandra/framework/consensus.h       |  59 ---
 .../cassandra/framework/consensus_test.cpp         | 179 -------
 platform/consensus/ordering/cassandra/proto/BUILD  |  16 -
 .../ordering/cassandra/proto/proposal.proto        | 119 -----
 20 files changed, 427 insertions(+), 2513 deletions(-)

diff --git a/platform/config/resdb_config.cpp b/platform/config/resdb_config.cpp
index 4c3ff954..23129a25 100644
--- a/platform/config/resdb_config.cpp
+++ b/platform/config/resdb_config.cpp
@@ -76,6 +76,9 @@ ResDBConfig::ResDBConfig(const ResConfigData& config_data,
   if (config_data_.tcp_batch_num() == 0) {
     config_data_.set_tcp_batch_num(100);
   }
+  if (config_data_.max_process_txn() == 0) {
+    config_data_.set_max_process_txn(64);
+  }
 }
 
 void ResDBConfig::SetConfigData(const ResConfigData& config_data) {
@@ -177,7 +180,7 @@ void ResDBConfig::SetSignatureVerifierEnabled(bool 
enable_sv) {
 }
 
 // Performance setting
-bool ResDBConfig::IsPerformanceRunning() {
+bool ResDBConfig::IsPerformanceRunning() const {
   return is_performance_running_ || GetConfigData().is_performance_running();
 }
 
diff --git a/platform/config/resdb_config.h b/platform/config/resdb_config.h
index fb56a9dd..9867c6d5 100644
--- a/platform/config/resdb_config.h
+++ b/platform/config/resdb_config.h
@@ -94,7 +94,7 @@ class ResDBConfig {
   void SetSignatureVerifierEnabled(bool enable_sv);
 
   // Performance setting
-  bool IsPerformanceRunning();
+  bool IsPerformanceRunning() const;
   void RunningPerformance(bool);
 
   bool IsTestMode() const;
@@ -135,15 +135,18 @@ class ResDBConfig {
   bool signature_verifier_enabled_ = true;
   bool is_performance_running_ = false;
   bool is_test_mode_ = false;
-  uint32_t max_process_txn_ = 2048;
   uint32_t client_batch_wait_time_ms_ = 100;  // milliseconds, 0.1s
-  uint32_t client_batch_num_ = 100;
   uint64_t viewchange_commit_timeout_ms_ =
       60000;  // default 60s to change viewchange
 
-  uint32_t worker_num_ = 64;
-  uint32_t input_worker_num_ = 1;
-  uint32_t output_worker_num_ = 1;
+
+  // This is the default settings.
+  // change these parameters in the configuration.
+  uint32_t max_process_txn_ = 64;
+  uint32_t worker_num_ = 16;
+  uint32_t input_worker_num_ = 5;
+  uint32_t output_worker_num_ = 5;
+  uint32_t client_batch_num_ = 100;
 };
 
 }  // namespace resdb
diff --git a/platform/consensus/execution/transaction_executor.cpp 
b/platform/consensus/execution/transaction_executor.cpp
index fd24da3a..1f98babd 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -20,6 +20,7 @@
 #include "platform/consensus/execution/transaction_executor.h"
 
 #include <glog/logging.h>
+#include "common/utils/utils.h"
 
 namespace resdb {
 
@@ -35,30 +36,86 @@ TransactionExecutor::TransactionExecutor(
       execute_queue_("execute"),
       stop_(false),
       duplicate_manager_(nullptr) {
+
+  memset(blucket_, 0, sizeof(blucket_));
   global_stats_ = Stats::GetGlobalStats();
   ordering_thread_ = std::thread(&TransactionExecutor::OrderMessage, this);
-  execute_thread_ = std::thread(&TransactionExecutor::ExecuteMessage, this);
+  for (int i = 0; i < execute_thread_num_; ++i) {
+    execute_thread_.push_back(
+        std::thread(&TransactionExecutor::ExecuteMessage, this));
+  }
+
+  for (int i = 0; i < 1; ++i) {
+    prepare_thread_.push_back(
+        std::thread(&TransactionExecutor::PrepareMessage, this));
+  }
 
   if (transaction_manager_ && transaction_manager_->IsOutOfOrder()) {
     execute_OOO_thread_ =
         std::thread(&TransactionExecutor::ExecuteMessageOutOfOrder, this);
     LOG(ERROR) << " is out of order:" << transaction_manager_->IsOutOfOrder();
   }
+  gc_thread_ = std::thread(&TransactionExecutor::GCProcess, this);
 }
 
 TransactionExecutor::~TransactionExecutor() { Stop(); }
 
+void TransactionExecutor::RegisterExecute(int64_t seq) {
+  if (execute_thread_num_ == 1) return;
+  int idx = seq % blucket_num_;
+  std::unique_lock<std::mutex> lk(mutex_);
+  // LOG(ERROR)<<"register seq:"<<seq<<" bluck:"<<blucket_[idx];
+  assert(!blucket_[idx] || !(blucket_[idx] ^ 3));
+  blucket_[idx] = 1;
+  // LOG(ERROR)<<"register seq:"<<seq;
+}
+
+void TransactionExecutor::WaitForExecute(int64_t seq) {
+  if (execute_thread_num_ == 1) return;
+  int pre_idx = (seq - 1 + blucket_num_) % blucket_num_;
+
+  while (!IsStop()) {
+    std::unique_lock<std::mutex> lk(mutex_);
+    cv_.wait_for(lk, std::chrono::milliseconds(10000), [&] {
+      return ((blucket_[pre_idx] & 2) || !blucket_[pre_idx]);
+    });
+    if ((blucket_[pre_idx] & 2) || !blucket_[pre_idx]) {
+      break;
+    }
+  }
+  // LOG(ERROR)<<"wait for :"<<seq<<" done";
+}
+
+void TransactionExecutor::FinishExecute(int64_t seq) {
+  if (execute_thread_num_ == 1) return;
+  int idx = seq % blucket_num_;
+  std::unique_lock<std::mutex> lk(mutex_);
+  // LOG(ERROR)<<"finish :"<<seq<<" done";
+  blucket_[idx] = 3;
+  cv_.notify_all();
+}
+
 void TransactionExecutor::Stop() {
   stop_ = true;
   if (ordering_thread_.joinable()) {
     ordering_thread_.join();
   }
-  if (execute_thread_.joinable()) {
-    execute_thread_.join();
+  for (auto& th : execute_thread_) {
+    if (th.joinable()) {
+      th.join();
+    }
+  }
+  for (auto& th : prepare_thread_) {
+    if (th.joinable()) {
+      th.join();
+    }
   }
   if (execute_OOO_thread_.joinable()) {
     execute_OOO_thread_.join();
   }
+  if (gc_thread_.joinable()) {
+    gc_thread_.join();
+  }
 }
 
 Storage* TransactionExecutor::GetStorage() {
@@ -144,6 +201,12 @@ void TransactionExecutor::OrderMessage() {
   return;
 }
 
+void TransactionExecutor::AddExecuteMessage(std::unique_ptr<Request> message) {
+    global_stats_->IncCommit();
+    message->set_commit_time(GetCurrentTime());
+    execute_queue_.Push(std::move(message));
+}
+
 void TransactionExecutor::ExecuteMessage() {
   while (!IsStop()) {
     auto message = execute_queue_.Pop();
@@ -154,7 +217,11 @@ void TransactionExecutor::ExecuteMessage() {
     if (transaction_manager_ && transaction_manager_->IsOutOfOrder()) {
       need_execute = false;
     }
+    int64_t start_time = GetCurrentTime();
+    global_stats_->AddExecuteQueuingLatency(start_time - 
message->commit_time());
     Execute(std::move(message), need_execute);
+    int64_t end_time = GetCurrentTime();
+    global_stats_->AddExecuteLatency(end_time-start_time);
   }
 }
 
@@ -184,10 +251,7 @@ void 
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
   // LOG(INFO) << " get request batch size:"
   //          << batch_request.user_requests_size()<<" proxy
   //          id:"<<request->proxy_id();
-  // std::unique_ptr<BatchUserResponse> batch_response =
-  //     std::make_unique<BatchUserResponse>();
   std::unique_ptr<BatchUserResponse> response;
-  global_stats_->GetTransactionDetails(batch_request);
   if (transaction_manager_) {
     response = transaction_manager_->ExecuteBatch(batch_request);
   }
@@ -198,51 +262,321 @@ void 
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
 
 void TransactionExecutor::Execute(std::unique_ptr<Request> request,
                                   bool need_execute) {
-  // Execute the request, then send the response back to the user.
-  BatchUserRequest batch_request;
-  if (!batch_request.ParseFromString(request->data())) {
-    LOG(ERROR) << "parse data fail";
+  uint64_t uid = request->uid();
+  int64_t seq = request->seq();
+  RegisterExecute(request->seq());
+  std::unique_ptr<BatchUserRequest> batch_request = nullptr;
+  std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>> 
data;
+  std::vector<std::unique_ptr<google::protobuf::Message>> * data_p = nullptr;
+  BatchUserRequest* batch_request_p = nullptr;
+
+  bool need_gc = false;
+
+  if (request->uid() > 0) {
+    bool current_f = SetFlag(uid, Start_Execute);
+    if (!current_f) {
+      global_stats_->ConsumeTransactions(1);
+      std::unique_ptr<std::future<int>> data_f = GetFuture(uid);
+      //LOG(ERROR)<<"wait prepare:"<<uid;
+      {
+        data_f->get();
+      }
+      //LOG(ERROR)<<"wait prepare done:"<<uid;
+
+      // prepare is done
+      //LOG(ERROR)<<"prepare is done:"<<uid;
+      {
+        int64_t start_time = GetCurrentTime();
+        std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
+        if(req_[uid % mod][uid] == nullptr){
+          LOG(ERROR)<<"data is empty:"<<uid;
+        }
+        assert(req_[uid % mod][uid] != nullptr);
+        //batch_request = std::move(req_[uid % mod][uid]);
+        batch_request_p = req_[uid % mod][uid].get();
+        auto it = data_[uid % mod].find(uid);
+        if (it != data_[uid % mod].end()) {
+          assert(it->second!=nullptr);
+          data_p = it->second.get();
+          //data = std::move(it->second);
+        }
+        int64_t end_time = GetCurrentTime();
+        if (end_time - start_time > 1000) {
+          LOG(ERROR) << "get data done:" << uid
+                     << " wait time:" << (end_time - start_time);
+        }
+      }
+      ClearPromise(uid);
+      need_gc = true;
+    } else {
+      global_stats_->AddNewTransactions(1);
+      //LOG(ERROR)<<"commit start:"<<uid;
+      // LOG(ERROR)<<" prepare not start:"<<uid;
+    }
   }
-  batch_request.set_seq(request->seq());
-  batch_request.set_hash(request->hash());
-  batch_request.set_proxy_id(request->proxy_id());
-  if (request->has_committed_certs()) {
-    *batch_request.mutable_committed_certs() = request->committed_certs();
+
+  // Execute the request, then send the response back to the user.
+  if (batch_request_p == nullptr) {
+    batch_request = std::make_unique<BatchUserRequest>();
+    if (!batch_request->ParseFromString(request->data())) {
+      LOG(ERROR) << "parse data fail";
+    }
+    batch_request->set_hash(request->hash());
+    if (request->has_committed_certs()) {
+      *batch_request->mutable_committed_certs() = request->committed_certs();
+    }
+    batch_request->set_seq(request->seq());
+    batch_request->set_proxy_id(request->proxy_id());
+    batch_request_p = batch_request.get();
+    // LOG(ERROR)<<"get data from req:";
+  } else {
+  assert(batch_request_p);
+    batch_request_p->set_seq(request->seq());
+    batch_request_p->set_proxy_id(request->proxy_id());
+    // LOG(ERROR)<<" get from cache:"<<uid;
   }
+  assert(batch_request_p);
 
   // LOG(INFO) << " get request batch size:"
-  //         << batch_request.user_requests_size()<<" proxy
-  //         id:"<<request->proxy_id()<<" need execute:"<<need_execute;
-  // std::unique_ptr<BatchUserResponse> batch_response =
-  //     std::make_unique<BatchUserResponse>();
+  // << batch_request.user_requests_size()<<" proxy id:"
+  //  <<request->proxy_id()<<" need execute:"<<need_execute;
 
   std::unique_ptr<BatchUserResponse> response;
-  global_stats_->GetTransactionDetails(batch_request);
+  // need_execute = false;
   if (transaction_manager_ && need_execute) {
-    response = transaction_manager_->ExecuteBatch(batch_request);
-  }
+    if (execute_thread_num_ == 1) {
+      response = transaction_manager_->ExecuteBatch(*batch_request_p);
+    } else {
+      std::vector<std::unique_ptr<std::string>> response_v;
+
+      if(data_p == nullptr) {
+        int64_t start_time = GetCurrentTime();
+        data = std::move(transaction_manager_->Prepare(*batch_request_p));
+        int64_t end_time = GetCurrentTime();
+        if (end_time - start_time > 10) {
+          // LOG(ERROR)<<"exec data done:"<<uid<<" wait
+          // time:"<<(end_time-start_time);
+        }
+        data_p = data.get();
+      }
 
-  if (duplicate_manager_) {
-    duplicate_manager_->AddExecuted(batch_request.hash(), batch_request.seq());
+      WaitForExecute(request->seq());
+           if(data_p->empty() || (*data_p)[0] == nullptr){
+                   response = 
transaction_manager_->ExecuteBatch(*batch_request_p);
+           }
+           else {
+                   response_v = 
transaction_manager_->ExecuteBatchData(*data_p);
+           }
+      FinishExecute(request->seq());
+
+      if(response == nullptr){
+             response = std::make_unique<BatchUserResponse>();
+             for (auto& s : response_v) {
+                     response->add_response()->swap(*s);
+             }
+      }
+    }
   }
+  // LOG(ERROR)<<" CF = :"<<(cf==1)<<" uid:"<<uid;
 
-  global_stats_->IncTotalRequest(batch_request.user_requests_size());
   if (response == nullptr) {
     response = std::make_unique<BatchUserResponse>();
   }
+  global_stats_->IncTotalRequest(batch_request_p->user_requests_size());
+  response->set_proxy_id(batch_request_p->proxy_id());
+  response->set_createtime(batch_request_p->createtime() + 
request->queuing_time());
+  response->set_local_id(batch_request_p->local_id());
+  global_stats_->AddCommitDelay(GetCurrentTime()- response->createtime());
 
-  response->set_proxy_id(batch_request.proxy_id());
-  response->set_createtime(batch_request.createtime());
-  response->set_local_id(batch_request.local_id());
-  response->set_hash(batch_request.hash());
+  response->set_seq(request->seq());
 
-  post_exec_func_(std::move(request), std::move(response));
+  if (post_exec_func_) {
+    post_exec_func_(std::move(request), std::move(response));
+  }
 
   global_stats_->IncExecuteDone();
+  if(need_gc){
+    gc_queue_.Push(std::make_unique<int64_t>(uid));
+  }
 }
 
 void TransactionExecutor::SetDuplicateManager(DuplicateManager* manager) {
   duplicate_manager_ = manager;
 }
 
+
+bool TransactionExecutor::SetFlag(uint64_t uid, int f) {
+  std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
+  auto it = flag_[uid % mod].find(uid);
+  if (it == flag_[uid % mod].end()) {
+    flag_[uid % mod][uid] |= f;
+    // LOG(ERROR)<<"NO FUTURE uid:"<<uid;
+    return true;
+  }
+  assert(it != flag_[uid % mod].end());
+  if (f == Start_Prepare) {
+    if (flag_[uid % mod][uid] & Start_Execute) {
+      return false;
+    }
+  } else if(f == Start_Execute){
+    if (flag_[uid % mod][uid] & End_Prepare) {
+    //if (flag_[uid % mod][uid] & Start_Prepare) {
+      return false;
+    }
+  }
+  flag_[uid % mod][uid] |= f;
+  return true;
+}
+
+void TransactionExecutor::ClearPromise(uint64_t uid) {
+  std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
+  auto it = pre_[uid % mod].find(uid);
+  if (it == pre_[uid % mod].end()) {
+    return;
+  }
+  // LOG(ERROR)<<"CLEAR UID:"<<uid;
+  assert(it != pre_[uid % mod].end());
+  assert(flag_[uid % mod].find(uid) != flag_[uid % mod].end());
+  //assert(data_[uid%mod].find(uid) != data_[uid%mod].end());
+  //assert(req_[uid%mod].find(uid) != req_[uid%mod].end());
+  //data_[uid%mod].erase(data_[uid%mod].find(uid));
+  //req_[uid%mod].erase(req_[uid%mod].find(uid));
+  pre_[uid % mod].erase(it);
+  flag_[uid % mod].erase(flag_[uid % mod].find(uid));
+}
+
+std::promise<int>* TransactionExecutor::GetPromise(uint64_t uid) {
+  std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
+  auto it = pre_[uid % mod].find(uid);
+  if (it == pre_[uid % mod].end()) {
+    return nullptr;
+  }
+  return it->second.get();
+}
+
+std::unique_ptr<std::future<int>> TransactionExecutor::GetFuture(uint64_t uid) 
{
+  std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
+  auto it = pre_[uid % mod].find(uid);
+  if (it == pre_[uid % mod].end()) {
+    return nullptr;
+  }
+  //return std::move(it->second);
+  // LOG(ERROR)<<"add future:"<<uid;
+  return std::make_unique<std::future<int>>(it->second->get_future());
+}
+
+bool TransactionExecutor::AddFuture(uint64_t uid) {
+  std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
+  auto it = pre_[uid % mod].find(uid);
+  if (it == pre_[uid % mod].end()) {
+    // LOG(ERROR)<<"add future:"<<uid;
+    std::unique_ptr<std::promise<int>> p =
+        std::make_unique<std::promise<int>>();
+    //auto f = std::make_unique<std::future<int>>(p->get_future());
+    pre_[uid % mod][uid] = std::move(p);
+    //pre_f_[uid % mod][uid] = std::move(f);
+    flag_[uid % mod][uid] = 0;
+    return true;
+  }
+  return false;
+}
+
+void TransactionExecutor::Prepare(std::unique_ptr<Request> request) {
+  if (AddFuture(request->uid())) {
+    prepare_queue_.Push(std::move(request));
+  }
+}
+
+void TransactionExecutor::GCProcess() {
+  while (!IsStop()) {
+    std::unique_ptr<int64_t> uid_or = gc_queue_.Pop();
+    if (uid_or== nullptr) {
+      continue;
+    }
+    int64_t uid = *uid_or;
+  
+    std::vector<std::unique_ptr<google::protobuf::Message>> * data_p = nullptr;
+    {
+      std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
+      assert(data_[uid%mod].find(uid) != data_[uid%mod].end());
+      data_p = data_[uid%mod][uid].get();
+    }
+
+    for(int i = 0; i < data_p->size(); ++i){
+      (*data_p)[i].release();
+    }
+    (*data_p).clear();
+    {
+      std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
+      assert(req_[uid%mod].find(uid) != req_[uid%mod].end());
+      assert(data_[uid%mod].find(uid) != data_[uid%mod].end());
+      data_[uid%mod].erase(data_[uid%mod].find(uid));
+      req_[uid%mod].erase(req_[uid%mod].find(uid));
+    }
+  }
+}
+
+void TransactionExecutor::PrepareMessage() {
+  while (!IsStop()) {
+    std::unique_ptr<Request> request = prepare_queue_.Pop();
+    if (request == nullptr) {
+      continue;
+    }
+
+    uint64_t uid = request->uid();
+    int current_f = SetFlag(uid, Start_Prepare);
+    if (current_f == 0) {
+      // commit has done
+      // LOG(ERROR)<<" want prepare, commit started:"<<uid;
+//      ClearPromise(uid);
+      continue;
+    }
+
+    std::promise<int>* p = GetPromise(uid) ;
+    assert(p);
+    //LOG(ERROR)<<" prepare started:"<<uid;
+
+    // LOG(ERROR)<<" prepare uid:"<<uid;
+
+    // Execute the request, then send the response back to the user.
+    std::unique_ptr<BatchUserRequest> batch_request =
+        std::make_unique<BatchUserRequest>();
+    if (!batch_request->ParseFromString(request->data())) {
+      LOG(ERROR) << "parse data fail";
+    }
+    // batch_request = std::make_unique<BatchUserRequest>();
+    batch_request->set_seq(request->seq());
+    batch_request->set_hash(request->hash());
+    batch_request->set_proxy_id(request->proxy_id());
+    if (request->has_committed_certs()) {
+      *batch_request->mutable_committed_certs() = request->committed_certs();
+    }
+
+    // LOG(ERROR)<<"prepare seq:"<<batch_request->seq()<<" proxy
+    // id:"<<request->proxy_id()<<" local id:"<<batch_request->local_id();
+
+    std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
+     request_v = transaction_manager_->Prepare(*batch_request);
+    {
+      std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
+   //   assert(request_v);
+      // assert(data_[uid%mod].find(uid) == data_[uid%mod].end());
+      data_[uid%mod][uid] = std::move(request_v);
+      req_[uid % mod][uid] = std::move(batch_request);
+    }
+    //LOG(ERROR)<<"set promise:"<<uid;
+    p->set_value(1);
+    {
+      int set_ret = SetFlag(uid, End_Prepare);
+      if (set_ret == 0) {
+        // LOG(ERROR)<<"commit interrupt:"<<uid;
+        //ClearPromise(uid);
+      } else {
+        //LOG(ERROR)<<"prepare done:"<<uid;
+      }
+    }
+  }
+}
+
+
 }  // namespace resdb
diff --git a/platform/consensus/execution/transaction_executor.h 
b/platform/consensus/execution/transaction_executor.h
index 2b111164..cbd31c60 100644
--- a/platform/consensus/execution/transaction_executor.h
+++ b/platform/consensus/execution/transaction_executor.h
@@ -62,8 +62,16 @@ class TransactionExecutor {
 
   void SetDuplicateManager(DuplicateManager* manager);
 
+  void AddExecuteMessage(std::unique_ptr<Request> message);
+
   Storage* GetStorage();
 
+  void RegisterExecute(int64_t seq);
+  void WaitForExecute(int64_t seq);
+  void FinishExecute(int64_t seq);
+
+  void Prepare(std::unique_ptr<Request> request);
+
  private:
   void Execute(std::unique_ptr<Request> request, bool need_execute = true);
   void OnlyExecute(std::unique_ptr<Request> request);
@@ -80,6 +88,15 @@ class TransactionExecutor {
 
   void UpdateMaxExecutedSeq(uint64_t seq);
 
+  bool SetFlag(uint64_t uid, int f);
+  void ClearPromise(uint64_t uid);
+  void PrepareMessage();
+  void GCProcess();
+
+  bool AddFuture(uint64_t uid);
+  std::unique_ptr<std::future<int>> GetFuture(uint64_t uid);
+  std::promise<int>* GetPromise(uint64_t uid);
+
  protected:
   ResDBConfig config_;
 
@@ -91,11 +108,43 @@ class TransactionExecutor {
   SystemInfo* system_info_ = nullptr;
   std::unique_ptr<TransactionManager> transaction_manager_ = nullptr;
   std::map<uint64_t, std::unique_ptr<Request>> candidates_;
-  std::thread ordering_thread_, execute_thread_, execute_OOO_thread_;
+  std::thread ordering_thread_, execute_OOO_thread_;
+  std::vector<std::thread> execute_thread_;
   LockFreeQueue<Request> commit_queue_, execute_queue_, execute_OOO_queue_;
   std::atomic<bool> stop_;
   Stats* global_stats_ = nullptr;
   DuplicateManager* duplicate_manager_;
+  int execute_thread_num_ = 10;
+  static const int blucket_num_ = 1024;
+  int blucket_[blucket_num_];
+  std::condition_variable cv_;
+  std::mutex mutex_;
+
+  enum PrepareType {
+    Start_Prepare = 1,
+    Start_Execute = 2,
+    End_Prepare = 4,
+  };
+
+
+  std::vector<std::thread> prepare_thread_;
+  std::thread gc_thread_;
+  static const int mod = 2048;
+  std::mutex f_mutex_[mod], fd_mutex_[mod];
+  LockFreeQueue<Request> prepare_queue_;
+  LockFreeQueue<int64_t> gc_queue_;
+  typedef std::unique_ptr<std::promise<int>> PromiseType;
+  std::map<uint64_t, PromiseType> pre_[mod];
+
+  std::map<uint64_t, std::unique_ptr<std::future<int>>> pre_f_[mod];
+  std::map<uint64_t, int> flag_[mod];
+
+  std::map<uint64_t, std::unique_ptr<BatchUserRequest>> req_[mod];
+  std::unordered_map<
+      uint64_t,
+      std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>>
+      data_[mod];
+
 };
 
 }  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/BUILD 
b/platform/consensus/ordering/cassandra/algorithm/BUILD
deleted file mode 100644
index 1d86e0fe..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/BUILD
+++ /dev/null
@@ -1,76 +0,0 @@
-package(default_visibility = 
["//platform/consensus/ordering/cassandra:__subpackages__"])
-
-cc_library(
-    name = "proposal_state",
-    hdrs = ["proposal_state.h"],
-)
-
-cc_library(
-    name = "proposal_manager",
-    srcs = ["proposal_manager.cpp"],
-    hdrs = ["proposal_manager.h"],
-    deps = [
-        ":proposal_graph",
-        "//common:comm",
-        "//platform/statistic:stats",
-        "//common/crypto:signature_verifier",
-        "//common/utils",
-        "//platform/consensus/ordering/cassandra/proto:proposal_cc_proto",
-    ],
-)
-
-cc_library(
-    name = "ranking",
-    srcs = ["ranking.cpp"],
-    hdrs = ["ranking.h"],
-    deps = [
-        "//common:comm",
-    ],
-)
-
-cc_library(
-    name = "proposal_graph",
-    srcs = ["proposal_graph.cpp"],
-    hdrs = ["proposal_graph.h"],
-    deps = [
-        ":ranking",
-        ":proposal_state",
-        "//platform/statistic:stats",
-        "//common:comm",
-        "//common/utils",
-        "//platform/consensus/ordering/cassandra/proto:proposal_cc_proto",
-    ],
-)
-
-cc_library(
-    name = "cassandra",
-    srcs = ["cassandra.cpp"],
-    hdrs = ["cassandra.h"],
-    deps = [
-        ":proposal_graph",
-        ":proposal_manager",
-        "//platform/statistic:stats",
-        "//common:comm",
-        "//common/crypto:signature_verifier",
-        "//platform/consensus/ordering/common/algorithm:protocol_base",
-        "//platform/common/queue:lock_free_queue",
-    ],
-)
-
-cc_test(
-    name = "proposal_graph_test",
-    srcs = ["proposal_graph_test.cpp"],
-    deps = [
-        ":proposal_graph",
-        "//common/test:test_main",
-    ],
-)
-
-cc_test(
-    name = "cassandra_test",
-    srcs = ["cassandra_test.cpp"],
-    deps = [
-        ":cassandra",
-        "//common/test:test_main",
-    ],
-)
diff --git a/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp 
b/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp
deleted file mode 100644
index 04a81bb2..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp
+++ /dev/null
@@ -1,562 +0,0 @@
-#include "platform/consensus/ordering/cassandra/algorithm/cassandra.h"
-
-#include <glog/logging.h>
-
-#include "common/crypto/signature_verifier.h"
-#include "common/utils/utils.h"
-
-namespace resdb {
-namespace cassandra {
-namespace cassandra_recv {
-
-Cassandra::Cassandra(int id, int f, int total_num, SignatureVerifier* verifier)
-    : ProtocolBase(id, f, total_num), verifier_(verifier) {
-
-  LOG(ERROR) << "get proposal graph";
-  id_ = id;
-  total_num_ = total_num;
-  f_ = f;
-  is_stop_ = false;
-  timeout_ms_ = 60000;
-  local_txn_id_ = 1;
-  local_proposal_id_ = 1;
-  batch_size_ = 10;
-
-  recv_num_ = 0;
-  execute_num_ = 0;
-  executed_ = 0;
-  committed_num_ = 0;
-  precommitted_num_ = 0;
-  execute_id_ = 1;
-
-  graph_ = std::make_unique<ProposalGraph>(f_);
-  proposal_manager_ = std::make_unique<ProposalManager>(id, graph_.get());
-
-  graph_->SetCommitCallBack(
-      [&](const Proposal& proposal) { CommitProposal(proposal); });
-
-  Reset();
-
-  consensus_thread_ = std::thread(&Cassandra::AsyncConsensus, this);
-
-  block_thread_ = std::thread(&Cassandra::BroadcastTxn, this);
-
-  commit_thread_ = std::thread(&Cassandra::AsyncCommit, this);
-
-  global_stats_ = Stats::GetGlobalStats();
-
-  prepare_thread_ = std::thread(&Cassandra::AsyncPrepare, this);
-}
-
-Cassandra::~Cassandra() {
-  is_stop_ = true;
-  if (consensus_thread_.joinable()) {
-    consensus_thread_.join();
-  }
-  if (commit_thread_.joinable()) {
-    commit_thread_.join();
-  }
-  if (prepare_thread_.joinable()) {
-    prepare_thread_.join();
-  }
-}
-
-void Cassandra::SetPrepareFunction(std::function<int(const Transaction&)> 
prepare){
-  prepare_ = prepare;
-}
-
-bool Cassandra::IsStop() { return is_stop_; }
-
-void Cassandra::Reset() {
-  // received_num_.clear();
-  // state_ = State::NewProposal;
-}
-
-void Cassandra::AsyncConsensus() {
-  int height = 0;
-  int64_t start_time = GetCurrentTime();
-  while (!is_stop_) {
-    int next_height = SendTxn(height);
-    if (next_height == -1) {
-      // usleep(10000);
-      proposal_manager_->WaitBlock();
-      continue;
-    }
-    int64_t end_time = GetCurrentTime();
-    global_stats_->AddRoundLatency(end_time - start_time);
-    start_time = end_time;
-    height = next_height;
-    WaitVote(height);
-  }
-}
-
-bool Cassandra::WaitVote(int height) {
-  std::unique_lock<std::mutex> lk(mutex_);
-  vote_cv_.wait_for(lk, std::chrono::microseconds(timeout_ms_ * 1000),
-                    [&] { return can_vote_[height]; });
-  if (!can_vote_[height]) {
-    LOG(ERROR) << "wait vote time out"
-               << " can vote:" << can_vote_[height] << " height:" << height;
-  }
-  return true;
-}
-
-void Cassandra::AsyncCommit() {
-  while (!is_stop_) {
-    std::unique_ptr<Proposal> p = execute_queue_.Pop(timeout_ms_ * 1000);
-    if (p == nullptr) {
-      LOG(ERROR) << "execu timeout";
-      continue;
-    }
-     //LOG(ERROR) << "execute proposal from proposer:" <<
-     //        p->header().proposer_id()
-     //          << " id:" << p->header().proposal_id()
-     //          << " height:" << p->header().height()
-     //          << " block size:" << p->block_size();
-
-    int txn_num = 0;
-    for (const Block& block : p->block()) {
-      std::unique_lock<std::mutex> lk(mutex_);
-      std::unique_ptr<Block> data_block =
-          proposal_manager_->GetBlock(block.hash(), p->header().proposer_id());
-       //LOG(ERROR)<<"!!!!!!!!! commit proposal from:"
-      //<<p->header().proposer_id()<<" txn size:"
-      //<<data_block->data().transaction_size()<<" 
height:"<<p->header().height();
-      if (p->header().proposer_id() == id_) {
-        execute_num_ += data_block->data().transaction_size();
-        // LOG(ERROR) << "recv num:" << recv_num_
-        //           << " execute num:" << execute_num_
-        //           << " block id:" << data_block->local_id()
-        //           << " block delay:" << (GetCurrentTime() -
-        //           data_block->create_time());
-      }
-
-      for (Transaction& txn :
-           *data_block->mutable_data()->mutable_transaction()) {
-        txn.set_id(execute_id_++);
-        txn_num++;
-        commit_(txn);
-      }
-    }
-    global_stats_->AddCommitTxn(txn_num);
-  }
-}
-
-void Cassandra::CommitProposal(const Proposal& p) {
-   //LOG(ERROR) << "commit proposal from proposer:" << p.header().proposer_id()
-   //          << " id:" << p.header().proposal_id()
-   //          << " height:" << p.header().height()
-   //          << " block size:" << p.block_size();
-  if (p.block_size() == 0) {
-    return;
-  }
-  // proposal_manager_->ClearProposal(p);
-  committed_num_++;
-  int64_t commit_time = GetCurrentTime() - p.create_time();
-  global_stats_->AddCommitLatency(commit_time);
-  // LOG(ERROR) << "commit num:" << committed_num_
-  //           << " commit delay:" << GetCurrentTime() - p.create_time();
-  execute_queue_.Push(std::make_unique<Proposal>(p));
-}
-
-void Cassandra::AsyncPrepare() {
-  while (!is_stop_) {
-    std::unique_ptr<Proposal> p = prepare_queue_.Pop();
-    if (p == nullptr) {
-      //LOG(ERROR) << "execu timeout";
-      continue;
-    }
-    //LOG(ERROR)<<"prepare block from:"<<p->header().proposer_id() <<" 
id:"<<p->header().proposal_id();
-    int id = 0;
-    for (const Block& block : p->block()) {
-      std::unique_lock<std::mutex> lkx(mutex_);
-      Block* data_block = proposal_manager_->GetBlockSnap(
-          block.hash(), p->header().proposer_id());
-      if (data_block == nullptr) {
-        continue;
-      }
-
-      for (Transaction& txn :
-           *data_block->mutable_data()->mutable_transaction()) {
-        long long uid = (((long long)p->header().proposer_id() << 50) |
-                         (long long)(p->header().proposal_id()) << 20 | id++);
-        txn.set_uid(uid);
-        prepare_(txn);
-      }
-    }
-
-    // LOG(ERROR)<<" weak proposal:"<<p->weak_proposals().hash_size();
-    for (const std::string& hash : p->weak_proposals().hash()) {
-      const Proposal* pre_p = graph_->GetProposalInfo(hash);
-      if (pre_p == nullptr) {
-        continue;
-      }
-       //LOG(ERROR)<<"prepare weak block from:"<<pre_p->header().proposer_id()
-       //<<" id:"<<pre_p->header().proposal_id();
-      for (const Block& block : pre_p->block()) {
-        std::unique_lock<std::mutex> lkx(mutex_);
-        Block* data_block = proposal_manager_->GetBlockSnap(
-            block.hash(), pre_p->header().proposer_id());
-        if (data_block == nullptr) {
-          continue;
-        }
-
-        for (Transaction& txn :
-             *data_block->mutable_data()->mutable_transaction()) {
-          long long uid = (((long long)pre_p->header().proposer_id() << 50) |
-                           (long long)(pre_p->header().proposal_id()) << 20 | 
id++);
-          txn.set_uid(uid);
-          prepare_(txn);
-        }
-      }
-    }
-    //LOG(ERROR) << "prepare done";
-  }
-}
-
-void Cassandra::PrepareProposal(const Proposal& p) {
-  if (p.block_size() == 0) {
-    return;
-  }
-  prepare_queue_.Push(std::make_unique<Proposal>(p));
-}
-
-bool Cassandra::ReceiveTransaction(std::unique_ptr<Transaction> txn) {
-  // LOG(ERROR)<<"recv txn:";
-  txn->set_create_time(GetCurrentTime());
-  txns_.Push(std::move(txn));
-  recv_num_++;
-  return true;
-}
-
-void Cassandra::BroadcastTxn() {
-  std::vector<std::unique_ptr<Transaction>> txns;
-  int num = 0;
-  while (!IsStop()) {
-    std::unique_ptr<Transaction> txn = txns_.Pop();
-    if (txn == nullptr) {
-      continue;
-    }
-    txn->set_queuing_time(GetCurrentTime()-txn->create_time());
-    global_stats_->AddQueuingLatency(GetCurrentTime()-txn->create_time());
-    //LOG(ERROR)<<"get txn, proxy id:"<<txn->proxy_id()<<" hash:"<<txn->hash();
-    txns.push_back(std::move(txn));
-    /*
-    if (txns.size() < batch_size_) {
-      continue;
-    }
-    */
-
-    for(int i = 1; i < batch_size_; ++i){
-      std::unique_ptr<Transaction> txn = txns_.Pop(0);
-      if(txn == nullptr){
-        break;
-      }
-      //LOG(ERROR)<<"get txn, proxy id:"<<txn->proxy_id()<<" 
hash:"<<txn->hash();
-      txn->set_queuing_time(GetCurrentTime()-txn->create_time());
-      global_stats_->AddQueuingLatency(GetCurrentTime()-txn->create_time());
-      txns.push_back(std::move(txn));
-    }
-
-    std::unique_ptr<Block> block = proposal_manager_->MakeBlock(txns);
-    assert(block != nullptr);
-    //LOG(ERROR)<<" send block:"<<block->local_id();
-    Broadcast(MessageType::NewBlocks, *block);
-    proposal_manager_->AddLocalBlock(std::move(block));
-    txns.clear();
-  }
-}
-
-void Cassandra::ReceiveBlock(std::unique_ptr<Block> block) {
-  // std::unique_lock<std::mutex> lk(g_mutex_);
-  //LOG(ERROR)<<"recv block from:"<<block->sender_id()<<" block 
id:"<<block->local_id();
-  BlockACK block_ack;
-  block_ack.set_hash(block->hash());
-  block_ack.set_sender_id(block->sender_id());
-  block_ack.set_local_id(block->local_id());
-  block_ack.set_responder(id_);
-
-  // std::unique_lock<std::mutex> lk(mutex_);
-  proposal_manager_->AddBlock(std::move(block));
-  //LOG(ERROR)<<" send block to:"<<block_ack.sender_id();
-  SendMessage(MessageType::CMD_BlockACK, block_ack, block_ack.sender_id());
-}
-
-void Cassandra::ReceiveBlockACK(std::unique_ptr<BlockACK> block) {
-  //LOG(ERROR)<<"recv block ack:"<<block->local_id()<<" 
from:"<<block->responder();
-  assert(block->sender_id() == id_);
-  // std::unique_lock<std::mutex> lkx(g_mutex_);
-  std::unique_lock<std::mutex> lk(block_mutex_);
-  if (received_.find(block->local_id()) != received_.end()) {
-    return;
-  }
-  block_ack_[block->local_id()].insert(block->responder());
-  //LOG(ERROR)<<"recv block ack:"<<block->local_id()
-  //<<" from:"<<block->responder()<< " 
num:"<<block_ack_[block->local_id()].size();
-  if (block_ack_[block->local_id()].size() >= 2 * f_ + 1 &&
-      block_ack_[block->local_id()].find(id_) !=
-          block_ack_[block->local_id()].end()) {
-    // std::unique_lock<std::mutex> lk(mutex_);
-    proposal_manager_->BlockReady(block->hash(), block->local_id());
-    received_.insert(block->local_id());
-  }
-}
-
-int Cassandra::SendTxn(int round) {
-  std::unique_ptr<Proposal> proposal = nullptr;
-  // LOG(ERROR)<<"send:"<<round;
-  {
-    // std::unique_lock<std::mutex> lkx(g_mutex_);
-    round++;
-    std::unique_lock<std::mutex> lk(mutex_);
-    int current_round = proposal_manager_->CurrentRound();
-    //LOG(ERROR)<<"current round:"<<current_round<<" send round:"<<round;
-    assert(current_round < round);
-
-    proposal = proposal_manager_->GenerateProposal(round, start_);
-    if (proposal == nullptr) {
-      LOG(ERROR) << "no transactions";
-      if (start_ == false) {
-        return -1;
-      }
-    }
-
-    if(!proposal->header().prehash().empty()){
-      const Proposal* pre_p =
-        graph_->GetProposalInfo(proposal->header().prehash());
-        assert(pre_p != nullptr);
-      PrepareProposal(*pre_p);
-    }
-  }
-
-  //LOG(ERROR)<<"his size:"<<proposal->history_size();
-  for (const auto& his : proposal->history()) {
-    int sender = his.sender();
-    int id = his.id();
-    const std::string& hash = his.hash();
-    int state = his.state();
-    if (state != ProposalState::New) {
-      continue;
-    }
-    const Proposal* p = graph_->GetProposalInfo(hash);
-    assert(p);
-    for (const auto& b : p->block()) {
-      bool ret = proposal_manager_->ContainBlock(b.hash(), b.sender_id());
-      //LOG(ERROR) << "sender:" << sender << " id:" << his.id()
-      //           << " block id:" << b.local_id()
-      //           << " block sender:" << b.sender_id() << " ret:" << ret
-      //           << " state:" << state;
-      assert(ret);
-    }
-  }
-  proposal_manager_->AddLocalProposal(*proposal);
-
-  //LOG(ERROR) << "====== bc proposal block size:" << proposal->block_size()
-             //<< " round:" << round
-             //<< " id:" << proposal->header().proposal_id()
-             //<< " weak links:"<< proposal->weak_proposals().hash_size();
-
-  Broadcast(MessageType::NewProposal, *proposal);
-
-  assert(proposal->header().height() == round);
-  //current_round_ = round;
-  return proposal->header().height();
-}
-
-void Cassandra::SendBlock(const BlockQuery& block) {
-  // std::unique_lock<std::mutex> lk(g_mutex_);
-  // std::unique_lock<std::mutex> lk(mutex_);
-  //LOG(ERROR)<<" query block from:"<<block.sender()<<" block 
id:"<<block.local_id();
-  const std::string& hash = block.hash();
-  const Block* block_resp = proposal_manager_->QueryBlock(hash);
-  assert(block_resp != nullptr);
-  //LOG(ERROR) << "send block :" << block.local_id() << " to :" << 
block.sender();
-  SendMessage(MessageType::NewBlocks, *block_resp, block.sender());
-}
-
-void Cassandra::AskBlock(const Block& block) {
-  BlockQuery query;
-  query.set_hash(block.hash());
-  query.set_proposer(block.sender_id());
-  query.set_local_id(block.local_id());
-  query.set_sender(id_);
-  //LOG(ERROR) << "ask block from:" << block.sender_id();
-  SendMessage(MessageType::CMD_BlockQuery, query, block.sender_id());
-}
-
-void Cassandra::AskProposal(const Proposal& proposal) {
-  ProposalQuery query;
-  query.set_hash(proposal.header().hash());
-  query.set_proposer(proposal.header().proposer_id());
-  query.set_id(proposal.header().proposal_id());
-  query.set_sender(id_);
-  //LOG(ERROR) << "ask proposal from:" << proposal.header().proposer_id();
-  SendMessage(MessageType::CMD_ProposalQuery, query, 
proposal.header().proposer_id());
-}
-
-void Cassandra::SendProposal(const ProposalQuery& query) {
-  std::unique_lock<std::mutex> lk(g_mutex_);
-  // std::unique_lock<std::mutex> plk(mutex_);
-  // std::unique_lock<std::mutex> lk(mutex_);
-  //LOG(ERROR) << "!!!!! recv query proposal:" << query.id()
-  //           << " from :" << query.sender();
-  const std::string& hash = query.hash();
-  std::unique_ptr<ProposalQueryResp> resp =
-      proposal_manager_->QueryProposal(hash);
-  assert(resp != nullptr);
-  //LOG(ERROR) << "!!!!! send proposal:" << query.id()
-  //           << " to :" << query.sender();
-  SendMessage(MessageType::CMD_ProposalQueryResponse, *resp, query.sender());
-}
-
-void Cassandra::ReceiveProposalQueryResp(const ProposalQueryResp& resp) {
-  // std::unique_lock<std::mutex> lkx(g_mutex_);
-  std::unique_lock<std::mutex> lk(mutex_);
-  //LOG(ERROR) << "!!!!! recv proposal query resp";
-  proposal_manager_->VerifyProposal(resp);
-}
-
-bool Cassandra::ReceiveProposal(std::unique_ptr<Proposal> proposal) {
-  // std::unique_lock<std::mutex> lk(g_mutex_);
-  {
-    // LOG(ERROR)<<"recv proposal";
-    std::unique_lock<std::mutex> lk(mutex_);
-    const Proposal* pre_p =
-        graph_->GetProposalInfo(proposal->header().prehash());
-    if (pre_p == nullptr) {
-      //LOG(ERROR) << "receive proposal from :"
-      //           << proposal->header().proposer_id()
-      //           << " id:" << proposal->header().proposal_id() << "no pre:";
-       if(!proposal->header().prehash().empty()) {
-         if (proposal->header().height() > graph_->GetCurrentHeight()) {
-           future_proposal_[proposal->header().height()].push_back(
-               std::move(proposal));
-           return true;
-         }
-       }
-      assert(proposal->header().prehash().empty());
-    } else {
-      //LOG(ERROR) << "receive proposal from :"
-      //           << proposal->header().proposer_id()
-      //           << " id:" << proposal->header().proposal_id()
-      //           << " pre:" << pre_p->header().proposer_id()
-      //           << " pre id:" << pre_p->header().proposal_id();
-    }
-
-    /*
-      if(proposal->header().height() >=200 && proposal->header().height() 
<205){
-        if(!((proposal->header().proposer_id() <= 23 && id_ <=23)
-        || (proposal->header().proposer_id() > 23 && id_ >23))) {
-        //if(proposal->header().proposer_id() % 2 != id_%2){
-          return true;
-        }
-      }
-      */
-
-    if (proposal->header().height() > graph_->GetCurrentHeight()) {
-      future_proposal_[proposal->header().height()].push_back(
-          std::move(proposal));
-      return true;
-    }
-    AddProposal(*proposal);
-
-    auto it = future_proposal_.find(graph_->GetCurrentHeight());
-    if (it != future_proposal_.end()) {
-      for (auto& p : it->second) {
-        AddProposal(*p);
-      }
-      future_proposal_.erase(it);
-    }
-  }
-  // LOG(ERROR)<<"receive proposal done";
-  return true;
-}
-
-bool Cassandra::AddProposal(const Proposal& proposal) {
-  int ret = proposal_manager_->VerifyProposal(proposal);
-  if (ret != 0) {
-    if (ret == 2) {
-      LOG(ERROR) << "verify proposal fail";
-      AskProposal(proposal);
-    }
-    return false;
-  }
-
-  for (const Block& block : proposal.block()) {
-    bool ret = false;;
-    for(int i = 0; i< 5; ++i){
-      ret = proposal_manager_->ContainBlock(block);
-      if (ret == false) {
-        LOG(ERROR) << "======== block from:" << block.sender_id()
-          << " block id:" << block.local_id() << " not exist";
-         usleep(1000);
-        //AskBlock(block);
-        continue;
-      }
-      else {
-        break;
-      }
-    }
-    assert(ret);
-  }
-
-  {
-    std::unique_lock<std::mutex> lk(g_mutex_);
-    // LOG(ERROR) << "add proposal";
-    int v_ret = graph_->AddProposal(proposal);
-    if (v_ret != 0) {
-      LOG(ERROR) << "add proposal fail, ret:" << v_ret;
-      if (v_ret == 2) {
-        // miss history
-        // AskProposal(proposal);
-      }
-      // TrySendRecoveery(proposal);
-      return false;
-    }
-
-    if (proposal.header().proposer_id() == id_) {
-      proposal_manager_->RemoveLocalProposal(proposal.header().hash());
-    }
-  }
-
-  received_num_[proposal.header().height()].insert(
-      proposal.header().proposer_id());
-  //LOG(ERROR) << "received current height:" << graph_->GetCurrentHeight()
-  //           << " proposal height:" << proposal.header().height()
-  //           << " num:" << received_num_[graph_->GetCurrentHeight()].size()
-  //           << " from:" << proposal.header().proposer_id()
-  //           << " last vote:" << last_vote_;
-  if (received_num_[graph_->GetCurrentHeight()].size() == total_num_) {
-    if (last_vote_ < graph_->GetCurrentHeight()) {
-      last_vote_ = graph_->GetCurrentHeight();
-      can_vote_[graph_->GetCurrentHeight()] = true;
-      vote_cv_.notify_all();
-      //LOG(ERROR) << "can vote:";
-    }
-  }
-  // LOG(ERROR)<<"recv done";
-
-  std::vector<std::unique_ptr<Proposal>> future_g = graph_->GetNotFound(
-      proposal.header().height() + 1, proposal.header().hash());
-  if (future_g.size() > 0) {
-    // LOG(ERROR) << "get future size:" << future_g.size();
-    for (auto& it : future_g) {
-      if (!graph_->AddProposal(*it)) {
-        LOG(ERROR) << "add future proposal fail";
-        // TrySendRecoveery(proposal);
-        continue;
-      }
-
-      received_num_[it->header().height()].insert(it->header().proposer_id());
-      //LOG(ERROR) << "received current height:" << graph_->GetCurrentHeight()
-      //           << " num:" << 
received_num_[graph_->GetCurrentHeight()].size()
-      //           << " from:" << it->header().proposer_id()
-      //           << " last vote:" << last_vote_;
-    }
-  }
-  return true;
-}
-
-}  // namespace cassandra_recv
-}  // namespace cassandra
-}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/cassandra.h 
b/platform/consensus/ordering/cassandra/algorithm/cassandra.h
deleted file mode 100644
index b4a5e221..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/cassandra.h
+++ /dev/null
@@ -1,114 +0,0 @@
-#pragma once
-
-#include <deque>
-#include <map>
-#include <queue>
-#include <thread>
-
-#include "platform/common/queue/lock_free_queue.h"
-#include "platform/consensus/ordering/common/algorithm/protocol_base.h"
-#include "platform/consensus/ordering/cassandra/algorithm/proposal_graph.h"
-#include "platform/consensus/ordering/cassandra/algorithm/proposal_manager.h"
-#include "platform/consensus/ordering/cassandra/proto/proposal.pb.h"
-#include "platform/statistic/stats.h"
-
-namespace resdb {
-namespace cassandra {
-namespace cassandra_recv {
-
-class Cassandra: public common::ProtocolBase {
- public:
-  Cassandra(int id, int f, int total_num, SignatureVerifier* verifier);
-  ~Cassandra();
-
-  void CheckBlock(const std::string& hash, int local_id);
-  void ReceiveBlock(std::unique_ptr<Block> block);
-  void ReceiveBlockACK(std::unique_ptr<BlockACK> block);
-  bool ReceiveTransaction(std::unique_ptr<Transaction> txn);
-  bool ReceiveProposal(std::unique_ptr<Proposal> proposal);
-  bool ReceiveVote(const VoteMessage& msg);
-  bool ReceivePrepare(const VoteMessage& msg);
-  int ReceiveRecovery(const CommittedProposals& proposals);
-
-  bool ReceiveVoteACK(const VoteMessage& msg);
-  bool ReceiveCommit(const VoteMessage& msg);
-
-  void AskBlock(const Block& block);
-  void SendBlock(const BlockQuery& block);
-
-  void AskProposal(const Proposal& proposal);
-  void SendProposal(const ProposalQuery& query);
-  void ReceiveProposalQueryResp(const ProposalQueryResp& resp);
-  void PrepareProposal(const Proposal& p);
-
-
-  void SetPrepareFunction(std::function<int(const Transaction&)> prepare);
-
- private:
-  bool IsStop();
-
-  int SendTxn(int round);
-
-  void Commit(const VoteMessage& msg);
-  void CommitProposal(const Proposal& p);
-
-  void AsyncConsensus();
-  void AsyncCommit();
-  void AsyncPrepare();
-  bool WaitVote(int);
-  void WaitCommit();
-
-  bool CheckHistory(const Proposal& proposal);
-
-  void Reset();
-  bool CheckState(MessageType type, ProposalState state);
-
-  void TrySendRecoveery(const Proposal& proposal);
-  void BroadcastTxn();
-  bool AddProposal(const Proposal& proposal);
-
-  bool ProcessProposal(std::unique_ptr<Proposal> proposal);
-
- private:
-  std::unique_ptr<ProposalGraph> graph_;
-  LockFreeQueue<Transaction> txns_;
-  std::unique_ptr<ProposalManager> proposal_manager_;
-  SignatureVerifier* verifier_;
-  std::mutex mutex_, g_mutex_;
-  std::map<int, std::set<int>> received_num_;
-  // int state_;
-  int id_, total_num_, f_, batch_size_;
-  std::atomic<int> is_stop_;
-  int timeout_ms_;
-  int local_txn_id_, local_proposal_id_;
-  LockFreeQueue<Proposal> commit_queue_, execute_queue_, prepare_queue_;
-  std::thread commit_thread_, consensus_thread_, block_thread_, 
prepare_thread_;
-  std::condition_variable vote_cv_;
-  std::map<int, bool> can_vote_;
-  std::atomic<int> committed_num_;
-  int voting_, start_ = false;
-  std::map<int, std::vector<std::unique_ptr<Transaction>>> uncommitted_txn_;
-
-  bool use_linear_ = false;
-  int recv_num_ = 0;
-  int execute_num_ = 0;
-  int pending_num_ = 0;
-  std::atomic<int> executed_;
-  std::atomic<int> precommitted_num_;
-  int last_vote_ = 0;
-  int execute_id_;
-
-  std::mutex block_mutex_;
-  std::set<int> received_;
-  std::map<int, std::set<int>> block_ack_;
-  std::map<int, std::vector<std::unique_ptr<Proposal>>> future_proposal_;
-
-  std::function<int(const Transaction&)> prepare_;
-  int current_round_;
-
-  Stats* global_stats_;
-};
-
-}  // namespace cassandra_recv
-}  // namespace cassandra
-}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp 
b/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp
deleted file mode 100644
index a923da84..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp
+++ /dev/null
@@ -1,491 +0,0 @@
-#include "platform/consensus/ordering/cassandra/algorithm/proposal_graph.h"
-
-#include <glog/logging.h>
-
-#include <queue>
-#include <stack>
-
-#include "common/utils/utils.h"
-
-namespace resdb {
-namespace cassandra {
-namespace cassandra_recv {
-
-std::vector<ProposalState> GetStates() {
-  return std::vector<ProposalState>{ProposalState::New, 
ProposalState::Prepared,
-                                    ProposalState::PreCommit};
-}
-
-ProposalGraph::ProposalGraph(int fault_num) : f_(fault_num) {
-  ranking_ = std::make_unique<Ranking>();
-  current_height_ = 0;
-  global_stats_ = Stats::GetGlobalStats();
-}
-
-void ProposalGraph::IncreaseHeight() {
-  //LOG(ERROR) << "increase height:" << current_height_;
-  current_height_++;
-}
-
-void ProposalGraph::TryUpgradeHeight(int height) {
-  if (last_node_[height].size() > 0) {
-    // LOG(ERROR) << "upgrade height:" << height;
-    current_height_ = height;
-  } else {
-    // assert(1 == 0);
-    LOG(ERROR) << "need to recovery";
-  }
-}
-
-std::string Encode(const std::string& hash) {
-  std::string ret;
-  for (int i = 0; i < hash.size(); ++i) {
-    int x = hash[i];
-    ret += std::to_string(x);
-  }
-  return ret;
-}
-
-void ProposalGraph::AddProposalOnly(const Proposal& proposal) {
-  auto it = node_info_.find(proposal.header().hash());
-  if (it == node_info_.end()) {
-    auto np = std::make_unique<NodeInfo>(proposal);
-    node_info_[proposal.header().hash()] = std::move(np);
-    //LOG(ERROR) << "add proposal proposer:" << proposal.header().proposer_id()
-    //           << " id:" << proposal.header().proposal_id()
-    //           << " hash:" << Encode(proposal.header().hash());
-  }
-}
-
-int ProposalGraph::AddProposal(const Proposal& proposal) {
-   //LOG(ERROR) << "add proposal height:" << proposal.header().height()
-   //          << " current height:" << current_height_<<" 
from:"<<proposal.header().proposer_id()<<" proposal 
id:"<<proposal.header().proposal_id();
-  assert(current_height_ >= latest_commit_.header().height());
-  /*
-  if (proposal.header().height() < current_height_) {
-    LOG(ERROR) << "height not match:" << current_height_
-               << " proposal height:" << proposal.header().height();
-    return false;
-  }
-  */
-
-  if (proposal.header().height() > current_height_) {
-    pending_header_[proposal.header().height()].insert(
-        proposal.header().proposer_id());
-  } else {
-    while (!pending_header_.empty()) {
-      if (pending_header_.begin()->first <= current_height_) {
-        pending_header_.erase(pending_header_.begin());
-      } else {
-        break;
-      }
-    }
-  }
-
-  if (proposal.header().height() > current_height_ + 1) {
-    LOG(ERROR) << "height not match:" << current_height_
-               << " proposal height:" << proposal.header().height();
-    if (pending_header_[proposal.header().height()].size() >= f_ + 1) {
-      TryUpgradeHeight(proposal.header().height());
-    }
-    return 1;
-  }
-
-  if (!VerifyParent(proposal)) {
-    LOG(ERROR) << "verify parent fail:" << proposal.header().proposer_id()
-               << " id:" << proposal.header().proposal_id();
-    // assert(1==0);
-    return 2;
-  }
-
-  // LOG(ERROR)<<"history size:"<<proposal.history_size();
-  for (const auto& history : proposal.history()) {
-    std::string hash = history.hash();
-    auto node_it = node_info_.find(hash);
-    assert(node_it != node_info_.end());
-    /*
-    if (node_it == node_info_.end()) {
-      LOG(ERROR) << " history node not found";
-      return false;
-    }
-    else {
-      LOG(ERROR)<<"find history";
-    }
-    */
-  }
-
-  for (const auto& history : proposal.history()) {
-    std::string hash = history.hash();
-    auto node_it = node_info_.find(hash);
-    for (auto s : GetStates()) {
-      if (s >= node_it->second->state && s <= history.state()) {
-        if (s != history.state()) {
-          LOG(ERROR) << "update from higher state";
-        }
-        node_it->second->votes[s].insert(proposal.header().proposer_id());
-      }
-    }
-    // 
LOG(ERROR)<<"proposal:("<<node_it->second->proposal.header().proposer_id()
-    //<<","<<node_it->second->proposal.header().proposal_id()<<")"
-    //<<" history state:"<<history.state()<<"
-    //num:"<<node_it->second->votes[history.state()].size();
-    CheckState(node_it->second.get(),
-               static_cast<resdb::cassandra::ProposalState>(history.state()));
-    if (node_it->second->state == ProposalState::PreCommit) {
-      Commit(hash);
-    }
-    //LOG(ERROR)<<"history 
proposal:("<<node_it->second->proposal.header().proposer_id()
-    //<<","<<node_it->second->proposal.header().proposal_id()
-    //<<" state:"<<node_it->second->state;
-  }
-
-  if (proposal.header().height() < current_height_) {
-    LOG(ERROR) << "height not match:" << current_height_
-               << " proposal height:" << proposal.header().height();
-
-    // LOG(ERROR)<<"add proposal proposer:"<<proposal.header().proposer_id()<<"
-    // id:"<<proposal.header().proposal_id();
-    // g_[proposal.header().prehash()].push_back(proposal.header().hash());
-    auto np = std::make_unique<NodeInfo>(proposal);
-    new_proposals_[proposal.header().hash()] = &np->proposal;
-    node_info_[proposal.header().hash()] = std::move(np);
-    last_node_[proposal.header().height()].insert(proposal.header().hash());
-
-    return 1;
-  } else {
-    g_[proposal.header().prehash()].push_back(proposal.header().hash());
-    auto np = std::make_unique<NodeInfo>(proposal);
-    new_proposals_[proposal.header().hash()] = &np->proposal;
-    // LOG(ERROR)<<"add proposal proposer:"<<proposal.header().proposer_id()<<"
-    // id:"<<proposal.header().proposal_id()<<"
-    // hash:"<<Encode(proposal.header().hash());
-    node_info_[proposal.header().hash()] = std::move(np);
-    last_node_[proposal.header().height()].insert(proposal.header().hash());
-  }
-  return 0;
-}
-
-void ProposalGraph::UpgradeState(ProposalState& state) {
-  switch (state) {
-    case None:
-    case New:
-      state = Prepared;
-      break;
-    case Prepared:
-      state = PreCommit;
-      break;
-    default:
-      break;
-  }
-}
-
-int ProposalGraph::CheckState(NodeInfo* node_info, ProposalState state) {
-  // LOG(ERROR) << "node: (" << node_info->proposal.header().proposer_id() <<
-  // ","
-  //           << node_info->proposal.header().proposal_id()
-  //           << ") state:" << node_info->state
-  //           << " vote num:" << node_info->votes[node_info->state].size();
-
-  while (node_info->votes[node_info->state].size() >= 2 * f_ + 1) {
-    UpgradeState(node_info->state);
-    // LOG(ERROR) << "==========  proposal:("
-    //           << node_info->proposal.header().proposer_id() << ","
-    //           << node_info->proposal.header().proposal_id() << ")"
-    //           << " upgrate state:" << node_info->state << (GetCurrentTime() 
-
-    //           node_info->proposal.create_time());
-  }
-  return true;
-}
-
-void ProposalGraph::Commit(const std::string& hash) {
-  auto it = node_info_.find(hash);
-  if (it == node_info_.end()) {
-    LOG(ERROR) << "node not found, hash:" << hash;
-    assert(1 == 0);
-    return;
-  }
-
-  if (it->second->state != ProposalState::PreCommit) {
-    LOG(ERROR) << "hash not committed:" << hash;
-    assert(1 == 0);
-    return;
-  }
-
-  std::set<std::string> is_main_hash;
-  is_main_hash.insert(hash);
-  
-  int from_proposer = it->second->proposal.header().proposer_id();
-  int from_proposal_id = it->second->proposal.header().proposal_id();
-   //LOG(ERROR)<<"commit :"<<it->second->proposal.header().proposer_id()<<" 
id:"<<it->second->proposal.header().proposal_id();
-
-  std::vector<std::vector<Proposal*>> commit_p;
-  auto bfs = [&]() {
-    std::queue<std::string> q;
-    q.push(hash);
-    while (!q.empty()) {
-      std::string c_hash = q.front();
-      q.pop();
-
-      auto it = node_info_.find(c_hash);
-      if (it == node_info_.end()) {
-        LOG(ERROR) << "node not found, hash:";
-        assert(1 == 0);
-      }
-
-      Proposal* p = &it->second->proposal;
-      if (it->second->state == ProposalState::Committed) {
-        // LOG(ERROR)<<" try commit proposal,
-        // sender:"<<p->header().proposer_id()
-        //<<" proposal id:"<<p->header().proposal_id()<<" has been committed";
-        continue;
-      }
-
-      it->second->state = ProposalState::Committed;
-      if (is_main_hash.find(c_hash) != is_main_hash.end()) {
-        commit_num_[p->header().proposer_id()]++;
-        // LOG(ERROR)<<"commit main node:"<<p->header().proposer_id();
-        is_main_hash.insert(p->header().prehash());
-        commit_p.push_back(std::vector<Proposal*>());
-      }
-
-      commit_p.back().push_back(p);
-       //LOG(ERROR)<<"commit node:"<<p->header().proposer_id()<<" 
id:"<<p->header().proposal_id()
-      //<<" weak proposal size:"<<p->weak_proposals().hash_size();
-       //LOG(ERROR)<<"push p:"<<p->header().proposer_id();
-      for (const std::string& w_hash : p->weak_proposals().hash()) {
-        auto it = node_info_.find(w_hash);
-        if (it == node_info_.end()) {
-          LOG(ERROR) << "node not found, hash:";
-          assert(1 == 0);
-        }
-
-        // LOG(ERROR)<<"add weak
-        // proposal:"<<it->second->proposal.header().proposer_id()<<"
-        // id:"<<it->second->proposal.header().proposal_id();
-        q.push(w_hash);
-      }
-      if (!p->header().prehash().empty()) {
-        q.push(p->header().prehash());
-      }
-    }
-  };
-
-  bfs();
-  if (commit_p.size() > 1) {
-    LOG(ERROR) << "commit more hash";
-  }
-  int block_num = 0;
-  for (int i = commit_p.size() - 1; i >= 0; i--) {
-    for (int j = 0; j < commit_p[i].size(); ++j) {
-      /*
-      if (j == 0) {
-        LOG(ERROR) << "commmit proposal lead from:"
-                   << commit_p[i][j]->header().proposer_id()
-                   << " height:" << commit_p[i][j]->header().height()
-                   << " size:" << commit_p[i].size();
-      }
-      */
-      //LOG(ERROR) << "commmit proposal:"
-      //           << commit_p[i][j]->header().proposer_id()
-      //           << " height:" << commit_p[i][j]->header().height()
-      //           << " idx:" << j 
-      //           << " delay:" << (GetCurrentTime() - 
commit_p[i][j]->create_time()) 
-      //           << " commit from:"<< from_proposer<<" 
id:"<<from_proposal_id;
-      block_num += commit_p[i][j]->block_size();
-      if (commit_callback_) {
-        commit_callback_(*commit_p[i][j]);
-      }
-    }
-  }
-  global_stats_->AddCommitBlock(block_num);
-
-  // TODO clean
-  last_node_[it->second->proposal.header().height()].clear();
-  latest_commit_ = it->second->proposal;
-  it->second->state = ProposalState::Committed;
-  // Clear(latest_commit_.header().hash());
-}
-
-std::vector<std::unique_ptr<Proposal>> ProposalGraph::GetNotFound(
-    int height, const std::string& hash) {
-  auto it = not_found_proposal_.find(height);
-  if (it == not_found_proposal_.end()) {
-    return std::vector<std::unique_ptr<Proposal>>();
-  }
-  auto pre_it = it->second.find(hash);
-  std::vector<std::unique_ptr<Proposal>> ret;
-  if (pre_it != it->second.end()) {
-    ret = std::move(pre_it->second);
-    it->second.erase(pre_it);
-    LOG(ERROR) << "found future height:" << height;
-  }
-  return ret;
-}
-
-bool ProposalGraph::VerifyParent(const Proposal& proposal) {
-  // LOG(ERROR) << "last commit:" << latest_commit_.header().proposal_id()
-  //           << " current :" << proposal.header().proposal_id()
-  //           << " height:" << proposal.header().height();
-
-  if (proposal.header().prehash() == latest_commit_.header().hash()) {
-    return true;
-  }
-
-  std::string prehash = proposal.header().prehash();
-  // LOG(ERROR)<<"prehash:"<<prehash;
-
-  auto it = node_info_.find(prehash);
-  if (it == node_info_.end()) {
-    LOG(ERROR) << "prehash not here";
-    
not_found_proposal_[proposal.header().height()][proposal.header().prehash()]
-        .push_back(std::make_unique<Proposal>(proposal));
-    return false;
-  } else {
-    if (proposal.header().height() !=
-        it->second->proposal.header().height() + 1) {
-      LOG(ERROR) << "link to invalid proposal, height:"
-                 << proposal.header().height()
-                 << " pre height:" << it->second->proposal.header().height();
-      return false;
-    }
-  }
-  return true;
-}
-
-void ProposalGraph::UpdateHistory(Proposal* proposal) {
-  proposal->mutable_history()->Clear();
-  std::string hash = proposal->header().hash();
-
-  for (int i = 0; i < 3 && !hash.empty(); ++i) {
-    auto node_it = node_info_.find(hash);
-    auto his = proposal->add_history();
-    his->set_hash(hash);
-    his->set_state(node_it->second->state);
-    his->set_sender(node_it->second->proposal.header().proposer_id());
-    his->set_id(node_it->second->proposal.header().proposal_id());
-    hash = node_it->second->proposal.header().prehash();
-  }
-}
-
-Proposal* ProposalGraph::GetStrongestProposal() {
-  // LOG(ERROR) << "get strong proposal from height:" << current_height_;
-  if (last_node_.find(current_height_) == last_node_.end()) {
-    LOG(ERROR) << "no data:" << current_height_;
-    return nullptr;
-  }
-
-  NodeInfo* sp = nullptr;
-  for (const auto& last_hash : last_node_[current_height_]) {
-    NodeInfo* node_info = node_info_[last_hash].get();
-    if (sp == nullptr || Compare(*sp, *node_info)) {
-      sp = node_info;
-    }
-  }
-
-  UpdateHistory(&sp->proposal);
-   //LOG(ERROR) << "get strong proposal from height:" << current_height_ << " 
->("
-   //          << sp->proposal.header().proposer_id() << ","
-   //          << sp->proposal.header().proposal_id() << ")";
-  return &sp->proposal;
-}
-
-bool ProposalGraph::Cmp(int id1, int id2) {
-  // LOG(ERROR) << "commit commit num:" << id1 << " " << id2
-  //           << " commit  time:" << commit_num_[id1] << " " <<
-  //           commit_num_[id2];
-  if (commit_num_[id1] + 1 < commit_num_[id2]) {
-    return false;
-  }
-
-  if (commit_num_[id1] > commit_num_[id2] + 1) {
-    return true;
-  }
-  return id1 < id2;
-}
-
-int ProposalGraph::StateScore(const ProposalState& state) {
-  // return state == ProposalState::Prepared? 1:0;
-  return state;
-}
-
-int ProposalGraph::CompareState(const ProposalState& state1,
-                                const ProposalState& state2) {
-  // LOG(ERROR) << "check state:" << state1 << " " << state2;
-  return StateScore(state1) - StateScore(state2);
-}
-
-// p1 < p2
-bool ProposalGraph::Compare(const NodeInfo& p1, const NodeInfo& p2) {
-  // LOG(ERROR) << "proposer:" << p1.proposal.header().proposer_id() << " "
-  //           << p2.proposal.header().proposer_id()
-  //          << "height:" << p1.proposal.header().height() << " "
-  //          << p2.proposal.header().height();
-  if (p1.proposal.header().height() != p2.proposal.header().height()) {
-    return p1.proposal.header().height() < p2.proposal.header().height();
-  }
-  // LOG(ERROR)<<"proposer:"<<p1.proposal.header().proposer_id()<<"
-  // "<<p2.proposal.header().proposer_id();
-  if (CompareState(p1.state, p2.state) != 0) {
-    return CompareState(p1.state, p2.state) < 0;
-  }
-
-  if (p1.proposal.header().proposer_id() ==
-      p2.proposal.header().proposer_id()) {
-    return p1.proposal.header().proposal_id() <
-           p2.proposal.header().proposal_id();
-  }
-
-  return Cmp(p1.proposal.header().proposer_id(),
-             p2.proposal.header().proposer_id());
-}
-
-Proposal* ProposalGraph::GetLatestStrongestProposal() {
-  Proposal* sp = GetStrongestProposal();
-  if (sp == nullptr) {
-    if (current_height_ > 0) {
-      assert(1 == 0);
-    }
-    return &latest_commit_;
-  }
-  // LOG(ERROR) << "====== get strong proposal from:" <<
-  // sp->header().proposer_id()
-  //           << " id:" << sp->header().proposal_id();
-  return sp;
-}
-
-ProposalState ProposalGraph::GetProposalState(const std::string& hash) const {
-  auto node_it = node_info_.find(hash);
-  if (node_it == node_info_.end()) {
-    return ProposalState::None;
-  }
-  return node_it->second->state;
-}
-
-const Proposal* ProposalGraph::GetProposalInfo(const std::string& hash) const {
-  auto it = node_info_.find(hash);
-  if (it == node_info_.end()) {
-    LOG(ERROR) << "hash not found:" << Encode(hash);
-    return nullptr;
-  }
-  return &it->second->proposal;
-}
-
-int ProposalGraph::GetCurrentHeight() { return current_height_; }
-
-std::vector<Proposal*> ProposalGraph::GetNewProposals(int height) {
-  std::vector<Proposal*> ps;
-  for (auto it : new_proposals_) {
-    if (it.second->header().height() >= height) {
-      continue;
-    }
-    ps.push_back(it.second);
-  }
-  for (Proposal* p : ps) {
-    new_proposals_.erase(new_proposals_.find(p->header().hash()));
-  }
-  return ps;
-}
-
-}  // namespace cassandra_recv
-}  // namespace cassandra
-}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.h 
b/platform/consensus/ordering/cassandra/algorithm/proposal_graph.h
deleted file mode 100644
index ee21834c..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.h
+++ /dev/null
@@ -1,87 +0,0 @@
-#pragma once
-
-#include <map>
-
-#include "platform/consensus/ordering/cassandra/algorithm/proposal_state.h"
-#include "platform/consensus/ordering/cassandra/algorithm/ranking.h"
-#include "platform/consensus/ordering/cassandra/proto/proposal.pb.h"
-#include "platform/statistic/stats.h"
-
-namespace resdb {
-namespace cassandra {
-namespace cassandra_recv {
-
-class ProposalGraph {
- public:
-  ProposalGraph(int fault_num);
-  inline void SetCommitCallBack(std::function<void(const Proposal&)> func) {
-    commit_callback_ = func;
-  }
-
-  int AddProposal(const Proposal& proposal);
-  void AddProposalOnly(const Proposal& proposal);
-
-  Proposal* GetLatestStrongestProposal();
-  const Proposal* GetProposalInfo(const std::string& hash) const;
-
-  int GetCurrentHeight();
-
-  void Clear(const std::string& hash);
-  void IncreaseHeight();
-  ProposalState GetProposalState(const std::string& hash) const;
-
-  std::vector<std::unique_ptr<Proposal>> GetNotFound(int height,
-                                                     const std::string& hash);
-
-  std::vector<Proposal*> GetNewProposals(int height);
-
- private:
-  struct NodeInfo {
-    Proposal proposal;
-    ProposalState state;
-    int score;
-    int is_main;
-    // std::set<int> received_num[5];
-    std::map<int, std::set<int>> votes;
-
-    NodeInfo(const Proposal& proposal)
-        : proposal(proposal), state(ProposalState::New), score(0), is_main(0) 
{}
-  };
-
-  bool VerifyParent(const Proposal& proposal);
-
-  bool Compare(const NodeInfo& p1, const NodeInfo& p2);
-  bool Cmp(int id1, int id2);
-  int StateScore(const ProposalState& state);
-  int CompareState(const ProposalState& state1, const ProposalState& state2);
-
-  Proposal* GetStrongestProposal();
-
-  void UpdateHistory(Proposal* proposal);
-  int CheckState(NodeInfo* node_info, ProposalState state);
-  void UpgradeState(ProposalState& state);
-  void TryUpgradeHeight(int height);
-
-  void Commit(const std::string& hash);
-
- private:
-  Proposal latest_commit_;
-  std::map<std::string, std::vector<std::string>> g_;
-  std::map<std::string, std::unique_ptr<NodeInfo>> node_info_;
-  std::map<std::string, std::vector<VoteMessage>> not_found_;
-  std::unique_ptr<Ranking> ranking_;
-  std::map<int, int> commit_num_;
-  std::map<int, std::set<std::string>> last_node_;
-  int current_height_;
-  uint32_t f_;
-  std::function<void(const Proposal&)> commit_callback_;
-  std::map<int, std::set<int>> pending_header_;
-  std::map<int, std::map<std::string, std::vector<std::unique_ptr<Proposal>>>>
-      not_found_proposal_;
-  std::map<std::string, Proposal*> new_proposals_;
-  Stats* global_stats_;
-};
-
-}  // namespace cassandra_recv
-}  // namespace cassandra
-}  // namespace resdb
diff --git 
a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp 
b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp
deleted file mode 100644
index bcca39d9..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp
+++ /dev/null
@@ -1,470 +0,0 @@
-#include "platform/consensus/ordering/cassandra/algorithm/proposal_manager.h"
-
-#include <glog/logging.h>
-
-#include "common/crypto/signature_verifier.h"
-#include "common/utils/utils.h"
-
-namespace resdb {
-namespace cassandra {
-namespace cassandra_recv {
-
-namespace {
-std::string Encode(const std::string& hash) {
-  std::string ret;
-  for (int i = 0; i < hash.size(); ++i) {
-    int x = hash[i];
-    ret += std::to_string(x);
-  }
-  return ret;
-}
-
-}
-
-ProposalManager::ProposalManager(int32_t id, ProposalGraph* graph)
-    : id_(id), graph_(graph) {
-  local_proposal_id_ = 1;
-  global_stats_ = Stats::GetGlobalStats();
-}
-
-int ProposalManager::CurrentRound() { return graph_->GetCurrentHeight(); }
-
-std::unique_ptr<Block> ProposalManager::MakeBlock(
-    std::vector<std::unique_ptr<Transaction>>& txns) {
-  auto block = std::make_unique<Block>();
-  Block::BlockData* data = block->mutable_data();
-  for (const auto& txn : txns) {
-    *data->add_transaction() = *txn;
-  }
-
-  std::string data_str;
-  data->SerializeToString(&data_str);
-  std::string hash = SignatureVerifier::CalculateHash(data_str);
-  block->set_hash(hash);
-  block->set_sender_id(id_);
-  block->set_create_time(GetCurrentTime());
-  block->set_local_id(local_block_id_++);
-  // LOG(ERROR)<<"make block time:"<<block->create_time();
-  return block;
-}
-
-void ProposalManager::AddLocalBlock(std::unique_ptr<Block> block) {
-  std::unique_lock<std::mutex> lk(mutex_);
-  //LOG(ERROR)<<"add local block :"<<block->local_id();
-  blocks_candidates_[block->local_id()] = std::move(block);
-}
-
-void ProposalManager::BlockReady(const std::string& hash, int local_id) {
-  std::unique_lock<std::mutex> lk(mutex_);
-  //LOG(ERROR)<<"ready block:"<<local_id;
-  auto it = blocks_candidates_.find(local_id);
-  assert(it != blocks_candidates_.end());
-  assert(it->second->hash() == hash);
-  blocks_.push_back(std::move(it->second));
-  blocks_candidates_.erase(it);
-  //  Notify();
-  notify_.notify_all();
-}
-
-void ProposalManager::AddBlock(std::unique_ptr<Block> block) {
-  std::unique_lock<std::mutex> lk(p_mutex_);
-  int sender = block->sender_id();
-  int block_id = block->local_id();
-  //LOG(ERROR)<<"add block from sender:"<<sender<<" id:"<<block_id<<" 
blocksize:"<<pending_blocks_[sender].size()<<" hash:"<<Encode(block->hash());
-  pending_blocks_[sender][block->hash()] = std::move(block);
-}
-
-bool ProposalManager::ContainBlock(const std::string& hash, int sender) {
-  std::unique_lock<std::mutex> lk(p_mutex_);
-  auto bit = pending_blocks_[sender].find(hash);
-  return bit != pending_blocks_[sender].end();
-}
-
-bool ProposalManager::ContainBlock(const Block& block) {
-  std::unique_lock<std::mutex> lk(p_mutex_);
-  int sender = block.sender_id();
-  const std::string& hash = block.hash();
-  auto bit = pending_blocks_[sender].find(hash);
-  return bit != pending_blocks_[sender].end();
-}
-
-const Block* ProposalManager::QueryBlock(const std::string& hash) {
-  std::unique_lock<std::mutex> lk(p_mutex_);
-  auto bit = pending_blocks_[id_].find(hash);
-  assert(bit != pending_blocks_[id_].end());
-  return bit->second.get();
-}
-
-Block* ProposalManager::GetBlockSnap(const std::string& hash, int sender) {
-  std::unique_lock<std::mutex> lk(p_mutex_);
-
-  //LOG(ERROR)<<" sender:"<<sender<<" block 
size:"<<pending_blocks_[sender].size();
-  auto it = pending_blocks_[sender].find(hash);
-  if (it == pending_blocks_[sender].end()) {
-    LOG(ERROR) << "block from sender:" << sender << " not found"<<" pending 
size:"<<pending_blocks_[sender].size();
-    return nullptr;
-  }
-  assert(it != pending_blocks_[sender].end());
-  // LOG(ERROR)<<"get block: sender:"<<sender<<" id:"<<block->local_id();
-  return it->second.get();
-}
-
-std::unique_ptr<Block> ProposalManager::GetBlock(const std::string& hash,
-                                                 int sender) {
-  bool need_wait = false;
-  while (true) {
-    if (need_wait) {
-      usleep(1000);
-    }
-    std::unique_lock<std::mutex> lk(p_mutex_);
-    auto it = pending_blocks_[sender].find(hash);
-    if (it == pending_blocks_[sender].end()) {
-      LOG(ERROR) << "block from sender:" << sender << " not found";
-      need_wait = true;
-      continue;
-    }
-    assert(it != pending_blocks_[sender].end());
-    auto block = std::move(it->second);
-    //LOG(ERROR)<<"get block: sender:"<<sender<<" id:"<<block->local_id()<<" 
removed";
-    pending_blocks_[sender].erase(it);
-    return block;
-  }
-}
-
-void ProposalManager::ClearProposal(const Proposal& p) {}
-
-/*
-void ProposalManager::Notify(){
-  //std::unique_lock<std::mutex> lk(notify_mutex_);
-  notify_.notify_all();
-}
-
-void ProposalManager::Wait(){
-  //std::unique_lock<std::mutex> lk(notify_mutex_);
-  notify_.wait_for(lk, std::chrono::microseconds(100000),
-      [&] { return !blocks_.empty(); });
-}
-*/
-
-bool ProposalManager::WaitBlock() {
-  std::unique_lock<std::mutex> lk(mutex_);
-  if (blocks_.empty()) {
-    notify_.wait_for(lk, std::chrono::microseconds(1000000),
-                     [&] { return !blocks_.empty(); });
-    //LOG(ERROR) << "wait proposal block size:" << blocks_.size();
-  }
-  return !blocks_.empty();
-}
-
-std::unique_ptr<Proposal> ProposalManager::GenerateProposal(int round,
-                                                            bool need_empty) {
-  auto proposal = std::make_unique<Proposal>();
-  std::string data;
-  {
-    //LOG(ERROR) << "generate proposal pending block size:" << blocks_.size();
-    std::unique_lock<std::mutex> lk(mutex_);
-    if (blocks_.empty() && !need_empty) {
-      // return nullptr;
-      //    Wait();
-      // notify_.wait_for(lk, std::chrono::microseconds(100000),
-      //[&] { return !blocks_.empty(); });
-      // if(blocks_.empty()){
-      // return nullptr;
-      //}
-      return nullptr;
-      // LOG(ERROR) << "generate wait proposal block size:" << blocks_.size();
-    }
-    int max_block = 5;
-    int num = 0;
-    int64_t current_time = GetCurrentTime();
-    proposal->set_create_time(current_time);
-    for (auto& block : blocks_) {
-      data += block->hash();
-      Block* ab = proposal->add_block();
-      ab->set_hash(block->hash());
-      ab->set_sender_id(block->sender_id());
-      ab->set_local_id(block->local_id());
-      ab->set_create_time(block->create_time());
-      //LOG(ERROR) << " add block:" << block->local_id()
-      //           << " block delay:" << (current_time - block->create_time())
-      //           << " block size:" << blocks_.size()
-      //           << " txn:" << block->data().transaction_size();
-      max_block--;
-      num++;
-      // break;
-      if (max_block <= 0) break;
-      // blocks_.pop_front();
-      // break;
-    }
-    //LOG(ERROR) << "block num:" << num;
-    while (num > 0) {
-      blocks_.pop_front();
-      num--;
-    }
-    /*
-    if(!blocks_.empty()){
-      blocks_.pop_front();
-    }
-    */
-    // blocks_.clear();
-  }
-
-  Proposal* last = graph_->GetLatestStrongestProposal();
-  proposal->mutable_header()->set_height(round);
-  //LOG(ERROR) << "get last proposal, proposer:" << 
last->header().proposer_id()
-  //           << " id:" << last->header().proposal_id();
-
-  graph_->IncreaseHeight();
-  if (last != nullptr) {
-    assert(last->header().height() + 1 == round);
-
-    proposal->mutable_header()->set_prehash(last->header().hash());
-    *proposal->mutable_history() = last->history();
-  }
-
-  {
-    std::vector<Proposal*> ps = graph_->GetNewProposals(round);
-    // LOG(ERROR)<<"get weak p from round:"<<round<<" size:"<<ps.size();
-    for (Proposal* p : ps) {
-      if (p->header().height() >= round) {
-        LOG(ERROR) << "round invalid:" << round
-                   << " header:" << p->header().height();
-      }
-      assert(p->header().height() < round);
-
-      if (p->header().hash() == last->header().hash()) {
-        continue;
-      }
-      // LOG(ERROR)<<"add weak p:"<<p->header().height()<<"
-      // proposer:"<<p->header().proposer_id();
-      *proposal->mutable_weak_proposals()->add_hash() = p->header().hash();
-      data += p->header().hash();
-    }
-  }
-
-  proposal->mutable_header()->set_proposer_id(id_);
-  proposal->mutable_header()->set_proposal_id(local_proposal_id_++);
-  data += std::to_string(proposal->header().proposal_id()) +
-          std::to_string(proposal->header().proposer_id()) +
-          std::to_string(proposal->header().height());
-
-  std::string hash = SignatureVerifier::CalculateHash(data);
-  proposal->mutable_header()->set_hash(hash);
-  // proposal->set_create_time(GetCurrentTime());
-  return proposal;
-}
-
-void ProposalManager::ObtainHistoryProposal(const Proposal* p,
-                                            std::set<std::pair<int, int>>& v,
-                                            std::vector<const Proposal*>& resp,
-                                            int current_height) {
-  // LOG(ERROR)<<"obtain proposal history:"<<p->header().proposer_id()<<"
-  // id:"<<p->header().proposal_id();
-  const std::string& pre_hash = p->header().prehash();
-  if (!pre_hash.empty()) {
-    const Proposal* next_p = graph_->GetProposalInfo(pre_hash);
-    assert(next_p != nullptr);
-    int height = next_p->header().height();
-    // LOG(ERROR)<<"Obtain next height:"<<height<<"
-    // proposer:"<<next_p->header().proposer_id();
-    //  LOG(ERROR)<<"ask height:"<<current_height<<" current height:"<<height;
-    if (current_height - height > 5) {
-      return;
-    }
-    if (v.find(std::make_pair(next_p->header().proposer_id(),
-                              next_p->header().proposal_id())) != v.end()) {
-      return;
-    }
-    v.insert(std::make_pair(next_p->header().proposer_id(),
-                            next_p->header().proposal_id()));
-    // LOG(ERROR)<<"Obtain height:"<<height<<"
-    // proposer:"<<next_p->header().proposer_id()<<"
-    // id:"<<next_p->header().proposal_id();
-    ObtainHistoryProposal(next_p, v, resp, current_height);
-    resp.push_back(next_p);
-  }
-
-  for (const std::string& hash : p->weak_proposals().hash()) {
-    const Proposal* next_p = graph_->GetProposalInfo(hash);
-    assert(next_p != nullptr);
-    int height = next_p->header().height();
-    // LOG(ERROR)<<"ask height:"<<current_height<<" current height:"<<height;
-    if (current_height - height > 5) {
-      return;
-    }
-    if (v.find(std::make_pair(next_p->header().proposer_id(),
-                              next_p->header().proposal_id())) != v.end()) {
-      return;
-    }
-    v.insert(std::make_pair(next_p->header().proposer_id(),
-                            next_p->header().proposal_id()));
-    // LOG(ERROR)<<"Obtain height:"<<height<<"
-    // proposer:"<<next_p->header().proposer_id()<<"
-    // id:"<<next_p->header().proposal_id();
-    ObtainHistoryProposal(next_p, v, resp, current_height);
-    resp.push_back(next_p);
-  }
-}
-
-int ProposalManager::VerifyProposalHistory(const Proposal* p) {
-  // LOG(ERROR)<<"verify proposal, proposer:"<<p->header().proposer_id()<<"
-  // id:"<<p->header().proposal_id();
-  int ret = 0;
-  const std::string& pre_hash = p->header().prehash();
-  if (!pre_hash.empty()) {
-    const Proposal* next_p = graph_->GetProposalInfo(pre_hash);
-    if (next_p == nullptr) {
-      LOG(ERROR) << "no prehash:";
-      if (tmp_proposal_.find(pre_hash) == tmp_proposal_.end()) {
-        LOG(ERROR) << " prehash not found";
-        return 2;
-      }
-      ret = 1;
-      auto& it = tmp_proposal_[pre_hash];
-      assert(it != nullptr);
-      LOG(ERROR) << "Find pre hash in tmp:" << it->header().proposer_id()
-                 << " id:" << it->header().proposal_id();
-    }
-  }
-
-  for (const std::string& hash : p->weak_proposals().hash()) {
-    const Proposal* next_p = graph_->GetProposalInfo(hash);
-    if (next_p != nullptr) {
-      continue;
-    }
-    if (tmp_proposal_.find(hash) == tmp_proposal_.end()) {
-      LOG(ERROR) << "weak not found";
-      return 3;
-    }
-    ret = 1;
-    auto& it = tmp_proposal_[hash];
-    assert(it != nullptr);
-    LOG(ERROR) << "Find weak in tmp:" << it->header().proposer_id()
-               << " id:" << it->header().proposal_id();
-  }
-  return ret;
-}
-
-std::unique_ptr<ProposalQueryResp> ProposalManager::QueryProposal(
-    const std::string& hash) {
-  const Proposal* p = graph_->GetProposalInfo(hash);
-  if (p == nullptr) {
-    p = GetLocalProposal(hash);
-    LOG(ERROR) << "get from local:" << (p == nullptr);
-  }
-  assert(p != nullptr);
-  LOG(ERROR) << "query proposal id:" << p->header().proposal_id();
-  int current_height = p->header().height();
-
-  std::set<std::pair<int, int>> v;
-  std::vector<const Proposal*> list;
-
-  {
-    std::unique_lock<std::mutex> lk(q_mutex_);
-    ObtainHistoryProposal(p, v, list, current_height);
-  }
-
-  std::unique_ptr<ProposalQueryResp> resp =
-      std::make_unique<ProposalQueryResp>();
-  for (const Proposal* p : list) {
-    *resp->add_proposal() = *p;
-  }
-  return resp;
-}
-
-int ProposalManager::VerifyProposal(const Proposal& proposal) {
-  std::unique_lock<std::mutex> lk(q_mutex_);
-  int ret = VerifyProposalHistory(&proposal);
-  if (ret != 0) {
-    LOG(ERROR) << "add to temp proposer:" << proposal.header().proposer_id()
-               << " id:" << proposal.header().proposal_id();
-    tmp_proposal_[proposal.header().hash()] =
-        std::make_unique<Proposal>(proposal);
-  }
-  return ret;
-}
-
-void ProposalManager::ReleaseTmpProposal(const Proposal& proposal) {
-  std::unique_lock<std::mutex> lk(q_mutex_);
-  auto it = tmp_proposal_.find(proposal.header().hash());
-  if (it != tmp_proposal_.end()) {
-    //LOG(ERROR) << "release proposal:" << proposal.header().proposer_id()
-    //           << " id:" << proposal.header().proposal_id();
-    tmp_proposal_.erase(it);
-  }
-  graph_->AddProposalOnly(proposal);
-}
-
-void ProposalManager::AddLocalProposal(const Proposal& proposal) {
-  std::unique_lock<std::mutex> lk(t_mutex_);
-  local_proposal_[proposal.header().hash()] =
-      std::make_unique<Proposal>(proposal);
-  //LOG(ERROR) << "add local id:" << proposal.header().proposal_id();
-}
-
-Proposal* ProposalManager::GetLocalProposal(const std::string& hash) {
-  std::unique_lock<std::mutex> lk(t_mutex_);
-  auto it = local_proposal_.find(hash);
-  if (it == local_proposal_.end()) return nullptr;
-  return it->second.get();
-}
-
-void ProposalManager::RemoveLocalProposal(const std::string& hash) {
-  std::unique_lock<std::mutex> lk(t_mutex_);
-  auto it = local_proposal_.find(hash);
-  if (it == local_proposal_.end()) return;
-  //LOG(ERROR) << "remove local id:" << it->second->header().proposal_id();
-  local_proposal_.erase(it);
-}
-
-int ProposalManager::VerifyProposal(const ProposalQueryResp& resp) {
-  std::map<std::pair<int, int>, std::unique_ptr<Proposal>> list;
-  //LOG(ERROR) << "verify resp proposal size:" << resp.proposal_size();
-  for (auto& it : tmp_proposal_) {
-    std::unique_ptr<Proposal> p = std::move(it.second);
-    list[std::make_pair(p->header().height(), p->header().proposer_id())] =
-        std::move(p);
-  }
-
-  tmp_proposal_.clear();
-
-  for (const Proposal& p : resp.proposal()) {
-    // LOG(ERROR)<<"verify resp proposal 
proposer:"<<p.header().proposer_id()<<"
-    // id:"<<p.header().proposal_id();
-    if (list.find(std::make_pair(p.header().height(),
-                                 p.header().proposer_id())) != list.end()) {
-      continue;
-    }
-    list[std::make_pair(p.header().height(), p.header().proposer_id())] =
-        std::make_unique<Proposal>(p);
-  }
-
-  std::vector<std::unique_ptr<Proposal>> fail_list;
-  while (!list.empty()) {
-    auto it = list.begin();
-    int ret = VerifyProposalHistory(it->second.get());
-    // LOG(ERROR)<<"verify propser:"<<it->second->header().proposer_id()<<"
-    // height:"<<it->second->header().height()<<" ret:"<<ret;
-    if (ret == 0) {
-      ReleaseTmpProposal(*it->second);
-      //  LOG(ERROR)<<"proposal from:"<<it->second->header().proposer_id()
-      // <<" id:"<<it->second->header().proposal_id()<<" activate";
-    } else {
-      fail_list.push_back(std::move(it->second));
-    }
-    list.erase(it);
-    // assert(ret==0);
-  }
-  // tmp_proposal_.clear();
-  for (auto& p : fail_list) {
-    LOG(ERROR) << "proposal from:" << p->header().proposer_id()
-               << " id:" << p->header().proposal_id() << " fail";
-    assert(p != nullptr);
-    tmp_proposal_[p->header().hash()] = std::move(p);
-  }
-  return 0;
-}
-
-}  // namespace cassandra_recv
-}  // namespace cassandra
-}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.h 
b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.h
deleted file mode 100644
index 4a937470..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.h
+++ /dev/null
@@ -1,73 +0,0 @@
-#pragma once
-
-#include <condition_variable>
-#include <list>
-
-#include "platform/consensus/ordering/cassandra/algorithm/proposal_graph.h"
-#include "platform/consensus/ordering/cassandra/proto/proposal.pb.h"
-#include "platform/statistic/stats.h"
-
-namespace resdb {
-namespace cassandra {
-namespace cassandra_recv {
-
-class ProposalManager {
- public:
-  ProposalManager(int32_t id, ProposalGraph* graph);
-
-  int VerifyProposal(const Proposal& proposal);
-
-  void AddLocalBlock(std::unique_ptr<Block> block);
-  void AddBlock(std::unique_ptr<Block> block);
-  std::unique_ptr<Block> MakeBlock(
-      std::vector<std::unique_ptr<Transaction>>& txn);
-
-  std::unique_ptr<Proposal> GenerateProposal(int round, bool need_empty);
-  int CurrentRound();
-
-  void ClearProposal(const Proposal& p);
-  std::unique_ptr<Block> GetBlock(const std::string& hash, int sender);
-  Block* GetBlockSnap(const std::string& hash, int sender);
-
-  bool ContainBlock(const std::string& hash, int sender);
-  bool ContainBlock(const Block& block);
-  bool WaitBlock();
-  void BlockReady(const std::string& hash, int local_id);
-  const Block* QueryBlock(const std::string& hash);
-  std::unique_ptr<ProposalQueryResp> QueryProposal(const std::string& hash);
-  bool VerifyProposal(const Proposal* proposal);
-  void AddTmpProposal(std::unique_ptr<Proposal> proposal);
-  void ReleaseTmpProposal(const Proposal& proposal);
-  int VerifyProposalHistory(const Proposal* p);
-  void AddLocalProposal(const Proposal& proposal);
-  void RemoveLocalProposal(const std::string& hash);
-
-  int VerifyProposal(const ProposalQueryResp& resp);
-
- private:
-  void ObtainHistoryProposal(const Proposal* p,
-                             std::set<std::pair<int, int>>& v,
-                             std::vector<const Proposal*>& resp,
-                             int current_height);
-  Proposal* GetLocalProposal(const std::string& hash);
-
- private:
-  int32_t id_;
-  ProposalGraph* graph_;
-  int64_t local_proposal_id_ = 1, local_block_id_ = 1;
-
-  std::map<std::string, std::unique_ptr<Block>> pending_blocks_[512];
-  std::list<std::unique_ptr<Block>> blocks_;
-  std::mutex mutex_, p_mutex_, q_mutex_;
-  std::condition_variable notify_;
-  std::map<int, std::unique_ptr<Block>> blocks_candidates_;
-  std::map<std::string, std::unique_ptr<Proposal>> tmp_proposal_;
-
-  std::mutex t_mutex_;
-  std::map<std::string, std::unique_ptr<Proposal>> local_proposal_;
-  Stats* global_stats_;
-};
-
-}  // namespace cassandra_recv
-}  // namespace cassandra
-}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h 
b/platform/consensus/ordering/cassandra/algorithm/proposal_state.h
deleted file mode 100644
index b25bc293..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h
+++ /dev/null
@@ -1,16 +0,0 @@
-#pragma once
-
-namespace resdb {
-namespace cassandra {
-
-enum ProposalState {
-  None = 0,
-  New = 1,
-  Voted = 2,
-  Prepared = 3,
-  PreCommit = 4,
-  Committed = 5,
-};
-
-}
-}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/ranking.cpp 
b/platform/consensus/ordering/cassandra/algorithm/ranking.cpp
deleted file mode 100644
index d8454e43..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/ranking.cpp
+++ /dev/null
@@ -1,12 +0,0 @@
-
-#include "platform/consensus/ordering/cassandra/algorithm/ranking.h"
-
-namespace resdb {
-namespace cassandra {
-namespace cassandra_recv {
-
-int Ranking::GetRank(int proposer_id) { return proposer_id; }
-
-}  // namespace cassandra_recv
-}  // namespace cassandra
-}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/ranking.h 
b/platform/consensus/ordering/cassandra/algorithm/ranking.h
deleted file mode 100644
index d3d2dfb6..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/ranking.h
+++ /dev/null
@@ -1,14 +0,0 @@
-#pragma once
-
-namespace resdb {
-namespace cassandra {
-namespace cassandra_recv {
-
-class Ranking {
- public:
-  int GetRank(int proposer_id);
-};
-
-}  // namespace cassandra_recv
-}  // namespace cassandra
-}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/framework/BUILD 
b/platform/consensus/ordering/cassandra/framework/BUILD
deleted file mode 100644
index a764ecff..00000000
--- a/platform/consensus/ordering/cassandra/framework/BUILD
+++ /dev/null
@@ -1,16 +0,0 @@
-package(default_visibility = ["//visibility:private"])
-
-cc_library(
-    name = "consensus",
-    srcs = ["consensus.cpp"],
-    hdrs = ["consensus.h"],
-    visibility = [
-        "//visibility:public",
-    ],
-    deps = [
-        "//common/utils",
-        "//platform/consensus/ordering/common/framework:consensus",
-        "//platform/consensus/ordering/cassandra/algorithm:cassandra",
-    ],
-)
-
diff --git a/platform/consensus/ordering/cassandra/framework/consensus.cpp 
b/platform/consensus/ordering/cassandra/framework/consensus.cpp
deleted file mode 100644
index f8ad64fb..00000000
--- a/platform/consensus/ordering/cassandra/framework/consensus.cpp
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Copyright (c) 2019-2022 ExpoLab, UC Davis
- *
- * Permission is hereby granted, free of charge, to any person
- * obtaining a copy of this software and associated documentation
- * files (the "Software"), to deal in the Software without
- * restriction, including without limitation the rights to use,
- * copy, modify, merge, publish, distribute, sublicense, and/or
- * sell copies of the Software, and to permit persons to whom the
- * Software is furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be
- * included in all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
- * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
- * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
- * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
- * DEALINGS IN THE SOFTWARE.
- *
- */
-
-#include "platform/consensus/ordering/cassandra/framework/consensus.h"
-
-#include <glog/logging.h>
-#include <unistd.h>
-
-#include "common/utils/utils.h"
-
-namespace resdb {
-namespace cassandra {
-
-Consensus::Consensus(const ResDBConfig& config,
-                     std::unique_ptr<TransactionManager> executor)
-    : common::Consensus(config, std::move(executor)){
-  int total_replicas = config_.GetReplicaNum();
-  int f = (total_replicas - 1) / 3;
-
-  Init();
-
-  start_ = 0;
-
-  if (config_.GetPublicKeyCertificateInfo()
-          .public_key()
-          .public_key_info()
-          .type() != CertificateKeyInfo::CLIENT) {
-    cassandra_ = std::make_unique<cassandra_recv::Cassandra>(
-        config_.GetSelfInfo().id(), f,
-                                   total_replicas, GetSignatureVerifier());
-
-    InitProtocol(cassandra_.get());
-
-
-    cassandra_->SetPrepareFunction([&](const Transaction& msg) { 
-      return Prepare(msg); 
-    });
-  }
-}
-
-int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) {
-  //LOG(ERROR)<<"receive commit:"<<request->type()<<" 
"<<MessageType_Name(request->user_type());
-  if (request->user_type() == MessageType::NewBlocks) {
-    std::unique_ptr<Block> block = std::make_unique<Block>();
-    if (!block->ParseFromString(request->data())) {
-      assert(1 == 0);
-      LOG(ERROR) << "parse proposal fail";
-      return -1;
-    }
-    cassandra_->ReceiveBlock(std::move(block));
-    return 0;
-  } else if (request->user_type() == MessageType::CMD_BlockACK) {
-    std::unique_ptr<BlockACK> block_ack = std::make_unique<BlockACK>();
-    if (!block_ack->ParseFromString(request->data())) {
-      LOG(ERROR) << "parse proposal fail";
-      assert(1 == 0);
-      return -1;
-    }
-    cassandra_->ReceiveBlockACK(std::move(block_ack));
-    return 0;
-
-  } else if (request->user_type() == MessageType::NewProposal) {
-    // LOG(ERROR)<<"receive proposal:";
-    std::unique_ptr<Proposal> proposal = std::make_unique<Proposal>();
-    if (!proposal->ParseFromString(request->data())) {
-      LOG(ERROR) << "parse proposal fail";
-      assert(1 == 0);
-      return -1;
-    }
-    if (!cassandra_->ReceiveProposal(std::move(proposal))) {
-      return -1;
-    }
-    return 0;
-  } else if (request->user_type() == MessageType::CMD_BlockQuery) {
-    std::unique_ptr<BlockQuery> block = std::make_unique<BlockQuery>();
-    if (!block->ParseFromString(request->data())) {
-      assert(1 == 0);
-      LOG(ERROR) << "parse proposal fail";
-      return -1;
-    }
-    cassandra_->SendBlock(*block);
-    return 0;
-  } else if (request->user_type() == MessageType::CMD_ProposalQuery) {
-    std::unique_ptr<ProposalQuery> query =
-      std::make_unique<ProposalQuery>();
-    if (!query->ParseFromString(request->data())) {
-      assert(1 == 0);
-      LOG(ERROR) << "parse proposal fail";
-      return -1;
-    }
-    cassandra_->SendProposal(*query);
-  } else if (request->user_type() ==
-      MessageType::CMD_ProposalQueryResponse) {
-    std::unique_ptr<ProposalQueryResp> resp =
-      std::make_unique<ProposalQueryResp>();
-    if (!resp->ParseFromString(request->data())) {
-      assert(1 == 0);
-      LOG(ERROR) << "parse proposal fail";
-      return -1;
-    }
-    cassandra_->ReceiveProposalQueryResp(*resp);
-  } 
-  return 0;
-}
-
-int Consensus::ProcessNewTransaction(std::unique_ptr<Request> request) {
-  std::unique_ptr<Transaction> txn = std::make_unique<Transaction>();
-  txn->set_data(request->data());
-  txn->set_hash(request->hash());
-  txn->set_proxy_id(request->proxy_id());
-  //LOG(ERROR)<<"receive txn";
-  return cassandra_->ReceiveTransaction(std::move(txn));
-}
-
-int Consensus::CommitMsg(const google::protobuf::Message& msg) {
-  return CommitMsgInternal(dynamic_cast<const Transaction&>(msg));
-}
-
-int Consensus::CommitMsgInternal(const Transaction& txn) {
-  //LOG(ERROR)<<"commit txn:"<<txn.id()<<" proxy id:"<<txn.proxy_id()<<" 
uid:"<<txn.uid();
-  std::unique_ptr<Request> request = std::make_unique<Request>();
-  request->set_queuing_time(txn.queuing_time());
-  request->set_data(txn.data());
-  request->set_seq(txn.id());
-  request->set_uid(txn.uid());
-  //if (txn.proposer_id() == config_.GetSelfInfo().id()) {
-    request->set_proxy_id(txn.proxy_id());
-   // LOG(ERROR)<<"commit txn:"<<txn.id()<<" proxy id:"<<request->uid();
-    assert(request->uid()>0);
-  //}
-
-  transaction_executor_->AddExecuteMessage(std::move(request));
-  return 0;
-}
-
-
-int Consensus::Prepare(const Transaction& txn) {
-  // LOG(ERROR)<<"prepare txn:"<<txn.id()<<" proxy id:"<<txn.proxy_id()<<"
-  // uid:"<<txn.uid();
-  std::unique_ptr<Request> request = std::make_unique<Request>();
-  request->set_data(txn.data());
-  request->set_uid(txn.uid());
-  transaction_executor_->Prepare(std::move(request));
-  return 0;
-}
-
-
-}  // namespace cassandra
-}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/framework/consensus.h 
b/platform/consensus/ordering/cassandra/framework/consensus.h
deleted file mode 100644
index a0069c9d..00000000
--- a/platform/consensus/ordering/cassandra/framework/consensus.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright (c) 2019-2022 ExpoLab, UC Davis
- *
- * Permission is hereby granted, free of charge, to any person
- * obtaining a copy of this software and associated documentation
- * files (the "Software"), to deal in the Software without
- * restriction, including without limitation the rights to use,
- * copy, modify, merge, publish, distribute, sublicense, and/or
- * sell copies of the Software, and to permit persons to whom the
- * Software is furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be
- * included in all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
- * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
- * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
- * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
- * DEALINGS IN THE SOFTWARE.
- *
- */
-
-#pragma once
-
-#include "executor/common/transaction_manager.h"
-#include "platform/consensus/ordering/common/framework/consensus.h"
-#include "platform/consensus/ordering/cassandra/algorithm/cassandra.h"
-#include "platform/networkstrate/consensus_manager.h"
-
-namespace resdb {
-namespace cassandra {
-
-class Consensus : public common::Consensus {
- public:
-  Consensus(const ResDBConfig& config,
-            std::unique_ptr<TransactionManager> transaction_manager);
-  virtual ~Consensus() = default;
-
- private:
-  int ProcessCustomConsensus(std::unique_ptr<Request> request) override;
-  int ProcessNewTransaction(std::unique_ptr<Request> request) override;
-  int CommitMsg(const google::protobuf::Message& msg) override;
-  int CommitMsgInternal(const Transaction& txn);
-
-  int Prepare(const Transaction& txn);
-
- protected:
-  std::unique_ptr<cassandra_recv::Cassandra> cassandra_;
-  Stats* global_stats_;
-  int64_t start_;
-  std::mutex mutex_;
-  int send_num_[200];
-};
-
-}  // namespace cassandra
-}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/framework/consensus_test.cpp 
b/platform/consensus/ordering/cassandra/framework/consensus_test.cpp
deleted file mode 100644
index 2c8834a8..00000000
--- a/platform/consensus/ordering/cassandra/framework/consensus_test.cpp
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Copyright (c) 2019-2022 ExpoLab, UC Davis
- *
- * Permission is hereby granted, free of charge, to any person
- * obtaining a copy of this software and associated documentation
- * files (the "Software"), to deal in the Software without
- * restriction, including without limitation the rights to use,
- * copy, modify, merge, publish, distribute, sublicense, and/or
- * sell copies of the Software, and to permit persons to whom the
- * Software is furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be
- * included in all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
- * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
- * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
- * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
- * DEALINGS IN THE SOFTWARE.
- *
- */
-
-#include "platform/consensus/ordering/cassandra/framework/consensus.h"
-
-#include <glog/logging.h>
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
-#include <future>
-
-#include "common/test/test_macros.h"
-#include "executor/common/mock_transaction_manager.h"
-#include "platform/config/resdb_config_utils.h"
-#include "platform/networkstrate/mock_replica_communicator.h"
-
-namespace resdb {
-namespace cassandra {
-namespace {
-
-using ::resdb::testing::EqualsProto;
-using ::testing::_;
-using ::testing::Invoke;
-using ::testing::Test;
-
-ResDBConfig GetConfig() {
-  ResDBConfig config({GenerateReplicaInfo(1, "127.0.0.1", 1234),
-                      GenerateReplicaInfo(2, "127.0.0.1", 1235),
-                      GenerateReplicaInfo(3, "127.0.0.1", 1236),
-                      GenerateReplicaInfo(4, "127.0.0.1", 1237)},
-                     GenerateReplicaInfo(1, "127.0.0.1", 1234));
-  return config;
-}
-
-class ConsensusTest : public Test {
- public:
-  ConsensusTest() : config_(GetConfig()) {
-    auto transaction_manager =
-        std::make_unique<MockTransactionExecutorDataImpl>();
-    mock_transaction_manager_ = transaction_manager.get();
-    consensus_ =
-        std::make_unique<Consensus>(config_, std::move(transaction_manager));
-    consensus_->SetCommunicator(&replica_communicator_);
-  }
-
-  void AddTransaction(const std::string& data) {
-    auto request = std::make_unique<Request>();
-    request->set_type(Request::TYPE_NEW_TXNS);
-
-    Transaction txn;
-
-    BatchUserRequest batch_request;
-    auto req = batch_request.add_user_requests();
-    req->mutable_request()->set_data(data);
-
-    batch_request.set_local_id(1);
-    batch_request.SerializeToString(txn.mutable_data());
-
-    txn.SerializeToString(request->mutable_data());
-
-    EXPECT_EQ(consensus_->ConsensusCommit(nullptr, std::move(request)), 0);
-  }
-
- protected:
-  ResDBConfig config_;
-  MockTransactionExecutorDataImpl* mock_transaction_manager_;
-  MockReplicaCommunicator replica_communicator_;
-  std::unique_ptr<TransactionManager> transaction_manager_;
-  std::unique_ptr<Consensus> consensus_;
-};
-
-TEST_F(ConsensusTest, NormalCase) {
-  std::promise<bool> commit_done;
-  std::future<bool> commit_done_future = commit_done.get_future();
-
-  EXPECT_CALL(replica_communicator_, BroadCast)
-      .WillRepeatedly(Invoke([&](const google::protobuf::Message& msg) {
-        Request request = *dynamic_cast<const Request*>(&msg);
-
-        if (request.user_type() == MessageType::NewProposal) {
-          LOG(ERROR) << "bc new proposal";
-          consensus_->ConsensusCommit(nullptr,
-                                      std::make_unique<Request>(request));
-          LOG(ERROR) << "recv proposal done";
-        }
-        if (request.user_type() == MessageType::Vote) {
-          LOG(ERROR) << "bc vote";
-
-          VoteMessage ack_msg;
-          assert(ack_msg.ParseFromString(request.data()));
-          for (int i = 1; i <= 3; ++i) {
-            ack_msg.set_proposer_id(i);
-            auto new_req = std::make_unique<Request>(request);
-            ack_msg.SerializeToString(new_req->mutable_data());
-
-            consensus_->ConsensusCommit(nullptr, std::move(new_req));
-          }
-        }
-        // LOG(ERROR)<<"bc type:"<<request->type()<<" user
-        // type:"<<request->user_type();
-        if (request.user_type() == MessageType::Prepare) {
-          LOG(ERROR) << "bc prepare";
-
-          VoteMessage ack_msg;
-          assert(ack_msg.ParseFromString(request.data()));
-          for (int i = 1; i <= 3; ++i) {
-            ack_msg.set_proposer_id(i);
-            auto new_req = std::make_unique<Request>(request);
-            ack_msg.SerializeToString(new_req->mutable_data());
-
-            consensus_->ConsensusCommit(nullptr, std::move(new_req));
-          }
-        }
-        if (request.user_type() == MessageType::Voteprep) {
-          LOG(ERROR) << "bc voterep:";
-
-          VoteMessage ack_msg;
-          assert(ack_msg.ParseFromString(request.data()));
-          for (int i = 1; i <= 3; ++i) {
-            ack_msg.set_proposer_id(i);
-            auto new_req = std::make_unique<Request>(request);
-            ack_msg.SerializeToString(new_req->mutable_data());
-            LOG(ERROR) << "new request type:" << new_req->user_type();
-
-            consensus_->ConsensusCommit(nullptr, std::move(new_req));
-          }
-        }
-        LOG(ERROR) << "done";
-        return 0;
-      }));
-
-  EXPECT_CALL(*mock_transaction_manager_, ExecuteData)
-      .WillOnce(Invoke([&](const std::string& msg) {
-        LOG(ERROR) << "execute txn:" << msg;
-        EXPECT_EQ(msg, "transaction1");
-        return nullptr;
-      }));
-
-  EXPECT_CALL(replica_communicator_, SendMessage(_, 0))
-      .WillRepeatedly(
-          Invoke([&](const google::protobuf::Message& msg, int64_t) {
-            Request request = *dynamic_cast<const Request*>(&msg);
-            if (request.type() == Request::TYPE_RESPONSE) {
-              LOG(ERROR) << "get response";
-              commit_done.set_value(true);
-            }
-            return;
-          }));
-
-  AddTransaction("transaction1");
-
-  commit_done_future.get();
-}
-
-}  // namespace
-}  // namespace cassandra
-}  // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/proto/BUILD 
b/platform/consensus/ordering/cassandra/proto/BUILD
deleted file mode 100644
index 558db374..00000000
--- a/platform/consensus/ordering/cassandra/proto/BUILD
+++ /dev/null
@@ -1,16 +0,0 @@
-package(default_visibility = 
["//platform/consensus/ordering/cassandra:__subpackages__"])
-
-load("@rules_cc//cc:defs.bzl", "cc_proto_library")
-load("@rules_proto//proto:defs.bzl", "proto_library")
-load("@rules_proto_grpc//python:defs.bzl", "python_proto_library")
-
-proto_library(
-    name = "proposal_proto",
-    srcs = ["proposal.proto"],
-    #visibility = ["//visibility:public"],
-)
-
-cc_proto_library(
-    name = "proposal_cc_proto",
-    deps = [":proposal_proto"],
-)
diff --git a/platform/consensus/ordering/cassandra/proto/proposal.proto 
b/platform/consensus/ordering/cassandra/proto/proposal.proto
deleted file mode 100644
index 1fe96ba1..00000000
--- a/platform/consensus/ordering/cassandra/proto/proposal.proto
+++ /dev/null
@@ -1,119 +0,0 @@
-
-syntax = "proto3";
-
-package resdb.cassandra;
-
-message Transaction{
-  int32 id = 1;
-  bytes data = 2;
-  bytes hash = 3;
-  int32 proxy_id = 4;
-  int32 proposer_id = 5;
-  int64 uid = 6;
-  int64 create_time = 7;
-  int64 queuing_time = 8;
-}
-
-message Header {
-  bytes hash = 1;
-  int32 height = 2;
-  int32 proposer_id = 3;
-  int32 proposal_id = 4;
-  bytes prehash = 5;
-}
-
-message History {
-  bytes hash = 1;
-  int32 state = 2;
-  int32 sender = 3;
-  int32 id = 4;
-}
-
-message WeakProposal {
-  repeated bytes hash = 1;
-}
-
-message Proposal {
-  Header header = 1;
-  repeated History history = 2;
-  int32 nonce = 3;
-  repeated Transaction transactions = 4;
-  uint64 create_time = 5;
-  repeated Block block = 6;
-  WeakProposal weak_proposals = 7;
-};
-
-enum MessageType {
-  NewProposal = 0;
-  Vote = 1;
-  Prepare = 2;
-  VoteAck = 3;
-  Commit = 4;
-  Recovery = 5;
-  NewBlocks = 6;
-  ProposalAck = 7;
-  CMD_BlockACK = 8;
-  CMD_BlockQuery = 9;
-  CMD_SingleBlock = 10;
-  CMD_ProposalQuery = 11;
-  CMD_ProposalQueryResponse = 12;
-};
-
-message VoteMessage {
-  bytes hash = 1;
-  int32 proposer_id = 2;
-  MessageType type = 3;
-  int32 sender_id = 4;
-  int32 proposal_id = 5;
-};
-
-message CommittedProposals{
-  repeated Proposal proposals = 1;
-  int32 sender_id = 2;
-};
-
-message HashValue{
-       repeated uint64 bits = 1;
-};
-
-message Block {
-  message BlockData { 
-    repeated Transaction transaction = 1;
-  }
-  BlockData data = 1;
-  bytes hash = 2;
-  int32 sender_id = 3;
-  int32 local_id = 4;
-  int64 create_time = 5;
-}
-
-message BlockACK {
-  bytes hash = 2;
-  int32 sender_id = 3;
-  int32 local_id = 4;
-  int32 responder = 5;
-}
-
-message BlockQuery {
-  bytes hash = 2;
-  int32 proposer = 3;
-  int32 local_id = 4;
-  int32 sender = 5;
-}
-
-message ProposalQuery {
-  bytes hash = 2;
-  int32 proposer = 3;
-  int32 id = 4;
-  int32 sender = 5;
-}
-
-message ProposalQueryResp {
-  repeated Proposal proposal = 1;
-}
-
-message ProposalResponse {
-  Header header = 1;
-  int32 leader = 2;
-}
-

Reply via email to