This is an automated email from the ASF dual-hosted git repository.

junchao pushed a commit to branch poe_merge
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/poe_merge by this push:
     new 3b587004 add poe
3b587004 is described below

commit 3b5870042b541b3fe71954118a9c383ccea46c68
Author: cjcchen <[email protected]>
AuthorDate: Mon Apr 8 11:57:12 2024 +0000

    add poe
---
 benchmark/protocols/poe/BUILD                      |  16 ++
 benchmark/protocols/poe/kv_server_performance.cpp  |  89 +++++++
 .../consensus/execution/transaction_executor.cpp   |   1 +
 platform/consensus/ordering/common/algorithm/BUILD |  12 +
 .../ordering/common/algorithm/protocol_base.cpp    |  53 ++++
 .../ordering/common/algorithm/protocol_base.h      |  64 +++++
 platform/consensus/ordering/common/framework/BUILD |  49 ++++
 .../ordering/common/framework/consensus.cpp        | 181 +++++++++++++
 .../ordering/common/framework/consensus.h          |  83 ++++++
 .../common/framework/performance_manager.cpp       | 289 +++++++++++++++++++++
 .../common/framework/performance_manager.h         | 103 ++++++++
 .../ordering/common/framework/response_manager.cpp | 242 +++++++++++++++++
 .../ordering/common/framework/response_manager.h   |  85 ++++++
 .../common/framework/transaction_utils.cpp         |  49 ++++
 .../ordering/common/framework/transaction_utils.h  |  45 ++++
 platform/consensus/ordering/poe/algorithm/BUILD    |  15 ++
 platform/consensus/ordering/poe/algorithm/poe.cpp  |  75 ++++++
 platform/consensus/ordering/poe/algorithm/poe.h    |  40 +++
 platform/consensus/ordering/poe/framework/BUILD    |  16 ++
 .../consensus/ordering/poe/framework/consensus.cpp | 105 ++++++++
 .../consensus/ordering/poe/framework/consensus.h   |  59 +++++
 .../ordering/poe/framework/consensus_test.cpp      | 179 +++++++++++++
 platform/consensus/ordering/poe/proto/BUILD        |  16 ++
 .../consensus/ordering/poe/proto/proposal.proto    |  28 ++
 platform/networkstrate/replica_communicator.cpp    |   2 +-
 platform/proto/resdb.proto                         |   9 +-
 scripts/deploy/config/kv_performance_server.conf   |  10 +-
 scripts/deploy/config/poe.config                   |  10 +
 scripts/deploy/performance/pbft_performance.sh     |   2 +
 scripts/deploy/performance/poe_performance.sh      |   4 +
 scripts/deploy/performance/run_performance.sh      |   2 -
 31 files changed, 1924 insertions(+), 9 deletions(-)

diff --git a/benchmark/protocols/poe/BUILD b/benchmark/protocols/poe/BUILD
new file mode 100644
index 00000000..e7bbde3b
--- /dev/null
+++ b/benchmark/protocols/poe/BUILD
@@ -0,0 +1,16 @@
+package(default_visibility = ["//visibility:private"])
+
+load("@bazel_skylib//rules:common_settings.bzl", "bool_flag")
+
+cc_binary(
+    name = "kv_server_performance",
+    srcs = ["kv_server_performance.cpp"],
+    deps = [
+        "//chain/storage:memory_db",
+        "//executor/kv:kv_executor",
+        "//platform/config:resdb_config_utils",
+        "//platform/consensus/ordering/poe/framework:consensus",
+        "//service/utils:server_factory",
+    ],
+)
+
diff --git a/benchmark/protocols/poe/kv_server_performance.cpp 
b/benchmark/protocols/poe/kv_server_performance.cpp
new file mode 100644
index 00000000..fa13e9a9
--- /dev/null
+++ b/benchmark/protocols/poe/kv_server_performance.cpp
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include <glog/logging.h>
+
+#include "chain/storage/memory_db.h"
+#include "executor/kv/kv_executor.h"
+#include "platform/config/resdb_config_utils.h"
+#include "platform/consensus/ordering/poe/framework/consensus.h"
+#include "platform/networkstrate/service_network.h"
+#include "platform/statistic/stats.h"
+#include "proto/kv/kv.pb.h"
+
+using namespace resdb;
+using namespace resdb::poe;
+using namespace resdb::storage;
+
+void ShowUsage() {
+  printf("<config> <private_key> <cert_file> [logging_dir]\n");
+}
+
+std::string GetRandomKey() {
+  int num1 = rand() % 10;
+  int num2 = rand() % 10;
+  return std::to_string(num1) + std::to_string(num2);
+}
+
+int main(int argc, char** argv) {
+  if (argc < 3) {
+    ShowUsage();
+    exit(0);
+  }
+
+  // google::InitGoogleLogging(argv[0]);
+  // FLAGS_minloglevel = google::GLOG_WARNING;
+
+  char* config_file = argv[1];
+  char* private_key_file = argv[2];
+  char* cert_file = argv[3];
+
+  if (argc >= 5) {
+    auto monitor_port = Stats::GetGlobalStats(5);
+    monitor_port->SetPrometheus(argv[4]);
+  }
+
+  std::unique_ptr<ResDBConfig> config =
+      GenerateResDBConfig(config_file, private_key_file, cert_file);
+
+  config->RunningPerformance(true);
+  ResConfigData config_data = config->GetConfigData();
+
+  auto performance_consens = std::make_unique<Consensus>(
+      *config, std::make_unique<KVExecutor>(std::make_unique<MemoryDB>()));
+  performance_consens->SetupPerformanceDataFunc([]() {
+    KVRequest request;
+    request.set_cmd(KVRequest::SET);
+    request.set_key(GetRandomKey());
+    request.set_value("helloword");
+    std::string request_data;
+    request.SerializeToString(&request_data);
+    return request_data;
+  });
+    
+  auto server =
+      std::make_unique<ServiceNetwork>(*config, 
std::move(performance_consens));
+  server->Run();
+}
diff --git a/platform/consensus/execution/transaction_executor.cpp 
b/platform/consensus/execution/transaction_executor.cpp
index 59779b22..fd24da3a 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -231,6 +231,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
     response = std::make_unique<BatchUserResponse>();
   }
 
+  response->set_proxy_id(batch_request.proxy_id());
   response->set_createtime(batch_request.createtime());
   response->set_local_id(batch_request.local_id());
   response->set_hash(batch_request.hash());
