This is an automated email from the ASF dual-hosted git repository.
saipranav pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/master by this push:
new 69ef5830 ResView Branch (#138)
69ef5830 is described below
commit 69ef58307738cc2922ae4b80b686171fcee49d54
Author: Saipranav-Kotamreddy
<[email protected]>
AuthorDate: Sat Mar 30 23:34:47 2024 -0700
ResView Branch (#138)
* "Added basic data collection, pointed out spots for web socket"
* "Added data gathering points and prints for testing"
* "Fixed bugs with compiling stats"
* "Changed message collection to be per message rather than queried
periodically"
* "Added conversion of stats values to JSON, need to fix replica id setting"
* "Added more precision to timestamps in JSON"
* "Added new data to the summary view"
* "Got transaction detail collection working"
* "Changed to GETALLVALUES based on main repo"
* Changed Produced JSON to include txn_number
* "Added websocket to send to front end, slightly inconsistent"
* Add files via upload
* "Added ability to receive messages from front end"
* "Viewchange Update"
* "Removed possible infinite loop in sending summary"
* "Fixing file inclusion issue"
* "Added 2 new apis in same thread to save resources"
* "Adjusted make faulty endpoint"
* "Removed vestigial variables, turned off faulty switch for PR"
* "Fixed failing response manager test"
---
.bazelrc | 2 +-
WORKSPACE | 21 +++
common/BUILD | 7 +
monitoring/prometheus/prometheus.yml | 10 +-
.../consensus/execution/transaction_executor.cpp | 3 +-
platform/consensus/ordering/pbft/BUILD | 1 +
platform/consensus/ordering/pbft/commitment.cpp | 17 +-
.../ordering/pbft/consensus_manager_pbft.cpp | 1 +
.../consensus/ordering/pbft/response_manager.cpp | 96 +++++++++++
.../consensus/ordering/pbft/response_manager.h | 27 +++
.../consensus/ordering/pbft/viewchange_manager.cpp | 6 +
.../consensus/ordering/pbft/viewchange_manager.h | 2 +
platform/proto/replica_info.proto | 2 +
platform/statistic/BUILD | 7 +
platform/statistic/stats.cpp | 191 ++++++++++++++++++++-
platform/statistic/stats.h | 56 +++++-
script.js | 103 +++++++++++
scripts/deploy/config/template.config | 2 +-
service/tools/config/server/server.config | 5 +-
third_party/BUILD | 7 +
20 files changed, 552 insertions(+), 14 deletions(-)
diff --git a/.bazelrc b/.bazelrc
index 0566a7c9..99c69f64 100644
--- a/.bazelrc
+++ b/.bazelrc
@@ -1,4 +1,4 @@
build --cxxopt='-std=c++17' --copt=-O3 --jobs=40
-#build --action_env=PYTHON_BIN_PATH="/usr/bin/python3.10"
+build --action_env=PYTHON_BIN_PATH="/usr/bin/python3.10"
#build --action_env=PYTHON_LIB_PATH="/usr/include/python3.10"
diff --git a/WORKSPACE b/WORKSPACE
index f285e83f..6a7883f7 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -224,3 +224,24 @@ http_archive(
strip_prefix = "json-3.9.1",
urls = ["https://github.com/nlohmann/json/archive/v3.9.1.tar.gz"],
)
+
+http_archive(
+ name = "com_crowcpp_crow",
+ build_file = "//third_party:crow.BUILD",
+ sha256 =
"f95128a8976fae6f2922823e07da59edae277a460776572a556a4b663ff5ee4b",
+ strip_prefix = "Crow-1.0-5",
+ url = "https://github.com/CrowCpp/Crow/archive/refs/tags/v1.0+5.zip",
+)
+
+bind(
+ name = "asio",
+ actual = "@com_chriskohlhoff_asio//:asio",
+)
+
+http_archive(
+ name = "com_chriskohlhoff_asio",
+ build_file = "//third_party:asio.BUILD",
+ sha256 =
"babcdfd2c744905a73d20de211b51367bda0d5200f11d654c4314b909d8c963c",
+ strip_prefix = "asio-asio-1-26-0",
+ url =
"https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-26-0.zip",
+)
\ No newline at end of file
diff --git a/common/BUILD b/common/BUILD
index 62d32413..84acee11 100644
--- a/common/BUILD
+++ b/common/BUILD
@@ -15,6 +15,13 @@ cc_library(
],
)
+cc_library(
+ name= "beast",
+ deps = [
+ "@boost//:beast"
+ ]
+)
+
cc_library(
name = "boost_lockfree",
deps = [
diff --git a/monitoring/prometheus/prometheus.yml
b/monitoring/prometheus/prometheus.yml
index c0046d38..8463c696 100644
--- a/monitoring/prometheus/prometheus.yml
+++ b/monitoring/prometheus/prometheus.yml
@@ -25,19 +25,19 @@ scrape_configs:
- targets: ["localhost:9090"]
- job_name: "node_exporter1"
static_configs:
- - targets: ["172.31.52.247:9100"]
+ - targets: ["localhost:9100"]
- job_name: "node_exporter2"
static_configs:
- - targets: ["172.31.54.193:9100"]
+ - targets: ["localhost:9100"]
- job_name: "node_exporter3"
static_configs:
- - targets: ["172.31.55.48:9100"]
+ - targets: ["localhost:9100"]
- job_name: "node_exporter4"
static_configs:
- - targets: ["172.31.53.140:9100"]
+ - targets: ["localhost:9100"]
- job_name: "node_exporter5"
static_configs:
- - targets: ["172.31.57.186:9100"]
+ - targets: ["localhost:9100"]
- job_name: "cpp_client1"
static_configs:
- targets: ["172.31.52.247:8090"]
diff --git a/platform/consensus/execution/transaction_executor.cpp
b/platform/consensus/execution/transaction_executor.cpp
index 2a8ca4f4..59779b22 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -186,8 +186,8 @@ void
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
// id:"<<request->proxy_id();
// std::unique_ptr<BatchUserResponse> batch_response =
// std::make_unique<BatchUserResponse>();
-
std::unique_ptr<BatchUserResponse> response;
+ global_stats_->GetTransactionDetails(batch_request);
if (transaction_manager_) {
response = transaction_manager_->ExecuteBatch(batch_request);
}
@@ -217,6 +217,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
// std::make_unique<BatchUserResponse>();
std::unique_ptr<BatchUserResponse> response;
+ global_stats_->GetTransactionDetails(batch_request);
if (transaction_manager_ && need_execute) {
response = transaction_manager_->ExecuteBatch(batch_request);
}
diff --git a/platform/consensus/ordering/pbft/BUILD
b/platform/consensus/ordering/pbft/BUILD
index ce59cb9f..a1bf1755 100644
--- a/platform/consensus/ordering/pbft/BUILD
+++ b/platform/consensus/ordering/pbft/BUILD
@@ -151,6 +151,7 @@ cc_library(
"//platform/consensus/execution:system_info",
"//platform/networkstrate:replica_communicator",
"//platform/proto:viewchange_message_cc_proto",
+ "//platform/statistic:stats",
],
)
diff --git a/platform/consensus/ordering/pbft/commitment.cpp
b/platform/consensus/ordering/pbft/commitment.cpp
index c2e84b14..081c11d2 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -40,6 +40,9 @@ Commitment::Commitment(const ResDBConfig& config,
global_stats_ = Stats::GetGlobalStats();
duplicate_manager_ = std::make_unique<DuplicateManager>(config);
message_manager_->SetDuplicateManager(duplicate_manager_.get());
+
+ global_stats_->SetProps(config_.GetSelfInfo().id(),
config_.GetSelfInfo().ip(), config_.GetSelfInfo().port(),
config_.GetConfigData().enable_resview(),
config_.GetConfigData().enable_faulty_switch());
+ global_stats_->SetPrimaryId(message_manager_->GetCurrentPrimary());
}
Commitment::~Commitment() {
@@ -78,6 +81,7 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context>
context,
// << message_manager_->GetCurrentPrimary()
// << " seq:" << user_request->seq()
// << " hash:" << user_request->hash();
+ LOG(ERROR)<<"NOT PRIMARY, Primary is
"<<message_manager_->GetCurrentPrimary();
replica_communicator_->SendMessage(*user_request,
message_manager_->GetCurrentPrimary());
{
@@ -134,6 +138,8 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context>
context,
return -2;
}
+ global_stats_->RecordStateTime("request");
+
user_request->set_type(Request::TYPE_PRE_PREPARE);
user_request->set_current_view(message_manager_->GetCurrentView());
user_request->set_seq(*seq);
@@ -149,7 +155,7 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context>
context,
// TODO check whether the sender is the primary.
int Commitment::ProcessProposeMsg(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
- if (context == nullptr || context->signature.signature().empty()) {
+ if (global_stats_->IsFaulty() || context == nullptr ||
context->signature.signature().empty()) {
LOG(ERROR) << "user request doesn't contain signature, reject";
return -2;
}
@@ -181,6 +187,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context>
context,
LOG(ERROR) << " check by the user func fail";
return -2;
}
+ //global_stats_->GetTransactionDetails(std::move(request));
BatchUserRequest batch_request;
batch_request.ParseFromString(request->data());
batch_request.clear_createtime();
@@ -202,6 +209,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context>
context,
}
global_stats_->IncPropose();
+ global_stats_->RecordStateTime("pre-prepare");
std::unique_ptr<Request> prepare_request = resdb::NewRequest(
Request::TYPE_PREPARE, *request, config_.GetSelfInfo().id());
prepare_request->clear_data();
@@ -253,6 +261,7 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context>
context,
// LOG(ERROR) << "sign hash"
// << commit_request->data_signature().DebugString();
}
+ global_stats_->RecordStateTime("prepare");
replica_communicator_->BroadCast(*commit_request);
}
return ret == CollectorResultCode::INVALID ? -2 : 0;
@@ -276,6 +285,11 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context>
context,
// commit the request.
CollectorResultCode ret =
message_manager_->AddConsensusMsg(context->signature,
std::move(request));
+ if (ret == CollectorResultCode::STATE_CHANGED) {
+ //LOG(ERROR)<<request->data().size();
+ //global_stats_->GetTransactionDetails(request->data());
+ global_stats_->RecordStateTime("commit");
+ }
return ret == CollectorResultCode::INVALID ? -2 : 0;
}
@@ -287,6 +301,7 @@ int Commitment::PostProcessExecutedMsg() {
if (batch_resp == nullptr) {
continue;
}
+ global_stats_->SendSummary();
Request request;
request.set_hash(batch_resp->hash());
request.set_seq(batch_resp->seq());
diff --git a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
index 5131445a..09feae40 100644
--- a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
+++ b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
@@ -208,6 +208,7 @@ int ConsensusManagerPBFT::InternalConsensusCommit(
int ret = commitment_->ProcessNewRequest(std::move(context),
std::move(request));
if (ret == -3) {
+ LOG(ERROR)<<"BAD RETURN";
std::pair<std::unique_ptr<Context>, std::unique_ptr<Request>>
request_complained;
{
diff --git a/platform/consensus/ordering/pbft/response_manager.cpp
b/platform/consensus/ordering/pbft/response_manager.cpp
index 9d212f61..6a7162d8 100644
--- a/platform/consensus/ordering/pbft/response_manager.cpp
+++ b/platform/consensus/ordering/pbft/response_manager.cpp
@@ -24,6 +24,24 @@
#include "common/utils/utils.h"
namespace resdb {
+
+ResponseClientTimeout::ResponseClientTimeout(std::string hash_,
+ uint64_t time_) {
+ this->hash = hash_;
+ this->timeout_time = time_;
+}
+
+ResponseClientTimeout::ResponseClientTimeout(
+ const ResponseClientTimeout& other) {
+ this->hash = other.hash;
+ this->timeout_time = other.timeout_time;
+}
+
+bool ResponseClientTimeout::operator<(
+ const ResponseClientTimeout& other) const {
+ return timeout_time > other.timeout_time;
+}
+
ResponseManager::ResponseManager(const ResDBConfig& config,
ReplicaCommunicator* replica_communicator,
SystemInfo* system_info,
@@ -47,6 +65,10 @@ ResponseManager::ResponseManager(const ResDBConfig& config,
config_.IsTestMode()) {
user_req_thread_ = std::thread(&ResponseManager::BatchProposeMsg, this);
}
+ if(config_.GetConfigData().enable_viewchange()){
+ checking_timeout_thread_ =
+ std::thread(&ResponseManager::MonitoringClientTimeOut, this);
+ }
global_stats_ = Stats::GetGlobalStats();
send_num_ = 0;
}
@@ -56,6 +78,9 @@ ResponseManager::~ResponseManager() {
if (user_req_thread_.joinable()) {
user_req_thread_.join();
}
+ if(checking_timeout_thread_.joinable()){
+ checking_timeout_thread_.join();
+ }
}
// use system info
@@ -293,11 +318,82 @@ int ResponseManager::DoBatch(
batch_request.SerializeToString(new_request->mutable_data());
new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data()));
new_request->set_proxy_id(config_.GetSelfInfo().id());
+ /*for(int i=1; i<=4; i++){
+ replica_communicator_->SendMessage(*new_request, i);
+ }*/
replica_communicator_->SendMessage(*new_request, GetPrimary());
send_num_++;
LOG(INFO) << "send msg to primary:" << GetPrimary()
<< " batch size:" << batch_req.size();
+ AddWaitingResponseRequest(std::move(new_request));
return 0;
}
+void ResponseManager::AddWaitingResponseRequest(
+ std::unique_ptr<Request> request) {
+ if (!config_.GetConfigData().enable_viewchange()) {
+ return;
+ }
+ pm_lock_.lock();
+ uint64_t time = GetCurrentTime() + this->timeout_length_;
+ client_timeout_min_heap_.push(
+ ResponseClientTimeout(request->hash(), time));
+ waiting_response_batches_.insert(
+ make_pair(request->hash(), std::move(request)));
+ pm_lock_.unlock();
+ sem_post(&request_sent_signal_);
+}
+
+void ResponseManager::RemoveWaitingResponseRequest(std::string hash) {
+ if (!config_.GetConfigData().enable_viewchange()) {
+ return;
+ }
+ pm_lock_.lock();
+ if (waiting_response_batches_.find(hash) != waiting_response_batches_.end())
{
+ waiting_response_batches_.erase(waiting_response_batches_.find(hash));
+ }
+ pm_lock_.unlock();
+}
+
+bool ResponseManager::CheckTimeOut(std::string hash) {
+ pm_lock_.lock();
+ bool value =
+ (waiting_response_batches_.find(hash) !=
waiting_response_batches_.end());
+ pm_lock_.unlock();
+ return value;
+}
+
+std::unique_ptr<Request> ResponseManager::GetTimeOutRequest(
+ std::string hash) {
+ pm_lock_.lock();
+ auto value = std::move(waiting_response_batches_.find(hash)->second);
+ pm_lock_.unlock();
+ return value;
+}
+
+void ResponseManager::MonitoringClientTimeOut() {
+ while (!stop_) {
+ sem_wait(&request_sent_signal_);
+ pm_lock_.lock();
+ if (client_timeout_min_heap_.empty()) {
+ pm_lock_.unlock();
+ continue;
+ }
+ auto client_timeout = client_timeout_min_heap_.top();
+ client_timeout_min_heap_.pop();
+ pm_lock_.unlock();
+
+ if (client_timeout.timeout_time > GetCurrentTime()) {
+ usleep(client_timeout.timeout_time - GetCurrentTime());
+ }
+
+ if (CheckTimeOut(client_timeout.hash)) {
+ auto request = GetTimeOutRequest(client_timeout.hash);
+ if (request) {
+ LOG(ERROR) << "Client Request Timeout " << client_timeout.hash;
+ replica_communicator_->BroadCast(*request);
+ }
+ }
+ }
+}
} // namespace resdb
diff --git a/platform/consensus/ordering/pbft/response_manager.h
b/platform/consensus/ordering/pbft/response_manager.h
index fdf89b75..b238e0df 100644
--- a/platform/consensus/ordering/pbft/response_manager.h
+++ b/platform/consensus/ordering/pbft/response_manager.h
@@ -18,6 +18,7 @@
*/
#pragma once
+#include <semaphore.h>
#include "platform/config/resdb_config.h"
#include "platform/consensus/ordering/pbft/lock_free_collector_pool.h"
@@ -27,6 +28,16 @@
namespace resdb {
+class ResponseClientTimeout {
+ public:
+ ResponseClientTimeout(std::string hash_, uint64_t time_);
+ ResponseClientTimeout(const ResponseClientTimeout& other);
+ bool operator<(const ResponseClientTimeout& other) const;
+
+ std::string hash;
+ uint64_t timeout_time;
+};
+
class ResponseManager {
public:
ResponseManager(const ResDBConfig& config,
@@ -65,6 +76,13 @@ class ResponseManager {
int BatchProposeMsg();
int GetPrimary();
+ void AddWaitingResponseRequest(std::unique_ptr<Request> request);
+ void RemoveWaitingResponseRequest(std::string hash);
+ bool CheckTimeOut(std::string hash);
+ void ResponseTimer(std::string hash);
+ void MonitoringClientTimeOut();
+ std::unique_ptr<Request> GetTimeOutRequest(std::string hash);
+
private:
ResDBConfig config_;
ReplicaCommunicator* replica_communicator_;
@@ -77,6 +95,15 @@ class ResponseManager {
SystemInfo* system_info_;
std::atomic<int> send_num_;
SignatureVerifier* verifier_;
+
+ std::thread checking_timeout_thread_;
+ std::map<std::string, std::unique_ptr<Request>> waiting_response_batches_;
+ std::priority_queue<ResponseClientTimeout> client_timeout_min_heap_;
+ std::mutex pm_lock_;
+ uint64_t timeout_length_;
+ sem_t request_sent_signal_;
+ uint64_t highest_seq_;
+ uint64_t highest_seq_primary_id_;
};
} // namespace resdb
diff --git a/platform/consensus/ordering/pbft/viewchange_manager.cpp
b/platform/consensus/ordering/pbft/viewchange_manager.cpp
index 7033801b..53060c20 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager.cpp
+++ b/platform/consensus/ordering/pbft/viewchange_manager.cpp
@@ -83,6 +83,7 @@ ViewChangeManager::ViewChangeManager(const ResDBConfig&
config,
started_(false),
stop_(false) {
view_change_counter_ = 1;
+ global_stats_ = Stats::GetGlobalStats();
if (config_.GetConfigData().enable_viewchange()) {
collector_pool_ = message_manager->GetCollectorPool();
sem_init(&viewchange_timer_signal_, 0, 0);
@@ -108,6 +109,7 @@ void ViewChangeManager::MayStart() {
return;
}
started_ = true;
+ LOG(ERROR)<<"MAYSTART";
if (config_.GetPublicKeyCertificateInfo()
.public_key()
@@ -147,6 +149,7 @@ void ViewChangeManager::MayStart() {
bool ViewChangeManager::ChangeStatue(ViewChangeStatus status) {
if (status == ViewChangeStatus::READY_VIEW_CHANGE) {
if (status_ != ViewChangeStatus::READY_VIEW_CHANGE) {
+ LOG(ERROR)<<"CHANGE STATUS";
status_ = status;
}
} else {
@@ -224,6 +227,8 @@ void
ViewChangeManager::SetCurrentViewAndNewPrimary(uint64_t view_number) {
uint32_t id =
config_.GetReplicaInfos()[(view_number - 1) % replicas.size()].id();
system_info_->SetPrimary(id);
+ global_stats_->ChangePrimary(id);
+ LOG(ERROR)<<"View Change Happened";
}
std::vector<std::unique_ptr<Request>> ViewChangeManager::GetPrepareMsg(
@@ -504,6 +509,7 @@ void ViewChangeManager::SendViewChangeMsg() {
}
void ViewChangeManager::AddComplaintTimer(uint64_t proxy_id, std::string hash)
{
+ LOG(ERROR)<<"ADDING COMPLAINT";
std::lock_guard<std::mutex> lk(vc_mutex_);
if (complaining_clients_.count(proxy_id) == 0) {
complaining_clients_[proxy_id].set_proxy_id(proxy_id);
diff --git a/platform/consensus/ordering/pbft/viewchange_manager.h
b/platform/consensus/ordering/pbft/viewchange_manager.h
index a6e085ed..4cc2b7cd 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager.h
+++ b/platform/consensus/ordering/pbft/viewchange_manager.h
@@ -28,6 +28,7 @@
#include "platform/consensus/ordering/pbft/message_manager.h"
#include "platform/networkstrate/replica_communicator.h"
#include "platform/proto/viewchange_message.pb.h"
+#include "platform/statistic/stats.h"
namespace resdb {
@@ -128,6 +129,7 @@ class ViewChangeManager {
ResDBConfig config_;
CheckPointManager* checkpoint_manager_;
MessageManager* message_manager_;
+ Stats* global_stats_;
SystemInfo* system_info_;
ReplicaCommunicator* replica_communicator_;
SignatureVerifier* verifier_;
diff --git a/platform/proto/replica_info.proto
b/platform/proto/replica_info.proto
index 9c81dade..658bf532 100644
--- a/platform/proto/replica_info.proto
+++ b/platform/proto/replica_info.proto
@@ -38,6 +38,8 @@ message ResConfigData{
optional string recovery_path = 18;
optional int32 recovery_buffer_size = 19;
optional int32 recovery_ckpt_time_s = 20;
+ optional bool enable_resview = 23;
+ optional bool enable_faulty_switch = 24;
// for hotstuff.
optional bool use_chain_hotstuff = 9;
diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD
index 4d3b5c68..e20fd916 100644
--- a/platform/statistic/BUILD
+++ b/platform/statistic/BUILD
@@ -8,10 +8,17 @@ cc_library(
srcs = ["stats.cpp"],
hdrs = ["stats.h"],
deps = [
+ "//proto/kv:kv_cc_proto",
+ "//platform/proto:resdb_cc_proto",
+ "//common:json",
":prometheus_handler",
"//common:comm",
"//common/utils",
"//third_party:prometheus",
+ "//platform/common/network:tcp_socket",
+ "//common:asio",
+ "//common:beast",
+ "//third_party:crow",
],
)
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index 7ce79310..33de785c 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -20,8 +20,13 @@
#include "platform/statistic/stats.h"
#include <glog/logging.h>
-
+#include <ctime>
#include "common/utils/utils.h"
+#include "proto/kv/kv.pb.h"
+
+namespace asio = boost::asio;
+namespace beast = boost::beast;
+using tcp = asio::ip::tcp;
namespace resdb {
@@ -59,9 +64,19 @@ Stats::Stats(int sleep_time) {
send_broad_cast_msg_ = 0;
prometheus_ = nullptr;
-
global_thread_ =
std::thread(&Stats::MonitorGlobal, this); // pass by reference
+
+ transaction_summary_.port=-1;
+
+ //Setup websocket here
+ make_faulty_.store(false);
+
transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::time_point::min();
+
transaction_summary_.prepare_state_time=std::chrono::system_clock::time_point::min();
+
transaction_summary_.commit_state_time=std::chrono::system_clock::time_point::min();
+
transaction_summary_.execution_time=std::chrono::system_clock::time_point::min();
+ transaction_summary_.txn_number=0;
+
}
void Stats::Stop() { stop_ = true; }
@@ -71,6 +86,172 @@ Stats::~Stats() {
if (global_thread_.joinable()) {
global_thread_.join();
}
+ if(enable_resview && crow_thread_.joinable()){
+ crow_thread_.join();
+ }
+}
+
+void Stats::CrowRoute(){
+ crow::SimpleApp app;
+ while(!stop_){
+ try{
+ CROW_ROUTE(app, "/consensus_data").methods("GET"_method)([this](const
crow::request& req, crow::response& res){
+ LOG(ERROR)<<"API 1";
+ res.set_header("Access-Control-Allow-Origin", "*"); // Allow requests
from any origin
+ res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS");
// Specify allowed methods
+ res.set_header("Access-Control-Allow-Headers", "Content-Type,
Authorization"); // Specify allowed headers
+
+ // Send your response
+ res.body=consensus_history_.dump();
+ res.end();
+ });
+ CROW_ROUTE(app, "/get_status").methods("GET"_method)([this](const
crow::request& req, crow::response& res){
+ LOG(ERROR)<<"API 2";
+ res.set_header("Access-Control-Allow-Origin", "*"); // Allow requests
from any origin
+ res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS");
// Specify allowed methods
+ res.set_header("Access-Control-Allow-Headers", "Content-Type,
Authorization"); // Specify allowed headers
+
+ // Send your response
+ res.body= IsFaulty() ? "Faulty" : "Not Faulty";
+ res.end();
+ });
+ CROW_ROUTE(app, "/make_faulty").methods("GET"_method)([this](const
crow::request& req, crow::response& res){
+ LOG(ERROR)<<"API 3";
+ res.set_header("Access-Control-Allow-Origin", "*"); // Allow requests
from any origin
+ res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS");
// Specify allowed methods
+ res.set_header("Access-Control-Allow-Headers", "Content-Type,
Authorization"); // Specify allowed headers
+
+ // Send your response
+ if(enable_faulty_switch_){
+ make_faulty_.store(!make_faulty_.load());
+ }
+ res.body= "Success";
+ res.end();
+ });
+ app.port(8500+transaction_summary_.port).multithreaded().run();
+ sleep(1);
+ }
+ catch( const std::exception& e){
+ }
+ }
+ app.stop();
+}
+
+bool Stats::IsFaulty(){
+ return make_faulty_.load();
+}
+
+void Stats::ChangePrimary(int primary_id){
+ transaction_summary_.primary_id=primary_id;
+ make_faulty_.store(false);
+}
+
+void Stats::SetProps(int replica_id, std::string ip, int port, bool
resview_flag, bool faulty_flag){
+ transaction_summary_.replica_id=replica_id;
+ transaction_summary_.ip=ip;
+ transaction_summary_.port=port;
+ enable_resview=resview_flag;
+ enable_faulty_switch_=faulty_flag;
+ if(resview_flag){
+ crow_thread_ = std::thread(&Stats::CrowRoute, this);
+ }
+}
+
+void Stats::SetPrimaryId(int primary_id){
+ transaction_summary_.primary_id=primary_id;
+}
+
+void Stats::RecordStateTime(std::string state){
+ if(!enable_resview){
+ return;
+ }
+ if(state=="request" || state=="pre-prepare"){
+
transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::now();
+ }
+ else if(state=="prepare"){
+ transaction_summary_.prepare_state_time=std::chrono::system_clock::now();
+ }
+ else if(state=="commit"){
+ transaction_summary_.commit_state_time=std::chrono::system_clock::now();
+ }
+}
+
+void Stats::GetTransactionDetails(BatchUserRequest batch_request){
+ if(!enable_resview){
+ return;
+ }
+ transaction_summary_.txn_number=batch_request.seq();
+ transaction_summary_.txn_command.clear();
+ transaction_summary_.txn_key.clear();
+ transaction_summary_.txn_value.clear();
+ for (auto& sub_request : batch_request.user_requests()) {
+ KVRequest kv_request;
+ if(!kv_request.ParseFromString(sub_request.request().data())){
+ break;
+ }
+ if (kv_request.cmd() == KVRequest::SET) {
+ transaction_summary_.txn_command.push_back("SET");
+ transaction_summary_.txn_key.push_back(kv_request.key());
+ transaction_summary_.txn_value.push_back(kv_request.value());
+ } else if (kv_request.cmd() == KVRequest::GET) {
+ transaction_summary_.txn_command.push_back("GET");
+ transaction_summary_.txn_key.push_back(kv_request.key());
+ transaction_summary_.txn_value.push_back("");
+ } else if (kv_request.cmd() == KVRequest::GETALLVALUES) {
+ transaction_summary_.txn_command.push_back("GETALLVALUES");
+ transaction_summary_.txn_key.push_back(kv_request.key());
+ transaction_summary_.txn_value.push_back("");
+ } else if (kv_request.cmd() == KVRequest::GETRANGE) {
+ transaction_summary_.txn_command.push_back("GETRANGE");
+ transaction_summary_.txn_key.push_back(kv_request.key());
+ transaction_summary_.txn_value.push_back(kv_request.value());
+ }
+ }
+}
+
+void Stats::SendSummary(){
+ if(!enable_resview){
+ return;
+ }
+ transaction_summary_.execution_time=std::chrono::system_clock::now();
+
+ //Convert Transaction Summary to JSON
+ summary_json_["replica_id"]=transaction_summary_.replica_id;
+ summary_json_["ip"]=transaction_summary_.ip;
+ summary_json_["port"]=transaction_summary_.port;
+ summary_json_["primary_id"]=transaction_summary_.primary_id;
+
summary_json_["propose_pre_prepare_time"]=transaction_summary_.request_pre_prepare_state_time.time_since_epoch().count();
+
summary_json_["prepare_time"]=transaction_summary_.prepare_state_time.time_since_epoch().count();
+
summary_json_["commit_time"]=transaction_summary_.commit_state_time.time_since_epoch().count();
+
summary_json_["execution_time"]=transaction_summary_.execution_time.time_since_epoch().count();
+ for(size_t i=0;
i<transaction_summary_.prepare_message_count_times_list.size(); i++){
+
summary_json_["prepare_message_timestamps"].push_back(transaction_summary_.prepare_message_count_times_list[i].time_since_epoch().count());
+ }
+ for(size_t i=0;
i<transaction_summary_.commit_message_count_times_list.size(); i++){
+
summary_json_["commit_message_timestamps"].push_back(transaction_summary_.commit_message_count_times_list[i].time_since_epoch().count());
+ }
+ summary_json_["txn_number"]=transaction_summary_.txn_number;
+ for(size_t i=0; i<transaction_summary_.txn_command.size(); i++){
+
summary_json_["txn_commands"].push_back(transaction_summary_.txn_command[i]);
+ }
+ for(size_t i=0; i<transaction_summary_.txn_key.size(); i++){
+ summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]);
+ }
+ for(size_t i=0; i<transaction_summary_.txn_value.size(); i++){
+ summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]);
+ }
+
+
consensus_history_[std::to_string(transaction_summary_.txn_number)]=summary_json_;
+
+ LOG(ERROR)<<summary_json_.dump();
+
+ //Reset Transaction Summary Parameters
+
transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::time_point::min();
+
transaction_summary_.prepare_state_time=std::chrono::system_clock::time_point::min();
+
transaction_summary_.commit_state_time=std::chrono::system_clock::time_point::min();
+
transaction_summary_.execution_time=std::chrono::system_clock::time_point::min();
+ transaction_summary_.prepare_message_count_times_list.clear();
+ transaction_summary_.commit_message_count_times_list.clear();
}
void Stats::MonitorGlobal() {
@@ -238,6 +419,7 @@ void Stats::IncPrepare() {
prometheus_->Inc(PREPARE, 1);
}
num_prepare_++;
+
transaction_summary_.prepare_message_count_times_list.push_back(std::chrono::system_clock::now());
}
void Stats::IncCommit() {
@@ -245,9 +427,12 @@ void Stats::IncCommit() {
prometheus_->Inc(COMMIT, 1);
}
num_commit_++;
+
transaction_summary_.commit_message_count_times_list.push_back(std::chrono::system_clock::now());
}
-void Stats::IncPendingExecute() { pending_execute_++; }
+void Stats::IncPendingExecute() {
+ pending_execute_++;
+}
void Stats::IncExecute() { execute_++; }
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index 436f942e..621aaf84 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -23,15 +23,59 @@
#include <future>
#include "platform/statistic/prometheus_handler.h"
+#include "platform/proto/resdb.pb.h"
+#include "proto/kv/kv.pb.h"
+#include "platform/common/network/tcp_socket.h"
+#include <nlohmann/json.hpp>
+#include "boost/asio.hpp"
+#include "boost/beast.hpp"
+#include <crow.h>
+
+namespace asio = boost::asio;
+namespace beast = boost::beast;
+using tcp = asio::ip::tcp;
namespace resdb {
-class Stats {
+struct VisualData{
+ //Set when initializing
+ int replica_id;
+ int primary_id;
+ std::string ip;
+ int port;
+
+ //Set when new txn is received
+ int txn_number;
+ std::vector<std::string> txn_command;
+ std::vector<std::string> txn_key;
+ std::vector<std::string> txn_value;
+
+ //Request state if primary_id==replica_id, pre_prepare state otherwise
+ std::chrono::system_clock::time_point request_pre_prepare_state_time;
+ std::chrono::system_clock::time_point prepare_state_time;
+ std::vector<std::chrono::system_clock::time_point>
prepare_message_count_times_list;
+ std::chrono::system_clock::time_point commit_state_time;
+ std::vector<std::chrono::system_clock::time_point>
commit_message_count_times_list;
+ std::chrono::system_clock::time_point execution_time;
+};
+
+class Stats{
public:
static Stats* GetGlobalStats(int sleep_seconds = 5);
void Stop();
+ void RetrieveProgress();
+ void SetProps(int replica_id, std::string ip, int port, bool resview_flag,
bool faulty_flag);
+ void SetPrimaryId(int primary_id);
+ void RecordStateTime(std::string state);
+ void GetTransactionDetails(BatchUserRequest batch_request);
+ void SendSummary();
+ void CrowRoute();
+ bool IsFaulty();
+ void ChangePrimary(int primary_id);
+
+
void AddLatency(uint64_t run_time);
void Monitor();
@@ -92,6 +136,16 @@ class Stats {
std::atomic<uint64_t> total_request_, total_geo_request_, geo_request_;
int monitor_sleep_time_ = 5; // default 5s.
+ std::thread crow_thread_;
+ bool enable_resview;
+ bool enable_faulty_switch_;
+ VisualData transaction_summary_;
+ std::atomic<bool> make_faulty_;
+ std::atomic<uint64_t> prev_num_prepare_;
+ std::atomic<uint64_t> prev_num_commit_;
+ nlohmann::json summary_json_;
+ nlohmann::json consensus_history_;
+
std::unique_ptr<PrometheusHandler> prometheus_;
};
diff --git a/script.js b/script.js
new file mode 100644
index 00000000..b9b183ee
--- /dev/null
+++ b/script.js
@@ -0,0 +1,103 @@
+import ResilientSDK from 'https://cdn.resilientdb.com/resilient-sdk.js';
+
+const sdk = new ResilientSDK();
+
+// Add a message listener
+sdk.addMessageListener((event) => {
+ const message = event.data.data;
+ alert(JSON.stringify(message)); // Set the message
+});
+
+var commit = document.querySelector('[data-nexres="commit-page-script"]');
+var fetcher = document.querySelector('[data-nexres="get-page-script"]');
+var update = document.querySelector('[data-nexres="update-page-script"]');
+var updateMulti =
document.querySelector('[data-nexres="update-multi-page-script"]');
+var filter = document.querySelector('[data-nexres="filter-page-script"]');
+var account = document.querySelector('[data-nexres="account-page-script"]');
+var data = document.querySelector('[data-nexres="get-data"]');
+var amount = document.querySelector('[data-nexres="get-amount"]');
+var address = document.querySelector('[data-nexres="get-address"]');
+var id = document.querySelector('[data-nexres="get-id"]');
+var updateId = document.querySelector('[data-nexres="update-id"]');
+var updateData = document.querySelector('[data-nexres="update-data"]');
+var updateAmount = document.querySelector('[data-nexres="update-amount"]');
+var updateAddress = document.querySelector('[data-nexres="update-address"]');
+var ownerPublicKey =
document.querySelector('[data-nexres="filter-owner-key"]');
+var recipientPublicKey =
document.querySelector('[data-nexres="filter-recipient-key"]');
+var updateMultiId1 =
document.querySelector('[data-nexres="update-multi-id1"]');
+var updateMultiData1 =
document.querySelector('[data-nexres="update-multi-data1"]');
+var updateMultiAmount1 =
document.querySelector('[data-nexres="update-multi-amount1"]');
+var updateMultiAddress1 =
document.querySelector('[data-nexres="update-multi-address1"]');
+var updateMultiId2 =
document.querySelector('[data-nexres="update-multi-id2"]');
+var updateMultiData2 =
document.querySelector('[data-nexres="update-multi-data2"]');
+var updateMultiAmount2 =
document.querySelector('[data-nexres="update-multi-amount2"]');
+var updateMultiAddress2 =
document.querySelector('[data-nexres="update-multi-address2"]');
+
+commit.addEventListener("click", commitContentScript);
+fetcher.addEventListener("click", fetchContentScript);
+update.addEventListener("click", updateContentScript);
+updateMulti.addEventListener("click", updateMultiContentScript);
+filter.addEventListener("click", filterContentScript);
+account.addEventListener("click", accountContentScript);
+
+function commitContentScript() {
+ sdk.sendMessage({
+ direction: "commit-page-script",
+ message: data.value,
+ amount: amount.value,
+ address: address.value
+ });
+}
+
+function fetchContentScript() {
+ sdk.sendMessage({
+ direction: "get-page-script",
+ id: id.value
+ });
+}
+
+function updateContentScript() {
+ sdk.sendMessage({
+ direction: "update-page-script",
+ id: updateId.value,
+ message: updateData.value,
+ amount: updateAmount.value,
+ address: updateAddress.value
+ });
+}
+
+function updateMultiContentScript() {
+ const valuesList = [
+ {
+ id: updateMultiId1.value,
+ message: updateMultiData1.value,
+ amount: updateMultiAmount1.value,
+ address: updateMultiAddress1.value,
+ },
+ {
+ id: updateMultiId2.value,
+ message: updateMultiData2.value,
+ amount: updateMultiAmount2.value,
+ address: updateMultiAddress2.value,
+ }
+ ];
+
+ sdk.sendMessage({
+ direction: "update-multi-page-script",
+ values: valuesList
+ });
+}
+
+function filterContentScript() {
+ sdk.sendMessage({
+ direction: "filter-page-script",
+ owner: ownerPublicKey.value,
+ recipient: recipientPublicKey.value,
+ });
+}
+
+function accountContentScript() {
+ sdk.sendMessage({
+ direction: "account-page-script",
+ });
+}
diff --git a/scripts/deploy/config/template.config
b/scripts/deploy/config/template.config
index a68d5dc0..7962f6b0 100644
--- a/scripts/deploy/config/template.config
+++ b/scripts/deploy/config/template.config
@@ -1,6 +1,6 @@
{
"clientBatchNum": 100,
- "enable_viewchange": false,
+ "enable_viewchange": true,
"recovery_enabled":true,
"max_client_complaint_num":10
}
diff --git a/service/tools/config/server/server.config
b/service/tools/config/server/server.config
index 20a8b771..1dc5b0de 100644
--- a/service/tools/config/server/server.config
+++ b/service/tools/config/server/server.config
@@ -32,7 +32,10 @@
write_buffer_size_mb:128,
write_batch_size:1,
},
- require_txn_validation:false,
+ require_txn_validation:true,
+ enable_viewchange:true,
+ enable_resview:true,
+ enable_faulty_switch:false
}
diff --git a/third_party/BUILD b/third_party/BUILD
index 24eb61ae..b43e9230 100644
--- a/third_party/BUILD
+++ b/third_party/BUILD
@@ -45,3 +45,10 @@ cc_library(
"@eEVM",
],
)
+
+cc_library(
+ name = "crow",
+ deps = [
+ "@com_crowcpp_crow//:crow",
+ ],
+)
\ No newline at end of file