This is an automated email from the ASF dual-hosted git repository.
junchao pushed a commit to branch new_deploy
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/new_deploy by this push:
new 320593b7 format code
320593b7 is described below
commit 320593b73ebb6bdf8796347b52ed1f3cd3ad60ee
Author: cjcchen <[email protected]>
AuthorDate: Fri May 3 03:10:45 2024 +0000
format code
---
WORKSPACE | 4 +-
api/BUILD | 7 +-
api/pybind_kv_service.cpp | 47 ++--
benchmark/protocols/poe/BUILD | 1 -
benchmark/protocols/poe/kv_server_performance.cpp | 2 +-
common/BUILD | 6 +-
interface/common/resdb_txn_accessor.cpp | 7 +-
interface/common/resdb_txn_accessor_test.cpp | 6 +-
platform/consensus/ordering/common/algorithm/BUILD | 1 -
.../ordering/common/algorithm/protocol_base.cpp | 45 ++--
.../ordering/common/algorithm/protocol_base.h | 64 ++---
platform/consensus/ordering/common/framework/BUILD | 1 -
.../ordering/common/framework/consensus.cpp | 53 ++---
.../ordering/common/framework/consensus.h | 13 +-
.../common/framework/performance_manager.cpp | 51 ++--
.../common/framework/performance_manager.h | 2 +-
.../ordering/common/framework/response_manager.cpp | 2 +-
.../ordering/common/framework/response_manager.h | 4 +-
.../consensus/ordering/pbft/checkpoint_manager.cpp | 5 +-
platform/consensus/ordering/pbft/commitment.cpp | 29 ++-
.../ordering/pbft/consensus_manager_pbft.cpp | 2 +-
.../ordering/pbft/performance_manager.cpp | 7 +-
platform/consensus/ordering/pbft/query.cpp | 32 ++-
.../consensus/ordering/pbft/response_manager.cpp | 28 +--
.../consensus/ordering/pbft/viewchange_manager.cpp | 8 +-
platform/consensus/ordering/poe/algorithm/BUILD | 6 +-
platform/consensus/ordering/poe/algorithm/poe.cpp | 13 +-
platform/consensus/ordering/poe/algorithm/poe.h | 6 +-
platform/consensus/ordering/poe/framework/BUILD | 1 -
.../consensus/ordering/poe/framework/consensus.cpp | 9 +-
.../consensus/ordering/poe/framework/consensus.h | 2 +-
platform/consensus/recovery/recovery.cpp | 3 +-
platform/networkstrate/consensus_manager.cpp | 120 +++++-----
platform/networkstrate/consensus_manager.h | 2 +-
platform/networkstrate/replica_communicator.cpp | 2 +-
platform/statistic/BUILD | 12 +-
platform/statistic/stats.cpp | 261 ++++++++++++---------
platform/statistic/stats.h | 77 +++---
third_party/BUILD | 2 +-
39 files changed, 493 insertions(+), 450 deletions(-)
diff --git a/WORKSPACE b/WORKSPACE
index 6a7883f7..ea67f3e5 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -199,7 +199,7 @@ http_archive(
http_archive(
name = "pybind11_bazel",
strip_prefix = "pybind11_bazel-2.11.1.bzl.1",
- urls =
["https://github.com/pybind/pybind11_bazel/archive/refs/tags/v2.11.1.bzl.1.zip"]
+ urls =
["https://github.com/pybind/pybind11_bazel/archive/refs/tags/v2.11.1.bzl.1.zip"],
)
http_archive(
@@ -244,4 +244,4 @@ http_archive(
sha256 =
"babcdfd2c744905a73d20de211b51367bda0d5200f11d654c4314b909d8c963c",
strip_prefix = "asio-asio-1-26-0",
url =
"https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-26-0.zip",
-)
\ No newline at end of file
+)
diff --git a/api/BUILD b/api/BUILD
index d69120c1..93b83cdf 100644
--- a/api/BUILD
+++ b/api/BUILD
@@ -1,19 +1,20 @@
package(default_visibility = ["//visibility:public"])
+
cc_binary(
name = "pybind_kv.so",
srcs = ["pybind_kv_service.cpp"],
- linkshared =1,
+ linkshared = 1,
linkstatic = 1,
deps = [
"@//common/proto:signature_info_cc_proto",
"@//interface/kv:kv_client",
"@//platform/config:resdb_config_utils",
- "@pybind11//:pybind11",
+ "@pybind11",
],
)
+
py_library(
name = "pybind_kv_so",
data = [":pybind_kv.so"],
imports = ["."],
)
-
diff --git a/api/pybind_kv_service.cpp b/api/pybind_kv_service.cpp
index bad50dc7..40073d66 100644
--- a/api/pybind_kv_service.cpp
+++ b/api/pybind_kv_service.cpp
@@ -1,11 +1,12 @@
#include <fcntl.h>
#include <getopt.h>
+#include <pybind11/pybind11.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
-#include <pybind11/pybind11.h>
#include <fstream>
+
#include "common/proto/signature_info.pb.h"
#include "interface/kv/kv_client.h"
#include "platform/config/resdb_config_utils.h"
@@ -16,35 +17,31 @@ using resdb::KVClient;
using resdb::ReplicaInfo;
using resdb::ResDBConfig;
-
-
-
std::string get(std::string key, std::string config_path) {
- ResDBConfig config = GenerateResDBConfig(config_path);
- config.SetClientTimeoutMs(100000);
- KVClient client(config);
- auto result_ptr = client.Get(key);
- if (result_ptr) {
- return *result_ptr;
- } else {
- return "";
- }
+ ResDBConfig config = GenerateResDBConfig(config_path);
+ config.SetClientTimeoutMs(100000);
+ KVClient client(config);
+ auto result_ptr = client.Get(key);
+ if (result_ptr) {
+ return *result_ptr;
+ } else {
+ return "";
+ }
}
bool set(std::string key, std::string value, std::string config_path) {
- ResDBConfig config = GenerateResDBConfig(config_path);
- config.SetClientTimeoutMs(100000);
- KVClient client(config);
- int result = client.Set(key, value);
- if (result == 0) {
- return true;
- } else {
- return false;
- }
+ ResDBConfig config = GenerateResDBConfig(config_path);
+ config.SetClientTimeoutMs(100000);
+ KVClient client(config);
+ int result = client.Set(key, value);
+ if (result == 0) {
+ return true;
+ } else {
+ return false;
+ }
}
PYBIND11_MODULE(pybind_kv, m) {
- m.def("get", &get, "A function that gets a value from the key-value
store");
- m.def("set", &set, "A function that sets a value in the key-value store");
+ m.def("get", &get, "A function that gets a value from the key-value store");
+ m.def("set", &set, "A function that sets a value in the key-value store");
}
-
diff --git a/benchmark/protocols/poe/BUILD b/benchmark/protocols/poe/BUILD
index e7bbde3b..f63f6fb8 100644
--- a/benchmark/protocols/poe/BUILD
+++ b/benchmark/protocols/poe/BUILD
@@ -13,4 +13,3 @@ cc_binary(
"//service/utils:server_factory",
],
)
-
diff --git a/benchmark/protocols/poe/kv_server_performance.cpp
b/benchmark/protocols/poe/kv_server_performance.cpp
index fa13e9a9..b2eb62dd 100644
--- a/benchmark/protocols/poe/kv_server_performance.cpp
+++ b/benchmark/protocols/poe/kv_server_performance.cpp
@@ -82,7 +82,7 @@ int main(int argc, char** argv) {
request.SerializeToString(&request_data);
return request_data;
});
-
+
auto server =
std::make_unique<ServiceNetwork>(*config,
std::move(performance_consens));
server->Run();
diff --git a/common/BUILD b/common/BUILD
index 84acee11..4e6a18aa 100644
--- a/common/BUILD
+++ b/common/BUILD
@@ -16,10 +16,10 @@ cc_library(
)
cc_library(
- name= "beast",
+ name = "beast",
deps = [
- "@boost//:beast"
- ]
+ "@boost//:beast",
+ ],
)
cc_library(
diff --git a/interface/common/resdb_txn_accessor.cpp
b/interface/common/resdb_txn_accessor.cpp
index ca19753d..3b775191 100644
--- a/interface/common/resdb_txn_accessor.cpp
+++ b/interface/common/resdb_txn_accessor.cpp
@@ -159,9 +159,10 @@ absl::StatusOr<uint64_t>
ResDBTxnAccessor::GetBlockNumbers() {
std::condition_variable resp_cv;
bool success = false;
- std::unique_ptr<NetChannel> client = GetNetChannel(replicas_[0].ip(),
replicas_[0].port());
+ std::unique_ptr<NetChannel> client =
+ GetNetChannel(replicas_[0].ip(), replicas_[0].port());
- LOG(INFO)<<"ip:"<<replicas_[0].ip()<<" port:"<<replicas_[0].port();
+ LOG(INFO) << "ip:" << replicas_[0].ip() << " port:" << replicas_[0].port();
std::string response_str;
int ret = 0;
@@ -172,7 +173,7 @@ absl::StatusOr<uint64_t>
ResDBTxnAccessor::GetBlockNumbers() {
}
client->SetRecvTimeout(100000);
ret = client->RecvRawMessageStr(&response_str);
- LOG(INFO)<<"receive str:"<<ret<<" len:"<<response_str.size();
+ LOG(INFO) << "receive str:" << ret << " len:" << response_str.size();
if (ret != 0) {
continue;
}
diff --git a/interface/common/resdb_txn_accessor_test.cpp
b/interface/common/resdb_txn_accessor_test.cpp
index e0b4925e..39c407bc 100644
--- a/interface/common/resdb_txn_accessor_test.cpp
+++ b/interface/common/resdb_txn_accessor_test.cpp
@@ -62,9 +62,11 @@ TEST(ResDBTxnAccessorTest, GetTransactionsFail) {
auto client = std::make_unique<MockNetChannel>(ip, port);
EXPECT_CALL(*client,
SendRequest(EqualsProto(request), Request::TYPE_QUERY, _))
- .Times(AtLeast(1)).WillRepeatedly(Return(0));
+ .Times(AtLeast(1))
+ .WillRepeatedly(Return(0));
EXPECT_CALL(*client, RecvRawMessageStr)
- .Times(AtLeast(1)).WillRepeatedly(Invoke([&](std::string* resp) {
return -1; }));
+ .Times(AtLeast(1))
+ .WillRepeatedly(Invoke([&](std::string* resp) { return -1; }));
return client;
}));
absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> resp =
diff --git a/platform/consensus/ordering/common/algorithm/BUILD
b/platform/consensus/ordering/common/algorithm/BUILD
index 9abd8715..1ba4e6fa 100644
--- a/platform/consensus/ordering/common/algorithm/BUILD
+++ b/platform/consensus/ordering/common/algorithm/BUILD
@@ -9,4 +9,3 @@ cc_library(
"//common/crypto:signature_verifier",
],
)
-
diff --git a/platform/consensus/ordering/common/algorithm/protocol_base.cpp
b/platform/consensus/ordering/common/algorithm/protocol_base.cpp
index 3c6c2fc3..54194136 100644
--- a/platform/consensus/ordering/common/algorithm/protocol_base.cpp
+++ b/platform/consensus/ordering/common/algorithm/protocol_base.cpp
@@ -5,43 +5,36 @@
namespace resdb {
namespace common {
-ProtocolBase::ProtocolBase(
- int id,
- int f,
- int total_num,
- SingleCallFuncType single_call,
- BroadcastCallFuncType broadcast_call,
- CommitFuncType commit) :
- id_(id),
+ProtocolBase::ProtocolBase(int id, int f, int total_num,
+ SingleCallFuncType single_call,
+ BroadcastCallFuncType broadcast_call,
+ CommitFuncType commit)
+ : id_(id),
f_(f),
total_num_(total_num),
- single_call_(single_call),
- broadcast_call_(broadcast_call),
+ single_call_(single_call),
+ broadcast_call_(broadcast_call),
commit_(commit) {
- stop_ = false;
+ stop_ = false;
}
-ProtocolBase::ProtocolBase( int id, int f, int total_num) : ProtocolBase(id,
f, total_num, nullptr, nullptr, nullptr){
+ProtocolBase::ProtocolBase(int id, int f, int total_num)
+ : ProtocolBase(id, f, total_num, nullptr, nullptr, nullptr) {}
-}
+ProtocolBase::~ProtocolBase() { Stop(); }
-ProtocolBase::~ProtocolBase() {
- Stop();
-}
+void ProtocolBase::Stop() { stop_ = true; }
-void ProtocolBase::Stop(){
- stop_ = true;
-}
+bool ProtocolBase::IsStop() { return stop_; }
-bool ProtocolBase::IsStop(){
- return stop_;
-}
-
-int ProtocolBase::SendMessage(int msg_type, const google::protobuf::Message&
msg, int node_id) {
+int ProtocolBase::SendMessage(int msg_type,
+ const google::protobuf::Message& msg,
+ int node_id) {
return single_call_(msg_type, msg, node_id);
}
-int ProtocolBase::Broadcast(int msg_type, const google::protobuf::Message&
msg) {
+int ProtocolBase::Broadcast(int msg_type,
+ const google::protobuf::Message& msg) {
return broadcast_call_(msg_type, msg);
}
@@ -49,5 +42,5 @@ int ProtocolBase::Commit(const google::protobuf::Message&
msg) {
return commit_(msg);
}
-} // namespace protocol
+} // namespace common
} // namespace resdb
diff --git a/platform/consensus/ordering/common/algorithm/protocol_base.h
b/platform/consensus/ordering/common/algorithm/protocol_base.h
index a93be822..621ecd1c 100644
--- a/platform/consensus/ordering/common/algorithm/protocol_base.h
+++ b/platform/consensus/ordering/common/algorithm/protocol_base.h
@@ -1,7 +1,9 @@
#pragma once
-#include <functional>
#include <google/protobuf/message.h>
+
+#include <functional>
+
#include "common/crypto/signature_verifier.h"
namespace resdb {
@@ -9,50 +11,52 @@ namespace common {
class ProtocolBase {
public:
- typedef std::function<int(int, const google::protobuf::Message& msg, int)>
SingleCallFuncType;
- typedef std::function<int(int, const google::protobuf::Message& msg)>
BroadcastCallFuncType;
- typedef std::function<int(const google::protobuf::Message& msg)>
CommitFuncType;
+ typedef std::function<int(int, const google::protobuf::Message& msg, int)>
+ SingleCallFuncType;
+ typedef std::function<int(int, const google::protobuf::Message& msg)>
+ BroadcastCallFuncType;
+ typedef std::function<int(const google::protobuf::Message& msg)>
+ CommitFuncType;
- ProtocolBase(
- int id,
- int f,
- int total_num,
- SingleCallFuncType single_call,
- BroadcastCallFuncType broadcast_call,
- CommitFuncType commit
- );
-
- ProtocolBase( int id, int f, int total_num);
+ ProtocolBase(int id, int f, int total_num, SingleCallFuncType single_call,
+ BroadcastCallFuncType broadcast_call, CommitFuncType commit);
+ ProtocolBase(int id, int f, int total_num);
virtual ~ProtocolBase();
void Stop();
- inline
- void SetSingleCallFunc(SingleCallFuncType single_call) { single_call_ =
single_call; }
-
- inline
- void SetBroadcastCallFunc(BroadcastCallFuncType broadcast_call) {
broadcast_call_ = broadcast_call; }
+ inline void SetSingleCallFunc(SingleCallFuncType single_call) {
+ single_call_ = single_call;
+ }
+
+ inline void SetBroadcastCallFunc(BroadcastCallFuncType broadcast_call) {
+ broadcast_call_ = broadcast_call;
+ }
- inline
- void SetCommitFunc(CommitFuncType commit_func) { commit_ = commit_func; }
+ inline void SetCommitFunc(CommitFuncType commit_func) {
+ commit_ = commit_func;
+ }
- inline
- void SetSignatureVerifier(SignatureVerifier* verifier) { verifier_ =
verifier;}
+ inline void SetSignatureVerifier(SignatureVerifier* verifier) {
+ verifier_ = verifier;
+ }
- protected:
- int SendMessage(int msg_type, const google::protobuf::Message& msg, int
node_id);
- int Broadcast(int msg_type, const google::protobuf::Message& msg);
- int Commit(const google::protobuf::Message& msg);
+ protected:
+ int SendMessage(int msg_type, const google::protobuf::Message& msg,
+ int node_id);
+ int Broadcast(int msg_type, const google::protobuf::Message& msg);
+ int Commit(const google::protobuf::Message& msg);
- bool IsStop();
+ bool IsStop();
protected:
int id_;
int f_;
int total_num_;
- std::function<int(int, const google::protobuf::Message& msg, int)>
single_call_;
+ std::function<int(int, const google::protobuf::Message& msg, int)>
+ single_call_;
std::function<int(int, const google::protobuf::Message& msg)>
broadcast_call_;
std::function<int(const google::protobuf::Message& msg)> commit_;
std::atomic<bool> stop_;
@@ -60,5 +64,5 @@ class ProtocolBase {
SignatureVerifier* verifier_;
};
-} // namespace protocol
+} // namespace common
} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/BUILD
b/platform/consensus/ordering/common/framework/BUILD
index 82e03a0f..e4a5382d 100644
--- a/platform/consensus/ordering/common/framework/BUILD
+++ b/platform/consensus/ordering/common/framework/BUILD
@@ -26,7 +26,6 @@ cc_library(
],
)
-
cc_library(
name = "response_manager",
srcs = ["response_manager.cpp"],
diff --git a/platform/consensus/ordering/common/framework/consensus.cpp
b/platform/consensus/ordering/common/framework/consensus.cpp
index 683bce29..caea1a46 100644
--- a/platform/consensus/ordering/common/framework/consensus.cpp
+++ b/platform/consensus/ordering/common/framework/consensus.cpp
@@ -46,56 +46,51 @@ Consensus::Consensus(const ResDBConfig& config,
nullptr, std::move(executor))) {
LOG(INFO) << "is running is performance mode:"
<< config_.IsPerformanceRunning();
- is_stop_ = false;
+ is_stop_ = false;
global_stats_ = Stats::GetGlobalStats();
}
-void Consensus::Init(){
- if(performance_manager_ == nullptr){
- performance_manager_ =
- config_.IsPerformanceRunning()
- ? std::make_unique<PerformanceManager>(
- config_, GetBroadCastClient(), GetSignatureVerifier())
- : nullptr;
+void Consensus::Init() {
+ if (performance_manager_ == nullptr) {
+ performance_manager_ =
+ config_.IsPerformanceRunning()
+ ? std::make_unique<PerformanceManager>(
+ config_, GetBroadCastClient(), GetSignatureVerifier())
+ : nullptr;
}
- if(response_manager_ == nullptr){
- response_manager_ =
- !config_.IsPerformanceRunning()
- ? std::make_unique<ResponseManager>(config_, GetBroadCastClient(),
- GetSignatureVerifier())
- : nullptr;
+ if (response_manager_ == nullptr) {
+ response_manager_ =
+ !config_.IsPerformanceRunning()
+ ? std::make_unique<ResponseManager>(config_, GetBroadCastClient(),
+ GetSignatureVerifier())
+ : nullptr;
}
}
-void Consensus::InitProtocol(ProtocolBase * protocol){
+void Consensus::InitProtocol(ProtocolBase* protocol) {
protocol->SetSingleCallFunc(
[&](int type, const google::protobuf::Message& msg, int node_id) {
- return SendMsg(type, msg, node_id);
+ return SendMsg(type, msg, node_id);
});
protocol->SetBroadcastCallFunc(
[&](int type, const google::protobuf::Message& msg) {
- return Broadcast(type, msg);
+ return Broadcast(type, msg);
});
protocol->SetCommitFunc(
- [&](const google::protobuf::Message& msg) {
- return CommitMsg(msg);
- });
+ [&](const google::protobuf::Message& msg) { return CommitMsg(msg); });
}
-Consensus::~Consensus(){
- is_stop_ = true;
-}
+Consensus::~Consensus() { is_stop_ = true; }
-void Consensus::SetPerformanceManager(std::unique_ptr<PerformanceManager>
performance_manager){
+void Consensus::SetPerformanceManager(
+ std::unique_ptr<PerformanceManager> performance_manager) {
performance_manager_ = std::move(performance_manager);
}
-bool Consensus::IsStop(){
- return is_stop_;
-}
+bool Consensus::IsStop() { return is_stop_; }
void Consensus::SetupPerformanceDataFunc(std::function<std::string()> func) {
performance_manager_->SetDataFunc(func);
@@ -131,9 +126,7 @@ std::vector<ReplicaInfo> Consensus::GetReplicas() {
return config_.GetReplicaInfos();
}
-int Consensus::CommitMsg(const google::protobuf::Message &txn) {
- return 0;
-}
+int Consensus::CommitMsg(const google::protobuf::Message& txn) { return 0; }
// The implementation of PBFT.
int Consensus::ConsensusCommit(std::unique_ptr<Context> context,
diff --git a/platform/consensus/ordering/common/framework/consensus.h
b/platform/consensus/ordering/common/framework/consensus.h
index 881dc72b..bb065923 100644
--- a/platform/consensus/ordering/common/framework/consensus.h
+++ b/platform/consensus/ordering/common/framework/consensus.h
@@ -49,12 +49,12 @@ class Consensus : public ConsensusManager {
void SetCommunicator(ReplicaCommunicator* replica_communicator);
- void InitProtocol(ProtocolBase * protocol);
+ void InitProtocol(ProtocolBase* protocol);
- protected:
- virtual int ProcessCustomConsensus(std::unique_ptr<Request> request);
- virtual int ProcessNewTransaction(std::unique_ptr<Request> request);
- virtual int CommitMsg(const google::protobuf::Message& msg);
+ protected:
+ virtual int ProcessCustomConsensus(std::unique_ptr<Request> request);
+ virtual int ProcessNewTransaction(std::unique_ptr<Request> request);
+ virtual int CommitMsg(const google::protobuf::Message& msg);
protected:
int SendMsg(int type, const google::protobuf::Message& msg, int node_id);
@@ -65,7 +65,8 @@ class Consensus : public ConsensusManager {
protected:
void Init();
- void SetPerformanceManager(std::unique_ptr<PerformanceManager>
performance_manger);
+ void SetPerformanceManager(
+ std::unique_ptr<PerformanceManager> performance_manger);
protected:
ReplicaCommunicator* replica_communicator_;
diff --git
a/platform/consensus/ordering/common/framework/performance_manager.cpp
b/platform/consensus/ordering/common/framework/performance_manager.cpp
index 089f3bd5..6d03820a 100644
--- a/platform/consensus/ordering/common/framework/performance_manager.cpp
+++ b/platform/consensus/ordering/common/framework/performance_manager.cpp
@@ -37,7 +37,7 @@ using comm::CollectorResultCode;
PerformanceManager::PerformanceManager(
const ResDBConfig& config, ReplicaCommunicator* replica_communicator,
SignatureVerifier* verifier)
- : config_(config),
+ : config_(config),
replica_communicator_(replica_communicator),
batch_queue_("user request"),
verifier_(verifier) {
@@ -119,51 +119,57 @@ int
PerformanceManager::ProcessResponseMsg(std::unique_ptr<Context> context,
return 0;
}
- //LOG(INFO) << "get response:" << request->seq() << "
sender:"<<request->sender_id();
+ // LOG(INFO) << "get response:" << request->seq() << "
+ // sender:"<<request->sender_id();
std::unique_ptr<BatchUserResponse> batch_response = nullptr;
- CollectorResultCode ret =
- AddResponseMsg(std::move(request),
[&](std::unique_ptr<BatchUserResponse> request) {
+ CollectorResultCode ret = AddResponseMsg(
+ std::move(request), [&](std::unique_ptr<BatchUserResponse> request) {
batch_response = std::move(request);
return;
});
if (ret == CollectorResultCode::STATE_CHANGED) {
assert(batch_response);
- SendResponseToClient(*batch_response);
+ SendResponseToClient(*batch_response);
}
return ret == CollectorResultCode::INVALID ? -2 : 0;
}
CollectorResultCode PerformanceManager::AddResponseMsg(
std::unique_ptr<Request> request,
- std::function<void(std::unique_ptr<BatchUserResponse>)>
response_call_back) {
+ std::function<void(std::unique_ptr<BatchUserResponse>)>
+ response_call_back) {
if (request == nullptr) {
return CollectorResultCode::INVALID;
}
- //uint64_t seq = request->seq();
+ // uint64_t seq = request->seq();
- std::unique_ptr<BatchUserResponse> batch_response =
std::make_unique<BatchUserResponse>();
+ std::unique_ptr<BatchUserResponse> batch_response =
+ std::make_unique<BatchUserResponse>();
if (!batch_response->ParseFromString(request->data())) {
- LOG(ERROR) << "parse response fail:"<<request->data().size()
- <<" seq:"<<request->seq(); return CollectorResultCode::INVALID;
+ LOG(ERROR) << "parse response fail:" << request->data().size()
+ << " seq:" << request->seq();
+ return CollectorResultCode::INVALID;
}
uint64_t seq = batch_response->local_id();
- //LOG(ERROR)<<"receive seq:"<<seq;
+ // LOG(ERROR)<<"receive seq:"<<seq;
bool done = false;
{
int idx = seq % response_set_size_;
std::unique_lock<std::mutex> lk(response_lock_[idx]);
if (response_[idx].find(seq) == response_[idx].end()) {
- //LOG(ERROR)<<"has done local seq:"<<seq<<" global seq:"<<request->seq();
+ // LOG(ERROR)<<"has done local seq:"<<seq<<" global
seq:"<<request->seq();
return CollectorResultCode::OK;
}
response_[idx][seq]++;
- //LOG(ERROR)<<"get seq :"<<request->seq()<<" local id:"<<seq<<"
num:"<<response_[idx][seq]<<" send:"<<send_num_;
+ // LOG(ERROR)<<"get seq :"<<request->seq()<<" local id:"<<seq<<"
+ // num:"<<response_[idx][seq]<<" send:"<<send_num_;
if (response_[idx][seq] >= config_.GetMinClientReceiveNum()) {
- //LOG(ERROR)<<"get seq :"<<request->seq()<<" local id:"<<seq<<"
num:"<<response_[idx][seq]<<" done:"<<send_num_;
+ // LOG(ERROR)<<"get seq :"<<request->seq()<<" local id:"<<seq<<"
+ // num:"<<response_[idx][seq]<<" done:"<<send_num_;
response_[idx].erase(response_[idx].find(seq));
done = true;
}
@@ -180,11 +186,13 @@ void PerformanceManager::SendResponseToClient(
uint64_t create_time = batch_response.createtime();
if (create_time > 0) {
uint64_t run_time = GetCurrentTime() - create_time;
- LOG(ERROR)<<"receive current:"<<GetCurrentTime()<<" create
time:"<<create_time<<" run time:"<<run_time<<" local
id:"<<batch_response.local_id();
+ LOG(ERROR) << "receive current:" << GetCurrentTime()
+ << " create time:" << create_time << " run time:" << run_time
+ << " local id:" << batch_response.local_id();
global_stats_->AddLatency(run_time);
} else {
}
- //send_num_-=10;
+ // send_num_-=10;
send_num_--;
}
@@ -206,8 +214,8 @@ int PerformanceManager::BatchProposeMsg() {
std::unique_ptr<QueueItem> item =
batch_queue_.Pop(config_.ClientBatchWaitTimeMS());
if (item == nullptr) {
- if(start){
- LOG(ERROR)<<"no data";
+ if (start) {
+ LOG(ERROR) << "no data";
}
continue;
}
@@ -217,7 +225,7 @@ int PerformanceManager::BatchProposeMsg() {
}
}
start = true;
- for(int i = 0; i < 1;++i){
+ for (int i = 0; i < 1; ++i) {
int ret = DoBatch(batch_req);
}
batch_req.clear();
@@ -269,7 +277,8 @@ int PerformanceManager::DoBatch(
global_stats_->BroadCastMsg();
send_num_++;
sum_ += batch_req.size();
- //LOG(ERROR)<<"send num:"<<send_num_<<" total num:"<<total_num_<<"
sum:"<<sum_<<" to:"<<GetPrimary();
+ // LOG(ERROR)<<"send num:"<<send_num_<<" total num:"<<total_num_<<"
+ // sum:"<<sum_<<" to:"<<GetPrimary();
if (total_num_++ == 1000000) {
stop_ = true;
LOG(WARNING) << "total num is done:" << total_num_;
@@ -281,7 +290,7 @@ int PerformanceManager::DoBatch(
return 0;
}
-void PerformanceManager::SendMessage(const Request& request){
+void PerformanceManager::SendMessage(const Request& request) {
replica_communicator_->SendMessage(request, GetPrimary());
}
diff --git a/platform/consensus/ordering/common/framework/performance_manager.h
b/platform/consensus/ordering/common/framework/performance_manager.h
index 5a874baa..7cacc847 100644
--- a/platform/consensus/ordering/common/framework/performance_manager.h
+++ b/platform/consensus/ordering/common/framework/performance_manager.h
@@ -50,7 +50,7 @@ class PerformanceManager {
std::unique_ptr<Request> request);
void SetDataFunc(std::function<std::string()> func);
- protected:
+ protected:
virtual void SendMessage(const Request& request);
private:
diff --git a/platform/consensus/ordering/common/framework/response_manager.cpp
b/platform/consensus/ordering/common/framework/response_manager.cpp
index ae6b55cc..98385141 100644
--- a/platform/consensus/ordering/common/framework/response_manager.cpp
+++ b/platform/consensus/ordering/common/framework/response_manager.cpp
@@ -238,5 +238,5 @@ int ResponseManager::DoBatch(
return 0;
}
-} // namespace common
+} // namespace common
} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/response_manager.h
b/platform/consensus/ordering/common/framework/response_manager.h
index 1c704396..93f7ed9c 100644
--- a/platform/consensus/ordering/common/framework/response_manager.h
+++ b/platform/consensus/ordering/common/framework/response_manager.h
@@ -32,7 +32,7 @@
#include "platform/statistic/stats.h"
namespace resdb {
-namespace common {
+namespace common {
class ResponseManager {
public:
@@ -81,5 +81,5 @@ class ResponseManager {
std::mutex response_lock_[response_set_size_];
};
-} // common
+} // namespace common
} // namespace resdb
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
index db633c09..a5a24ca8 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
@@ -317,8 +317,9 @@ void CheckPointManager::UpdateCheckPointStatus() {
if (current_seq == last_ckpt_seq + water_mark) {
last_ckpt_seq = current_seq;
- if(!is_recovery){
- BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs,
stable_seqs);
+ if (!is_recovery) {
+ BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs,
+ stable_seqs);
}
}
}
diff --git a/platform/consensus/ordering/pbft/commitment.cpp
b/platform/consensus/ordering/pbft/commitment.cpp
index aeebe835..4a1aacc0 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -41,7 +41,10 @@ Commitment::Commitment(const ResDBConfig& config,
duplicate_manager_ = std::make_unique<DuplicateManager>(config);
message_manager_->SetDuplicateManager(duplicate_manager_.get());
- global_stats_->SetProps(config_.GetSelfInfo().id(),
config_.GetSelfInfo().ip(), config_.GetSelfInfo().port(),
config_.GetConfigData().enable_resview(),
config_.GetConfigData().enable_faulty_switch());
+ global_stats_->SetProps(
+ config_.GetSelfInfo().id(), config_.GetSelfInfo().ip(),
+ config_.GetSelfInfo().port(), config_.GetConfigData().enable_resview(),
+ config_.GetConfigData().enable_faulty_switch());
global_stats_->SetPrimaryId(message_manager_->GetCurrentPrimary());
}
@@ -81,7 +84,8 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context>
context,
// << message_manager_->GetCurrentPrimary()
// << " seq:" << user_request->seq()
// << " hash:" << user_request->hash();
- LOG(INFO)<<"NOT PRIMARY, Primary is
"<<message_manager_->GetCurrentPrimary();
+ LOG(INFO) << "NOT PRIMARY, Primary is "
+ << message_manager_->GetCurrentPrimary();
replica_communicator_->SendMessage(*user_request,
message_manager_->GetCurrentPrimary());
{
@@ -154,16 +158,19 @@ int
Commitment::ProcessNewRequest(std::unique_ptr<Context> context,
// TODO check whether the sender is the primary.
int Commitment::ProcessProposeMsg(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
- if (global_stats_->IsFaulty() || context == nullptr ||
context->signature.signature().empty()) {
+ if (global_stats_->IsFaulty() || context == nullptr ||
+ context->signature.signature().empty()) {
LOG(ERROR) << "user request doesn't contain signature, reject";
return -2;
}
if (request->is_recovery()) {
- if (message_manager_->GetNextSeq() == 0 || request->seq() ==
message_manager_->GetNextSeq()) {
+ if (message_manager_->GetNextSeq() == 0 ||
+ request->seq() == message_manager_->GetNextSeq()) {
message_manager_->SetNextSeq(request->seq() + 1);
- }
- else {
- LOG(ERROR)<<" recovery request not valid:"<<" current
seq:"<<message_manager_->GetNextSeq()<<" data seq:"<<request->seq();
+ } else {
+ LOG(ERROR) << " recovery request not valid:"
+ << " current seq:" << message_manager_->GetNextSeq()
+ << " data seq:" << request->seq();
return 0;
}
return message_manager_->AddConsensusMsg(context->signature,
@@ -189,7 +196,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context>
context,
LOG(ERROR) << " check by the user func fail";
return -2;
}
- //global_stats_->GetTransactionDetails(std::move(request));
+ // global_stats_->GetTransactionDetails(std::move(request));
BatchUserRequest batch_request;
batch_request.ParseFromString(request->data());
batch_request.clear_createtime();
@@ -288,8 +295,8 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context>
context,
CollectorResultCode ret =
message_manager_->AddConsensusMsg(context->signature,
std::move(request));
if (ret == CollectorResultCode::STATE_CHANGED) {
- //LOG(ERROR)<<request->data().size();
- //global_stats_->GetTransactionDetails(request->data());
+ // LOG(ERROR)<<request->data().size();
+ // global_stats_->GetTransactionDetails(request->data());
global_stats_->RecordStateTime("commit");
}
return ret == CollectorResultCode::INVALID ? -2 : 0;
@@ -312,7 +319,7 @@ int Commitment::PostProcessExecutedMsg() {
request.set_current_view(batch_resp->current_view());
request.set_proxy_id(batch_resp->proxy_id());
request.set_primary_id(batch_resp->primary_id());
- //LOG(ERROR)<<"send back to proxy:"<<batch_resp->proxy_id();
+ // LOG(ERROR)<<"send back to proxy:"<<batch_resp->proxy_id();
batch_resp->SerializeToString(request.mutable_data());
replica_communicator_->SendMessage(request, request.proxy_id());
}
diff --git a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
index 09feae40..72103eab 100644
--- a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
+++ b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
@@ -208,7 +208,7 @@ int ConsensusManagerPBFT::InternalConsensusCommit(
int ret = commitment_->ProcessNewRequest(std::move(context),
std::move(request));
if (ret == -3) {
- LOG(ERROR)<<"BAD RETURN";
+ LOG(ERROR) << "BAD RETURN";
std::pair<std::unique_ptr<Context>, std::unique_ptr<Request>>
request_complained;
{
diff --git a/platform/consensus/ordering/pbft/performance_manager.cpp
b/platform/consensus/ordering/pbft/performance_manager.cpp
index ebaf2ed4..26d0ebca 100644
--- a/platform/consensus/ordering/pbft/performance_manager.cpp
+++ b/platform/consensus/ordering/pbft/performance_manager.cpp
@@ -191,10 +191,11 @@ CollectorResultCode PerformanceManager::AddResponseMsg(
return CollectorResultCode::INVALID;
}
- std::unique_ptr<BatchUserResponse> batch_response =
std::make_unique<BatchUserResponse>();
+ std::unique_ptr<BatchUserResponse> batch_response =
+ std::make_unique<BatchUserResponse>();
if (!batch_response->ParseFromString(request->data())) {
- LOG(ERROR) << "parse response fail:"<<request->data().size()
- <<" seq:"<<request->seq();
+ LOG(ERROR) << "parse response fail:" << request->data().size()
+ << " seq:" << request->seq();
return CollectorResultCode::INVALID;
}
diff --git a/platform/consensus/ordering/pbft/query.cpp
b/platform/consensus/ordering/pbft/query.cpp
index 62a7d54f..c2d60276 100644
--- a/platform/consensus/ordering/pbft/query.cpp
+++ b/platform/consensus/ordering/pbft/query.cpp
@@ -50,17 +50,15 @@ int Query::ProcessGetReplicaState(std::unique_ptr<Context>
context,
int Query::ProcessQuery(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
-
if (config_.GetPublicKeyCertificateInfo()
- .public_key()
- .public_key_info()
- .type() == CertificateKeyInfo::CLIENT) {
-
- auto find_primary = [&](){
+ .public_key()
+ .public_key_info()
+ .type() == CertificateKeyInfo::CLIENT) {
+ auto find_primary = [&]() {
auto config_data = config_.GetConfigData();
for (const auto& r : config_data.region()) {
for (const auto& replica : r.replica_info()) {
- if(replica.id() == 1){
+ if (replica.id() == 1) {
return replica;
}
}
@@ -70,20 +68,21 @@ int Query::ProcessQuery(std::unique_ptr<Context> context,
std::string ip = primary.ip();
int port = primary.port();
- LOG(ERROR)<<"redirect to primary:"<<ip<<" port:"<<port;
+ LOG(ERROR) << "redirect to primary:" << ip << " port:" << port;
auto client = std::make_unique<NetChannel>(ip, port);
if (client->SendRawMessage(*request) == 0) {
QueryResponse resp;
if (client->RecvRawMessage(&resp) == 0) {
if (context != nullptr && context->client != nullptr) {
- LOG(ERROR) << "send response from
primary:"<<resp.transactions_size();
+ LOG(ERROR) << "send response from primary:"
+ << resp.transactions_size();
int ret = context->client->SendRawMessage(resp);
if (ret) {
LOG(ERROR) << "send resp fail ret:" << ret;
}
}
- }
- }
+ }
+ }
return 0;
}
@@ -94,12 +93,11 @@ int Query::ProcessQuery(std::unique_ptr<Context> context,
}
QueryResponse response;
- if (query.max_seq() == 0 && query.min_seq() == 0){
+ if (query.max_seq() == 0 && query.min_seq() == 0) {
uint64_t mseq = message_manager_->GetNextSeq();
- response.set_max_seq(mseq-1);
- LOG(ERROR)<<"get max seq:"<<mseq;
- }
- else {
+ response.set_max_seq(mseq - 1);
+ LOG(ERROR) << "get max seq:" << mseq;
+ } else {
for (uint64_t i = query.min_seq(); i <= query.max_seq(); ++i) {
Request* ret_request = message_manager_->GetRequest(i);
if (ret_request == nullptr) {
@@ -114,7 +112,7 @@ int Query::ProcessQuery(std::unique_ptr<Context> context,
}
if (context != nullptr && context->client != nullptr) {
- //LOG(ERROR) << "send response:" << response.DebugString();
+ // LOG(ERROR) << "send response:" << response.DebugString();
int ret = context->client->SendRawMessage(response);
if (ret) {
LOG(ERROR) << "send resp fail ret:" << ret;
diff --git a/platform/consensus/ordering/pbft/response_manager.cpp
b/platform/consensus/ordering/pbft/response_manager.cpp
index 091a75cc..f490c003 100644
--- a/platform/consensus/ordering/pbft/response_manager.cpp
+++ b/platform/consensus/ordering/pbft/response_manager.cpp
@@ -26,7 +26,7 @@
namespace resdb {
ResponseClientTimeout::ResponseClientTimeout(std::string hash_,
- uint64_t time_) {
+ uint64_t time_) {
this->hash = hash_;
this->timeout_time = time_;
}
@@ -66,9 +66,9 @@ ResponseManager::ResponseManager(const ResDBConfig& config,
config_.IsTestMode()) {
user_req_thread_ = std::thread(&ResponseManager::BatchProposeMsg, this);
}
- if(config_.GetConfigData().enable_viewchange()){
+ if (config_.GetConfigData().enable_viewchange()) {
checking_timeout_thread_ =
- std::thread(&ResponseManager::MonitoringClientTimeOut, this);
+ std::thread(&ResponseManager::MonitoringClientTimeOut, this);
}
global_stats_ = Stats::GetGlobalStats();
send_num_ = 0;
@@ -79,7 +79,7 @@ ResponseManager::~ResponseManager() {
if (user_req_thread_.joinable()) {
user_req_thread_.join();
}
- if(checking_timeout_thread_.joinable()){
+ if (checking_timeout_thread_.joinable()) {
checking_timeout_thread_.join();
}
}
@@ -175,10 +175,11 @@ CollectorResultCode ResponseManager::AddResponseMsg(
std::string hash = request->hash();
- std::unique_ptr<BatchUserResponse> batch_response =
std::make_unique<BatchUserResponse>();
+ std::unique_ptr<BatchUserResponse> batch_response =
+ std::make_unique<BatchUserResponse>();
if (!batch_response->ParseFromString(request->data())) {
- LOG(ERROR) << "parse response fail:"<<request->data().size()
- <<" seq:"<<request->seq();
+ LOG(ERROR) << "parse response fail:" << request->data().size()
+ << " seq:" << request->seq();
RemoveWaitingResponseRequest(hash);
return CollectorResultCode::INVALID;
}
@@ -309,7 +310,8 @@ int ResponseManager::DoBatch(
if (!config_.IsPerformanceRunning()) {
LOG(ERROR) << "add context list:" << new_request->seq()
- << " list size:" << context_list.size()<<"
local_id:"<<local_id_;
+ << " list size:" << context_list.size()
+ << " local_id:" << local_id_;
batch_request.set_local_id(local_id_);
int ret = AddContextList(std::move(context_list), local_id_++);
if (ret != 0) {
@@ -334,7 +336,7 @@ int ResponseManager::DoBatch(
new_request->set_proxy_id(config_.GetSelfInfo().id());
replica_communicator_->SendMessage(*new_request, GetPrimary());
send_num_++;
- //LOG(INFO) << "send msg to primary:" << GetPrimary()
+ // LOG(INFO) << "send msg to primary:" << GetPrimary()
// << " batch size:" << batch_req.size();
AddWaitingResponseRequest(std::move(new_request));
return 0;
@@ -346,10 +348,9 @@ void ResponseManager::AddWaitingResponseRequest(
return;
}
pm_lock_.lock();
- assert(timeout_length_>0);
+ assert(timeout_length_ > 0);
uint64_t time = GetCurrentTime() + timeout_length_;
- client_timeout_min_heap_.push(
- ResponseClientTimeout(request->hash(), time));
+ client_timeout_min_heap_.push(ResponseClientTimeout(request->hash(), time));
waiting_response_batches_.insert(
make_pair(request->hash(), std::move(request)));
pm_lock_.unlock();
@@ -375,8 +376,7 @@ bool ResponseManager::CheckTimeOut(std::string hash) {
return value;
}
-std::unique_ptr<Request> ResponseManager::GetTimeOutRequest(
- std::string hash) {
+std::unique_ptr<Request> ResponseManager::GetTimeOutRequest(std::string hash) {
pm_lock_.lock();
auto value = std::move(waiting_response_batches_.find(hash)->second);
pm_lock_.unlock();
diff --git a/platform/consensus/ordering/pbft/viewchange_manager.cpp
b/platform/consensus/ordering/pbft/viewchange_manager.cpp
index 53060c20..3084c7a1 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager.cpp
+++ b/platform/consensus/ordering/pbft/viewchange_manager.cpp
@@ -109,7 +109,7 @@ void ViewChangeManager::MayStart() {
return;
}
started_ = true;
- LOG(ERROR)<<"MAYSTART";
+ LOG(ERROR) << "MAYSTART";
if (config_.GetPublicKeyCertificateInfo()
.public_key()
@@ -149,7 +149,7 @@ void ViewChangeManager::MayStart() {
bool ViewChangeManager::ChangeStatue(ViewChangeStatus status) {
if (status == ViewChangeStatus::READY_VIEW_CHANGE) {
if (status_ != ViewChangeStatus::READY_VIEW_CHANGE) {
- LOG(ERROR)<<"CHANGE STATUS";
+ LOG(ERROR) << "CHANGE STATUS";
status_ = status;
}
} else {
@@ -228,7 +228,7 @@ void
ViewChangeManager::SetCurrentViewAndNewPrimary(uint64_t view_number) {
config_.GetReplicaInfos()[(view_number - 1) % replicas.size()].id();
system_info_->SetPrimary(id);
global_stats_->ChangePrimary(id);
- LOG(ERROR)<<"View Change Happened";
+ LOG(ERROR) << "View Change Happened";
}
std::vector<std::unique_ptr<Request>> ViewChangeManager::GetPrepareMsg(
@@ -509,7 +509,7 @@ void ViewChangeManager::SendViewChangeMsg() {
}
void ViewChangeManager::AddComplaintTimer(uint64_t proxy_id, std::string hash)
{
- LOG(ERROR)<<"ADDING COMPLAINT";
+ LOG(ERROR) << "ADDING COMPLAINT";
std::lock_guard<std::mutex> lk(vc_mutex_);
if (complaining_clients_.count(proxy_id) == 0) {
complaining_clients_[proxy_id].set_proxy_id(proxy_id);
diff --git a/platform/consensus/ordering/poe/algorithm/BUILD
b/platform/consensus/ordering/poe/algorithm/BUILD
index 357f56d8..335cbca0 100644
--- a/platform/consensus/ordering/poe/algorithm/BUILD
+++ b/platform/consensus/ordering/poe/algorithm/BUILD
@@ -5,11 +5,11 @@ cc_library(
srcs = ["poe.cpp"],
hdrs = ["poe.h"],
deps = [
- "//platform/statistic:stats",
"//common:comm",
- "//platform/consensus/ordering/poe/proto:proposal_cc_proto",
"//common/crypto:signature_verifier",
- "//platform/consensus/ordering/common/algorithm:protocol_base",
"//platform/common/queue:lock_free_queue",
+ "//platform/consensus/ordering/common/algorithm:protocol_base",
+ "//platform/consensus/ordering/poe/proto:proposal_cc_proto",
+ "//platform/statistic:stats",
],
)
diff --git a/platform/consensus/ordering/poe/algorithm/poe.cpp
b/platform/consensus/ordering/poe/algorithm/poe.cpp
index 8a9b3a31..6aa9d694 100644
--- a/platform/consensus/ordering/poe/algorithm/poe.cpp
+++ b/platform/consensus/ordering/poe/algorithm/poe.cpp
@@ -10,7 +10,6 @@ namespace poe {
PoE::PoE(int id, int f, int total_num, SignatureVerifier* verifier)
: ProtocolBase(id, f, total_num), verifier_(verifier) {
-
LOG(ERROR) << "get proposal graph";
id_ = id;
total_num_ = total_num;
@@ -19,9 +18,7 @@ PoE::PoE(int id, int f, int total_num, SignatureVerifier*
verifier)
seq_ = 0;
}
-PoE::~PoE() {
- is_stop_ = true;
-}
+PoE::~PoE() { is_stop_ = true; }
bool PoE::IsStop() { return is_stop_; }
@@ -41,7 +38,7 @@ bool PoE::ReceivePropose(std::unique_ptr<Transaction> txn) {
int proposer = txn->proposer();
{
std::unique_lock<std::mutex> lk(mutex_);
- data_[txn->hash()]=std::move(txn);
+ data_[txn->hash()] = std::move(txn);
}
Proposal proposal;
@@ -58,14 +55,14 @@ bool PoE::ReceivePrepare(std::unique_ptr<Proposal>
proposal) {
std::unique_lock<std::mutex> lk(mutex_);
received_[proposal->hash()].insert(proposal->proposer());
auto it = data_.find(proposal->hash());
- if(it != data_.end()){
- if(received_[proposal->hash()].size()>=2*f_+1){
+ if (it != data_.end()) {
+ if (received_[proposal->hash()].size() >= 2 * f_ + 1) {
txn = std::move(it->second);
data_.erase(it);
}
}
}
- if(txn != nullptr){
+ if (txn != nullptr) {
commit_(*txn);
}
return true;
diff --git a/platform/consensus/ordering/poe/algorithm/poe.h
b/platform/consensus/ordering/poe/algorithm/poe.h
index 20cc71a9..65f7d018 100644
--- a/platform/consensus/ordering/poe/algorithm/poe.h
+++ b/platform/consensus/ordering/poe/algorithm/poe.h
@@ -13,7 +13,7 @@
namespace resdb {
namespace poe {
-class PoE: public common::ProtocolBase {
+class PoE : public common::ProtocolBase {
public:
PoE(int id, int f, int total_num, SignatureVerifier* verifier);
~PoE();
@@ -31,10 +31,10 @@ class PoE: public common::ProtocolBase {
std::map<std::string, std::unique_ptr<Transaction> > data_;
int64_t seq_;
- bool is_stop_;
+ bool is_stop_;
SignatureVerifier* verifier_;
Stats* global_stats_;
};
-} // namespace cassandra
+} // namespace poe
} // namespace resdb
diff --git a/platform/consensus/ordering/poe/framework/BUILD
b/platform/consensus/ordering/poe/framework/BUILD
index 7030d2a0..833a121b 100644
--- a/platform/consensus/ordering/poe/framework/BUILD
+++ b/platform/consensus/ordering/poe/framework/BUILD
@@ -13,4 +13,3 @@ cc_library(
"//platform/consensus/ordering/poe/algorithm:poe",
],
)
-
diff --git a/platform/consensus/ordering/poe/framework/consensus.cpp
b/platform/consensus/ordering/poe/framework/consensus.cpp
index b401adaf..21763d46 100644
--- a/platform/consensus/ordering/poe/framework/consensus.cpp
+++ b/platform/consensus/ordering/poe/framework/consensus.cpp
@@ -35,7 +35,7 @@ namespace poe {
Consensus::Consensus(const ResDBConfig& config,
std::unique_ptr<TransactionManager> executor)
- : common::Consensus(config, std::move(executor)){
+ : common::Consensus(config, std::move(executor)) {
int total_replicas = config_.GetReplicaNum();
int f = (total_replicas - 1) / 3;
@@ -47,9 +47,8 @@ Consensus::Consensus(const ResDBConfig& config,
.public_key()
.public_key_info()
.type() != CertificateKeyInfo::CLIENT) {
- poe_ = std::make_unique<PoE>(
- config_.GetSelfInfo().id(), f,
- total_replicas, GetSignatureVerifier());
+ poe_ = std::make_unique<PoE>(config_.GetSelfInfo().id(), f, total_replicas,
+ GetSignatureVerifier());
InitProtocol(poe_.get());
}
}
@@ -73,7 +72,7 @@ int
Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) {
}
poe_->ReceivePrepare(std::move(proposal));
return 0;
- }
+ }
return 0;
}
diff --git a/platform/consensus/ordering/poe/framework/consensus.h
b/platform/consensus/ordering/poe/framework/consensus.h
index 72e56e18..df51be37 100644
--- a/platform/consensus/ordering/poe/framework/consensus.h
+++ b/platform/consensus/ordering/poe/framework/consensus.h
@@ -55,5 +55,5 @@ class Consensus : public common::Consensus {
int send_num_[200];
};
-} // namespace cassandra
+} // namespace poe
} // namespace resdb
diff --git a/platform/consensus/recovery/recovery.cpp
b/platform/consensus/recovery/recovery.cpp
index d106075a..51faad5c 100644
--- a/platform/consensus/recovery/recovery.cpp
+++ b/platform/consensus/recovery/recovery.cpp
@@ -521,7 +521,8 @@ void Recovery::ReadLogsFromFiles(
}
}
- LOG(ERROR) << "read log from files:" << path << " done"<<" recovery max
seq:"<<max_seq;
+ LOG(ERROR) << "read log from files:" << path << " done"
+ << " recovery max seq:" << max_seq;
close(fd);
}
diff --git a/platform/networkstrate/consensus_manager.cpp
b/platform/networkstrate/consensus_manager.cpp
index 813ad1fc..b3fb1062 100644
--- a/platform/networkstrate/consensus_manager.cpp
+++ b/platform/networkstrate/consensus_manager.cpp
@@ -108,52 +108,52 @@ void ConsensusManager::HeartBeat() {
}
void ConsensusManager::SendHeartBeat() {
- auto keys = verifier_->GetAllPublicKeys();
-
- std::vector<ReplicaInfo> replicas = GetAllReplicas();
- LOG(ERROR) << "all replicas:" << replicas.size();
- std::vector<ReplicaInfo> client_replicas = GetClientReplicas();
- HeartBeatInfo hb_info;
- hb_info.set_sender(config_.GetSelfInfo().id());
- hb_info.set_ip(config_.GetSelfInfo().ip());
- hb_info.set_port(config_.GetSelfInfo().port());
- hb_info.set_hb_version(version_);
- for (const auto& key : keys) {
- *hb_info.add_public_keys() = key;
- hb_info.add_node_version(hb_[key.public_key_info().node_id()]);
- }
- for (const auto& client : client_replicas) {
- replicas.push_back(client);
- }
- auto client = GetReplicaClient(replicas, false);
- if (client == nullptr) {
- return;
- }
+ auto keys = verifier_->GetAllPublicKeys();
- // If it is not a client node, broadcost the current primary to the client.
- if (config_.GetPublicKeyCertificateInfo()
- .public_key()
- .public_key_info()
- .type() == CertificateKeyInfo::REPLICA) {
- hb_info.set_primary(GetPrimary());
- hb_info.set_version(GetVersion());
- }
- LOG(ERROR) << " server:" << config_.GetSelfInfo().id() << " sends HB"
- << " is ready:" << is_ready_
- << " client size:" << client_replicas.size()
- << " svr size:" << replicas.size();
-
- Request request;
- request.set_type(Request::TYPE_HEART_BEAT);
- request.mutable_region_info()->set_region_id(
- config_.GetConfigData().self_region_id());
- hb_info.SerializeToString(request.mutable_data());
-
- int ret = client->SendHeartBeat(request);
- if (ret <= 0) {
- LOG(ERROR) << " server:" << config_.GetSelfInfo().id()
- << " sends HB fail:" << ret;
- }
+ std::vector<ReplicaInfo> replicas = GetAllReplicas();
+ LOG(ERROR) << "all replicas:" << replicas.size();
+ std::vector<ReplicaInfo> client_replicas = GetClientReplicas();
+ HeartBeatInfo hb_info;
+ hb_info.set_sender(config_.GetSelfInfo().id());
+ hb_info.set_ip(config_.GetSelfInfo().ip());
+ hb_info.set_port(config_.GetSelfInfo().port());
+ hb_info.set_hb_version(version_);
+ for (const auto& key : keys) {
+ *hb_info.add_public_keys() = key;
+ hb_info.add_node_version(hb_[key.public_key_info().node_id()]);
+ }
+ for (const auto& client : client_replicas) {
+ replicas.push_back(client);
+ }
+ auto client = GetReplicaClient(replicas, false);
+ if (client == nullptr) {
+ return;
+ }
+
+ // If it is not a client node, broadcost the current primary to the client.
+ if (config_.GetPublicKeyCertificateInfo()
+ .public_key()
+ .public_key_info()
+ .type() == CertificateKeyInfo::REPLICA) {
+ hb_info.set_primary(GetPrimary());
+ hb_info.set_version(GetVersion());
+ }
+ LOG(ERROR) << " server:" << config_.GetSelfInfo().id() << " sends HB"
+ << " is ready:" << is_ready_
+ << " client size:" << client_replicas.size()
+ << " svr size:" << replicas.size();
+
+ Request request;
+ request.set_type(Request::TYPE_HEART_BEAT);
+ request.mutable_region_info()->set_region_id(
+ config_.GetConfigData().self_region_id());
+ hb_info.SerializeToString(request.mutable_data());
+
+ int ret = client->SendHeartBeat(request);
+ if (ret <= 0) {
+ LOG(ERROR) << " server:" << config_.GetSelfInfo().id()
+ << " sends HB fail:" << ret;
+ }
}
// Porcess the packages received from the network.
@@ -185,19 +185,20 @@ int ConsensusManager::Process(std::unique_ptr<Context>
context,
bool valid = verifier_->VerifyMessage(message.data(), message.signature());
if (!valid) {
LOG(ERROR) << "request is not valid:"
- << message.signature().DebugString();
- LOG(ERROR) << " msg:" << message.data().size()<<" is
recovery:"<<request->is_recovery();
+ << message.signature().DebugString();
+ LOG(ERROR) << " msg:" << message.data().size()
+ << " is recovery:" << request->is_recovery();
return -2;
}
} else {
}
- // forward the signature to the request so that it can be included in the
- // request/response set if needed.
- context->signature = message.signature();
- // LOG(ERROR) << "======= server:" << config_.GetSelfInfo().id()
- // << " get request type:" << request->type()
- // << " from:" << request->sender_id();
+ // forward the signature to the request so that it can be included in the
+ // request/response set if needed.
+ context->signature = message.signature();
+ // LOG(ERROR) << "======= server:" << config_.GetSelfInfo().id()
+ // << " get request type:" << request->type()
+ // << " from:" << request->sender_id();
return Dispatch(std::move(context), std::move(request));
}
@@ -225,10 +226,10 @@ int
ConsensusManager::ProcessHeartBeat(std::unique_ptr<Context> context,
LOG(ERROR) << "receive public size:" << hb_info.public_keys().size()
<< " primary:" << hb_info.primary()
<< " version:" << hb_info.version()
- << " from region:" << request->region_info().region_id()
- << " sender:"<<hb_info.sender()
- << " last send:"<< hb_info.hb_version()
- << " current v:"<<hb_[hb_info.sender()];
+ << " from region:" << request->region_info().region_id()
+ << " sender:" << hb_info.sender()
+ << " last send:" << hb_info.hb_version()
+ << " current v:" << hb_[hb_info.sender()];
if (request->region_info().region_id() ==
config_.GetConfigData().self_region_id()) {
@@ -278,12 +279,13 @@ int
ConsensusManager::ProcessHeartBeat(std::unique_ptr<Context> context,
}
}
- if(!hb_info.ip().empty() && hb_info.hb_version() > 0&&hb_[hb_info.sender()]
!= hb_info.hb_version()) {
+ if (!hb_info.ip().empty() && hb_info.hb_version() > 0 &&
+ hb_[hb_info.sender()] != hb_info.hb_version()) {
ReplicaInfo info;
info.set_ip(hb_info.ip());
info.set_port(hb_info.port());
info.set_id(hb_info.sender());
- //bc_client_->Flush(info);
+ // bc_client_->Flush(info);
hb_[hb_info.sender()] = hb_info.hb_version();
SendHeartBeat();
}
diff --git a/platform/networkstrate/consensus_manager.h
b/platform/networkstrate/consensus_manager.h
index d775f9d8..57ecc836 100644
--- a/platform/networkstrate/consensus_manager.h
+++ b/platform/networkstrate/consensus_manager.h
@@ -107,7 +107,7 @@ class ConsensusManager : public ServiceInterface {
std::vector<ReplicaInfo> clients_;
Stats* global_stats_;
uint64_t version_;
- std::map<int,uint64_t> hb_;
+ std::map<int, uint64_t> hb_;
std::mutex hb_mutex_;
};
diff --git a/platform/networkstrate/replica_communicator.cpp
b/platform/networkstrate/replica_communicator.cpp
index 7083fe6e..de096125 100644
--- a/platform/networkstrate/replica_communicator.cpp
+++ b/platform/networkstrate/replica_communicator.cpp
@@ -257,7 +257,7 @@ void ReplicaCommunicator::SendMessage(const
google::protobuf::Message& message,
}
if (target_replica.ip().empty()) {
- LOG(ERROR) << "no replica info node:"<<node_id;
+ LOG(ERROR) << "no replica info node:" << node_id;
return;
}
diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD
index e20fd916..a0618d3d 100644
--- a/platform/statistic/BUILD
+++ b/platform/statistic/BUILD
@@ -8,17 +8,17 @@ cc_library(
srcs = ["stats.cpp"],
hdrs = ["stats.h"],
deps = [
- "//proto/kv:kv_cc_proto",
- "//platform/proto:resdb_cc_proto",
- "//common:json",
":prometheus_handler",
+ "//common:asio",
+ "//common:beast",
"//common:comm",
+ "//common:json",
"//common/utils",
- "//third_party:prometheus",
"//platform/common/network:tcp_socket",
- "//common:asio",
- "//common:beast",
+ "//platform/proto:resdb_cc_proto",
+ "//proto/kv:kv_cc_proto",
"//third_party:crow",
+ "//third_party:prometheus",
],
)
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index d8de42a7..200ea527 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -20,7 +20,9 @@
#include "platform/statistic/stats.h"
#include <glog/logging.h>
+
#include <ctime>
+
#include "common/utils/utils.h"
#include "proto/kv/kv.pb.h"
@@ -67,16 +69,19 @@ Stats::Stats(int sleep_time) {
global_thread_ =
std::thread(&Stats::MonitorGlobal, this); // pass by reference
- transaction_summary_.port=-1;
+ transaction_summary_.port = -1;
- //Setup websocket here
+ // Setup websocket here
make_faulty_.store(false);
-
transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::time_point::min();
-
transaction_summary_.prepare_state_time=std::chrono::system_clock::time_point::min();
-
transaction_summary_.commit_state_time=std::chrono::system_clock::time_point::min();
-
transaction_summary_.execution_time=std::chrono::system_clock::time_point::min();
- transaction_summary_.txn_number=0;
-
+ transaction_summary_.request_pre_prepare_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.prepare_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.commit_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.execution_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.txn_number = 0;
}
void Stats::Stop() { stop_ = true; }
@@ -86,107 +91,122 @@ Stats::~Stats() {
if (global_thread_.joinable()) {
global_thread_.join();
}
- if(enable_resview && crow_thread_.joinable()){
+ if (enable_resview && crow_thread_.joinable()) {
crow_thread_.join();
}
}
-void Stats::CrowRoute(){
+void Stats::CrowRoute() {
crow::SimpleApp app;
- while(!stop_){
- try{
- CROW_ROUTE(app, "/consensus_data").methods("GET"_method)([this](const
crow::request& req, crow::response& res){
- LOG(ERROR)<<"API 1";
- res.set_header("Access-Control-Allow-Origin", "*"); // Allow requests
from any origin
- res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS");
// Specify allowed methods
- res.set_header("Access-Control-Allow-Headers", "Content-Type,
Authorization"); // Specify allowed headers
-
- // Send your response
- res.body=consensus_history_.dump();
- res.end();
- });
- CROW_ROUTE(app, "/get_status").methods("GET"_method)([this](const
crow::request& req, crow::response& res){
- LOG(ERROR)<<"API 2";
- res.set_header("Access-Control-Allow-Origin", "*"); // Allow requests
from any origin
- res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS");
// Specify allowed methods
- res.set_header("Access-Control-Allow-Headers", "Content-Type,
Authorization"); // Specify allowed headers
-
- // Send your response
- res.body= IsFaulty() ? "Faulty" : "Not Faulty";
- res.end();
- });
- CROW_ROUTE(app, "/make_faulty").methods("GET"_method)([this](const
crow::request& req, crow::response& res){
- LOG(ERROR)<<"API 3";
- res.set_header("Access-Control-Allow-Origin", "*"); // Allow requests
from any origin
- res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS");
// Specify allowed methods
- res.set_header("Access-Control-Allow-Headers", "Content-Type,
Authorization"); // Specify allowed headers
-
- // Send your response
- if(enable_faulty_switch_){
- make_faulty_.store(!make_faulty_.load());
- }
- res.body= "Success";
- res.end();
- });
- app.port(8500+transaction_summary_.port).multithreaded().run();
+ while (!stop_) {
+ try {
+ CROW_ROUTE(app, "/consensus_data")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res) {
+ LOG(ERROR) << "API 1";
+ res.set_header("Access-Control-Allow-Origin",
+ "*"); // Allow requests from any origin
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS"); // Specify allowed methods
+ res.set_header(
+ "Access-Control-Allow-Headers",
+ "Content-Type, Authorization"); // Specify allowed headers
+
+ // Send your response
+ res.body = consensus_history_.dump();
+ res.end();
+ });
+ CROW_ROUTE(app, "/get_status")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res) {
+ LOG(ERROR) << "API 2";
+ res.set_header("Access-Control-Allow-Origin",
+ "*"); // Allow requests from any origin
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS"); // Specify allowed methods
+ res.set_header(
+ "Access-Control-Allow-Headers",
+ "Content-Type, Authorization"); // Specify allowed headers
+
+ // Send your response
+ res.body = IsFaulty() ? "Faulty" : "Not Faulty";
+ res.end();
+ });
+ CROW_ROUTE(app, "/make_faulty")
+ .methods("GET"_method)([this](const crow::request& req,
+ crow::response& res) {
+ LOG(ERROR) << "API 3";
+ res.set_header("Access-Control-Allow-Origin",
+ "*"); // Allow requests from any origin
+ res.set_header("Access-Control-Allow-Methods",
+ "GET, POST, OPTIONS"); // Specify allowed methods
+ res.set_header(
+ "Access-Control-Allow-Headers",
+ "Content-Type, Authorization"); // Specify allowed headers
+
+ // Send your response
+ if (enable_faulty_switch_) {
+ make_faulty_.store(!make_faulty_.load());
+ }
+ res.body = "Success";
+ res.end();
+ });
+ app.port(8500 + transaction_summary_.port).multithreaded().run();
sleep(1);
- }
- catch( const std::exception& e){
+ } catch (const std::exception& e) {
}
}
app.stop();
}
-bool Stats::IsFaulty(){
- return make_faulty_.load();
-}
+bool Stats::IsFaulty() { return make_faulty_.load(); }
-void Stats::ChangePrimary(int primary_id){
- transaction_summary_.primary_id=primary_id;
+void Stats::ChangePrimary(int primary_id) {
+ transaction_summary_.primary_id = primary_id;
make_faulty_.store(false);
}
-void Stats::SetProps(int replica_id, std::string ip, int port, bool
resview_flag, bool faulty_flag){
- transaction_summary_.replica_id=replica_id;
- transaction_summary_.ip=ip;
- transaction_summary_.port=port;
- enable_resview=resview_flag;
- enable_faulty_switch_=faulty_flag;
- if(resview_flag){
+void Stats::SetProps(int replica_id, std::string ip, int port,
+ bool resview_flag, bool faulty_flag) {
+ transaction_summary_.replica_id = replica_id;
+ transaction_summary_.ip = ip;
+ transaction_summary_.port = port;
+ enable_resview = resview_flag;
+ enable_faulty_switch_ = faulty_flag;
+ if (resview_flag) {
crow_thread_ = std::thread(&Stats::CrowRoute, this);
}
}
-void Stats::SetPrimaryId(int primary_id){
- transaction_summary_.primary_id=primary_id;
+void Stats::SetPrimaryId(int primary_id) {
+ transaction_summary_.primary_id = primary_id;
}
-void Stats::RecordStateTime(std::string state){
- if(!enable_resview){
+void Stats::RecordStateTime(std::string state) {
+ if (!enable_resview) {
return;
}
- if(state=="request" || state=="pre-prepare"){
-
transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::now();
- }
- else if(state=="prepare"){
- transaction_summary_.prepare_state_time=std::chrono::system_clock::now();
- }
- else if(state=="commit"){
- transaction_summary_.commit_state_time=std::chrono::system_clock::now();
+ if (state == "request" || state == "pre-prepare") {
+ transaction_summary_.request_pre_prepare_state_time =
+ std::chrono::system_clock::now();
+ } else if (state == "prepare") {
+ transaction_summary_.prepare_state_time = std::chrono::system_clock::now();
+ } else if (state == "commit") {
+ transaction_summary_.commit_state_time = std::chrono::system_clock::now();
}
}
-void Stats::GetTransactionDetails(BatchUserRequest batch_request){
- if(!enable_resview){
+void Stats::GetTransactionDetails(BatchUserRequest batch_request) {
+ if (!enable_resview) {
return;
}
- transaction_summary_.txn_number=batch_request.seq();
+ transaction_summary_.txn_number = batch_request.seq();
transaction_summary_.txn_command.clear();
transaction_summary_.txn_key.clear();
transaction_summary_.txn_value.clear();
for (auto& sub_request : batch_request.user_requests()) {
KVRequest kv_request;
- if(!kv_request.ParseFromString(sub_request.request().data())){
+ if (!kv_request.ParseFromString(sub_request.request().data())) {
break;
}
if (kv_request.cmd() == KVRequest::SET) {
@@ -209,53 +229,70 @@ void Stats::GetTransactionDetails(BatchUserRequest
batch_request){
}
}
-void Stats::SendSummary(){
- if(!enable_resview){
+void Stats::SendSummary() {
+ if (!enable_resview) {
return;
}
- transaction_summary_.execution_time=std::chrono::system_clock::now();
-
- //Convert Transaction Summary to JSON
- summary_json_["replica_id"]=transaction_summary_.replica_id;
- summary_json_["ip"]=transaction_summary_.ip;
- summary_json_["port"]=transaction_summary_.port;
- summary_json_["primary_id"]=transaction_summary_.primary_id;
-
summary_json_["propose_pre_prepare_time"]=transaction_summary_.request_pre_prepare_state_time.time_since_epoch().count();
-
summary_json_["prepare_time"]=transaction_summary_.prepare_state_time.time_since_epoch().count();
-
summary_json_["commit_time"]=transaction_summary_.commit_state_time.time_since_epoch().count();
-
summary_json_["execution_time"]=transaction_summary_.execution_time.time_since_epoch().count();
- for(size_t i=0;
i<transaction_summary_.prepare_message_count_times_list.size(); i++){
-
summary_json_["prepare_message_timestamps"].push_back(transaction_summary_.prepare_message_count_times_list[i].time_since_epoch().count());
+ transaction_summary_.execution_time = std::chrono::system_clock::now();
+
+ // Convert Transaction Summary to JSON
+ summary_json_["replica_id"] = transaction_summary_.replica_id;
+ summary_json_["ip"] = transaction_summary_.ip;
+ summary_json_["port"] = transaction_summary_.port;
+ summary_json_["primary_id"] = transaction_summary_.primary_id;
+ summary_json_["propose_pre_prepare_time"] =
+ transaction_summary_.request_pre_prepare_state_time.time_since_epoch()
+ .count();
+ summary_json_["prepare_time"] =
+ transaction_summary_.prepare_state_time.time_since_epoch().count();
+ summary_json_["commit_time"] =
+ transaction_summary_.commit_state_time.time_since_epoch().count();
+ summary_json_["execution_time"] =
+ transaction_summary_.execution_time.time_since_epoch().count();
+ for (size_t i = 0;
+ i < transaction_summary_.prepare_message_count_times_list.size(); i++) {
+ summary_json_["prepare_message_timestamps"].push_back(
+ transaction_summary_.prepare_message_count_times_list[i]
+ .time_since_epoch()
+ .count());
}
- for(size_t i=0;
i<transaction_summary_.commit_message_count_times_list.size(); i++){
-
summary_json_["commit_message_timestamps"].push_back(transaction_summary_.commit_message_count_times_list[i].time_since_epoch().count());
+ for (size_t i = 0;
+ i < transaction_summary_.commit_message_count_times_list.size(); i++) {
+ summary_json_["commit_message_timestamps"].push_back(
+ transaction_summary_.commit_message_count_times_list[i]
+ .time_since_epoch()
+ .count());
}
- summary_json_["txn_number"]=transaction_summary_.txn_number;
- for(size_t i=0; i<transaction_summary_.txn_command.size(); i++){
-
summary_json_["txn_commands"].push_back(transaction_summary_.txn_command[i]);
+ summary_json_["txn_number"] = transaction_summary_.txn_number;
+ for (size_t i = 0; i < transaction_summary_.txn_command.size(); i++) {
+ summary_json_["txn_commands"].push_back(
+ transaction_summary_.txn_command[i]);
}
- for(size_t i=0; i<transaction_summary_.txn_key.size(); i++){
+ for (size_t i = 0; i < transaction_summary_.txn_key.size(); i++) {
summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]);
}
- for(size_t i=0; i<transaction_summary_.txn_value.size(); i++){
+ for (size_t i = 0; i < transaction_summary_.txn_value.size(); i++) {
summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]);
}
-
consensus_history_[std::to_string(transaction_summary_.txn_number)]=summary_json_;
+ consensus_history_[std::to_string(transaction_summary_.txn_number)] =
+ summary_json_;
+ LOG(ERROR) << summary_json_.dump();
- LOG(ERROR)<<summary_json_.dump();
-
- //Reset Transaction Summary Parameters
-
transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::time_point::min();
-
transaction_summary_.prepare_state_time=std::chrono::system_clock::time_point::min();
-
transaction_summary_.commit_state_time=std::chrono::system_clock::time_point::min();
-
transaction_summary_.execution_time=std::chrono::system_clock::time_point::min();
+ // Reset Transaction Summary Parameters
+ transaction_summary_.request_pre_prepare_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.prepare_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.commit_state_time =
+ std::chrono::system_clock::time_point::min();
+ transaction_summary_.execution_time =
+ std::chrono::system_clock::time_point::min();
transaction_summary_.prepare_message_count_times_list.clear();
transaction_summary_.commit_message_count_times_list.clear();
summary_json_.clear();
-
}
void Stats::MonitorGlobal() {
@@ -423,7 +460,8 @@ void Stats::IncPrepare() {
prometheus_->Inc(PREPARE, 1);
}
num_prepare_++;
-
transaction_summary_.prepare_message_count_times_list.push_back(std::chrono::system_clock::now());
+ transaction_summary_.prepare_message_count_times_list.push_back(
+ std::chrono::system_clock::now());
}
void Stats::IncCommit() {
@@ -431,12 +469,11 @@ void Stats::IncCommit() {
prometheus_->Inc(COMMIT, 1);
}
num_commit_++;
-
transaction_summary_.commit_message_count_times_list.push_back(std::chrono::system_clock::now());
+ transaction_summary_.commit_message_count_times_list.push_back(
+ std::chrono::system_clock::now());
}
-void Stats::IncPendingExecute() {
- pending_execute_++;
-}
+void Stats::IncPendingExecute() { pending_execute_++; }
void Stats::IncExecute() { execute_++; }
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index 621aaf84..0ca8dd1e 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -19,17 +19,18 @@
#pragma once
+#include <crow.h>
+
#include <chrono>
#include <future>
-
-#include "platform/statistic/prometheus_handler.h"
-#include "platform/proto/resdb.pb.h"
-#include "proto/kv/kv.pb.h"
-#include "platform/common/network/tcp_socket.h"
#include <nlohmann/json.hpp>
+
#include "boost/asio.hpp"
#include "boost/beast.hpp"
-#include <crow.h>
+#include "platform/common/network/tcp_socket.h"
+#include "platform/proto/resdb.pb.h"
+#include "platform/statistic/prometheus_handler.h"
+#include "proto/kv/kv.pb.h"
namespace asio = boost::asio;
namespace beast = boost::beast;
@@ -37,44 +38,46 @@ using tcp = asio::ip::tcp;
namespace resdb {
-struct VisualData{
- //Set when initializing
- int replica_id;
- int primary_id;
- std::string ip;
- int port;
-
- //Set when new txn is received
- int txn_number;
- std::vector<std::string> txn_command;
- std::vector<std::string> txn_key;
- std::vector<std::string> txn_value;
-
- //Request state if primary_id==replica_id, pre_prepare state otherwise
- std::chrono::system_clock::time_point request_pre_prepare_state_time;
- std::chrono::system_clock::time_point prepare_state_time;
- std::vector<std::chrono::system_clock::time_point>
prepare_message_count_times_list;
- std::chrono::system_clock::time_point commit_state_time;
- std::vector<std::chrono::system_clock::time_point>
commit_message_count_times_list;
- std::chrono::system_clock::time_point execution_time;
+struct VisualData {
+ // Set when initializing
+ int replica_id;
+ int primary_id;
+ std::string ip;
+ int port;
+
+ // Set when new txn is received
+ int txn_number;
+ std::vector<std::string> txn_command;
+ std::vector<std::string> txn_key;
+ std::vector<std::string> txn_value;
+
+ // Request state if primary_id==replica_id, pre_prepare state otherwise
+ std::chrono::system_clock::time_point request_pre_prepare_state_time;
+ std::chrono::system_clock::time_point prepare_state_time;
+ std::vector<std::chrono::system_clock::time_point>
+ prepare_message_count_times_list;
+ std::chrono::system_clock::time_point commit_state_time;
+ std::vector<std::chrono::system_clock::time_point>
+ commit_message_count_times_list;
+ std::chrono::system_clock::time_point execution_time;
};
-class Stats{
+class Stats {
public:
static Stats* GetGlobalStats(int sleep_seconds = 5);
void Stop();
- void RetrieveProgress();
- void SetProps(int replica_id, std::string ip, int port, bool resview_flag,
bool faulty_flag);
- void SetPrimaryId(int primary_id);
- void RecordStateTime(std::string state);
- void GetTransactionDetails(BatchUserRequest batch_request);
- void SendSummary();
- void CrowRoute();
- bool IsFaulty();
- void ChangePrimary(int primary_id);
-
+ void RetrieveProgress();
+ void SetProps(int replica_id, std::string ip, int port, bool resview_flag,
+ bool faulty_flag);
+ void SetPrimaryId(int primary_id);
+ void RecordStateTime(std::string state);
+ void GetTransactionDetails(BatchUserRequest batch_request);
+ void SendSummary();
+ void CrowRoute();
+ bool IsFaulty();
+ void ChangePrimary(int primary_id);
void AddLatency(uint64_t run_time);
diff --git a/third_party/BUILD b/third_party/BUILD
index b43e9230..0ec7614b 100644
--- a/third_party/BUILD
+++ b/third_party/BUILD
@@ -51,4 +51,4 @@ cc_library(
deps = [
"@com_crowcpp_crow//:crow",
],
-)
\ No newline at end of file
+)