This is an automated email from the ASF dual-hosted git repository.
junchao pushed a commit to branch poe_merge
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/poe_merge by this push:
new 3b587004 add poe
3b587004 is described below
commit 3b5870042b541b3fe71954118a9c383ccea46c68
Author: cjcchen <[email protected]>
AuthorDate: Mon Apr 8 11:57:12 2024 +0000
add poe
---
benchmark/protocols/poe/BUILD | 16 ++
benchmark/protocols/poe/kv_server_performance.cpp | 89 +++++++
.../consensus/execution/transaction_executor.cpp | 1 +
platform/consensus/ordering/common/algorithm/BUILD | 12 +
.../ordering/common/algorithm/protocol_base.cpp | 53 ++++
.../ordering/common/algorithm/protocol_base.h | 64 +++++
platform/consensus/ordering/common/framework/BUILD | 49 ++++
.../ordering/common/framework/consensus.cpp | 181 +++++++++++++
.../ordering/common/framework/consensus.h | 83 ++++++
.../common/framework/performance_manager.cpp | 289 +++++++++++++++++++++
.../common/framework/performance_manager.h | 103 ++++++++
.../ordering/common/framework/response_manager.cpp | 242 +++++++++++++++++
.../ordering/common/framework/response_manager.h | 85 ++++++
.../common/framework/transaction_utils.cpp | 49 ++++
.../ordering/common/framework/transaction_utils.h | 45 ++++
platform/consensus/ordering/poe/algorithm/BUILD | 15 ++
platform/consensus/ordering/poe/algorithm/poe.cpp | 75 ++++++
platform/consensus/ordering/poe/algorithm/poe.h | 40 +++
platform/consensus/ordering/poe/framework/BUILD | 16 ++
.../consensus/ordering/poe/framework/consensus.cpp | 105 ++++++++
.../consensus/ordering/poe/framework/consensus.h | 59 +++++
.../ordering/poe/framework/consensus_test.cpp | 179 +++++++++++++
platform/consensus/ordering/poe/proto/BUILD | 16 ++
.../consensus/ordering/poe/proto/proposal.proto | 28 ++
platform/networkstrate/replica_communicator.cpp | 2 +-
platform/proto/resdb.proto | 9 +-
scripts/deploy/config/kv_performance_server.conf | 10 +-
scripts/deploy/config/poe.config | 10 +
scripts/deploy/performance/pbft_performance.sh | 2 +
scripts/deploy/performance/poe_performance.sh | 4 +
scripts/deploy/performance/run_performance.sh | 2 -
31 files changed, 1924 insertions(+), 9 deletions(-)
diff --git a/benchmark/protocols/poe/BUILD b/benchmark/protocols/poe/BUILD
new file mode 100644
index 00000000..e7bbde3b
--- /dev/null
+++ b/benchmark/protocols/poe/BUILD
@@ -0,0 +1,16 @@
+package(default_visibility = ["//visibility:private"])
+
+load("@bazel_skylib//rules:common_settings.bzl", "bool_flag")
+
+cc_binary(
+ name = "kv_server_performance",
+ srcs = ["kv_server_performance.cpp"],
+ deps = [
+ "//chain/storage:memory_db",
+ "//executor/kv:kv_executor",
+ "//platform/config:resdb_config_utils",
+ "//platform/consensus/ordering/poe/framework:consensus",
+ "//service/utils:server_factory",
+ ],
+)
+
diff --git a/benchmark/protocols/poe/kv_server_performance.cpp
b/benchmark/protocols/poe/kv_server_performance.cpp
new file mode 100644
index 00000000..fa13e9a9
--- /dev/null
+++ b/benchmark/protocols/poe/kv_server_performance.cpp
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include <glog/logging.h>
+
+#include "chain/storage/memory_db.h"
+#include "executor/kv/kv_executor.h"
+#include "platform/config/resdb_config_utils.h"
+#include "platform/consensus/ordering/poe/framework/consensus.h"
+#include "platform/networkstrate/service_network.h"
+#include "platform/statistic/stats.h"
+#include "proto/kv/kv.pb.h"
+
+using namespace resdb;
+using namespace resdb::poe;
+using namespace resdb::storage;
+
+void ShowUsage() {
+ printf("<config> <private_key> <cert_file> [logging_dir]\n");
+}
+
+std::string GetRandomKey() {
+ int num1 = rand() % 10;
+ int num2 = rand() % 10;
+ return std::to_string(num1) + std::to_string(num2);
+}
+
+int main(int argc, char** argv) {
+ if (argc < 3) {
+ ShowUsage();
+ exit(0);
+ }
+
+ // google::InitGoogleLogging(argv[0]);
+ // FLAGS_minloglevel = google::GLOG_WARNING;
+
+ char* config_file = argv[1];
+ char* private_key_file = argv[2];
+ char* cert_file = argv[3];
+
+ if (argc >= 5) {
+ auto monitor_port = Stats::GetGlobalStats(5);
+ monitor_port->SetPrometheus(argv[4]);
+ }
+
+ std::unique_ptr<ResDBConfig> config =
+ GenerateResDBConfig(config_file, private_key_file, cert_file);
+
+ config->RunningPerformance(true);
+ ResConfigData config_data = config->GetConfigData();
+
+ auto performance_consens = std::make_unique<Consensus>(
+ *config, std::make_unique<KVExecutor>(std::make_unique<MemoryDB>()));
+ performance_consens->SetupPerformanceDataFunc([]() {
+ KVRequest request;
+ request.set_cmd(KVRequest::SET);
+ request.set_key(GetRandomKey());
+ request.set_value("helloword");
+ std::string request_data;
+ request.SerializeToString(&request_data);
+ return request_data;
+ });
+
+ auto server =
+ std::make_unique<ServiceNetwork>(*config,
std::move(performance_consens));
+ server->Run();
+}
diff --git a/platform/consensus/execution/transaction_executor.cpp
b/platform/consensus/execution/transaction_executor.cpp
index 59779b22..fd24da3a 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -231,6 +231,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
response = std::make_unique<BatchUserResponse>();
}
+ response->set_proxy_id(batch_request.proxy_id());
response->set_createtime(batch_request.createtime());
response->set_local_id(batch_request.local_id());
response->set_hash(batch_request.hash());
diff --git a/platform/consensus/ordering/common/algorithm/BUILD
b/platform/consensus/ordering/common/algorithm/BUILD
new file mode 100644
index 00000000..9abd8715
--- /dev/null
+++ b/platform/consensus/ordering/common/algorithm/BUILD
@@ -0,0 +1,12 @@
+package(default_visibility = ["//platform/consensus/ordering:__subpackages__"])
+
+cc_library(
+ name = "protocol_base",
+ srcs = ["protocol_base.cpp"],
+ hdrs = ["protocol_base.h"],
+ deps = [
+ "//common:comm",
+ "//common/crypto:signature_verifier",
+ ],
+)
+
diff --git a/platform/consensus/ordering/common/algorithm/protocol_base.cpp
b/platform/consensus/ordering/common/algorithm/protocol_base.cpp
new file mode 100644
index 00000000..3c6c2fc3
--- /dev/null
+++ b/platform/consensus/ordering/common/algorithm/protocol_base.cpp
@@ -0,0 +1,53 @@
+#include "platform/consensus/ordering/common/algorithm/protocol_base.h"
+
+#include <glog/logging.h>
+
+namespace resdb {
+namespace common {
+
+ProtocolBase::ProtocolBase(
+ int id,
+ int f,
+ int total_num,
+ SingleCallFuncType single_call,
+ BroadcastCallFuncType broadcast_call,
+ CommitFuncType commit) :
+ id_(id),
+ f_(f),
+ total_num_(total_num),
+ single_call_(single_call),
+ broadcast_call_(broadcast_call),
+ commit_(commit) {
+ stop_ = false;
+}
+
+ProtocolBase::ProtocolBase( int id, int f, int total_num) : ProtocolBase(id,
f, total_num, nullptr, nullptr, nullptr){
+
+}
+
+ProtocolBase::~ProtocolBase() {
+ Stop();
+}
+
+void ProtocolBase::Stop(){
+ stop_ = true;
+}
+
+bool ProtocolBase::IsStop(){
+ return stop_;
+}
+
+int ProtocolBase::SendMessage(int msg_type, const google::protobuf::Message&
msg, int node_id) {
+ return single_call_(msg_type, msg, node_id);
+}
+
+int ProtocolBase::Broadcast(int msg_type, const google::protobuf::Message&
msg) {
+ return broadcast_call_(msg_type, msg);
+}
+
+int ProtocolBase::Commit(const google::protobuf::Message& msg) {
+ return commit_(msg);
+}
+
+} // namespace protocol
+} // namespace resdb
diff --git a/platform/consensus/ordering/common/algorithm/protocol_base.h
b/platform/consensus/ordering/common/algorithm/protocol_base.h
new file mode 100644
index 00000000..a93be822
--- /dev/null
+++ b/platform/consensus/ordering/common/algorithm/protocol_base.h
@@ -0,0 +1,64 @@
+#pragma once
+
+#include <functional>
+#include <google/protobuf/message.h>
+#include "common/crypto/signature_verifier.h"
+
+namespace resdb {
+namespace common {
+
+class ProtocolBase {
+ public:
+ typedef std::function<int(int, const google::protobuf::Message& msg, int)>
SingleCallFuncType;
+ typedef std::function<int(int, const google::protobuf::Message& msg)>
BroadcastCallFuncType;
+ typedef std::function<int(const google::protobuf::Message& msg)>
CommitFuncType;
+
+ ProtocolBase(
+ int id,
+ int f,
+ int total_num,
+ SingleCallFuncType single_call,
+ BroadcastCallFuncType broadcast_call,
+ CommitFuncType commit
+ );
+
+ ProtocolBase( int id, int f, int total_num);
+
+
+ virtual ~ProtocolBase();
+
+ void Stop();
+
+ inline
+ void SetSingleCallFunc(SingleCallFuncType single_call) { single_call_ =
single_call; }
+
+ inline
+ void SetBroadcastCallFunc(BroadcastCallFuncType broadcast_call) {
broadcast_call_ = broadcast_call; }
+
+ inline
+ void SetCommitFunc(CommitFuncType commit_func) { commit_ = commit_func; }
+
+ inline
+ void SetSignatureVerifier(SignatureVerifier* verifier) { verifier_ =
verifier;}
+
+ protected:
+ int SendMessage(int msg_type, const google::protobuf::Message& msg, int
node_id);
+ int Broadcast(int msg_type, const google::protobuf::Message& msg);
+ int Commit(const google::protobuf::Message& msg);
+
+ bool IsStop();
+
+ protected:
+ int id_;
+ int f_;
+ int total_num_;
+ std::function<int(int, const google::protobuf::Message& msg, int)>
single_call_;
+ std::function<int(int, const google::protobuf::Message& msg)>
broadcast_call_;
+ std::function<int(const google::protobuf::Message& msg)> commit_;
+ std::atomic<bool> stop_;
+
+ SignatureVerifier* verifier_;
+};
+
+} // namespace protocol
+} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/BUILD
b/platform/consensus/ordering/common/framework/BUILD
new file mode 100644
index 00000000..82e03a0f
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/BUILD
@@ -0,0 +1,49 @@
+package(default_visibility = ["//platform/consensus/ordering:__subpackages__"])
+
+cc_library(
+ name = "consensus",
+ srcs = ["consensus.cpp"],
+ hdrs = ["consensus.h"],
+ deps = [
+ ":performance_manager",
+ ":response_manager",
+ "//common/utils",
+ "//executor/common:transaction_manager",
+ "//platform/consensus/execution:transaction_executor",
+ "//platform/consensus/ordering/common/algorithm:protocol_base",
+ "//platform/networkstrate:consensus_manager",
+ ],
+)
+
+cc_library(
+ name = "performance_manager",
+ srcs = ["performance_manager.cpp"],
+ hdrs = ["performance_manager.h"],
+ deps = [
+ ":transaction_utils",
+ "//platform/networkstrate:replica_communicator",
+ "//platform/networkstrate:server_comm",
+ ],
+)
+
+
+cc_library(
+ name = "response_manager",
+ srcs = ["response_manager.cpp"],
+ hdrs = ["response_manager.h"],
+ deps = [
+ ":transaction_utils",
+ "//platform/networkstrate:replica_communicator",
+ "//platform/networkstrate:server_comm",
+ ],
+)
+
+cc_library(
+ name = "transaction_utils",
+ srcs = ["transaction_utils.cpp"],
+ hdrs = ["transaction_utils.h"],
+ visibility = ["//visibility:public"],
+ deps = [
+ "//platform/proto:resdb_cc_proto",
+ ],
+)
diff --git a/platform/consensus/ordering/common/framework/consensus.cpp
b/platform/consensus/ordering/common/framework/consensus.cpp
new file mode 100644
index 00000000..683bce29
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/consensus.cpp
@@ -0,0 +1,181 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "platform/consensus/ordering/common/framework/consensus.h"
+
+#include <glog/logging.h>
+#include <unistd.h>
+
+#include "common/utils/utils.h"
+
+namespace resdb {
+namespace common {
+
+Consensus::Consensus(const ResDBConfig& config,
+ std::unique_ptr<TransactionManager> executor)
+ : ConsensusManager(config),
+ replica_communicator_(GetBroadCastClient()),
+ transaction_executor_(std::make_unique<TransactionExecutor>(
+ config,
+ [&](std::unique_ptr<Request> request,
+ std::unique_ptr<BatchUserResponse> resp_msg) {
+ ResponseMsg(*resp_msg);
+ },
+ nullptr, std::move(executor))) {
+ LOG(INFO) << "is running is performance mode:"
+ << config_.IsPerformanceRunning();
+ is_stop_ = false;
+ global_stats_ = Stats::GetGlobalStats();
+}
+
+void Consensus::Init(){
+ if(performance_manager_ == nullptr){
+ performance_manager_ =
+ config_.IsPerformanceRunning()
+ ? std::make_unique<PerformanceManager>(
+ config_, GetBroadCastClient(), GetSignatureVerifier())
+ : nullptr;
+ }
+
+ if(response_manager_ == nullptr){
+ response_manager_ =
+ !config_.IsPerformanceRunning()
+ ? std::make_unique<ResponseManager>(config_, GetBroadCastClient(),
+ GetSignatureVerifier())
+ : nullptr;
+ }
+}
+
+void Consensus::InitProtocol(ProtocolBase * protocol){
+ protocol->SetSingleCallFunc(
+ [&](int type, const google::protobuf::Message& msg, int node_id) {
+ return SendMsg(type, msg, node_id);
+ });
+
+ protocol->SetBroadcastCallFunc(
+ [&](int type, const google::protobuf::Message& msg) {
+ return Broadcast(type, msg);
+ });
+
+ protocol->SetCommitFunc(
+ [&](const google::protobuf::Message& msg) {
+ return CommitMsg(msg);
+ });
+}
+
+Consensus::~Consensus(){
+ is_stop_ = true;
+}
+
+void Consensus::SetPerformanceManager(std::unique_ptr<PerformanceManager>
performance_manager){
+ performance_manager_ = std::move(performance_manager);
+}
+
+bool Consensus::IsStop(){
+ return is_stop_;
+}
+
+void Consensus::SetupPerformanceDataFunc(std::function<std::string()> func) {
+ performance_manager_->SetDataFunc(func);
+}
+
+void Consensus::SetCommunicator(ReplicaCommunicator* replica_communicator) {
+ replica_communicator_ = replica_communicator;
+}
+
+int Consensus::Broadcast(int type, const google::protobuf::Message& msg) {
+ Request request;
+ msg.SerializeToString(request.mutable_data());
+ request.set_type(Request::TYPE_CUSTOM_CONSENSUS);
+ request.set_user_type(type);
+ request.set_sender_id(config_.GetSelfInfo().id());
+
+ replica_communicator_->BroadCast(request);
+ return 0;
+}
+
+int Consensus::SendMsg(int type, const google::protobuf::Message& msg,
+ int node_id) {
+ Request request;
+ msg.SerializeToString(request.mutable_data());
+ request.set_type(Request::TYPE_CUSTOM_CONSENSUS);
+ request.set_user_type(type);
+ request.set_sender_id(config_.GetSelfInfo().id());
+ replica_communicator_->SendMessage(request, node_id);
+ return 0;
+}
+
+std::vector<ReplicaInfo> Consensus::GetReplicas() {
+ return config_.GetReplicaInfos();
+}
+
+int Consensus::CommitMsg(const google::protobuf::Message &txn) {
+ return 0;
+}
+
+// The implementation of PBFT.
+int Consensus::ConsensusCommit(std::unique_ptr<Context> context,
+ std::unique_ptr<Request> request) {
+ switch (request->type()) {
+ case Request::TYPE_CLIENT_REQUEST:
+ if (config_.IsPerformanceRunning()) {
+ return performance_manager_->StartEval();
+ }
+ case Request::TYPE_RESPONSE:
+ if (config_.IsPerformanceRunning()) {
+ return performance_manager_->ProcessResponseMsg(std::move(context),
+ std::move(request));
+ }
+ case Request::TYPE_NEW_TXNS: {
+ return ProcessNewTransaction(std::move(request));
+ }
+ case Request::TYPE_CUSTOM_CONSENSUS: {
+ return ProcessCustomConsensus(std::move(request));
+ }
+ }
+ return 0;
+}
+
+int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) {
+ return 0;
+}
+
+int Consensus::ProcessNewTransaction(std::unique_ptr<Request> request) {
+ return 0;
+}
+
+int Consensus::ResponseMsg(const BatchUserResponse& batch_resp) {
+ Request request;
+ request.set_seq(batch_resp.seq());
+ request.set_type(Request::TYPE_RESPONSE);
+ request.set_sender_id(config_.GetSelfInfo().id());
+ request.set_proxy_id(batch_resp.proxy_id());
+ batch_resp.SerializeToString(request.mutable_data());
+ replica_communicator_->SendMessage(request, request.proxy_id());
+ return 0;
+}
+
+} // namespace common
+} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/consensus.h
b/platform/consensus/ordering/common/framework/consensus.h
new file mode 100644
index 00000000..881dc72b
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/consensus.h
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#pragma once
+
+#include "executor/common/transaction_manager.h"
+#include "platform/consensus/execution/transaction_executor.h"
+#include "platform/consensus/ordering/common/algorithm/protocol_base.h"
+#include "platform/consensus/ordering/common/framework/performance_manager.h"
+#include "platform/consensus/ordering/common/framework/response_manager.h"
+#include "platform/networkstrate/consensus_manager.h"
+
+namespace resdb {
+namespace common {
+
+class Consensus : public ConsensusManager {
+ public:
+ Consensus(const ResDBConfig& config,
+ std::unique_ptr<TransactionManager> transaction_manager);
+ virtual ~Consensus();
+
+ int ConsensusCommit(std::unique_ptr<Context> context,
+ std::unique_ptr<Request> request) override;
+ std::vector<ReplicaInfo> GetReplicas() override;
+
+ void SetupPerformanceDataFunc(std::function<std::string()> func);
+
+ void SetCommunicator(ReplicaCommunicator* replica_communicator);
+
+ void InitProtocol(ProtocolBase * protocol);
+
+ protected:
+ virtual int ProcessCustomConsensus(std::unique_ptr<Request> request);
+ virtual int ProcessNewTransaction(std::unique_ptr<Request> request);
+ virtual int CommitMsg(const google::protobuf::Message& msg);
+
+ protected:
+ int SendMsg(int type, const google::protobuf::Message& msg, int node_id);
+ int Broadcast(int type, const google::protobuf::Message& msg);
+ int ResponseMsg(const BatchUserResponse& batch_resp);
+ void AsyncSend();
+ bool IsStop();
+
+ protected:
+ void Init();
+ void SetPerformanceManager(std::unique_ptr<PerformanceManager>
performance_manger);
+
+ protected:
+ ReplicaCommunicator* replica_communicator_;
+ std::unique_ptr<PerformanceManager> performance_manager_;
+ std::unique_ptr<ResponseManager> response_manager_;
+ std::unique_ptr<TransactionExecutor> transaction_executor_;
+ Stats* global_stats_;
+
+ LockFreeQueue<BatchUserResponse> resp_queue_;
+ std::vector<std::thread> send_thread_;
+ bool is_stop_;
+};
+
+} // namespace common
+} // namespace resdb
diff --git
a/platform/consensus/ordering/common/framework/performance_manager.cpp
b/platform/consensus/ordering/common/framework/performance_manager.cpp
new file mode 100644
index 00000000..089f3bd5
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/performance_manager.cpp
@@ -0,0 +1,289 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "platform/consensus/ordering/common/framework/performance_manager.h"
+
+#include <glog/logging.h>
+
+#include "common/utils/utils.h"
+
+namespace resdb {
+namespace common {
+
+using comm::CollectorResultCode;
+
+PerformanceManager::PerformanceManager(
+ const ResDBConfig& config, ReplicaCommunicator* replica_communicator,
+ SignatureVerifier* verifier)
+ : config_(config),
+ replica_communicator_(replica_communicator),
+ batch_queue_("user request"),
+ verifier_(verifier) {
+ stop_ = false;
+ eval_started_ = false;
+ eval_ready_future_ = eval_ready_promise_.get_future();
+ if (config_.GetPublicKeyCertificateInfo()
+ .public_key()
+ .public_key_info()
+ .type() == CertificateKeyInfo::CLIENT) {
+ for (int i = 0; i < 1; ++i) {
+ user_req_thread_[i] =
+ std::thread(&PerformanceManager::BatchProposeMsg, this);
+ }
+ }
+ global_stats_ = Stats::GetGlobalStats();
+ send_num_ = 0;
+ total_num_ = 0;
+ replica_num_ = config_.GetReplicaNum();
+ id_ = config_.GetSelfInfo().id();
+ primary_ = id_ % replica_num_;
+ if (primary_ == 0) primary_ = replica_num_;
+ local_id_ = 1;
+ sum_ = 0;
+}
+
+PerformanceManager::~PerformanceManager() {
+ stop_ = true;
+ for (int i = 0; i < 16; ++i) {
+ if (user_req_thread_[i].joinable()) {
+ user_req_thread_[i].join();
+ }
+ }
+}
+
+int PerformanceManager::GetPrimary() { return primary_; }
+
+std::unique_ptr<Request> PerformanceManager::GenerateUserRequest() {
+ std::unique_ptr<Request> request = std::make_unique<Request>();
+ request->set_data(data_func_());
+ return request;
+}
+
+void PerformanceManager::SetDataFunc(std::function<std::string()> func) {
+ data_func_ = std::move(func);
+}
+
+int PerformanceManager::StartEval() {
+ if (eval_started_) {
+ return 0;
+ }
+ eval_started_ = true;
+ for (int i = 0; i < 100000000; ++i) {
+ // for (int i = 0; i < 60000000000; ++i) {
+ std::unique_ptr<QueueItem> queue_item = std::make_unique<QueueItem>();
+ queue_item->context = nullptr;
+ queue_item->user_request = GenerateUserRequest();
+ batch_queue_.Push(std::move(queue_item));
+ if (i == 2000000) {
+ eval_ready_promise_.set_value(true);
+ }
+ }
+ LOG(WARNING) << "start eval done";
+ return 0;
+}
+
+// =================== response ========================
+// handle the response message. If receive f+1 commit messages, send back to
the
+// user.
+int PerformanceManager::ProcessResponseMsg(std::unique_ptr<Context> context,
+ std::unique_ptr<Request> request) {
+ std::unique_ptr<Request> response;
+ // Add the response message, and use the call back to collect the received
+ // messages.
+ // The callback will be triggered if it received f+1 messages.
+ if (request->ret() == -2) {
+ // LOG(INFO) << "get response fail:" << request->ret();
+ send_num_--;
+ return 0;
+ }
+
+ //LOG(INFO) << "get response:" << request->seq() << "
sender:"<<request->sender_id();
+ std::unique_ptr<BatchUserResponse> batch_response = nullptr;
+ CollectorResultCode ret =
+ AddResponseMsg(std::move(request),
[&](std::unique_ptr<BatchUserResponse> request) {
+ batch_response = std::move(request);
+ return;
+ });
+
+ if (ret == CollectorResultCode::STATE_CHANGED) {
+ assert(batch_response);
+ SendResponseToClient(*batch_response);
+ }
+ return ret == CollectorResultCode::INVALID ? -2 : 0;
+}
+
+CollectorResultCode PerformanceManager::AddResponseMsg(
+ std::unique_ptr<Request> request,
+ std::function<void(std::unique_ptr<BatchUserResponse>)>
response_call_back) {
+ if (request == nullptr) {
+ return CollectorResultCode::INVALID;
+ }
+
+ //uint64_t seq = request->seq();
+
+ std::unique_ptr<BatchUserResponse> batch_response =
std::make_unique<BatchUserResponse>();
+ if (!batch_response->ParseFromString(request->data())) {
+ LOG(ERROR) << "parse response fail:"<<request->data().size()
+ <<" seq:"<<request->seq(); return CollectorResultCode::INVALID;
+ }
+
+ uint64_t seq = batch_response->local_id();
+ //LOG(ERROR)<<"receive seq:"<<seq;
+
+ bool done = false;
+ {
+ int idx = seq % response_set_size_;
+ std::unique_lock<std::mutex> lk(response_lock_[idx]);
+ if (response_[idx].find(seq) == response_[idx].end()) {
+ //LOG(ERROR)<<"has done local seq:"<<seq<<" global seq:"<<request->seq();
+ return CollectorResultCode::OK;
+ }
+ response_[idx][seq]++;
+ //LOG(ERROR)<<"get seq :"<<request->seq()<<" local id:"<<seq<<"
num:"<<response_[idx][seq]<<" send:"<<send_num_;
+ if (response_[idx][seq] >= config_.GetMinClientReceiveNum()) {
+ //LOG(ERROR)<<"get seq :"<<request->seq()<<" local id:"<<seq<<"
num:"<<response_[idx][seq]<<" done:"<<send_num_;
+ response_[idx].erase(response_[idx].find(seq));
+ done = true;
+ }
+ }
+ if (done) {
+ response_call_back(std::move(batch_response));
+ return CollectorResultCode::STATE_CHANGED;
+ }
+ return CollectorResultCode::OK;
+}
+
+void PerformanceManager::SendResponseToClient(
+ const BatchUserResponse& batch_response) {
+ uint64_t create_time = batch_response.createtime();
+ if (create_time > 0) {
+ uint64_t run_time = GetCurrentTime() - create_time;
+ LOG(ERROR)<<"receive current:"<<GetCurrentTime()<<" create
time:"<<create_time<<" run time:"<<run_time<<" local
id:"<<batch_response.local_id();
+ global_stats_->AddLatency(run_time);
+ } else {
+ }
+ //send_num_-=10;
+ send_num_--;
+}
+
+// =================== request ========================
+int PerformanceManager::BatchProposeMsg() {
+ LOG(WARNING) << "batch wait time:" << config_.ClientBatchWaitTimeMS()
+ << " batch num:" << config_.ClientBatchNum()
+ << " max txn:" << config_.GetMaxProcessTxn();
+ std::vector<std::unique_ptr<QueueItem>> batch_req;
+ eval_ready_future_.get();
+ bool start = false;
+ while (!stop_) {
+ if (send_num_ > config_.GetMaxProcessTxn()) {
+ // LOG(ERROR)<<"wait send num:"<<send_num_;
+ usleep(100000);
+ continue;
+ }
+ if (batch_req.size() < config_.ClientBatchNum()) {
+ std::unique_ptr<QueueItem> item =
+ batch_queue_.Pop(config_.ClientBatchWaitTimeMS());
+ if (item == nullptr) {
+ if(start){
+ LOG(ERROR)<<"no data";
+ }
+ continue;
+ }
+ batch_req.push_back(std::move(item));
+ if (batch_req.size() < config_.ClientBatchNum()) {
+ continue;
+ }
+ }
+ start = true;
+ for(int i = 0; i < 1;++i){
+ int ret = DoBatch(batch_req);
+ }
+ batch_req.clear();
+ }
+ return 0;
+}
+
+int PerformanceManager::DoBatch(
+ const std::vector<std::unique_ptr<QueueItem>>& batch_req) {
+ auto new_request = comm::NewRequest(Request::TYPE_NEW_TXNS, Request(),
+ config_.GetSelfInfo().id());
+ if (new_request == nullptr) {
+ return -2;
+ }
+
+ BatchUserRequest batch_request;
+ for (size_t i = 0; i < batch_req.size(); ++i) {
+ BatchUserRequest::UserRequest* req = batch_request.add_user_requests();
+ *req->mutable_request() = *batch_req[i]->user_request.get();
+ req->set_id(i);
+ }
+
+ batch_request.set_local_id(local_id_++);
+
+ {
+ int idx = batch_request.local_id() % response_set_size_;
+ std::unique_lock<std::mutex> lk(response_lock_[idx]);
+ response_[idx][batch_request.local_id()]++;
+ }
+
+ batch_request.set_proxy_id(config_.GetSelfInfo().id());
+ batch_request.set_createtime(GetCurrentTime());
+ batch_request.SerializeToString(new_request->mutable_data());
+ if (verifier_) {
+ auto signature_or = verifier_->SignMessage(new_request->data());
+ if (!signature_or.ok()) {
+ LOG(ERROR) << "Sign message fail";
+ return -2;
+ }
+ *new_request->mutable_data_signature() = *signature_or;
+ }
+
+ new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data()));
+ new_request->set_proxy_id(config_.GetSelfInfo().id());
+ new_request->set_user_seq(batch_request.local_id());
+
+ SendMessage(*new_request);
+
+ global_stats_->BroadCastMsg();
+ send_num_++;
+ sum_ += batch_req.size();
+ //LOG(ERROR)<<"send num:"<<send_num_<<" total num:"<<total_num_<<"
sum:"<<sum_<<" to:"<<GetPrimary();
+ if (total_num_++ == 1000000) {
+ stop_ = true;
+ LOG(WARNING) << "total num is done:" << total_num_;
+ }
+ if (total_num_ % 1000 == 0) {
+ LOG(WARNING) << "total num is :" << total_num_;
+ }
+ global_stats_->IncClientCall();
+ return 0;
+}
+
+void PerformanceManager::SendMessage(const Request& request){
+ replica_communicator_->SendMessage(request, GetPrimary());
+}
+
+} // namespace common
+} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/performance_manager.h
b/platform/consensus/ordering/common/framework/performance_manager.h
new file mode 100644
index 00000000..5a874baa
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/performance_manager.h
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#pragma once
+
+#include <future>
+
+#include "platform/config/resdb_config.h"
+#include "platform/consensus/ordering/common/framework/transaction_utils.h"
+#include "platform/networkstrate/replica_communicator.h"
+#include "platform/networkstrate/server_comm.h"
+#include "platform/statistic/stats.h"
+
+namespace resdb {
+namespace common {
+
+class PerformanceManager {
+ public:
+ PerformanceManager(const ResDBConfig& config,
+ ReplicaCommunicator* replica_communicator,
+ SignatureVerifier* verifier);
+
+ virtual ~PerformanceManager();
+
+ int StartEval();
+
+ int ProcessResponseMsg(std::unique_ptr<Context> context,
+ std::unique_ptr<Request> request);
+ void SetDataFunc(std::function<std::string()> func);
+
+ protected:
+ virtual void SendMessage(const Request& request);
+
+ private:
+ // Add response messages which will be sent back to the caller
+ // if there are f+1 same messages.
+ comm::CollectorResultCode AddResponseMsg(
+ std::unique_ptr<Request> request,
+ std::function<void(std::unique_ptr<BatchUserResponse>)> call_back);
+ void SendResponseToClient(const BatchUserResponse& batch_response);
+
+ struct QueueItem {
+ std::unique_ptr<Context> context;
+ std::unique_ptr<Request> user_request;
+ };
+ int DoBatch(const std::vector<std::unique_ptr<QueueItem>>& batch_req);
+ int BatchProposeMsg();
+ int GetPrimary();
+ std::unique_ptr<Request> GenerateUserRequest();
+
+ protected:
+ ResDBConfig config_;
+ ReplicaCommunicator* replica_communicator_;
+
+ private:
+ LockFreeQueue<QueueItem> batch_queue_;
+ std::thread user_req_thread_[16];
+ std::atomic<bool> stop_;
+ Stats* global_stats_;
+ std::atomic<int> send_num_;
+ std::mutex mutex_;
+ std::atomic<int> total_num_;
+ SignatureVerifier* verifier_;
+ SignatureInfo sig_;
+ std::function<std::string()> data_func_;
+ std::future<bool> eval_ready_future_;
+ std::promise<bool> eval_ready_promise_;
+ std::atomic<bool> eval_started_;
+ std::atomic<int> fail_num_;
+ static const int response_set_size_ = 6000000;
+ std::map<int64_t, int> response_[response_set_size_];
+ std::mutex response_lock_[response_set_size_];
+ int replica_num_;
+ int id_;
+ int primary_;
+ std::atomic<int> local_id_;
+ std::atomic<int> sum_;
+};
+
+} // namespace common
+} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/response_manager.cpp
b/platform/consensus/ordering/common/framework/response_manager.cpp
new file mode 100644
index 00000000..ae6b55cc
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/response_manager.cpp
@@ -0,0 +1,242 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "platform/consensus/ordering/common/framework/response_manager.h"
+
+#include <glog/logging.h>
+
+#include "common/utils/utils.h"
+
+namespace resdb {
+namespace common {
+
+using namespace resdb::comm;
+
+ResponseManager::ResponseManager(const ResDBConfig& config,
+ ReplicaCommunicator* replica_communicator,
+ SignatureVerifier* verifier)
+ : config_(config),
+ replica_communicator_(replica_communicator),
+ batch_queue_("user request"),
+ verifier_(verifier) {
+ stop_ = false;
+ local_id_ = 1;
+
+ if (config_.GetPublicKeyCertificateInfo()
+ .public_key()
+ .public_key_info()
+ .type() == CertificateKeyInfo::CLIENT ||
+ config_.IsTestMode()) {
+ user_req_thread_ = std::thread(&ResponseManager::BatchProposeMsg, this);
+ }
+ global_stats_ = Stats::GetGlobalStats();
+ send_num_ = 0;
+}
+
+ResponseManager::~ResponseManager() {
+ stop_ = true;
+ if (user_req_thread_.joinable()) {
+ user_req_thread_.join();
+ }
+}
+
+// use system info
+int ResponseManager::GetPrimary() { return 1; }
+
+int ResponseManager::NewUserRequest(std::unique_ptr<Context> context,
+ std::unique_ptr<Request> user_request) {
+ context->client = nullptr;
+
+ std::unique_ptr<QueueItem> queue_item = std::make_unique<QueueItem>();
+ queue_item->context = std::move(context);
+ queue_item->user_request = std::move(user_request);
+
+ batch_queue_.Push(std::move(queue_item));
+ return 0;
+}
+
+// =================== response ========================
+// handle the response message. If receive f+1 commit messages, send back to
the
+// caller.
+int ResponseManager::ProcessResponseMsg(std::unique_ptr<Context> context,
+ std::unique_ptr<Request> request) {
+ std::unique_ptr<Request> response;
+ // Add the response message, and use the call back to collect the received
+ // messages.
+ // The callback will be triggered if it received f+1 messages.
+ if (request->ret() == -2) {
+ LOG(ERROR) << "get response fail:" << request->ret();
+ send_num_--;
+ return 0;
+ }
+ CollectorResultCode ret =
+ AddResponseMsg(std::move(request), [&](const Request& request) {
+ response = std::make_unique<Request>(request);
+ return;
+ });
+
+ if (ret == CollectorResultCode::STATE_CHANGED) {
+ BatchUserResponse batch_response;
+ if (batch_response.ParseFromString(response->data())) {
+ SendResponseToClient(batch_response);
+ } else {
+ LOG(ERROR) << "parse response fail:";
+ }
+ }
+ return ret == CollectorResultCode::INVALID ? -2 : 0;
+}
+
+CollectorResultCode ResponseManager::AddResponseMsg(
+ std::unique_ptr<Request> request,
+ std::function<void(const Request&)> response_call_back) {
+ if (request == nullptr) {
+ return CollectorResultCode::INVALID;
+ }
+
+ int type = request->type();
+ uint64_t seq = request->seq();
+ bool done = false;
+ {
+ int idx = seq % response_set_size_;
+ std::unique_lock<std::mutex> lk(response_lock_[idx]);
+ if (response_[idx][seq] == -1) {
+ return CollectorResultCode::OK;
+ }
+ response_[idx][seq]++;
+ if (response_[idx][seq] >= config_.GetMinClientReceiveNum()) {
+ response_[idx][seq] = -1;
+ done = true;
+ }
+ }
+ if (done) {
+ response_call_back(*request);
+ return CollectorResultCode::STATE_CHANGED;
+ }
+ return CollectorResultCode::OK;
+}
+
+void ResponseManager::SendResponseToClient(
+ const BatchUserResponse& batch_response) {
+ uint64_t create_time = batch_response.createtime();
+ uint64_t local_id = batch_response.local_id();
+ if (create_time > 0) {
+ uint64_t run_time = GetCurrentTime() - create_time;
+ global_stats_->AddLatency(run_time);
+ } else {
+ LOG(ERROR) << "seq:" << local_id << " no resp";
+ }
+ send_num_--;
+}
+
+// =================== request ========================
+int ResponseManager::BatchProposeMsg() {
+ LOG(INFO) << "batch wait time:" << config_.ClientBatchWaitTimeMS()
+ << " batch num:" << config_.ClientBatchNum();
+ std::vector<std::unique_ptr<QueueItem>> batch_req;
+ while (!stop_) {
+ if (send_num_ > config_.GetMaxProcessTxn()) {
+ LOG(ERROR) << "send num too high, wait:" << send_num_;
+ usleep(100);
+ continue;
+ }
+ if (batch_req.size() < config_.ClientBatchNum()) {
+ std::unique_ptr<QueueItem> item =
+ batch_queue_.Pop(config_.ClientBatchWaitTimeMS());
+ if (item != nullptr) {
+ batch_req.push_back(std::move(item));
+ if (batch_req.size() < config_.ClientBatchNum()) {
+ continue;
+ }
+ }
+ }
+ if (batch_req.empty()) {
+ continue;
+ }
+ int ret = DoBatch(batch_req);
+ batch_req.clear();
+ if (ret != 0) {
+ Response response;
+ response.set_result(Response::ERROR);
+ for (size_t i = 0; i < batch_req.size(); ++i) {
+ if (batch_req[i]->context && batch_req[i]->context->client) {
+ int ret = batch_req[i]->context->client->SendRawMessage(response);
+ if (ret) {
+ LOG(ERROR) << "send resp" << response.DebugString()
+ << " fail ret:" << ret;
+ }
+ }
+ }
+ }
+ }
+ return 0;
+}
+
+int ResponseManager::DoBatch(
+ const std::vector<std::unique_ptr<QueueItem>>& batch_req) {
+ auto new_request =
+ NewRequest(Request::TYPE_NEW_TXNS, Request(),
config_.GetSelfInfo().id());
+ if (new_request == nullptr) {
+ return -2;
+ }
+ std::vector<std::unique_ptr<Context>> context_list;
+
+ BatchUserRequest batch_request;
+ for (size_t i = 0; i < batch_req.size(); ++i) {
+ BatchUserRequest::UserRequest* req = batch_request.add_user_requests();
+ *req->mutable_request() = *batch_req[i]->user_request.get();
+ *req->mutable_signature() = batch_req[i]->context->signature;
+ req->set_id(i);
+ context_list.push_back(std::move(batch_req[i]->context));
+ }
+
+ if (!config_.IsPerformanceRunning()) {
+ LOG(ERROR) << "add context list:" << new_request->seq()
+ << " list size:" << context_list.size();
+ batch_request.set_local_id(local_id_);
+ }
+ batch_request.set_createtime(GetCurrentTime());
+ std::string data;
+ batch_request.SerializeToString(&data);
+ if (verifier_) {
+ auto signature_or = verifier_->SignMessage(data);
+ if (!signature_or.ok()) {
+ LOG(ERROR) << "Sign message fail";
+ return -2;
+ }
+ *new_request->mutable_data_signature() = *signature_or;
+ }
+
+ batch_request.SerializeToString(new_request->mutable_data());
+ new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data()));
+ new_request->set_proxy_id(config_.GetSelfInfo().id());
+ replica_communicator_->SendMessage(*new_request, GetPrimary());
+ send_num_++;
+ LOG(INFO) << "send msg to primary:" << GetPrimary()
+ << " batch size:" << batch_req.size();
+ return 0;
+}
+
+} // namespace common
+} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/response_manager.h
b/platform/consensus/ordering/common/framework/response_manager.h
new file mode 100644
index 00000000..1c704396
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/response_manager.h
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#pragma once
+
+#include "platform/config/resdb_config.h"
+#include "platform/consensus/ordering/common/framework/transaction_utils.h"
+#include "platform/networkstrate/replica_communicator.h"
+#include "platform/networkstrate/server_comm.h"
+#include "platform/statistic/stats.h"
+
+namespace resdb {
+namespace common {
+
+class ResponseManager {
+ public:
+ ResponseManager(const ResDBConfig& config,
+ ReplicaCommunicator* replica_communicator,
+ SignatureVerifier* verifier);
+
+ ~ResponseManager();
+
+ std::vector<std::unique_ptr<Context>> FetchContextList(uint64_t id);
+
+ int NewUserRequest(std::unique_ptr<Context> context,
+ std::unique_ptr<Request> user_request);
+
+ int ProcessResponseMsg(std::unique_ptr<Context> context,
+ std::unique_ptr<Request> request);
+
+ private:
+ // Add response messages which will be sent back to the caller
+ // if there are f+1 same messages.
+ comm::CollectorResultCode AddResponseMsg(
+ std::unique_ptr<Request> request,
+ std::function<void(const Request&)> call_back);
+ void SendResponseToClient(const BatchUserResponse& batch_response);
+
+ struct QueueItem {
+ std::unique_ptr<Context> context;
+ std::unique_ptr<Request> user_request;
+ };
+ int DoBatch(const std::vector<std::unique_ptr<QueueItem>>& batch_req);
+ int BatchProposeMsg();
+ int GetPrimary();
+
+ private:
+ ResDBConfig config_;
+ ReplicaCommunicator* replica_communicator_;
+ LockFreeQueue<QueueItem> batch_queue_;
+ std::thread user_req_thread_;
+ std::atomic<bool> stop_;
+ uint64_t local_id_ = 0;
+ Stats* global_stats_;
+ std::atomic<int> send_num_;
+ SignatureVerifier* verifier_;
+ static const int response_set_size_ = 6000000;
+ std::map<int64_t, int> response_[response_set_size_];
+ std::mutex response_lock_[response_set_size_];
+};
+
+} // common
+} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/transaction_utils.cpp
b/platform/consensus/ordering/common/framework/transaction_utils.cpp
new file mode 100644
index 00000000..08a8e544
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/transaction_utils.cpp
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "platform/consensus/ordering/common/framework/transaction_utils.h"
+
+namespace resdb {
+namespace comm {
+
+std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
+ int sender_id) {
+ auto new_request = std::make_unique<Request>(request);
+ new_request->set_type(type);
+ new_request->set_sender_id(sender_id);
+ return new_request;
+}
+
+std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
+ int sender_id, int region_id) {
+ auto new_request = std::make_unique<Request>(request);
+ new_request->set_type(type);
+ new_request->set_sender_id(sender_id);
+ new_request->mutable_region_info()->set_region_id(region_id);
+ return new_request;
+}
+
+} // namespace comm
+} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/transaction_utils.h
b/platform/consensus/ordering/common/framework/transaction_utils.h
new file mode 100644
index 00000000..3055cf44
--- /dev/null
+++ b/platform/consensus/ordering/common/framework/transaction_utils.h
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#pragma once
+#include "platform/proto/replica_info.pb.h"
+#include "platform/proto/resdb.pb.h"
+
+namespace resdb {
+namespace comm {
+
+enum CollectorResultCode {
+ INVALID = -2,
+ OK = 0,
+ STATE_CHANGED = 1,
+};
+
+std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
+ int sender_id);
+
+std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
+ int sender_id, int region_info);
+} // namespace comm
+} // namespace resdb
diff --git a/platform/consensus/ordering/poe/algorithm/BUILD
b/platform/consensus/ordering/poe/algorithm/BUILD
new file mode 100644
index 00000000..357f56d8
--- /dev/null
+++ b/platform/consensus/ordering/poe/algorithm/BUILD
@@ -0,0 +1,15 @@
+package(default_visibility =
["//platform/consensus/ordering/poe:__subpackages__"])
+
+cc_library(
+ name = "poe",
+ srcs = ["poe.cpp"],
+ hdrs = ["poe.h"],
+ deps = [
+ "//platform/statistic:stats",
+ "//common:comm",
+ "//platform/consensus/ordering/poe/proto:proposal_cc_proto",
+ "//common/crypto:signature_verifier",
+ "//platform/consensus/ordering/common/algorithm:protocol_base",
+ "//platform/common/queue:lock_free_queue",
+ ],
+)
diff --git a/platform/consensus/ordering/poe/algorithm/poe.cpp
b/platform/consensus/ordering/poe/algorithm/poe.cpp
new file mode 100644
index 00000000..8a9b3a31
--- /dev/null
+++ b/platform/consensus/ordering/poe/algorithm/poe.cpp
@@ -0,0 +1,75 @@
+#include "platform/consensus/ordering/poe/algorithm/poe.h"
+
+#include <glog/logging.h>
+
+#include "common/crypto/signature_verifier.h"
+#include "common/utils/utils.h"
+
+namespace resdb {
+namespace poe {
+
+PoE::PoE(int id, int f, int total_num, SignatureVerifier* verifier)
+ : ProtocolBase(id, f, total_num), verifier_(verifier) {
+
+ LOG(ERROR) << "get proposal graph";
+ id_ = id;
+ total_num_ = total_num;
+ f_ = f;
+ is_stop_ = false;
+ seq_ = 0;
+}
+
+PoE::~PoE() {
+ is_stop_ = true;
+}
+
+bool PoE::IsStop() { return is_stop_; }
+
+bool PoE::ReceiveTransaction(std::unique_ptr<Transaction> txn) {
+ // LOG(ERROR)<<"recv txn:";
+ txn->set_create_time(GetCurrentTime());
+ txn->set_seq(seq_++);
+ txn->set_proposer(id_);
+
+ Broadcast(MessageType::Propose, *txn);
+ return true;
+}
+
+bool PoE::ReceivePropose(std::unique_ptr<Transaction> txn) {
+ std::string hash = txn->hash();
+ int64_t seq = txn->seq();
+ int proposer = txn->proposer();
+ {
+ std::unique_lock<std::mutex> lk(mutex_);
+ data_[txn->hash()]=std::move(txn);
+ }
+
+ Proposal proposal;
+ proposal.set_hash(hash);
+ proposal.set_seq(seq);
+ proposal.set_proposer(id_);
+ Broadcast(MessageType::Prepare, proposal);
+ return true;
+}
+
+bool PoE::ReceivePrepare(std::unique_ptr<Proposal> proposal) {
+ std::unique_ptr<Transaction> txn = nullptr;
+ {
+ std::unique_lock<std::mutex> lk(mutex_);
+ received_[proposal->hash()].insert(proposal->proposer());
+ auto it = data_.find(proposal->hash());
+ if(it != data_.end()){
+ if(received_[proposal->hash()].size()>=2*f_+1){
+ txn = std::move(it->second);
+ data_.erase(it);
+ }
+ }
+ }
+ if(txn != nullptr){
+ commit_(*txn);
+ }
+ return true;
+}
+
+} // namespace poe
+} // namespace resdb
diff --git a/platform/consensus/ordering/poe/algorithm/poe.h
b/platform/consensus/ordering/poe/algorithm/poe.h
new file mode 100644
index 00000000..20cc71a9
--- /dev/null
+++ b/platform/consensus/ordering/poe/algorithm/poe.h
@@ -0,0 +1,40 @@
+#pragma once
+
+#include <deque>
+#include <map>
+#include <queue>
+#include <thread>
+
+#include "platform/common/queue/lock_free_queue.h"
+#include "platform/consensus/ordering/common/algorithm/protocol_base.h"
+#include "platform/consensus/ordering/poe/proto/proposal.pb.h"
+#include "platform/statistic/stats.h"
+
+namespace resdb {
+namespace poe {
+
+class PoE: public common::ProtocolBase {
+ public:
+ PoE(int id, int f, int total_num, SignatureVerifier* verifier);
+ ~PoE();
+
+ bool ReceiveTransaction(std::unique_ptr<Transaction> txn);
+ bool ReceivePropose(std::unique_ptr<Transaction> txn);
+ bool ReceivePrepare(std::unique_ptr<Proposal> proposal);
+
+ private:
+ bool IsStop();
+
+ private:
+ std::mutex mutex_;
+ std::map<std::string, std::set<int32_t> > received_;
+ std::map<std::string, std::unique_ptr<Transaction> > data_;
+
+ int64_t seq_;
+ bool is_stop_;
+ SignatureVerifier* verifier_;
+ Stats* global_stats_;
+};
+
+} // namespace cassandra
+} // namespace resdb
diff --git a/platform/consensus/ordering/poe/framework/BUILD
b/platform/consensus/ordering/poe/framework/BUILD
new file mode 100644
index 00000000..7030d2a0
--- /dev/null
+++ b/platform/consensus/ordering/poe/framework/BUILD
@@ -0,0 +1,16 @@
+package(default_visibility = ["//visibility:private"])
+
+cc_library(
+ name = "consensus",
+ srcs = ["consensus.cpp"],
+ hdrs = ["consensus.h"],
+ visibility = [
+ "//visibility:public",
+ ],
+ deps = [
+ "//common/utils",
+ "//platform/consensus/ordering/common/framework:consensus",
+ "//platform/consensus/ordering/poe/algorithm:poe",
+ ],
+)
+
diff --git a/platform/consensus/ordering/poe/framework/consensus.cpp
b/platform/consensus/ordering/poe/framework/consensus.cpp
new file mode 100644
index 00000000..b401adaf
--- /dev/null
+++ b/platform/consensus/ordering/poe/framework/consensus.cpp
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "platform/consensus/ordering/poe/framework/consensus.h"
+
+#include <glog/logging.h>
+#include <unistd.h>
+
+#include "common/utils/utils.h"
+
+namespace resdb {
+namespace poe {
+
+Consensus::Consensus(const ResDBConfig& config,
+ std::unique_ptr<TransactionManager> executor)
+ : common::Consensus(config, std::move(executor)){
+ int total_replicas = config_.GetReplicaNum();
+ int f = (total_replicas - 1) / 3;
+
+ Init();
+
+ start_ = 0;
+
+ if (config_.GetPublicKeyCertificateInfo()
+ .public_key()
+ .public_key_info()
+ .type() != CertificateKeyInfo::CLIENT) {
+ poe_ = std::make_unique<PoE>(
+ config_.GetSelfInfo().id(), f,
+ total_replicas, GetSignatureVerifier());
+ InitProtocol(poe_.get());
+ }
+}
+
+int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) {
+ if (request->user_type() == MessageType::Propose) {
+ std::unique_ptr<Transaction> txn = std::make_unique<Transaction>();
+ if (!txn->ParseFromString(request->data())) {
+ assert(1 == 0);
+ LOG(ERROR) << "parse proposal fail";
+ return -1;
+ }
+ poe_->ReceivePropose(std::move(txn));
+ return 0;
+ } else if (request->user_type() == MessageType::Prepare) {
+ std::unique_ptr<Proposal> proposal = std::make_unique<Proposal>();
+ if (!proposal->ParseFromString(request->data())) {
+ LOG(ERROR) << "parse proposal fail";
+ assert(1 == 0);
+ return -1;
+ }
+ poe_->ReceivePrepare(std::move(proposal));
+ return 0;
+ }
+ return 0;
+}
+
+int Consensus::ProcessNewTransaction(std::unique_ptr<Request> request) {
+ std::unique_ptr<Transaction> txn = std::make_unique<Transaction>();
+ txn->set_data(request->data());
+ txn->set_hash(request->hash());
+ txn->set_proxy_id(request->proxy_id());
+ txn->set_uid(request->uid());
+ return poe_->ReceiveTransaction(std::move(txn));
+}
+
+int Consensus::CommitMsg(const google::protobuf::Message& msg) {
+ return CommitMsgInternal(dynamic_cast<const Transaction&>(msg));
+}
+
+int Consensus::CommitMsgInternal(const Transaction& txn) {
+ std::unique_ptr<Request> request = std::make_unique<Request>();
+ request->set_data(txn.data());
+ request->set_seq(txn.seq());
+ request->set_uid(txn.uid());
+ request->set_proxy_id(txn.proxy_id());
+
+ transaction_executor_->Commit(std::move(request));
+ return 0;
+}
+
+} // namespace poe
+} // namespace resdb
diff --git a/platform/consensus/ordering/poe/framework/consensus.h
b/platform/consensus/ordering/poe/framework/consensus.h
new file mode 100644
index 00000000..72e56e18
--- /dev/null
+++ b/platform/consensus/ordering/poe/framework/consensus.h
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#pragma once
+
+#include "executor/common/transaction_manager.h"
+#include "platform/consensus/ordering/common/framework/consensus.h"
+#include "platform/consensus/ordering/poe/algorithm/poe.h"
+#include "platform/networkstrate/consensus_manager.h"
+
+namespace resdb {
+namespace poe {
+
+class Consensus : public common::Consensus {
+ public:
+ Consensus(const ResDBConfig& config,
+ std::unique_ptr<TransactionManager> transaction_manager);
+ virtual ~Consensus() = default;
+
+ private:
+ int ProcessCustomConsensus(std::unique_ptr<Request> request) override;
+ int ProcessNewTransaction(std::unique_ptr<Request> request) override;
+ int CommitMsg(const google::protobuf::Message& msg) override;
+ int CommitMsgInternal(const Transaction& txn);
+
+ int Prepare(const Transaction& txn);
+
+ protected:
+ std::unique_ptr<PoE> poe_;
+ Stats* global_stats_;
+ int64_t start_;
+ std::mutex mutex_;
+ int send_num_[200];
+};
+
+} // namespace cassandra
+} // namespace resdb
diff --git a/platform/consensus/ordering/poe/framework/consensus_test.cpp
b/platform/consensus/ordering/poe/framework/consensus_test.cpp
new file mode 100644
index 00000000..2c8834a8
--- /dev/null
+++ b/platform/consensus/ordering/poe/framework/consensus_test.cpp
@@ -0,0 +1,179 @@
+/*
+ * Copyright (c) 2019-2022 ExpoLab, UC Davis
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "platform/consensus/ordering/cassandra/framework/consensus.h"
+
+#include <glog/logging.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <future>
+
+#include "common/test/test_macros.h"
+#include "executor/common/mock_transaction_manager.h"
+#include "platform/config/resdb_config_utils.h"
+#include "platform/networkstrate/mock_replica_communicator.h"
+
+namespace resdb {
+namespace cassandra {
+namespace {
+
+using ::resdb::testing::EqualsProto;
+using ::testing::_;
+using ::testing::Invoke;
+using ::testing::Test;
+
+ResDBConfig GetConfig() {
+ ResDBConfig config({GenerateReplicaInfo(1, "127.0.0.1", 1234),
+ GenerateReplicaInfo(2, "127.0.0.1", 1235),
+ GenerateReplicaInfo(3, "127.0.0.1", 1236),
+ GenerateReplicaInfo(4, "127.0.0.1", 1237)},
+ GenerateReplicaInfo(1, "127.0.0.1", 1234));
+ return config;
+}
+
+class ConsensusTest : public Test {
+ public:
+ ConsensusTest() : config_(GetConfig()) {
+ auto transaction_manager =
+ std::make_unique<MockTransactionExecutorDataImpl>();
+ mock_transaction_manager_ = transaction_manager.get();
+ consensus_ =
+ std::make_unique<Consensus>(config_, std::move(transaction_manager));
+ consensus_->SetCommunicator(&replica_communicator_);
+ }
+
+ void AddTransaction(const std::string& data) {
+ auto request = std::make_unique<Request>();
+ request->set_type(Request::TYPE_NEW_TXNS);
+
+ Transaction txn;
+
+ BatchUserRequest batch_request;
+ auto req = batch_request.add_user_requests();
+ req->mutable_request()->set_data(data);
+
+ batch_request.set_local_id(1);
+ batch_request.SerializeToString(txn.mutable_data());
+
+ txn.SerializeToString(request->mutable_data());
+
+ EXPECT_EQ(consensus_->ConsensusCommit(nullptr, std::move(request)), 0);
+ }
+
+ protected:
+ ResDBConfig config_;
+ MockTransactionExecutorDataImpl* mock_transaction_manager_;
+ MockReplicaCommunicator replica_communicator_;
+ std::unique_ptr<TransactionManager> transaction_manager_;
+ std::unique_ptr<Consensus> consensus_;
+};
+
+TEST_F(ConsensusTest, NormalCase) {
+ std::promise<bool> commit_done;
+ std::future<bool> commit_done_future = commit_done.get_future();
+
+ EXPECT_CALL(replica_communicator_, BroadCast)
+ .WillRepeatedly(Invoke([&](const google::protobuf::Message& msg) {
+ Request request = *dynamic_cast<const Request*>(&msg);
+
+ if (request.user_type() == MessageType::NewProposal) {
+ LOG(ERROR) << "bc new proposal";
+ consensus_->ConsensusCommit(nullptr,
+ std::make_unique<Request>(request));
+ LOG(ERROR) << "recv proposal done";
+ }
+ if (request.user_type() == MessageType::Vote) {
+ LOG(ERROR) << "bc vote";
+
+ VoteMessage ack_msg;
+ assert(ack_msg.ParseFromString(request.data()));
+ for (int i = 1; i <= 3; ++i) {
+ ack_msg.set_proposer_id(i);
+ auto new_req = std::make_unique<Request>(request);
+ ack_msg.SerializeToString(new_req->mutable_data());
+
+ consensus_->ConsensusCommit(nullptr, std::move(new_req));
+ }
+ }
+ // LOG(ERROR)<<"bc type:"<<request->type()<<" user
+ // type:"<<request->user_type();
+ if (request.user_type() == MessageType::Prepare) {
+ LOG(ERROR) << "bc prepare";
+
+ VoteMessage ack_msg;
+ assert(ack_msg.ParseFromString(request.data()));
+ for (int i = 1; i <= 3; ++i) {
+ ack_msg.set_proposer_id(i);
+ auto new_req = std::make_unique<Request>(request);
+ ack_msg.SerializeToString(new_req->mutable_data());
+
+ consensus_->ConsensusCommit(nullptr, std::move(new_req));
+ }
+ }
+ if (request.user_type() == MessageType::Voteprep) {
+ LOG(ERROR) << "bc voterep:";
+
+ VoteMessage ack_msg;
+ assert(ack_msg.ParseFromString(request.data()));
+ for (int i = 1; i <= 3; ++i) {
+ ack_msg.set_proposer_id(i);
+ auto new_req = std::make_unique<Request>(request);
+ ack_msg.SerializeToString(new_req->mutable_data());
+ LOG(ERROR) << "new request type:" << new_req->user_type();
+
+ consensus_->ConsensusCommit(nullptr, std::move(new_req));
+ }
+ }
+ LOG(ERROR) << "done";
+ return 0;
+ }));
+
+ EXPECT_CALL(*mock_transaction_manager_, ExecuteData)
+ .WillOnce(Invoke([&](const std::string& msg) {
+ LOG(ERROR) << "execute txn:" << msg;
+ EXPECT_EQ(msg, "transaction1");
+ return nullptr;
+ }));
+
+ EXPECT_CALL(replica_communicator_, SendMessage(_, 0))
+ .WillRepeatedly(
+ Invoke([&](const google::protobuf::Message& msg, int64_t) {
+ Request request = *dynamic_cast<const Request*>(&msg);
+ if (request.type() == Request::TYPE_RESPONSE) {
+ LOG(ERROR) << "get response";
+ commit_done.set_value(true);
+ }
+ return;
+ }));
+
+ AddTransaction("transaction1");
+
+ commit_done_future.get();
+}
+
+} // namespace
+} // namespace cassandra
+} // namespace resdb
diff --git a/platform/consensus/ordering/poe/proto/BUILD
b/platform/consensus/ordering/poe/proto/BUILD
new file mode 100644
index 00000000..8088db09
--- /dev/null
+++ b/platform/consensus/ordering/poe/proto/BUILD
@@ -0,0 +1,16 @@
+package(default_visibility =
["//platform/consensus/ordering/poe:__subpackages__"])
+
+load("@rules_cc//cc:defs.bzl", "cc_proto_library")
+load("@rules_proto//proto:defs.bzl", "proto_library")
+load("@rules_proto_grpc//python:defs.bzl", "python_proto_library")
+
+proto_library(
+ name = "proposal_proto",
+ srcs = ["proposal.proto"],
+ #visibility = ["//visibility:public"],
+)
+
+cc_proto_library(
+ name = "proposal_cc_proto",
+ deps = [":proposal_proto"],
+)
diff --git a/platform/consensus/ordering/poe/proto/proposal.proto
b/platform/consensus/ordering/poe/proto/proposal.proto
new file mode 100644
index 00000000..8302752a
--- /dev/null
+++ b/platform/consensus/ordering/poe/proto/proposal.proto
@@ -0,0 +1,28 @@
+
+syntax = "proto3";
+
+package resdb.poe;
+
+message Transaction{
+ int32 id = 1;
+ bytes data = 2;
+ bytes hash = 3;
+ int32 proxy_id = 4;
+ int32 proposer = 5;
+ int64 uid = 6;
+ int64 create_time = 7;
+ int64 seq = 9;
+}
+
+message Proposal {
+ bytes hash = 1;
+ int32 proposer = 2;
+ int64 seq =3 ;
+}
+
+enum MessageType {
+ None = 0;
+ Propose = 1;
+ Prepare = 2;
+}
+
diff --git a/platform/networkstrate/replica_communicator.cpp
b/platform/networkstrate/replica_communicator.cpp
index 4057e27d..7083fe6e 100644
--- a/platform/networkstrate/replica_communicator.cpp
+++ b/platform/networkstrate/replica_communicator.cpp
@@ -257,7 +257,7 @@ void ReplicaCommunicator::SendMessage(const
google::protobuf::Message& message,
}
if (target_replica.ip().empty()) {
- LOG(ERROR) << "no replica info";
+ LOG(ERROR) << "no replica info node:"<<node_id;
return;
}
diff --git a/platform/proto/resdb.proto b/platform/proto/resdb.proto
index 14ad976a..71ed4591 100644
--- a/platform/proto/resdb.proto
+++ b/platform/proto/resdb.proto
@@ -37,8 +37,9 @@ message Request {
TYPE_VIEWCHANGE = 16;
TYPE_NEWVIEW= 17;
TYPE_CUSTOM_QUERY = 18;
+ TYPE_CUSTOM_CONSENSUS = 19;
- NUM_OF_TYPE = 19; // the total number of types.
+ NUM_OF_TYPE = 20; // the total number of types.
// Used to create the collector.
};
int32 type = 1;
@@ -62,6 +63,12 @@ message Request {
int32 primary_id = 17;
repeated bytes hashs = 18;
repeated uint64 seqs = 19;
+ int32 user_type = 20;
+ int64 user_seq = 21;
+ int64 queuing_time = 22;
+ int64 uid = 23;
+ int64 create_time = 24;
+ int64 commit_time = 25;
}
// The response message containing response
diff --git a/scripts/deploy/config/kv_performance_server.conf
b/scripts/deploy/config/kv_performance_server.conf
index 498f5a98..af36aef8 100644
--- a/scripts/deploy/config/kv_performance_server.conf
+++ b/scripts/deploy/config/kv_performance_server.conf
@@ -1,8 +1,8 @@
iplist=(
-172.31.23.110
-172.31.31.183
-172.31.22.246
-172.31.26.117
-172.31.21.196
+172.31.25.224
+172.31.20.228
+172.31.18.224
+172.31.16.230
+172.31.31.229
)
diff --git a/scripts/deploy/config/poe.config b/scripts/deploy/config/poe.config
new file mode 100644
index 00000000..c5092a94
--- /dev/null
+++ b/scripts/deploy/config/poe.config
@@ -0,0 +1,10 @@
+{
+ "clientBatchNum": 100,
+ "enable_viewchange": false,
+ "recovery_enabled": false,
+ "max_client_complaint_num":10,
+ "max_process_txn": 32,
+ "worker_num": 2,
+ "input_worker_num": 1,
+ "output_worker_num": 10
+}
diff --git a/scripts/deploy/performance/pbft_performance.sh
b/scripts/deploy/performance/pbft_performance.sh
new file mode 100755
index 00000000..68a890a5
--- /dev/null
+++ b/scripts/deploy/performance/pbft_performance.sh
@@ -0,0 +1,2 @@
+export server=//benchmark/protocols/pbft:kv_server_performance
+./performance/run_performance.sh $*
diff --git a/scripts/deploy/performance/poe_performance.sh
b/scripts/deploy/performance/poe_performance.sh
new file mode 100755
index 00000000..02f76238
--- /dev/null
+++ b/scripts/deploy/performance/poe_performance.sh
@@ -0,0 +1,4 @@
+export server=//benchmark/protocols/poe:kv_server_performance
+export TEMPLATE_PATH=$PWD/config/poe.config
+
+./performance/run_performance.sh $*
diff --git a/scripts/deploy/performance/run_performance.sh
b/scripts/deploy/performance/run_performance.sh
index 68bc9c26..49afa373 100755
--- a/scripts/deploy/performance/run_performance.sh
+++ b/scripts/deploy/performance/run_performance.sh
@@ -1,5 +1,3 @@
-export server=//benchmark/protocols/pbft:kv_server_performance
-
./script/deploy.sh $1
. ./script/load_config.sh $1