diff --git a/platform/consensus/ordering/common/algorithm/BUILD 
b/platform/consensus/ordering/common/algorithm/BUILD
new file mode 100644
index 00000000..9abd8715
--- /dev/null
+++ b/platform/consensus/ordering/common/algorithm/BUILD
@@ -0,0 +1,12 @@
+package(default_visibility = ["//platform/consensus/ordering:__subpackages__"])
+
+cc_library(
+    name = "protocol_base",
+    srcs = ["protocol_base.cpp"],
+    hdrs = ["protocol_base.h"],
+    deps = [
+        "//common:comm",
+        "//common/crypto:signature_verifier",
+    ],
+)
+
diff --git a/platform/consensus/ordering/common/algorithm/protocol_base.cpp 
b/platform/consensus/ordering/common/algorithm/protocol_base.cpp
new file mode 100644
index 00000000..3c6c2fc3
--- /dev/null
+++ b/platform/consensus/ordering/common/algorithm/protocol_base.cpp
@@ -0,0 +1,53 @@
+#include "platform/consensus/ordering/common/algorithm/protocol_base.h"
+
+#include <glog/logging.h>
+
+namespace resdb {
+namespace common {
+
+ProtocolBase::ProtocolBase(
+    int id, 
+    int f,
+    int total_num,
+    SingleCallFuncType single_call,
+    BroadcastCallFuncType broadcast_call,
+    CommitFuncType commit) : 
+      id_(id), 
+      f_(f),
+      total_num_(total_num),
+      single_call_(single_call), 
+      broadcast_call_(broadcast_call), 
+      commit_(commit) {
+      stop_ = false;
+}
+
+ProtocolBase::ProtocolBase( int id, int f, int total_num) : ProtocolBase(id, 
f, total_num, nullptr, nullptr, nullptr){
+
+}
+
+ProtocolBase::~ProtocolBase() {
+  Stop();
+}
+
+void ProtocolBase::Stop(){
+  stop_ = true;
+}
+
+bool ProtocolBase::IsStop(){
+  return stop_;
+}
+    
+int ProtocolBase::SendMessage(int msg_type, const google::protobuf::Message& 
msg, int node_id) {
+  return single_call_(msg_type, msg, node_id);
+}
+
+int ProtocolBase::Broadcast(int msg_type, const google::protobuf::Message& 
msg) {
+  return broadcast_call_(msg_type, msg);
+}
+
+int ProtocolBase::Commit(const google::protobuf::Message& msg) {
+  return commit_(msg);
+}
+
+}  // namespace protocol
+}  // namespace resdb
diff --git a/platform/consensus/ordering/common/algorithm/protocol_base.h 
b/platform/consensus/ordering/common/algorithm/protocol_base.h
new file mode 100644
index 00000000..a93be822
--- /dev/null
+++ b/platform/consensus/ordering/common/algorithm/protocol_base.h
@@ -0,0 +1,64 @@
+#pragma once
+
+#include <functional>
+#include <google/protobuf/message.h>
+#include "common/crypto/signature_verifier.h"
+
+namespace resdb {
+namespace common {
+
+class ProtocolBase {
+ public:
+  typedef std::function<int(int, const google::protobuf::Message& msg, int)> 
SingleCallFuncType;
+  typedef std::function<int(int, const google::protobuf::Message& msg)> 
BroadcastCallFuncType;
+  typedef std::function<int(const google::protobuf::Message& msg)> 
CommitFuncType;
+
+  ProtocolBase(
+      int id, 
+    int f,
+    int total_num,
+      SingleCallFuncType single_call,
+      BroadcastCallFuncType broadcast_call,
+      CommitFuncType commit
+      );
+
+  ProtocolBase( int id, int f, int total_num);
+
+
+  virtual ~ProtocolBase();
+
+  void Stop();
+
+  inline
+  void SetSingleCallFunc(SingleCallFuncType single_call) { single_call_ = 
single_call; }
+  
+  inline 
+  void SetBroadcastCallFunc(BroadcastCallFuncType broadcast_call) { 
broadcast_call_ = broadcast_call; }
+
+  inline
+  void SetCommitFunc(CommitFuncType commit_func) { commit_ = commit_func; }
+
+  inline 
+  void SetSignatureVerifier(SignatureVerifier* verifier) { verifier_ = 
verifier;}
+
+  protected:
+    int SendMessage(int msg_type, const google::protobuf::Message& msg, int 
node_id);
+    int Broadcast(int msg_type, const google::protobuf::Message& msg);
+    int Commit(const google::protobuf::Message& msg);
+
+    bool IsStop();
+
+ protected:
+  int id_;
+  int f_;
+  int total_num_;
+  std::function<int(int, const google::protobuf::Message& msg, int)> 
single_call_;
+  std::function<int(int, const google::protobuf::Message& msg)> 
broadcast_call_;
+  std::function<int(const google::protobuf::Message& msg)> commit_;
+  std::atomic<bool> stop_;
+
+  SignatureVerifier* verifier_;
+};
+
+}  // namespace protocol
+}  // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/BUILD 
b/platform/consensus/ordering/common/framework/BUILD
new file mode 100644
index 00000000..82e03a0f
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/BUILD
@@ -0,0 +1,49 @@
+package(default_visibility = ["//platform/consensus/ordering:__subpackages__"])
+
+cc_library(
+    name = "consensus",
+    srcs = ["consensus.cpp"],
+    hdrs = ["consensus.h"],
+    deps = [
+        ":performance_manager",
+        ":response_manager",
+        "//common/utils",
+        "//executor/common:transaction_manager",
+        "//platform/consensus/execution:transaction_executor",
+        "//platform/consensus/ordering/common/algorithm:protocol_base",
+        "//platform/networkstrate:consensus_manager",
+    ],
+)
+
+cc_library(
+    name = "performance_manager",
+    srcs = ["performance_manager.cpp"],
+    hdrs = ["performance_manager.h"],
+    deps = [
+        ":transaction_utils",
+        "//platform/networkstrate:replica_communicator",
+        "//platform/networkstrate:server_comm",
+    ],
+)
+
+
+cc_library(
+    name = "response_manager",
+    srcs = ["response_manager.cpp"],
+    hdrs = ["response_manager.h"],
+    deps = [
+        ":transaction_utils",
+        "//platform/networkstrate:replica_communicator",
+        "//platform/networkstrate:server_comm",
+    ],
+)
+
+cc_library(
+    name = "transaction_utils",
+    srcs = ["transaction_utils.cpp"],
+    hdrs = ["transaction_utils.h"],
+    visibility = ["//visibility:public"],
+    deps = [
+        "//platform/proto:resdb_cc_proto",
+    ],
+)
diff --git a/platform/consensus/ordering/common/framework/consensus.cpp 
b/platform/consensus/ordering/common/framework/consensus.cpp
new file mode 100644
index 00000000..683bce29
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/consensus.cpp
@@ -0,0 +1,181 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "platform/consensus/ordering/common/framework/consensus.h"
+
+#include <glog/logging.h>
+#include <unistd.h>
+
+#include "common/utils/utils.h"
+
+namespace resdb {
+namespace common {
+
+Consensus::Consensus(const ResDBConfig& config,
+                     std::unique_ptr<TransactionManager> executor)
+    : ConsensusManager(config),
+      replica_communicator_(GetBroadCastClient()),
+      transaction_executor_(std::make_unique<TransactionExecutor>(
+          config,
+          [&](std::unique_ptr<Request> request,
+              std::unique_ptr<BatchUserResponse> resp_msg) {
+            ResponseMsg(*resp_msg);
+          },
+          nullptr, std::move(executor))) {
+  LOG(INFO) << "is running is performance mode:"
+            << config_.IsPerformanceRunning();
+            is_stop_ = false;
+  global_stats_ = Stats::GetGlobalStats();
+}
+
+void Consensus::Init(){
+  if(performance_manager_ == nullptr){
+    performance_manager_ = 
+      config_.IsPerformanceRunning()
+      ? std::make_unique<PerformanceManager>(
+          config_, GetBroadCastClient(), GetSignatureVerifier())
+      : nullptr;
+  }
+
+  if(response_manager_ == nullptr){
+    response_manager_ = 
+      !config_.IsPerformanceRunning()
+      ? std::make_unique<ResponseManager>(config_, GetBroadCastClient(),
+          GetSignatureVerifier())
+      : nullptr;
+  }
+}
+
+void Consensus::InitProtocol(ProtocolBase * protocol){
+  protocol->SetSingleCallFunc(
+      [&](int type, const google::protobuf::Message& msg, int node_id) {
+      return SendMsg(type, msg, node_id);
+      });
+
+  protocol->SetBroadcastCallFunc(
+      [&](int type, const google::protobuf::Message& msg) {
+      return Broadcast(type, msg);
+      });
+
+  protocol->SetCommitFunc(
+      [&](const google::protobuf::Message& msg) { 
+      return CommitMsg(msg); 
+      });
+}
+
+Consensus::~Consensus(){
+  is_stop_ = true;
+}
+
+void Consensus::SetPerformanceManager(std::unique_ptr<PerformanceManager> 
performance_manager){
+  performance_manager_ = std::move(performance_manager);
+}
+
+bool Consensus::IsStop(){
+  return is_stop_;
+}
+
+void Consensus::SetupPerformanceDataFunc(std::function<std::string()> func) {
+  performance_manager_->SetDataFunc(func);
+}
+
+void Consensus::SetCommunicator(ReplicaCommunicator* replica_communicator) {
+  replica_communicator_ = replica_communicator;
+}
+
+int Consensus::Broadcast(int type, const google::protobuf::Message& msg) {
+  Request request;
+  msg.SerializeToString(request.mutable_data());
+  request.set_type(Request::TYPE_CUSTOM_CONSENSUS);
+  request.set_user_type(type);
+  request.set_sender_id(config_.GetSelfInfo().id());
+
+  replica_communicator_->BroadCast(request);
+  return 0;
+}
+
+int Consensus::SendMsg(int type, const google::protobuf::Message& msg,
+                       int node_id) {
+  Request request;
+  msg.SerializeToString(request.mutable_data());
+  request.set_type(Request::TYPE_CUSTOM_CONSENSUS);
+  request.set_user_type(type);
+  request.set_sender_id(config_.GetSelfInfo().id());
+  replica_communicator_->SendMessage(request, node_id);
+  return 0;
+}
+
+std::vector<ReplicaInfo> Consensus::GetReplicas() {
+  return config_.GetReplicaInfos();
+}
+
+int Consensus::CommitMsg(const google::protobuf::Message &txn) {
+  return 0;
+}
+
+// The implementation of PBFT.
+int Consensus::ConsensusCommit(std::unique_ptr<Context> context,
+                               std::unique_ptr<Request> request) {
+  switch (request->type()) {
+    case Request::TYPE_CLIENT_REQUEST:
+      if (config_.IsPerformanceRunning()) {
+        return performance_manager_->StartEval();
+      }
+    case Request::TYPE_RESPONSE:
+      if (config_.IsPerformanceRunning()) {
+        return performance_manager_->ProcessResponseMsg(std::move(context),
+                                                        std::move(request));
+      }
+    case Request::TYPE_NEW_TXNS: {
+      return ProcessNewTransaction(std::move(request));
+    }
+    case Request::TYPE_CUSTOM_CONSENSUS: {
+      return ProcessCustomConsensus(std::move(request));
+    }
+  }
+  return 0;
+}
+
+int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) {
+  return 0;
+}
+
+int Consensus::ProcessNewTransaction(std::unique_ptr<Request> request) {
+  return 0;
+}
+
+int Consensus::ResponseMsg(const BatchUserResponse& batch_resp) {
+  Request request;
+  request.set_seq(batch_resp.seq());
+  request.set_type(Request::TYPE_RESPONSE);
+  request.set_sender_id(config_.GetSelfInfo().id());
+  request.set_proxy_id(batch_resp.proxy_id());
+  batch_resp.SerializeToString(request.mutable_data());
+  replica_communicator_->SendMessage(request, request.proxy_id());
+  return 0;
+}
+
+}  // namespace common
+}  // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/consensus.h 
b/platform/consensus/ordering/common/framework/consensus.h
new file mode 100644
index 00000000..881dc72b
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/consensus.h
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#pragma once
+
+#include "executor/common/transaction_manager.h"
+#include "platform/consensus/execution/transaction_executor.h"
+#include "platform/consensus/ordering/common/algorithm/protocol_base.h"
+#include "platform/consensus/ordering/common/framework/performance_manager.h"
+#include "platform/consensus/ordering/common/framework/response_manager.h"
+#include "platform/networkstrate/consensus_manager.h"
+
+namespace resdb {
+namespace common {
+
+class Consensus : public ConsensusManager {
+ public:
+  Consensus(const ResDBConfig& config,
+            std::unique_ptr<TransactionManager> transaction_manager);
+  virtual ~Consensus();
+
+  int ConsensusCommit(std::unique_ptr<Context> context,
+                      std::unique_ptr<Request> request) override;
+  std::vector<ReplicaInfo> GetReplicas() override;
+
+  void SetupPerformanceDataFunc(std::function<std::string()> func);
+
+  void SetCommunicator(ReplicaCommunicator* replica_communicator);
+
+  void InitProtocol(ProtocolBase * protocol);
+
+  protected:
+    virtual int ProcessCustomConsensus(std::unique_ptr<Request> request);
+    virtual int ProcessNewTransaction(std::unique_ptr<Request> request);
+    virtual int CommitMsg(const google::protobuf::Message& msg);
+
+ protected:
+  int SendMsg(int type, const google::protobuf::Message& msg, int node_id);
+  int Broadcast(int type, const google::protobuf::Message& msg);
+  int ResponseMsg(const BatchUserResponse& batch_resp);
+  void AsyncSend();
+  bool IsStop();
+
+ protected:
+  void Init();
+  void SetPerformanceManager(std::unique_ptr<PerformanceManager> 
performance_manger);
+
+ protected:
+  ReplicaCommunicator* replica_communicator_;
+  std::unique_ptr<PerformanceManager> performance_manager_;
+  std::unique_ptr<ResponseManager> response_manager_;
+  std::unique_ptr<TransactionExecutor> transaction_executor_;
+  Stats* global_stats_;
+
+  LockFreeQueue<BatchUserResponse> resp_queue_;
+  std::vector<std::thread> send_thread_;
+  bool is_stop_;
+};
+
+}  // namespace common
+}  // namespace resdb
diff --git 
a/platform/consensus/ordering/common/framework/performance_manager.cpp 
b/platform/consensus/ordering/common/framework/performance_manager.cpp
new file mode 100644
index 00000000..089f3bd5
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/performance_manager.cpp
@@ -0,0 +1,289 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "platform/consensus/ordering/common/framework/performance_manager.h"
+
+#include <glog/logging.h>
+
+#include "common/utils/utils.h"
+
+namespace resdb {
+namespace common {
+
+using comm::CollectorResultCode;
+
+PerformanceManager::PerformanceManager(
+    const ResDBConfig& config, ReplicaCommunicator* replica_communicator,
+    SignatureVerifier* verifier)
+     : config_(config),
+      replica_communicator_(replica_communicator),
+      batch_queue_("user request"),
+      verifier_(verifier) {
+  stop_ = false;
+  eval_started_ = false;
+  eval_ready_future_ = eval_ready_promise_.get_future();
+  if (config_.GetPublicKeyCertificateInfo()
+          .public_key()
+          .public_key_info()
+          .type() == CertificateKeyInfo::CLIENT) {
+    for (int i = 0; i < 1; ++i) {
+      user_req_thread_[i] =
+          std::thread(&PerformanceManager::BatchProposeMsg, this);
+    }
+  }
+  global_stats_ = Stats::GetGlobalStats();
+  send_num_ = 0;
+  total_num_ = 0;
+  replica_num_ = config_.GetReplicaNum();
+  id_ = config_.GetSelfInfo().id();
+  primary_ = id_ % replica_num_;
+  if (primary_ == 0) primary_ = replica_num_;
+  local_id_ = 1;
+  sum_ = 0;
+}
+
+PerformanceManager::~PerformanceManager() {
+  stop_ = true;
+  for (int i = 0; i < 16; ++i) {
+    if (user_req_thread_[i].joinable()) {
+      user_req_thread_[i].join();
+    }
+  }
+}
+
+int PerformanceManager::GetPrimary() { return primary_; }
+
+std::unique_ptr<Request> PerformanceManager::GenerateUserRequest() {
+  std::unique_ptr<Request> request = std::make_unique<Request>();
+  request->set_data(data_func_());
+  return request;
+}
+
+void PerformanceManager::SetDataFunc(std::function<std::string()> func) {
+  data_func_ = std::move(func);
+}
+
+int PerformanceManager::StartEval() {
+  if (eval_started_) {
+    return 0;
+  }
+  eval_started_ = true;
+  for (int i = 0; i < 100000000; ++i) {
+    // for (int i = 0; i < 60000000000; ++i) {
+    std::unique_ptr<QueueItem> queue_item = std::make_unique<QueueItem>();
+    queue_item->context = nullptr;
+    queue_item->user_request = GenerateUserRequest();
+    batch_queue_.Push(std::move(queue_item));
+    if (i == 2000000) {
+      eval_ready_promise_.set_value(true);
+    }
+  }
+  LOG(WARNING) << "start eval done";
+  return 0;
+}
+
+// =================== response ========================
+// handle the response message. If receive f+1 commit messages, send back to 
the
+// user.
+int PerformanceManager::ProcessResponseMsg(std::unique_ptr<Context> context,
+                                           std::unique_ptr<Request> request) {
+  std::unique_ptr<Request> response;
+  // Add the response message, and use the call back to collect the received
+  // messages.
+  // The callback will be triggered if it received f+1 messages.
+  if (request->ret() == -2) {
+    // LOG(INFO) << "get response fail:" << request->ret();
+    send_num_--;
+    return 0;
+  }
+
+  //LOG(INFO) << "get response:" << request->seq() << " 
sender:"<<request->sender_id();
+  std::unique_ptr<BatchUserResponse> batch_response = nullptr;
+  CollectorResultCode ret =
+      AddResponseMsg(std::move(request), 
[&](std::unique_ptr<BatchUserResponse> request) {
+        batch_response = std::move(request);
+        return;
+      });
+
+  if (ret == CollectorResultCode::STATE_CHANGED) {
+    assert(batch_response);
+      SendResponseToClient(*batch_response);
+  }
+  return ret == CollectorResultCode::INVALID ? -2 : 0;
+}
+
+CollectorResultCode PerformanceManager::AddResponseMsg(
+    std::unique_ptr<Request> request,
+    std::function<void(std::unique_ptr<BatchUserResponse>)> 
response_call_back) {
+  if (request == nullptr) {
+    return CollectorResultCode::INVALID;
+  }
+
+  //uint64_t seq = request->seq();
+
+  std::unique_ptr<BatchUserResponse> batch_response = 
std::make_unique<BatchUserResponse>();
+  if (!batch_response->ParseFromString(request->data())) {
+    LOG(ERROR) << "parse response fail:"<<request->data().size()
+    <<" seq:"<<request->seq(); return CollectorResultCode::INVALID;
+  }
+
+  uint64_t seq = batch_response->local_id();
+  //LOG(ERROR)<<"receive seq:"<<seq;
+
+  bool done = false;
+  {
+    int idx = seq % response_set_size_;
+    std::unique_lock<std::mutex> lk(response_lock_[idx]);
+    if (response_[idx].find(seq) == response_[idx].end()) {
+      //LOG(ERROR)<<"has done local seq:"<<seq<<" global seq:"<<request->seq();
+      return CollectorResultCode::OK;
+    }
+    response_[idx][seq]++;
+    //LOG(ERROR)<<"get seq :"<<request->seq()<<" local id:"<<seq<<" 
num:"<<response_[idx][seq]<<" send:"<<send_num_;
+    if (response_[idx][seq] >= config_.GetMinClientReceiveNum()) {
+      //LOG(ERROR)<<"get seq :"<<request->seq()<<" local id:"<<seq<<" 
num:"<<response_[idx][seq]<<" done:"<<send_num_;
+      response_[idx].erase(response_[idx].find(seq));
+      done = true;
+    }
+  }
+  if (done) {
+    response_call_back(std::move(batch_response));
+    return CollectorResultCode::STATE_CHANGED;
+  }
+  return CollectorResultCode::OK;
+}
+
+void PerformanceManager::SendResponseToClient(
+    const BatchUserResponse& batch_response) {
+  uint64_t create_time = batch_response.createtime();
+  if (create_time > 0) {
+    uint64_t run_time = GetCurrentTime() - create_time;
+    LOG(ERROR)<<"receive current:"<<GetCurrentTime()<<" create 
time:"<<create_time<<" run time:"<<run_time<<" local 
id:"<<batch_response.local_id();
+    global_stats_->AddLatency(run_time);
+  } else {
+  }
+  //send_num_-=10;
+  send_num_--;
+}
+
+// =================== request ========================
+int PerformanceManager::BatchProposeMsg() {
+  LOG(WARNING) << "batch wait time:" << config_.ClientBatchWaitTimeMS()
+               << " batch num:" << config_.ClientBatchNum()
+               << " max txn:" << config_.GetMaxProcessTxn();
+  std::vector<std::unique_ptr<QueueItem>> batch_req;
+  eval_ready_future_.get();
+  bool start = false;
+  while (!stop_) {
+    if (send_num_ > config_.GetMaxProcessTxn()) {
+      // LOG(ERROR)<<"wait send num:"<<send_num_;
+      usleep(100000);
+      continue;
+    }
+    if (batch_req.size() < config_.ClientBatchNum()) {
+      std::unique_ptr<QueueItem> item =
+          batch_queue_.Pop(config_.ClientBatchWaitTimeMS());
+      if (item == nullptr) {
+        if(start){
+          LOG(ERROR)<<"no data";
+        }
+        continue;
+      }
+      batch_req.push_back(std::move(item));
+      if (batch_req.size() < config_.ClientBatchNum()) {
+        continue;
+      }
+    }
+    start = true;
+    for(int i = 0; i < 1;++i){
+      int ret = DoBatch(batch_req);
+    }
+    batch_req.clear();
+  }
+  return 0;
+}
+
+int PerformanceManager::DoBatch(
+    const std::vector<std::unique_ptr<QueueItem>>& batch_req) {
+  auto new_request = comm::NewRequest(Request::TYPE_NEW_TXNS, Request(),
+                                      config_.GetSelfInfo().id());
+  if (new_request == nullptr) {
+    return -2;
+  }
+
+  BatchUserRequest batch_request;
+  for (size_t i = 0; i < batch_req.size(); ++i) {
+    BatchUserRequest::UserRequest* req = batch_request.add_user_requests();
+    *req->mutable_request() = *batch_req[i]->user_request.get();
+    req->set_id(i);
+  }
+
+  batch_request.set_local_id(local_id_++);
+
+  {
+    int idx = batch_request.local_id() % response_set_size_;
+    std::unique_lock<std::mutex> lk(response_lock_[idx]);
+    response_[idx][batch_request.local_id()]++;
+  }
+
+  batch_request.set_proxy_id(config_.GetSelfInfo().id());
+  batch_request.set_createtime(GetCurrentTime());
+  batch_request.SerializeToString(new_request->mutable_data());
+  if (verifier_) {
+    auto signature_or = verifier_->SignMessage(new_request->data());
+    if (!signature_or.ok()) {
+      LOG(ERROR) << "Sign message fail";
+      return -2;
+    }
+    *new_request->mutable_data_signature() = *signature_or;
+  }
+
+  new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data()));
+  new_request->set_proxy_id(config_.GetSelfInfo().id());
+  new_request->set_user_seq(batch_request.local_id());
+
+  SendMessage(*new_request);
+
+  global_stats_->BroadCastMsg();
+  send_num_++;
+  sum_ += batch_req.size();
+  //LOG(ERROR)<<"send num:"<<send_num_<<" total num:"<<total_num_<<" 
sum:"<<sum_<<" to:"<<GetPrimary();
+  if (total_num_++ == 1000000) {
+    stop_ = true;
+    LOG(WARNING) << "total num is done:" << total_num_;
+  }
+  if (total_num_ % 1000 == 0) {
+    LOG(WARNING) << "total num is :" << total_num_;
+  }
+  global_stats_->IncClientCall();
+  return 0;
+}
+
+void PerformanceManager::SendMessage(const Request& request){
+  replica_communicator_->SendMessage(request, GetPrimary());
+}
+
+}  // namespace common
+}  // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/performance_manager.h 
b/platform/consensus/ordering/common/framework/performance_manager.h
new file mode 100644
index 00000000..5a874baa
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/performance_manager.h
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#pragma once
+
+#include <future>
+
+#include "platform/config/resdb_config.h"
+#include "platform/consensus/ordering/common/framework/transaction_utils.h"
+#include "platform/networkstrate/replica_communicator.h"
+#include "platform/networkstrate/server_comm.h"
+#include "platform/statistic/stats.h"
+
+namespace resdb {
+namespace common {
+
+class PerformanceManager {
+ public:
+  PerformanceManager(const ResDBConfig& config,
+                     ReplicaCommunicator* replica_communicator,
+                     SignatureVerifier* verifier);
+
+  virtual ~PerformanceManager();
+
+  int StartEval();
+
+  int ProcessResponseMsg(std::unique_ptr<Context> context,
+                         std::unique_ptr<Request> request);
+  void SetDataFunc(std::function<std::string()> func);
+
+  protected:
+  virtual void SendMessage(const Request& request);
+
+ private:
+  // Add response messages which will be sent back to the caller
+  // if there are f+1 same messages.
+  comm::CollectorResultCode AddResponseMsg(
+      std::unique_ptr<Request> request,
+      std::function<void(std::unique_ptr<BatchUserResponse>)> call_back);
+  void SendResponseToClient(const BatchUserResponse& batch_response);
+
+  struct QueueItem {
+    std::unique_ptr<Context> context;
+    std::unique_ptr<Request> user_request;
+  };
+  int DoBatch(const std::vector<std::unique_ptr<QueueItem>>& batch_req);
+  int BatchProposeMsg();
+  int GetPrimary();
+  std::unique_ptr<Request> GenerateUserRequest();
+
+ protected:
+  ResDBConfig config_;
+  ReplicaCommunicator* replica_communicator_;
+
+ private:
+  LockFreeQueue<QueueItem> batch_queue_;
+  std::thread user_req_thread_[16];
+  std::atomic<bool> stop_;
+  Stats* global_stats_;
+  std::atomic<int> send_num_;
+  std::mutex mutex_;
+  std::atomic<int> total_num_;
+  SignatureVerifier* verifier_;
+  SignatureInfo sig_;
+  std::function<std::string()> data_func_;
+  std::future<bool> eval_ready_future_;
+  std::promise<bool> eval_ready_promise_;
+  std::atomic<bool> eval_started_;
+  std::atomic<int> fail_num_;
+  static const int response_set_size_ = 6000000;
+  std::map<int64_t, int> response_[response_set_size_];
+  std::mutex response_lock_[response_set_size_];
+  int replica_num_;
+  int id_;
+  int primary_;
+  std::atomic<int> local_id_;
+  std::atomic<int> sum_;
+};
+
+}  // namespace common
+}  // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/response_manager.cpp 
b/platform/consensus/ordering/common/framework/response_manager.cpp
new file mode 100644
index 00000000..ae6b55cc
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/response_manager.cpp
@@ -0,0 +1,242 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "platform/consensus/ordering/common/framework/response_manager.h"
+
+#include <glog/logging.h>
+
+#include "common/utils/utils.h"
+
+namespace resdb {
+namespace common {
+
+using namespace resdb::comm;
+
+ResponseManager::ResponseManager(const ResDBConfig& config,
+                                 ReplicaCommunicator* replica_communicator,
+                                 SignatureVerifier* verifier)
+    : config_(config),
+      replica_communicator_(replica_communicator),
+      batch_queue_("user request"),
+      verifier_(verifier) {
+  stop_ = false;
+  local_id_ = 1;
+
+  if (config_.GetPublicKeyCertificateInfo()
+              .public_key()
+              .public_key_info()
+              .type() == CertificateKeyInfo::CLIENT ||
+      config_.IsTestMode()) {
+    user_req_thread_ = std::thread(&ResponseManager::BatchProposeMsg, this);
+  }
+  global_stats_ = Stats::GetGlobalStats();
+  send_num_ = 0;
+}
+
+ResponseManager::~ResponseManager() {
+  stop_ = true;
+  if (user_req_thread_.joinable()) {
+    user_req_thread_.join();
+  }
+}
+
+// use system info
+int ResponseManager::GetPrimary() { return 1; }
+
+int ResponseManager::NewUserRequest(std::unique_ptr<Context> context,
+                                    std::unique_ptr<Request> user_request) {
+  context->client = nullptr;
+
+  std::unique_ptr<QueueItem> queue_item = std::make_unique<QueueItem>();
+  queue_item->context = std::move(context);
+  queue_item->user_request = std::move(user_request);
+
+  batch_queue_.Push(std::move(queue_item));
+  return 0;
+}
+
+// =================== response ========================
+// handle the response message. If receive f+1 commit messages, send back to 
the
+// caller.
+int ResponseManager::ProcessResponseMsg(std::unique_ptr<Context> context,
+                                        std::unique_ptr<Request> request) {
+  std::unique_ptr<Request> response;
+  // Add the response message, and use the call back to collect the received
+  // messages.
+  // The callback will be triggered if it received f+1 messages.
+  if (request->ret() == -2) {
+    LOG(ERROR) << "get response fail:" << request->ret();
+    send_num_--;
+    return 0;
+  }
+  CollectorResultCode ret =
+      AddResponseMsg(std::move(request), [&](const Request& request) {
+        response = std::make_unique<Request>(request);
+        return;
+      });
+
+  if (ret == CollectorResultCode::STATE_CHANGED) {
+    BatchUserResponse batch_response;
+    if (batch_response.ParseFromString(response->data())) {
+      SendResponseToClient(batch_response);
+    } else {
+      LOG(ERROR) << "parse response fail:";
+    }
+  }
+  return ret == CollectorResultCode::INVALID ? -2 : 0;
+}
+
+CollectorResultCode ResponseManager::AddResponseMsg(
+    std::unique_ptr<Request> request,
+    std::function<void(const Request&)> response_call_back) {
+  if (request == nullptr) {
+    return CollectorResultCode::INVALID;
+  }
+
+  int type = request->type();
+  uint64_t seq = request->seq();
+  bool done = false;
+  {
+    int idx = seq % response_set_size_;
+    std::unique_lock<std::mutex> lk(response_lock_[idx]);
+    if (response_[idx][seq] == -1) {
+      return CollectorResultCode::OK;
+    }
+    response_[idx][seq]++;
+    if (response_[idx][seq] >= config_.GetMinClientReceiveNum()) {
+      response_[idx][seq] = -1;
+      done = true;
+    }
+  }
+  if (done) {
+    response_call_back(*request);
+    return CollectorResultCode::STATE_CHANGED;
+  }
+  return CollectorResultCode::OK;
+}
+
+void ResponseManager::SendResponseToClient(
+    const BatchUserResponse& batch_response) {
+  uint64_t create_time = batch_response.createtime();
+  uint64_t local_id = batch_response.local_id();
+  if (create_time > 0) {
+    uint64_t run_time = GetCurrentTime() - create_time;
+    global_stats_->AddLatency(run_time);
+  } else {
+    LOG(ERROR) << "seq:" << local_id << " no resp";
+  }
+  send_num_--;
+}
+
+// =================== request ========================
+int ResponseManager::BatchProposeMsg() {
+  LOG(INFO) << "batch wait time:" << config_.ClientBatchWaitTimeMS()
+            << " batch num:" << config_.ClientBatchNum();
+  std::vector<std::unique_ptr<QueueItem>> batch_req;
+  while (!stop_) {
+    if (send_num_ > config_.GetMaxProcessTxn()) {
+      LOG(ERROR) << "send num too high, wait:" << send_num_;
+      usleep(100);
+      continue;
+    }
+    if (batch_req.size() < config_.ClientBatchNum()) {
+      std::unique_ptr<QueueItem> item =
+          batch_queue_.Pop(config_.ClientBatchWaitTimeMS());
+      if (item != nullptr) {
+        batch_req.push_back(std::move(item));
+        if (batch_req.size() < config_.ClientBatchNum()) {
+          continue;
+        }
+      }
+    }
+    if (batch_req.empty()) {
+      continue;
+    }
+    int ret = DoBatch(batch_req);
+    batch_req.clear();
+    if (ret != 0) {
+      Response response;
+      response.set_result(Response::ERROR);
+      for (size_t i = 0; i < batch_req.size(); ++i) {
+        if (batch_req[i]->context && batch_req[i]->context->client) {
+          int ret = batch_req[i]->context->client->SendRawMessage(response);
+          if (ret) {
+            LOG(ERROR) << "send resp" << response.DebugString()
+                       << " fail ret:" << ret;
+          }
+        }
+      }
+    }
+  }
+  return 0;
+}
+
+int ResponseManager::DoBatch(
+    const std::vector<std::unique_ptr<QueueItem>>& batch_req) {
+  auto new_request =
+      NewRequest(Request::TYPE_NEW_TXNS, Request(), 
config_.GetSelfInfo().id());
+  if (new_request == nullptr) {
+    return -2;
+  }
+  std::vector<std::unique_ptr<Context>> context_list;
+
+  BatchUserRequest batch_request;
+  for (size_t i = 0; i < batch_req.size(); ++i) {
+    BatchUserRequest::UserRequest* req = batch_request.add_user_requests();
+    *req->mutable_request() = *batch_req[i]->user_request.get();
+    *req->mutable_signature() = batch_req[i]->context->signature;
+    req->set_id(i);
+    context_list.push_back(std::move(batch_req[i]->context));
+  }
+
+  if (!config_.IsPerformanceRunning()) {
+    LOG(ERROR) << "add context list:" << new_request->seq()
+               << " list size:" << context_list.size();
+    batch_request.set_local_id(local_id_);
+  }
+  batch_request.set_createtime(GetCurrentTime());
+  std::string data;
+  batch_request.SerializeToString(&data);
+  if (verifier_) {
+    auto signature_or = verifier_->SignMessage(data);
+    if (!signature_or.ok()) {
+      LOG(ERROR) << "Sign message fail";
+      return -2;
+    }
+    *new_request->mutable_data_signature() = *signature_or;
+  }
+
+  batch_request.SerializeToString(new_request->mutable_data());
+  new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data()));
+  new_request->set_proxy_id(config_.GetSelfInfo().id());
+  replica_communicator_->SendMessage(*new_request, GetPrimary());
+  send_num_++;
+  LOG(INFO) << "send msg to primary:" << GetPrimary()
+            << " batch size:" << batch_req.size();
+  return 0;
+}
+
+} // namespace common
+}  // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/response_manager.h 
b/platform/consensus/ordering/common/framework/response_manager.h
new file mode 100644
index 00000000..1c704396
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/response_manager.h
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#pragma once
+
+#include "platform/config/resdb_config.h"
+#include "platform/consensus/ordering/common/framework/transaction_utils.h"
+#include "platform/networkstrate/replica_communicator.h"
+#include "platform/networkstrate/server_comm.h"
+#include "platform/statistic/stats.h"
+
+namespace resdb {
+namespace common  {
+
+class ResponseManager {
+ public:
+  ResponseManager(const ResDBConfig& config,
+                  ReplicaCommunicator* replica_communicator,
+                  SignatureVerifier* verifier);
+
+  ~ResponseManager();
+
+  std::vector<std::unique_ptr<Context>> FetchContextList(uint64_t id);
+
+  int NewUserRequest(std::unique_ptr<Context> context,
+                     std::unique_ptr<Request> user_request);
+
+  int ProcessResponseMsg(std::unique_ptr<Context> context,
+                         std::unique_ptr<Request> request);
+
+ private:
+  // Add response messages which will be sent back to the caller
+  // if there are f+1 same messages.
+  comm::CollectorResultCode AddResponseMsg(
+      std::unique_ptr<Request> request,
+      std::function<void(const Request&)> call_back);
+  void SendResponseToClient(const BatchUserResponse& batch_response);
+
+  struct QueueItem {
+    std::unique_ptr<Context> context;
+    std::unique_ptr<Request> user_request;
+  };
+  int DoBatch(const std::vector<std::unique_ptr<QueueItem>>& batch_req);
+  int BatchProposeMsg();
+  int GetPrimary();
+
+ private:
+  ResDBConfig config_;
+  ReplicaCommunicator* replica_communicator_;
+  LockFreeQueue<QueueItem> batch_queue_;
+  std::thread user_req_thread_;
+  std::atomic<bool> stop_;
+  uint64_t local_id_ = 0;
+  Stats* global_stats_;
+  std::atomic<int> send_num_;
+  SignatureVerifier* verifier_;
+  static const int response_set_size_ = 6000000;
+  std::map<int64_t, int> response_[response_set_size_];
+  std::mutex response_lock_[response_set_size_];
+};
+
+} // common
+}  // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/transaction_utils.cpp 
b/platform/consensus/ordering/common/framework/transaction_utils.cpp
new file mode 100644
index 00000000..08a8e544
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/transaction_utils.cpp
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "platform/consensus/ordering/common/framework/transaction_utils.h"
+
+namespace resdb {
+namespace comm {
+
+std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
+                                    int sender_id) {
+  auto new_request = std::make_unique<Request>(request);
+  new_request->set_type(type);
+  new_request->set_sender_id(sender_id);
+  return new_request;
+}
+
+std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
+                                    int sender_id, int region_id) {
+  auto new_request = std::make_unique<Request>(request);
+  new_request->set_type(type);
+  new_request->set_sender_id(sender_id);
+  new_request->mutable_region_info()->set_region_id(region_id);
+  return new_request;
+}
+
+}  // namespace comm
+}  // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/transaction_utils.h 
b/platform/consensus/ordering/common/framework/transaction_utils.h
new file mode 100644
index 00000000..3055cf44
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/transaction_utils.h
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#pragma once
+#include "platform/proto/replica_info.pb.h"
+#include "platform/proto/resdb.pb.h"
+
+namespace resdb {
+namespace comm {
+
+enum CollectorResultCode {
+  INVALID = -2,
+  OK = 0,
+  STATE_CHANGED = 1,
+};
+
+std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
+                                    int sender_id);
+
+std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
+                                    int sender_id, int region_info);
+}  // namespace comm
+}  // namespace resdb
diff --git a/platform/consensus/ordering/poe/algorithm/BUILD 
b/platform/consensus/ordering/poe/algorithm/BUILD
new file mode 100644
index 00000000..357f56d8
--- /dev/null
+++ b/platform/consensus/ordering/poe/algorithm/BUILD
@@ -0,0 +1,15 @@
+package(default_visibility = 
["//platform/consensus/ordering/poe:__subpackages__"])
+
+cc_library(
+    name = "poe",
+    srcs = ["poe.cpp"],
+    hdrs = ["poe.h"],
+    deps = [
+        "//platform/statistic:stats",
+        "//common:comm",
+        "//platform/consensus/ordering/poe/proto:proposal_cc_proto",
+        "//common/crypto:signature_verifier",
+        "//platform/consensus/ordering/common/algorithm:protocol_base",
+        "//platform/common/queue:lock_free_queue",
+    ],
+)
diff --git a/platform/consensus/ordering/poe/algorithm/poe.cpp 
b/platform/consensus/ordering/poe/algorithm/poe.cpp
new file mode 100644
index 00000000..8a9b3a31
--- /dev/null
+++ b/platform/consensus/ordering/poe/algorithm/poe.cpp
@@ -0,0 +1,75 @@
+#include "platform/consensus/ordering/poe/algorithm/poe.h"
+
+#include <glog/logging.h>
+
+#include "common/crypto/signature_verifier.h"
+#include "common/utils/utils.h"
+
+namespace resdb {
+namespace poe {
+
+PoE::PoE(int id, int f, int total_num, SignatureVerifier* verifier)
+    : ProtocolBase(id, f, total_num), verifier_(verifier) {
+
+  LOG(ERROR) << "get proposal graph";
+  id_ = id;
+  total_num_ = total_num;
+  f_ = f;
+  is_stop_ = false;
+  seq_ = 0;
+}
+
+PoE::~PoE() {
+  is_stop_ = true;
+}
+
+bool PoE::IsStop() { return is_stop_; }
+
+bool PoE::ReceiveTransaction(std::unique_ptr<Transaction> txn) {
+  // LOG(ERROR)<<"recv txn:";
+  txn->set_create_time(GetCurrentTime());
+  txn->set_seq(seq_++);
+  txn->set_proposer(id_);
+
+  Broadcast(MessageType::Propose, *txn);
+  return true;
+}
+
+bool PoE::ReceivePropose(std::unique_ptr<Transaction> txn) {
+  std::string hash = txn->hash();
+  int64_t seq = txn->seq();
+  int proposer = txn->proposer();
+  {
+    std::unique_lock<std::mutex> lk(mutex_);
+    data_[txn->hash()]=std::move(txn);
+  }
+
+  Proposal proposal;
+  proposal.set_hash(hash);
+  proposal.set_seq(seq);
+  proposal.set_proposer(id_);
+  Broadcast(MessageType::Prepare, proposal);
+  return true;
+}
+
+bool PoE::ReceivePrepare(std::unique_ptr<Proposal> proposal) {
+  std::unique_ptr<Transaction> txn = nullptr;
+  {
+    std::unique_lock<std::mutex> lk(mutex_);
+    received_[proposal->hash()].insert(proposal->proposer());
+    auto it = data_.find(proposal->hash());
+    if(it != data_.end()){
+      if(received_[proposal->hash()].size()>=2*f_+1){
+        txn = std::move(it->second);
+        data_.erase(it);
+      }
+    }
+  }
+  if(txn != nullptr){
+    commit_(*txn);
+  }
+  return true;
+}
+
+}  // namespace poe
+}  // namespace resdb
diff --git a/platform/consensus/ordering/poe/algorithm/poe.h 
b/platform/consensus/ordering/poe/algorithm/poe.h
new file mode 100644
index 00000000..20cc71a9
--- /dev/null
+++ b/platform/consensus/ordering/poe/algorithm/poe.h
@@ -0,0 +1,40 @@
+#pragma once
+
+#include <deque>
+#include <map>
+#include <queue>
+#include <thread>
+
+#include "platform/common/queue/lock_free_queue.h"
+#include "platform/consensus/ordering/common/algorithm/protocol_base.h"
+#include "platform/consensus/ordering/poe/proto/proposal.pb.h"
+#include "platform/statistic/stats.h"
+
+namespace resdb {
+namespace poe {
+
+class PoE: public common::ProtocolBase {
+ public:
+  PoE(int id, int f, int total_num, SignatureVerifier* verifier);
+  ~PoE();
+
+  bool ReceiveTransaction(std::unique_ptr<Transaction> txn);
+  bool ReceivePropose(std::unique_ptr<Transaction> txn);
+  bool ReceivePrepare(std::unique_ptr<Proposal> proposal);
+
+ private:
+  bool IsStop();
+
+ private:
+  std::mutex mutex_;
+  std::map<std::string, std::set<int32_t> > received_;
+  std::map<std::string, std::unique_ptr<Transaction> > data_;
+
+  int64_t seq_;
+ bool is_stop_;
+  SignatureVerifier* verifier_;
+  Stats* global_stats_;
+};
+
+}  // namespace cassandra
+}  // namespace resdb
diff --git a/platform/consensus/ordering/poe/framework/BUILD 
b/platform/consensus/ordering/poe/framework/BUILD
new file mode 100644
index 00000000..7030d2a0
--- /dev/null
+++ b/platform/consensus/ordering/poe/framework/BUILD
@@ -0,0 +1,16 @@
+package(default_visibility = ["//visibility:private"])
+
+cc_library(
+    name = "consensus",
+    srcs = ["consensus.cpp"],
+    hdrs = ["consensus.h"],
+    visibility = [
+        "//visibility:public",
+    ],
+    deps = [
+        "//common/utils",
+        "//platform/consensus/ordering/common/framework:consensus",
+        "//platform/consensus/ordering/poe/algorithm:poe",
+    ],
+)
+
diff --git a/platform/consensus/ordering/poe/framework/consensus.cpp 
b/platform/consensus/ordering/poe/framework/consensus.cpp
new file mode 100644
index 00000000..b401adaf
--- /dev/null
+++ b/platform/consensus/ordering/poe/framework/consensus.cpp
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "platform/consensus/ordering/poe/framework/consensus.h"
+
+#include <glog/logging.h>
+#include <unistd.h>
+
+#include "common/utils/utils.h"
+
+namespace resdb {
+namespace poe {
+
+Consensus::Consensus(const ResDBConfig& config,
+                     std::unique_ptr<TransactionManager> executor)
+    : common::Consensus(config, std::move(executor)){
+  int total_replicas = config_.GetReplicaNum();
+  int f = (total_replicas - 1) / 3;
+
+  Init();
+
+  start_ = 0;
+
+  if (config_.GetPublicKeyCertificateInfo()
+          .public_key()
+          .public_key_info()
+          .type() != CertificateKeyInfo::CLIENT) {
+    poe_ = std::make_unique<PoE>(
+        config_.GetSelfInfo().id(), f,
+                                   total_replicas, GetSignatureVerifier());
+    InitProtocol(poe_.get());
+  }
+}
+
+int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) {
+  if (request->user_type() == MessageType::Propose) {
+    std::unique_ptr<Transaction> txn = std::make_unique<Transaction>();
+    if (!txn->ParseFromString(request->data())) {
+      assert(1 == 0);
+      LOG(ERROR) << "parse proposal fail";
+      return -1;
+    }
+    poe_->ReceivePropose(std::move(txn));
+    return 0;
+  } else if (request->user_type() == MessageType::Prepare) {
+    std::unique_ptr<Proposal> proposal = std::make_unique<Proposal>();
+    if (!proposal->ParseFromString(request->data())) {
+      LOG(ERROR) << "parse proposal fail";
+      assert(1 == 0);
+      return -1;
+    }
+    poe_->ReceivePrepare(std::move(proposal));
+    return 0;
+  } 
+  return 0;
+}
+
+int Consensus::ProcessNewTransaction(std::unique_ptr<Request> request) {
+  std::unique_ptr<Transaction> txn = std::make_unique<Transaction>();
+  txn->set_data(request->data());
+  txn->set_hash(request->hash());
+  txn->set_proxy_id(request->proxy_id());
+  txn->set_uid(request->uid());
+  return poe_->ReceiveTransaction(std::move(txn));
+}
+
+int Consensus::CommitMsg(const google::protobuf::Message& msg) {
+  return CommitMsgInternal(dynamic_cast<const Transaction&>(msg));
+}
+
+int Consensus::CommitMsgInternal(const Transaction& txn) {
+  std::unique_ptr<Request> request = std::make_unique<Request>();
+  request->set_data(txn.data());
+  request->set_seq(txn.seq());
+  request->set_uid(txn.uid());
+  request->set_proxy_id(txn.proxy_id());
+
+  transaction_executor_->Commit(std::move(request));
+  return 0;
+}
+
+}  // namespace poe
+}  // namespace resdb
diff --git a/platform/consensus/ordering/poe/framework/consensus.h 
b/platform/consensus/ordering/poe/framework/consensus.h
new file mode 100644
index 00000000..72e56e18
--- /dev/null
+++ b/platform/consensus/ordering/poe/framework/consensus.h
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#pragma once
+
+#include "executor/common/transaction_manager.h"
+#include "platform/consensus/ordering/common/framework/consensus.h"
+#include "platform/consensus/ordering/poe/algorithm/poe.h"
+#include "platform/networkstrate/consensus_manager.h"
+
+namespace resdb {
+namespace poe {
+
+class Consensus : public common::Consensus {
+ public:
+  Consensus(const ResDBConfig& config,
+            std::unique_ptr<TransactionManager> transaction_manager);
+  virtual ~Consensus() = default;
+
+ private:
+  int ProcessCustomConsensus(std::unique_ptr<Request> request) override;
+  int ProcessNewTransaction(std::unique_ptr<Request> request) override;
+  int CommitMsg(const google::protobuf::Message& msg) override;
+  int CommitMsgInternal(const Transaction& txn);
+
+  int Prepare(const Transaction& txn);
+
+ protected:
+  std::unique_ptr<PoE> poe_;
+  Stats* global_stats_;
+  int64_t start_;
+  std::mutex mutex_;
+  int send_num_[200];
+};
+
+}  // namespace cassandra
+}  // namespace resdb
diff --git a/platform/consensus/ordering/poe/framework/consensus_test.cpp 
b/platform/consensus/ordering/poe/framework/consensus_test.cpp
new file mode 100644
index 00000000..2c8834a8
--- /dev/null
+++ b/platform/consensus/ordering/poe/framework/consensus_test.cpp
@@ -0,0 +1,179 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "platform/consensus/ordering/cassandra/framework/consensus.h"
+
+#include <glog/logging.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <future>
+
+#include "common/test/test_macros.h"
+#include "executor/common/mock_transaction_manager.h"
+#include "platform/config/resdb_config_utils.h"
+#include "platform/networkstrate/mock_replica_communicator.h"
+
+namespace resdb {
+namespace cassandra {
+namespace {
+
+using ::resdb::testing::EqualsProto;
+using ::testing::_;
+using ::testing::Invoke;
+using ::testing::Test;
+
+ResDBConfig GetConfig() {
+  ResDBConfig config({GenerateReplicaInfo(1, "127.0.0.1", 1234),
+                      GenerateReplicaInfo(2, "127.0.0.1", 1235),
+                      GenerateReplicaInfo(3, "127.0.0.1", 1236),
+                      GenerateReplicaInfo(4, "127.0.0.1", 1237)},
+                     GenerateReplicaInfo(1, "127.0.0.1", 1234));
+  return config;
+}
+
+class ConsensusTest : public Test {
+ public:
+  ConsensusTest() : config_(GetConfig()) {
+    auto transaction_manager =
+        std::make_unique<MockTransactionExecutorDataImpl>();
+    mock_transaction_manager_ = transaction_manager.get();
+    consensus_ =
+        std::make_unique<Consensus>(config_, std::move(transaction_manager));
+    consensus_->SetCommunicator(&replica_communicator_);
+  }
+
+  void AddTransaction(const std::string& data) {
+    auto request = std::make_unique<Request>();
+    request->set_type(Request::TYPE_NEW_TXNS);
+
+    Transaction txn;
+
+    BatchUserRequest batch_request;
+    auto req = batch_request.add_user_requests();
+    req->mutable_request()->set_data(data);
+
+    batch_request.set_local_id(1);
+    batch_request.SerializeToString(txn.mutable_data());
+
+    txn.SerializeToString(request->mutable_data());
+
+    EXPECT_EQ(consensus_->ConsensusCommit(nullptr, std::move(request)), 0);
+  }
+
+ protected:
+  ResDBConfig config_;
+  MockTransactionExecutorDataImpl* mock_transaction_manager_;
+  MockReplicaCommunicator replica_communicator_;
+  std::unique_ptr<TransactionManager> transaction_manager_;
+  std::unique_ptr<Consensus> consensus_;
+};
+
+TEST_F(ConsensusTest, NormalCase) {
+  std::promise<bool> commit_done;
+  std::future<bool> commit_done_future = commit_done.get_future();
+
+  EXPECT_CALL(replica_communicator_, BroadCast)
+      .WillRepeatedly(Invoke([&](const google::protobuf::Message& msg) {
+        Request request = *dynamic_cast<const Request*>(&msg);
+
+        if (request.user_type() == MessageType::NewProposal) {
+          LOG(ERROR) << "bc new proposal";
+          consensus_->ConsensusCommit(nullptr,
+                                      std::make_unique<Request>(request));
+          LOG(ERROR) << "recv proposal done";
+        }
+        if (request.user_type() == MessageType::Vote) {
+          LOG(ERROR) << "bc vote";
+
+          VoteMessage ack_msg;
+          assert(ack_msg.ParseFromString(request.data()));
+          for (int i = 1; i <= 3; ++i) {
+            ack_msg.set_proposer_id(i);
+            auto new_req = std::make_unique<Request>(request);
+            ack_msg.SerializeToString(new_req->mutable_data());
+
+            consensus_->ConsensusCommit(nullptr, std::move(new_req));
+          }
+        }
+        // LOG(ERROR)<<"bc type:"<<request->type()<<" user
+        // type:"<<request->user_type();
+        if (request.user_type() == MessageType::Prepare) {
+          LOG(ERROR) << "bc prepare";
+
+          VoteMessage ack_msg;
+          assert(ack_msg.ParseFromString(request.data()));
+          for (int i = 1; i <= 3; ++i) {
+            ack_msg.set_proposer_id(i);
+            auto new_req = std::make_unique<Request>(request);
+            ack_msg.SerializeToString(new_req->mutable_data());
+
+            consensus_->ConsensusCommit(nullptr, std::move(new_req));
+          }
+        }
+        if (request.user_type() == MessageType::Voteprep) {
+          LOG(ERROR) << "bc voterep:";
+
+          VoteMessage ack_msg;
+          assert(ack_msg.ParseFromString(request.data()));
+          for (int i = 1; i <= 3; ++i) {
+            ack_msg.set_proposer_id(i);
+            auto new_req = std::make_unique<Request>(request);
+            ack_msg.SerializeToString(new_req->mutable_data());
+            LOG(ERROR) << "new request type:" << new_req->user_type();
+
+            consensus_->ConsensusCommit(nullptr, std::move(new_req));
+          }
+        }
+        LOG(ERROR) << "done";
+        return 0;
+      }));
+
+  EXPECT_CALL(*mock_transaction_manager_, ExecuteData)
+      .WillOnce(Invoke([&](const std::string& msg) {
+        LOG(ERROR) << "execute txn:" << msg;
+        EXPECT_EQ(msg, "transaction1");
+        return nullptr;
+      }));
+
+  EXPECT_CALL(replica_communicator_, SendMessage(_, 0))
+      .WillRepeatedly(
+          Invoke([&](const google::protobuf::Message& msg, int64_t) {
+            Request request = *dynamic_cast<const Request*>(&msg);
+            if (request.type() == Request::TYPE_RESPONSE) {
+              LOG(ERROR) << "get response";
+              commit_done.set_value(true);
+            }
+            return;
+          }));
+
+  AddTransaction("transaction1");
+
+  commit_done_future.get();
+}
+
+}  // namespace
+}  // namespace cassandra
+}  // namespace resdb
diff --git a/platform/consensus/ordering/poe/proto/BUILD 
b/platform/consensus/ordering/poe/proto/BUILD
new file mode 100644
index 00000000..8088db09
--- /dev/null
+++ b/platform/consensus/ordering/poe/proto/BUILD
@@ -0,0 +1,16 @@
+package(default_visibility = 
["//platform/consensus/ordering/poe:__subpackages__"])
+
+load("@rules_cc//cc:defs.bzl", "cc_proto_library")
+load("@rules_proto//proto:defs.bzl", "proto_library")
+load("@rules_proto_grpc//python:defs.bzl", "python_proto_library")
+
+proto_library(
+    name = "proposal_proto",
+    srcs = ["proposal.proto"],
+    #visibility = ["//visibility:public"],
+)
+
+cc_proto_library(
+    name = "proposal_cc_proto",
+    deps = [":proposal_proto"],
+)
diff --git a/platform/consensus/ordering/poe/proto/proposal.proto 
b/platform/consensus/ordering/poe/proto/proposal.proto
new file mode 100644
index 00000000..8302752a
--- /dev/null
+++ b/platform/consensus/ordering/poe/proto/proposal.proto
@@ -0,0 +1,28 @@
+
+syntax = "proto3";
+
+package resdb.poe;
+
+message Transaction{
+  int32 id = 1;
+  bytes data = 2;
+  bytes hash = 3;
+  int32 proxy_id = 4;
+  int32 proposer = 5;
+  int64 uid = 6;
+  int64 create_time = 7;
+  int64 seq = 9;
+}
+
+message Proposal {
+  bytes hash = 1;
+  int32 proposer = 2;
+  int64 seq =3 ;
+}
+
+enum MessageType {
+  None = 0;
+  Propose = 1;
+  Prepare = 2;
+}
+
diff --git a/platform/networkstrate/replica_communicator.cpp 
b/platform/networkstrate/replica_communicator.cpp
index 4057e27d..7083fe6e 100644
--- a/platform/networkstrate/replica_communicator.cpp
+++ b/platform/networkstrate/replica_communicator.cpp
@@ -257,7 +257,7 @@ void ReplicaCommunicator::SendMessage(const 
google::protobuf::Message& message,
   }
 
   if (target_replica.ip().empty()) {
-    LOG(ERROR) << "no replica info";
+    LOG(ERROR) << "no replica info node:"<<node_id;
     return;
   }
 
