This is an automated email from the ASF dual-hosted git repository. saipranav pushed a commit to branch ResViewCrow in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit d69b74c512d77abcde03815dd815586eec0dba81 Author: Saipranav Kotamreddy <[email protected]> AuthorDate: Thu Mar 7 17:54:07 2024 -0800 "Fixing file inclusion issue" --- .bazelrc | 2 +- WORKSPACE | 21 +++++ monitoring/prometheus/prometheus.yml | 10 +-- platform/consensus/ordering/pbft/commitment.cpp | 7 +- .../consensus/ordering/pbft/response_manager.cpp | 94 +++++++++++++++++++++- .../consensus/ordering/pbft/response_manager.h | 27 +++++++ platform/proto/replica_info.proto | 2 + platform/statistic/BUILD | 3 +- platform/statistic/stats.cpp | 83 ++++++++++++------- platform/statistic/stats.h | 8 +- service/tools/config/server/server.config | 2 + third_party/BUILD | 7 ++ 12 files changed, 224 insertions(+), 42 deletions(-) diff --git a/.bazelrc b/.bazelrc index 0566a7c9..99c69f64 100644 --- a/.bazelrc +++ b/.bazelrc @@ -1,4 +1,4 @@ build --cxxopt='-std=c++17' --copt=-O3 --jobs=40 -#build --action_env=PYTHON_BIN_PATH="/usr/bin/python3.10" +build --action_env=PYTHON_BIN_PATH="/usr/bin/python3.10" #build --action_env=PYTHON_LIB_PATH="/usr/include/python3.10" diff --git a/WORKSPACE b/WORKSPACE index 317fefbf..177d8f26 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -224,3 +224,24 @@ http_archive( strip_prefix = "json-3.9.1", urls = ["https://github.com/nlohmann/json/archive/v3.9.1.tar.gz"], ) + +http_archive( + name = "com_crowcpp_crow", + build_file = "//third_party:crow.BUILD", + sha256 = "f95128a8976fae6f2922823e07da59edae277a460776572a556a4b663ff5ee4b", + strip_prefix = "Crow-1.0-5", + url = "https://github.com/CrowCpp/Crow/archive/refs/tags/v1.0+5.zip", +) + +bind( + name = "asio", + actual = "@com_chriskohlhoff_asio//:asio", +) + +http_archive( + name = "com_chriskohlhoff_asio", + build_file = "//third_party:asio.BUILD", + sha256 = "babcdfd2c744905a73d20de211b51367bda0d5200f11d654c4314b909d8c963c", + strip_prefix = "asio-asio-1-26-0", + url = "https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-26-0.zip", +) \ No newline at end of file diff --git a/monitoring/prometheus/prometheus.yml b/monitoring/prometheus/prometheus.yml index c0046d38..8463c696 100644 --- a/monitoring/prometheus/prometheus.yml +++ b/monitoring/prometheus/prometheus.yml @@ -25,19 +25,19 @@ scrape_configs: - targets: ["localhost:9090"] - job_name: "node_exporter1" static_configs: - - targets: ["172.31.52.247:9100"] + - targets: ["localhost:9100"] - job_name: "node_exporter2" static_configs: - - targets: ["172.31.54.193:9100"] + - targets: ["localhost:9100"] - job_name: "node_exporter3" static_configs: - - targets: ["172.31.55.48:9100"] + - targets: ["localhost:9100"] - job_name: "node_exporter4" static_configs: - - targets: ["172.31.53.140:9100"] + - targets: ["localhost:9100"] - job_name: "node_exporter5" static_configs: - - targets: ["172.31.57.186:9100"] + - targets: ["localhost:9100"] - job_name: "cpp_client1" static_configs: - targets: ["172.31.52.247:8090"] diff --git a/platform/consensus/ordering/pbft/commitment.cpp b/platform/consensus/ordering/pbft/commitment.cpp index 8a791d26..737d53f3 100644 --- a/platform/consensus/ordering/pbft/commitment.cpp +++ b/platform/consensus/ordering/pbft/commitment.cpp @@ -47,7 +47,7 @@ Commitment::Commitment(const ResDBConfig& config, duplicate_manager_ = std::make_unique<DuplicateManager>(config); message_manager_->SetDuplicateManager(duplicate_manager_.get()); - global_stats_->SetProps(config_.GetSelfInfo().id(), config_.GetSelfInfo().ip(), config_.GetSelfInfo().port()); + global_stats_->SetProps(config_.GetSelfInfo().id(), config_.GetSelfInfo().ip(), config_.GetSelfInfo().port(), config_.GetConfigData().enable_resview(), config_.GetConfigData().enable_faulty_switch()); global_stats_->SetPrimaryId(message_manager_->GetCurrentPrimary()); } @@ -69,7 +69,6 @@ void Commitment::SetNeedCommitQC(bool need_qc) { need_qc_ = need_qc; } // TODO if not a primary, redicet to the primary replica. int Commitment::ProcessNewRequest(std::unique_ptr<Context> context, std::unique_ptr<Request> user_request) { - LOG(ERROR)<<"CHECK"; if (context == nullptr || context->signature.signature().empty()) { LOG(ERROR) << "user request doesn't contain signature, reject"; return -2; @@ -162,7 +161,7 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context> context, // TODO check whether the sender is the primary. int Commitment::ProcessProposeMsg(std::unique_ptr<Context> context, std::unique_ptr<Request> request) { - if (context == nullptr || context->signature.signature().empty()) { + if (global_stats_->IsFaulty() || context == nullptr || context->signature.signature().empty()) { LOG(ERROR) << "user request doesn't contain signature, reject"; return -2; } @@ -234,7 +233,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context> context, // If receive 2f+1 prepare message, broadcast a commit message. int Commitment::ProcessPrepareMsg(std::unique_ptr<Context> context, std::unique_ptr<Request> request) { - if (global_stats_->IsFaulty() || context == nullptr || context->signature.signature().empty()) { + if (context == nullptr || context->signature.signature().empty()) { LOG(ERROR) << "user request doesn't contain signature, reject"; return -2; } diff --git a/platform/consensus/ordering/pbft/response_manager.cpp b/platform/consensus/ordering/pbft/response_manager.cpp index e804f363..f86d17f1 100644 --- a/platform/consensus/ordering/pbft/response_manager.cpp +++ b/platform/consensus/ordering/pbft/response_manager.cpp @@ -30,6 +30,24 @@ #include "common/utils/utils.h" namespace resdb { + +ResponseClientTimeout::ResponseClientTimeout(std::string hash_, + uint64_t time_) { + this->hash = hash_; + this->timeout_time = time_; +} + +ResponseClientTimeout::ResponseClientTimeout( + const ResponseClientTimeout& other) { + this->hash = other.hash; + this->timeout_time = other.timeout_time; +} + +bool ResponseClientTimeout::operator<( + const ResponseClientTimeout& other) const { + return timeout_time > other.timeout_time; +} + ResponseManager::ResponseManager(const ResDBConfig& config, ReplicaCommunicator* replica_communicator, SystemInfo* system_info, @@ -53,6 +71,8 @@ ResponseManager::ResponseManager(const ResDBConfig& config, config_.IsTestMode()) { user_req_thread_ = std::thread(&ResponseManager::BatchProposeMsg, this); } + checking_timeout_thread_ = + std::thread(&ResponseManager::MonitoringClientTimeOut, this); global_stats_ = Stats::GetGlobalStats(); send_num_ = 0; } @@ -299,14 +319,82 @@ int ResponseManager::DoBatch( batch_request.SerializeToString(new_request->mutable_data()); new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data())); new_request->set_proxy_id(config_.GetSelfInfo().id()); - for(int i=1; i<=4; i++){ + /*for(int i=1; i<=4; i++){ replica_communicator_->SendMessage(*new_request, i); - } - //replica_communicator_->SendMessage(*new_request, GetPrimary()); + }*/ + replica_communicator_->SendMessage(*new_request, GetPrimary()); send_num_++; LOG(INFO) << "send msg to primary:" << GetPrimary() << " batch size:" << batch_req.size(); + AddWaitingResponseRequest(std::move(new_request)); return 0; } +void ResponseManager::AddWaitingResponseRequest( + std::unique_ptr<Request> request) { + if (!config_.GetConfigData().enable_viewchange()) { + return; + } + pm_lock_.lock(); + uint64_t time = GetCurrentTime() + this->timeout_length_; + client_timeout_min_heap_.push( + ResponseClientTimeout(request->hash(), time)); + waiting_response_batches_.insert( + make_pair(request->hash(), std::move(request))); + pm_lock_.unlock(); + sem_post(&request_sent_signal_); +} + +void ResponseManager::RemoveWaitingResponseRequest(std::string hash) { + if (!config_.GetConfigData().enable_viewchange()) { + return; + } + pm_lock_.lock(); + if (waiting_response_batches_.find(hash) != waiting_response_batches_.end()) { + waiting_response_batches_.erase(waiting_response_batches_.find(hash)); + } + pm_lock_.unlock(); +} + +bool ResponseManager::CheckTimeOut(std::string hash) { + pm_lock_.lock(); + bool value = + (waiting_response_batches_.find(hash) != waiting_response_batches_.end()); + pm_lock_.unlock(); + return value; +} + +std::unique_ptr<Request> ResponseManager::GetTimeOutRequest( + std::string hash) { + pm_lock_.lock(); + auto value = std::move(waiting_response_batches_.find(hash)->second); + pm_lock_.unlock(); + return value; +} + +void ResponseManager::MonitoringClientTimeOut() { + while (!stop_) { + sem_wait(&request_sent_signal_); + pm_lock_.lock(); + if (client_timeout_min_heap_.empty()) { + pm_lock_.unlock(); + continue; + } + auto client_timeout = client_timeout_min_heap_.top(); + client_timeout_min_heap_.pop(); + pm_lock_.unlock(); + + if (client_timeout.timeout_time > GetCurrentTime()) { + usleep(client_timeout.timeout_time - GetCurrentTime()); + } + + if (CheckTimeOut(client_timeout.hash)) { + auto request = GetTimeOutRequest(client_timeout.hash); + if (request) { + LOG(ERROR) << "Client Request Timeout " << client_timeout.hash; + replica_communicator_->BroadCast(*request); + } + } + } +} } // namespace resdb diff --git a/platform/consensus/ordering/pbft/response_manager.h b/platform/consensus/ordering/pbft/response_manager.h index 8b08c7fa..ec1f5068 100644 --- a/platform/consensus/ordering/pbft/response_manager.h +++ b/platform/consensus/ordering/pbft/response_manager.h @@ -24,6 +24,7 @@ */ #pragma once +#include <semaphore.h> #include "platform/config/resdb_config.h" #include "platform/consensus/ordering/pbft/lock_free_collector_pool.h" @@ -33,6 +34,16 @@ namespace resdb { +class ResponseClientTimeout { + public: + ResponseClientTimeout(std::string hash_, uint64_t time_); + ResponseClientTimeout(const ResponseClientTimeout& other); + bool operator<(const ResponseClientTimeout& other) const; + + std::string hash; + uint64_t timeout_time; +}; + class ResponseManager { public: ResponseManager(const ResDBConfig& config, @@ -71,6 +82,13 @@ class ResponseManager { int BatchProposeMsg(); int GetPrimary(); + void AddWaitingResponseRequest(std::unique_ptr<Request> request); + void RemoveWaitingResponseRequest(std::string hash); + bool CheckTimeOut(std::string hash); + void ResponseTimer(std::string hash); + void MonitoringClientTimeOut(); + std::unique_ptr<Request> GetTimeOutRequest(std::string hash); + private: ResDBConfig config_; ReplicaCommunicator* replica_communicator_; @@ -83,6 +101,15 @@ class ResponseManager { SystemInfo* system_info_; std::atomic<int> send_num_; SignatureVerifier* verifier_; + + std::thread checking_timeout_thread_; + std::map<std::string, std::unique_ptr<Request>> waiting_response_batches_; + std::priority_queue<ResponseClientTimeout> client_timeout_min_heap_; + std::mutex pm_lock_; + uint64_t timeout_length_; + sem_t request_sent_signal_; + uint64_t highest_seq_; + uint64_t highest_seq_primary_id_; }; } // namespace resdb diff --git a/platform/proto/replica_info.proto b/platform/proto/replica_info.proto index ebfd5373..d0dc23ff 100644 --- a/platform/proto/replica_info.proto +++ b/platform/proto/replica_info.proto @@ -37,6 +37,8 @@ message ResConfigData{ optional string recovery_path = 18; optional int32 recovery_buffer_size = 19; optional int32 recovery_ckpt_time_s = 20; + optional bool enable_resview = 23; + optional bool enable_faulty_switch = 24; // for hotstuff. optional bool use_chain_hotstuff = 9; diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD index 88267d33..e20fd916 100644 --- a/platform/statistic/BUILD +++ b/platform/statistic/BUILD @@ -17,7 +17,8 @@ cc_library( "//third_party:prometheus", "//platform/common/network:tcp_socket", "//common:asio", - "//common:beast" + "//common:beast", + "//third_party:crow", ], ) diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp index 57ae0c7f..94bc30a0 100644 --- a/platform/statistic/stats.cpp +++ b/platform/statistic/stats.cpp @@ -76,7 +76,7 @@ Stats::Stats(int sleep_time) { transaction_summary_.port=-1; //Setup websocket here - send_summary_.store(false); + //send_summary_.store(false); make_faulty_.store(false); transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::time_point::min(); transaction_summary_.prepare_state_time=std::chrono::system_clock::time_point::min(); @@ -93,10 +93,11 @@ Stats::~Stats() { if (global_thread_.joinable()) { global_thread_.join(); } - if(summary_thread_.joinable()){ + if(enable_resview && summary_thread_.joinable()){ summary_thread_.join(); + crow_thread_.join(); } - if(faulty_thread_.joinable()){ + if(enable_faulty_switch && faulty_thread_.joinable()){ faulty_thread_.join(); } } @@ -104,6 +105,7 @@ Stats::~Stats() { void Stats::SocketManagementWrite(){ while(!stop_){ try{ + int count=0; LOG(ERROR)<<"Port:" <<transaction_summary_.port; asio::io_context io_context; tcp::acceptor acceptor(io_context, {{}, (boost::asio::ip::port_type)(11000+transaction_summary_.port)}); @@ -122,7 +124,6 @@ void Stats::SocketManagementWrite(){ send_summary_.store(false); } } - sleep(1); } catch( const std::exception& e){ LOG(ERROR)<<"Exception: " <<e.what(); @@ -142,7 +143,7 @@ void Stats::SocketManagementRead(){ ws.accept(); beast::flat_buffer data; ws.read(data); - make_faulty_.store(true); + make_faulty_.store(!make_faulty_.load()); LOG(ERROR)<<"Received Message on port "<<transaction_summary_.port; ws.close("Message Received"); } @@ -152,21 +153,51 @@ void Stats::SocketManagementRead(){ } } +void Stats::CrowRoute(){ + crow::SimpleApp app; + while(!stop_){ + try{ + CROW_ROUTE(app, "/consensus_data").methods("GET"_method)([this](const crow::request& req, crow::response& res){ + LOG(ERROR)<<"API"; + res.set_header("Access-Control-Allow-Origin", "*"); // Allow requests from any origin + res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); // Specify allowed methods + res.set_header("Access-Control-Allow-Headers", "Content-Type, Authorization"); // Specify allowed headers + + // Send your response + res.body=consensus_history_.dump(); + res.end(); + //return consensus_history_.dump(); + }); + app.port(8500+transaction_summary_.port).multithreaded().run(); + sleep(1); + } + catch( const std::exception& e){ + } + } +} + bool Stats::IsFaulty(){ return make_faulty_.load(); } void Stats::ChangePrimary(int primary_id){ - transaction_summary_.primary_id; + transaction_summary_.primary_id=primary_id; make_faulty_.store(false); } -void Stats::SetProps(int replica_id, std::string ip, int port){ +void Stats::SetProps(int replica_id, std::string ip, int port, bool resview_flag, bool faulty_flag){ transaction_summary_.replica_id=replica_id; transaction_summary_.ip=ip; transaction_summary_.port=port; - summary_thread_ = std::thread(&Stats::SocketManagementWrite, this); - faulty_thread_ = std::thread(&Stats::SocketManagementRead, this); + enable_resview=resview_flag; + enable_faulty_switch=faulty_flag; + if(resview_flag){ + //summary_thread_ = std::thread(&Stats::SocketManagementWrite, this); + crow_thread_ = std::thread(&Stats::CrowRoute, this); + } + if(faulty_flag){ + faulty_thread_ = std::thread(&Stats::SocketManagementRead, this); + } } void Stats::SetPrimaryId(int primary_id){ @@ -174,6 +205,9 @@ void Stats::SetPrimaryId(int primary_id){ } void Stats::RecordStateTime(std::string state){ + if(!enable_resview){ + return; + } if(state=="request" || state=="pre-prepare"){ transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::now(); } @@ -186,6 +220,10 @@ void Stats::RecordStateTime(std::string state){ } void Stats::GetTransactionDetails(BatchUserRequest batch_request){ + if(!enable_resview){ + return; + } + transaction_summary_.txn_number=batch_request.seq(); transaction_summary_.txn_command.clear(); transaction_summary_.txn_key.clear(); transaction_summary_.txn_value.clear(); @@ -215,23 +253,12 @@ void Stats::GetTransactionDetails(BatchUserRequest batch_request){ } void Stats::SendSummary(){ - transaction_summary_.execution_time=std::chrono::system_clock::now(); - transaction_summary_.txn_number=transaction_summary_.txn_number+1; - /* Can print stat values - LOG(ERROR)<<"Replica ID:"<< transaction_summary_.replica_id; - LOG(ERROR)<<"Primary ID:"<< transaction_summary_.primary_id; - LOG(ERROR)<<"Propose/pre-prepare time:"<< transaction_summary_.request_pre_prepare_state_time.time_since_epoch().count(); - LOG(ERROR)<<"Prepare time:"<< transaction_summary_.prepare_state_time.time_since_epoch().count(); - LOG(ERROR)<<"Commit time:"<< transaction_summary_.commit_state_time.time_since_epoch().count(); - LOG(ERROR)<<"Execution time:"<< transaction_summary_.execution_time.time_since_epoch().count(); - for(size_t i=0; i<transaction_summary_.prepare_message_count_times_list.size(); i++){ - LOG(ERROR)<<" Prepare Message Count Time: " << transaction_summary_.prepare_message_count_times_list[i].time_since_epoch().count(); - } - for(size_t i=0; i<transaction_summary_.commit_message_count_times_list.size(); i++){ - LOG(ERROR)<<" Commit Message Count Time: " << transaction_summary_.commit_message_count_times_list[i].time_since_epoch().count(); + if(!enable_resview){ + return; } - */ - + transaction_summary_.execution_time=std::chrono::system_clock::now(); + //transaction_summary_.txn_number=transaction_summary_.txn_number+1; + //Convert Transaction Summary to JSON summary_json_["replica_id"]=transaction_summary_.replica_id; summary_json_["ip"]=transaction_summary_.ip; @@ -258,11 +285,13 @@ void Stats::SendSummary(){ summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]); } + consensus_history_[std::to_string(transaction_summary_.txn_number)]=summary_json_; + LOG(ERROR)<<summary_json_.dump(); //Send Summary via Websocket - send_summary_.store(true); + /*send_summary_.store(true); int count =0; while(send_summary_.load() && count<5){ sleep(1); @@ -270,7 +299,7 @@ void Stats::SendSummary(){ } if(send_summary_.load()){ send_summary_.store(false); - } + }*/ //Reset Transaction Summary Parameters transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::time_point::min(); transaction_summary_.prepare_state_time=std::chrono::system_clock::time_point::min(); diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h index 75473466..14e604ab 100644 --- a/platform/statistic/stats.h +++ b/platform/statistic/stats.h @@ -35,6 +35,7 @@ #include <nlohmann/json.hpp> #include "boost/asio.hpp" #include "boost/beast.hpp" +#include <crow.h> namespace asio = boost::asio; namespace beast = boost::beast; @@ -75,13 +76,14 @@ class Stats{ void Stop(); void RetrieveProgress(); - void SetProps(int replica_id, std::string ip, int port); + void SetProps(int replica_id, std::string ip, int port, bool resview_flag, bool faulty_flag); void SetPrimaryId(int primary_id); void RecordStateTime(std::string state); void GetTransactionDetails(BatchUserRequest batch_request); void SendSummary(); void SocketManagementWrite(); void SocketManagementRead(); + void CrowRoute(); bool IsFaulty(); void ChangePrimary(int primary_id); @@ -148,12 +150,16 @@ class Stats{ std::thread summary_thread_; std::thread faulty_thread_; + std::thread crow_thread_; + bool enable_resview; + bool enable_faulty_switch; VisualData transaction_summary_; std::atomic<bool> send_summary_; std::atomic<bool> make_faulty_; std::atomic<uint64_t> prev_num_prepare_; std::atomic<uint64_t> prev_num_commit_; nlohmann::json summary_json_; + nlohmann::json consensus_history_; std::unique_ptr<PrometheusHandler> prometheus_; }; diff --git a/service/tools/config/server/server.config b/service/tools/config/server/server.config index c16a0d44..26f12a02 100644 --- a/service/tools/config/server/server.config +++ b/service/tools/config/server/server.config @@ -36,6 +36,8 @@ }, require_txn_validation:true, enable_viewchange:true, + enable_resview:true, + enable_faulty_switch:true } diff --git a/third_party/BUILD b/third_party/BUILD index 24eb61ae..b43e9230 100644 --- a/third_party/BUILD +++ b/third_party/BUILD @@ -45,3 +45,10 @@ cc_library( "@eEVM", ], ) + +cc_library( + name = "crow", + deps = [ + "@com_crowcpp_crow//:crow", + ], +) \ No newline at end of file