diff --git a/platform/proto/resdb.proto b/platform/proto/resdb.proto
index 14ad976a..71ed4591 100644
--- a/platform/proto/resdb.proto
+++ b/platform/proto/resdb.proto
@@ -37,8 +37,9 @@ message Request {
         TYPE_VIEWCHANGE = 16;
         TYPE_NEWVIEW= 17;
         TYPE_CUSTOM_QUERY = 18;
+        TYPE_CUSTOM_CONSENSUS = 19;
 
-        NUM_OF_TYPE = 19; // the total number of types.
+        NUM_OF_TYPE = 20; // the total number of types.
                        // Used to create the collector.
     };
     int32 type = 1;
@@ -62,6 +63,12 @@ message Request {
     int32 primary_id = 17;
     repeated bytes hashs = 18;
     repeated uint64 seqs = 19;
+    int32 user_type = 20;
+    int64 user_seq = 21;
+    int64 queuing_time = 22;
+    int64 uid = 23;
+    int64 create_time = 24;
+    int64 commit_time = 25;
 }
 
 // The response message containing response
diff --git a/scripts/deploy/config/kv_performance_server.conf 
b/scripts/deploy/config/kv_performance_server.conf
index 498f5a98..af36aef8 100644
--- a/scripts/deploy/config/kv_performance_server.conf
+++ b/scripts/deploy/config/kv_performance_server.conf
@@ -1,8 +1,8 @@
 iplist=(
-172.31.23.110
-172.31.31.183
-172.31.22.246
-172.31.26.117
-172.31.21.196
+172.31.25.224
+172.31.20.228
+172.31.18.224
+172.31.16.230
+172.31.31.229
 )
 
diff --git a/scripts/deploy/config/poe.config b/scripts/deploy/config/poe.config
new file mode 100644
index 00000000..c5092a94
--- /dev/null
+++ b/scripts/deploy/config/poe.config
@@ -0,0 +1,10 @@
+{
+  "clientBatchNum": 100,
+  "enable_viewchange": false,
+  "recovery_enabled": false,
+  "max_client_complaint_num":10,
+  "max_process_txn": 32,
+  "worker_num": 2,
+  "input_worker_num": 1,
+  "output_worker_num": 10
+}
diff --git a/scripts/deploy/performance/pbft_performance.sh 
b/scripts/deploy/performance/pbft_performance.sh
new file mode 100755
index 00000000..68a890a5
--- /dev/null
+++ b/scripts/deploy/performance/pbft_performance.sh
@@ -0,0 +1,2 @@
+export server=//benchmark/protocols/pbft:kv_server_performance
+./performance/run_performance.sh $*
diff --git a/scripts/deploy/performance/poe_performance.sh 
b/scripts/deploy/performance/poe_performance.sh
new file mode 100755
index 00000000..02f76238
--- /dev/null
+++ b/scripts/deploy/performance/poe_performance.sh
@@ -0,0 +1,4 @@
+export server=//benchmark/protocols/poe:kv_server_performance
+export TEMPLATE_PATH=$PWD/config/poe.config
+
+./performance/run_performance.sh $*
diff --git a/scripts/deploy/performance/run_performance.sh 
b/scripts/deploy/performance/run_performance.sh
index 68bc9c26..49afa373 100755
--- a/scripts/deploy/performance/run_performance.sh
+++ b/scripts/deploy/performance/run_performance.sh
@@ -1,5 +1,3 @@
-export server=//benchmark/protocols/pbft:kv_server_performance
-
 ./script/deploy.sh $1
 
 . ./script/load_config.sh $1

Reply via email to