This is an automated email from the ASF dual-hosted git repository.

saipranav pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 69ef5830 ResView Branch (#138)
69ef5830 is described below

commit 69ef58307738cc2922ae4b80b686171fcee49d54
Author: Saipranav-Kotamreddy 
<[email protected]>
AuthorDate: Sat Mar 30 23:34:47 2024 -0700

    ResView Branch (#138)
    
    * "Added basic data collection, pointed out spots for web socket"
    
    * "Added data gathering points and prints for testing"
    
    * "Fixed bugs with compiling stats"
    
    * "Changed message collection to be per message rather than queried 
periodically"
    
    * "Added conversion of stats values to JSON, need to fix replica id setting"
    
    * "Added more precision to timestamps in JSON"
    
    * "Added new data to the summary view"
    
    * "Got transaction detail collection working"
    
    * "Changed to GETALLVALUES based on main repo"
    
    * Changed Produced JSON to include txn_number
    
    * "Added websocket to send to front end, slightly inconsistent"
    
    * Add files via upload
    
    * "Added ability to receive messages from front end"
    
    * "Viewchange Update"
    
    * "Removed possible infinite loop in sending summary"
    
    * "Fixing file inclusion issue"
    
    * "Added 2 new apis in same thread to save resources"
    
    * "Adjusted make faulty endpoint"
    
    * "Removed vestigial variables, turned off faulty switch for PR"
    
    * "Fixed failing response manager test"
---
 .bazelrc                                           |   2 +-
 WORKSPACE                                          |  21 +++
 common/BUILD                                       |   7 +
 monitoring/prometheus/prometheus.yml               |  10 +-
 .../consensus/execution/transaction_executor.cpp   |   3 +-
 platform/consensus/ordering/pbft/BUILD             |   1 +
 platform/consensus/ordering/pbft/commitment.cpp    |  17 +-
 .../ordering/pbft/consensus_manager_pbft.cpp       |   1 +
 .../consensus/ordering/pbft/response_manager.cpp   |  96 +++++++++++
 .../consensus/ordering/pbft/response_manager.h     |  27 +++
 .../consensus/ordering/pbft/viewchange_manager.cpp |   6 +
 .../consensus/ordering/pbft/viewchange_manager.h   |   2 +
 platform/proto/replica_info.proto                  |   2 +
 platform/statistic/BUILD                           |   7 +
 platform/statistic/stats.cpp                       | 191 ++++++++++++++++++++-
 platform/statistic/stats.h                         |  56 +++++-
 script.js                                          | 103 +++++++++++
 scripts/deploy/config/template.config              |   2 +-
 service/tools/config/server/server.config          |   5 +-
 third_party/BUILD                                  |   7 +
 20 files changed, 552 insertions(+), 14 deletions(-)

diff --git a/.bazelrc b/.bazelrc
index 0566a7c9..99c69f64 100644
--- a/.bazelrc
+++ b/.bazelrc
@@ -1,4 +1,4 @@
 build --cxxopt='-std=c++17' --copt=-O3 --jobs=40 
-#build --action_env=PYTHON_BIN_PATH="/usr/bin/python3.10"
+build --action_env=PYTHON_BIN_PATH="/usr/bin/python3.10"
 #build --action_env=PYTHON_LIB_PATH="/usr/include/python3.10"
 
diff --git a/WORKSPACE b/WORKSPACE
index f285e83f..6a7883f7 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -224,3 +224,24 @@ http_archive(
     strip_prefix = "json-3.9.1",
     urls = ["https://github.com/nlohmann/json/archive/v3.9.1.tar.gz";],
 )
+
+http_archive(
+    name = "com_crowcpp_crow",
+    build_file = "//third_party:crow.BUILD",
+    sha256 = 
"f95128a8976fae6f2922823e07da59edae277a460776572a556a4b663ff5ee4b",
+    strip_prefix = "Crow-1.0-5",
+    url = "https://github.com/CrowCpp/Crow/archive/refs/tags/v1.0+5.zip";,
+)
+
+bind(
+    name = "asio",
+    actual = "@com_chriskohlhoff_asio//:asio",
+)
+
+http_archive(
+    name = "com_chriskohlhoff_asio",
+    build_file = "//third_party:asio.BUILD",
+    sha256 = 
"babcdfd2c744905a73d20de211b51367bda0d5200f11d654c4314b909d8c963c",
+    strip_prefix = "asio-asio-1-26-0",
+    url = 
"https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-26-0.zip";,
+)
\ No newline at end of file
diff --git a/common/BUILD b/common/BUILD
index 62d32413..84acee11 100644
--- a/common/BUILD
+++ b/common/BUILD
@@ -15,6 +15,13 @@ cc_library(
     ],
 )
 
+cc_library(
+    name= "beast",
+    deps = [
+        "@boost//:beast"
+    ]
+)
+
 cc_library(
     name = "boost_lockfree",
     deps = [
diff --git a/monitoring/prometheus/prometheus.yml 
b/monitoring/prometheus/prometheus.yml
index c0046d38..8463c696 100644
--- a/monitoring/prometheus/prometheus.yml
+++ b/monitoring/prometheus/prometheus.yml
@@ -25,19 +25,19 @@ scrape_configs:
       - targets: ["localhost:9090"]
   - job_name: "node_exporter1"
     static_configs:
-      - targets: ["172.31.52.247:9100"]
+      - targets: ["localhost:9100"]
   - job_name: "node_exporter2"
     static_configs:
-      - targets: ["172.31.54.193:9100"]
+      - targets: ["localhost:9100"]
   - job_name: "node_exporter3"
     static_configs:
-      - targets: ["172.31.55.48:9100"]
+      - targets: ["localhost:9100"]
   - job_name: "node_exporter4"
     static_configs:
-      - targets: ["172.31.53.140:9100"]
+      - targets: ["localhost:9100"]
   - job_name: "node_exporter5"
     static_configs:
-      - targets: ["172.31.57.186:9100"]
+      - targets: ["localhost:9100"]
   - job_name: "cpp_client1"
     static_configs:
       - targets: ["172.31.52.247:8090"]
diff --git a/platform/consensus/execution/transaction_executor.cpp 
b/platform/consensus/execution/transaction_executor.cpp
index 2a8ca4f4..59779b22 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -186,8 +186,8 @@ void 
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
   //          id:"<<request->proxy_id();
   // std::unique_ptr<BatchUserResponse> batch_response =
   //     std::make_unique<BatchUserResponse>();
-
   std::unique_ptr<BatchUserResponse> response;
+  global_stats_->GetTransactionDetails(batch_request);
   if (transaction_manager_) {
     response = transaction_manager_->ExecuteBatch(batch_request);
   }
@@ -217,6 +217,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
   //     std::make_unique<BatchUserResponse>();
 
   std::unique_ptr<BatchUserResponse> response;
+  global_stats_->GetTransactionDetails(batch_request);
   if (transaction_manager_ && need_execute) {
     response = transaction_manager_->ExecuteBatch(batch_request);
   }
diff --git a/platform/consensus/ordering/pbft/BUILD 
b/platform/consensus/ordering/pbft/BUILD
index ce59cb9f..a1bf1755 100644
--- a/platform/consensus/ordering/pbft/BUILD
+++ b/platform/consensus/ordering/pbft/BUILD
@@ -151,6 +151,7 @@ cc_library(
         "//platform/consensus/execution:system_info",
         "//platform/networkstrate:replica_communicator",
         "//platform/proto:viewchange_message_cc_proto",
+        "//platform/statistic:stats",
     ],
 )
 
diff --git a/platform/consensus/ordering/pbft/commitment.cpp 
b/platform/consensus/ordering/pbft/commitment.cpp
index c2e84b14..081c11d2 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -40,6 +40,9 @@ Commitment::Commitment(const ResDBConfig& config,
   global_stats_ = Stats::GetGlobalStats();
   duplicate_manager_ = std::make_unique<DuplicateManager>(config);
   message_manager_->SetDuplicateManager(duplicate_manager_.get());
+
+  global_stats_->SetProps(config_.GetSelfInfo().id(), 
config_.GetSelfInfo().ip(), config_.GetSelfInfo().port(), 
config_.GetConfigData().enable_resview(), 
config_.GetConfigData().enable_faulty_switch());
+  global_stats_->SetPrimaryId(message_manager_->GetCurrentPrimary());
 }
 
 Commitment::~Commitment() {
@@ -78,6 +81,7 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context> 
context,
     //            << message_manager_->GetCurrentPrimary()
     //            << " seq:" << user_request->seq()
     //            << " hash:" << user_request->hash();
+    LOG(ERROR)<<"NOT PRIMARY, Primary is 
"<<message_manager_->GetCurrentPrimary();
     replica_communicator_->SendMessage(*user_request,
                                        message_manager_->GetCurrentPrimary());
     {
@@ -134,6 +138,8 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context> 
context,
     return -2;
   }
 
+  global_stats_->RecordStateTime("request");
+
   user_request->set_type(Request::TYPE_PRE_PREPARE);
   user_request->set_current_view(message_manager_->GetCurrentView());
   user_request->set_seq(*seq);
@@ -149,7 +155,7 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context> 
context,
 // TODO check whether the sender is the primary.
 int Commitment::ProcessProposeMsg(std::unique_ptr<Context> context,
                                   std::unique_ptr<Request> request) {
-  if (context == nullptr || context->signature.signature().empty()) {
+  if (global_stats_->IsFaulty() || context == nullptr || 
context->signature.signature().empty()) {
     LOG(ERROR) << "user request doesn't contain signature, reject";
     return -2;
   }
@@ -181,6 +187,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context> 
context,
       LOG(ERROR) << " check by the user func fail";
       return -2;
     }
+    //global_stats_->GetTransactionDetails(std::move(request));
     BatchUserRequest batch_request;
     batch_request.ParseFromString(request->data());
     batch_request.clear_createtime();
@@ -202,6 +209,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context> 
context,
   }
 
   global_stats_->IncPropose();
+  global_stats_->RecordStateTime("pre-prepare");
   std::unique_ptr<Request> prepare_request = resdb::NewRequest(
       Request::TYPE_PREPARE, *request, config_.GetSelfInfo().id());
   prepare_request->clear_data();
@@ -253,6 +261,7 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context> 
context,
       // LOG(ERROR) << "sign hash"
       //           << commit_request->data_signature().DebugString();
     }
+    global_stats_->RecordStateTime("prepare");
     replica_communicator_->BroadCast(*commit_request);
   }
   return ret == CollectorResultCode::INVALID ? -2 : 0;
@@ -276,6 +285,11 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context> 
context,
   // commit the request.
   CollectorResultCode ret =
       message_manager_->AddConsensusMsg(context->signature, 
std::move(request));
+  if (ret == CollectorResultCode::STATE_CHANGED) {
+    //LOG(ERROR)<<request->data().size();
+    //global_stats_->GetTransactionDetails(request->data());
+    global_stats_->RecordStateTime("commit");
+  }
   return ret == CollectorResultCode::INVALID ? -2 : 0;
 }
 
@@ -287,6 +301,7 @@ int Commitment::PostProcessExecutedMsg() {
     if (batch_resp == nullptr) {
       continue;
     }
+    global_stats_->SendSummary();
     Request request;
     request.set_hash(batch_resp->hash());
     request.set_seq(batch_resp->seq());
diff --git a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp 
b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
index 5131445a..09feae40 100644
--- a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
+++ b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
@@ -208,6 +208,7 @@ int ConsensusManagerPBFT::InternalConsensusCommit(
       int ret = commitment_->ProcessNewRequest(std::move(context),
                                                std::move(request));
       if (ret == -3) {
+        LOG(ERROR)<<"BAD RETURN";
         std::pair<std::unique_ptr<Context>, std::unique_ptr<Request>>
             request_complained;
         {
diff --git a/platform/consensus/ordering/pbft/response_manager.cpp 
b/platform/consensus/ordering/pbft/response_manager.cpp
index 9d212f61..6a7162d8 100644
--- a/platform/consensus/ordering/pbft/response_manager.cpp
+++ b/platform/consensus/ordering/pbft/response_manager.cpp
@@ -24,6 +24,24 @@
 #include "common/utils/utils.h"
 
 namespace resdb {
+
+ResponseClientTimeout::ResponseClientTimeout(std::string hash_,
+                                                   uint64_t time_) {
+  this->hash = hash_;
+  this->timeout_time = time_;
+}
+
+ResponseClientTimeout::ResponseClientTimeout(
+    const ResponseClientTimeout& other) {
+  this->hash = other.hash;
+  this->timeout_time = other.timeout_time;
+}
+
+bool ResponseClientTimeout::operator<(
+    const ResponseClientTimeout& other) const {
+  return timeout_time > other.timeout_time;
+}
+
 ResponseManager::ResponseManager(const ResDBConfig& config,
                                  ReplicaCommunicator* replica_communicator,
                                  SystemInfo* system_info,
@@ -47,6 +65,10 @@ ResponseManager::ResponseManager(const ResDBConfig& config,
       config_.IsTestMode()) {
     user_req_thread_ = std::thread(&ResponseManager::BatchProposeMsg, this);
   }
+  if(config_.GetConfigData().enable_viewchange()){
+    checking_timeout_thread_ =
+      std::thread(&ResponseManager::MonitoringClientTimeOut, this);
+  }
   global_stats_ = Stats::GetGlobalStats();
   send_num_ = 0;
 }
@@ -56,6 +78,9 @@ ResponseManager::~ResponseManager() {
   if (user_req_thread_.joinable()) {
     user_req_thread_.join();
   }
+  if(checking_timeout_thread_.joinable()){
+    checking_timeout_thread_.join();
+  }
 }
 
 // use system info
@@ -293,11 +318,82 @@ int ResponseManager::DoBatch(
   batch_request.SerializeToString(new_request->mutable_data());
   new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data()));
   new_request->set_proxy_id(config_.GetSelfInfo().id());
+  /*for(int i=1; i<=4; i++){
+      replica_communicator_->SendMessage(*new_request, i);
+  }*/
   replica_communicator_->SendMessage(*new_request, GetPrimary());
   send_num_++;
   LOG(INFO) << "send msg to primary:" << GetPrimary()
             << " batch size:" << batch_req.size();
+  AddWaitingResponseRequest(std::move(new_request));
   return 0;
 }
 
+void ResponseManager::AddWaitingResponseRequest(
+    std::unique_ptr<Request> request) {
+  if (!config_.GetConfigData().enable_viewchange()) {
+    return;
+  }
+  pm_lock_.lock();
+  uint64_t time = GetCurrentTime() + this->timeout_length_;
+  client_timeout_min_heap_.push(
+      ResponseClientTimeout(request->hash(), time));
+  waiting_response_batches_.insert(
+      make_pair(request->hash(), std::move(request)));
+  pm_lock_.unlock();
+  sem_post(&request_sent_signal_);
+}
+
+void ResponseManager::RemoveWaitingResponseRequest(std::string hash) {
+  if (!config_.GetConfigData().enable_viewchange()) {
+    return;
+  }
+  pm_lock_.lock();
+  if (waiting_response_batches_.find(hash) != waiting_response_batches_.end()) 
{
+    waiting_response_batches_.erase(waiting_response_batches_.find(hash));
+  }
+  pm_lock_.unlock();
+}
+
+bool ResponseManager::CheckTimeOut(std::string hash) {
+  pm_lock_.lock();
+  bool value =
+      (waiting_response_batches_.find(hash) != 
waiting_response_batches_.end());
+  pm_lock_.unlock();
+  return value;
+}
+
+std::unique_ptr<Request> ResponseManager::GetTimeOutRequest(
+    std::string hash) {
+  pm_lock_.lock();
+  auto value = std::move(waiting_response_batches_.find(hash)->second);
+  pm_lock_.unlock();
+  return value;
+}
+
+void ResponseManager::MonitoringClientTimeOut() {
+  while (!stop_) {
+    sem_wait(&request_sent_signal_);
+    pm_lock_.lock();
+    if (client_timeout_min_heap_.empty()) {
+      pm_lock_.unlock();
+      continue;
+    }
+    auto client_timeout = client_timeout_min_heap_.top();
+    client_timeout_min_heap_.pop();
+    pm_lock_.unlock();
+
+    if (client_timeout.timeout_time > GetCurrentTime()) {
+      usleep(client_timeout.timeout_time - GetCurrentTime());
+    }
+
+    if (CheckTimeOut(client_timeout.hash)) {
+      auto request = GetTimeOutRequest(client_timeout.hash);
+      if (request) {
+        LOG(ERROR) << "Client Request Timeout " << client_timeout.hash;
+        replica_communicator_->BroadCast(*request);
+      }
+    }
+  }
+}
 }  // namespace resdb
diff --git a/platform/consensus/ordering/pbft/response_manager.h 
b/platform/consensus/ordering/pbft/response_manager.h
index fdf89b75..b238e0df 100644
--- a/platform/consensus/ordering/pbft/response_manager.h
+++ b/platform/consensus/ordering/pbft/response_manager.h
@@ -18,6 +18,7 @@
  */
 
 #pragma once
+#include <semaphore.h>
 
 #include "platform/config/resdb_config.h"
 #include "platform/consensus/ordering/pbft/lock_free_collector_pool.h"
@@ -27,6 +28,16 @@
 
 namespace resdb {
 
+class ResponseClientTimeout {
+ public:
+  ResponseClientTimeout(std::string hash_, uint64_t time_);
+  ResponseClientTimeout(const ResponseClientTimeout& other);
+  bool operator<(const ResponseClientTimeout& other) const;
+
+  std::string hash;
+  uint64_t timeout_time;
+};
+
 class ResponseManager {
  public:
   ResponseManager(const ResDBConfig& config,
@@ -65,6 +76,13 @@ class ResponseManager {
   int BatchProposeMsg();
   int GetPrimary();
 
+  void AddWaitingResponseRequest(std::unique_ptr<Request> request);
+  void RemoveWaitingResponseRequest(std::string hash);
+  bool CheckTimeOut(std::string hash);
+  void ResponseTimer(std::string hash);
+  void MonitoringClientTimeOut();
+  std::unique_ptr<Request> GetTimeOutRequest(std::string hash);
+
  private:
   ResDBConfig config_;
   ReplicaCommunicator* replica_communicator_;
@@ -77,6 +95,15 @@ class ResponseManager {
   SystemInfo* system_info_;
   std::atomic<int> send_num_;
   SignatureVerifier* verifier_;
+
+  std::thread checking_timeout_thread_;
+  std::map<std::string, std::unique_ptr<Request>> waiting_response_batches_;
+  std::priority_queue<ResponseClientTimeout> client_timeout_min_heap_;
+  std::mutex pm_lock_;
+  uint64_t timeout_length_;
+  sem_t request_sent_signal_;
+  uint64_t highest_seq_;
+  uint64_t highest_seq_primary_id_;
 };
 
 }  // namespace resdb
diff --git a/platform/consensus/ordering/pbft/viewchange_manager.cpp 
b/platform/consensus/ordering/pbft/viewchange_manager.cpp
index 7033801b..53060c20 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager.cpp
+++ b/platform/consensus/ordering/pbft/viewchange_manager.cpp
@@ -83,6 +83,7 @@ ViewChangeManager::ViewChangeManager(const ResDBConfig& 
config,
       started_(false),
       stop_(false) {
   view_change_counter_ = 1;
+  global_stats_ = Stats::GetGlobalStats();
   if (config_.GetConfigData().enable_viewchange()) {
     collector_pool_ = message_manager->GetCollectorPool();
     sem_init(&viewchange_timer_signal_, 0, 0);
@@ -108,6 +109,7 @@ void ViewChangeManager::MayStart() {
     return;
   }
   started_ = true;
+  LOG(ERROR)<<"MAYSTART";
 
   if (config_.GetPublicKeyCertificateInfo()
           .public_key()
@@ -147,6 +149,7 @@ void ViewChangeManager::MayStart() {
 bool ViewChangeManager::ChangeStatue(ViewChangeStatus status) {
   if (status == ViewChangeStatus::READY_VIEW_CHANGE) {
     if (status_ != ViewChangeStatus::READY_VIEW_CHANGE) {
+      LOG(ERROR)<<"CHANGE STATUS";
       status_ = status;
     }
   } else {
@@ -224,6 +227,8 @@ void 
ViewChangeManager::SetCurrentViewAndNewPrimary(uint64_t view_number) {
   uint32_t id =
       config_.GetReplicaInfos()[(view_number - 1) % replicas.size()].id();
   system_info_->SetPrimary(id);
+  global_stats_->ChangePrimary(id);
+  LOG(ERROR)<<"View Change Happened";
 }
 
 std::vector<std::unique_ptr<Request>> ViewChangeManager::GetPrepareMsg(
@@ -504,6 +509,7 @@ void ViewChangeManager::SendViewChangeMsg() {
 }
 
 void ViewChangeManager::AddComplaintTimer(uint64_t proxy_id, std::string hash) 
{
+  LOG(ERROR)<<"ADDING COMPLAINT";
   std::lock_guard<std::mutex> lk(vc_mutex_);
   if (complaining_clients_.count(proxy_id) == 0) {
     complaining_clients_[proxy_id].set_proxy_id(proxy_id);
diff --git a/platform/consensus/ordering/pbft/viewchange_manager.h 
b/platform/consensus/ordering/pbft/viewchange_manager.h
index a6e085ed..4cc2b7cd 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager.h
+++ b/platform/consensus/ordering/pbft/viewchange_manager.h
@@ -28,6 +28,7 @@
 #include "platform/consensus/ordering/pbft/message_manager.h"
 #include "platform/networkstrate/replica_communicator.h"
 #include "platform/proto/viewchange_message.pb.h"
+#include "platform/statistic/stats.h"
 
 namespace resdb {
 
@@ -128,6 +129,7 @@ class ViewChangeManager {
   ResDBConfig config_;
   CheckPointManager* checkpoint_manager_;
   MessageManager* message_manager_;
+  Stats* global_stats_;
   SystemInfo* system_info_;
   ReplicaCommunicator* replica_communicator_;
   SignatureVerifier* verifier_;
diff --git a/platform/proto/replica_info.proto 
b/platform/proto/replica_info.proto
index 9c81dade..658bf532 100644
--- a/platform/proto/replica_info.proto
+++ b/platform/proto/replica_info.proto
@@ -38,6 +38,8 @@ message ResConfigData{
   optional string recovery_path = 18;
   optional int32 recovery_buffer_size = 19;
   optional int32 recovery_ckpt_time_s = 20;
+  optional bool enable_resview = 23;
+  optional bool enable_faulty_switch = 24;
 
 // for hotstuff.
   optional bool use_chain_hotstuff = 9;
diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD
index 4d3b5c68..e20fd916 100644
--- a/platform/statistic/BUILD
+++ b/platform/statistic/BUILD
@@ -8,10 +8,17 @@ cc_library(
     srcs = ["stats.cpp"],
     hdrs = ["stats.h"],
     deps = [
+        "//proto/kv:kv_cc_proto",
+        "//platform/proto:resdb_cc_proto",
+        "//common:json",
         ":prometheus_handler",
         "//common:comm",
         "//common/utils",
         "//third_party:prometheus",
+        "//platform/common/network:tcp_socket",
+        "//common:asio",
+        "//common:beast",
+        "//third_party:crow",
     ],
 )
 
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index 7ce79310..33de785c 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -20,8 +20,13 @@
 #include "platform/statistic/stats.h"
 
 #include <glog/logging.h>
-
+#include <ctime>
 #include "common/utils/utils.h"
+#include "proto/kv/kv.pb.h"
+
+namespace asio = boost::asio;
+namespace beast = boost::beast;
+using tcp = asio::ip::tcp;
 
 namespace resdb {
 
@@ -59,9 +64,19 @@ Stats::Stats(int sleep_time) {
   send_broad_cast_msg_ = 0;
 
   prometheus_ = nullptr;
-
   global_thread_ =
       std::thread(&Stats::MonitorGlobal, this);  // pass by reference
+
+  transaction_summary_.port=-1;
+
+  //Setup websocket here
+  make_faulty_.store(false);
+  
transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::time_point::min();
+  
transaction_summary_.prepare_state_time=std::chrono::system_clock::time_point::min();
+  
transaction_summary_.commit_state_time=std::chrono::system_clock::time_point::min();
+  
transaction_summary_.execution_time=std::chrono::system_clock::time_point::min();
+  transaction_summary_.txn_number=0;
+  
 }
 
 void Stats::Stop() { stop_ = true; }
@@ -71,6 +86,172 @@ Stats::~Stats() {
   if (global_thread_.joinable()) {
     global_thread_.join();
   }
+  if(enable_resview && crow_thread_.joinable()){
+    crow_thread_.join();
+  }
+}
+
+void Stats::CrowRoute(){
+  crow::SimpleApp app;
+  while(!stop_){
+    try{
+      CROW_ROUTE(app, "/consensus_data").methods("GET"_method)([this](const 
crow::request& req, crow::response& res){
+        LOG(ERROR)<<"API 1";
+        res.set_header("Access-Control-Allow-Origin", "*"); // Allow requests 
from any origin
+        res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); 
// Specify allowed methods
+        res.set_header("Access-Control-Allow-Headers", "Content-Type, 
Authorization"); // Specify allowed headers
+
+        // Send your response
+        res.body=consensus_history_.dump();
+        res.end();
+      });
+      CROW_ROUTE(app, "/get_status").methods("GET"_method)([this](const 
crow::request& req, crow::response& res){
+        LOG(ERROR)<<"API 2";
+        res.set_header("Access-Control-Allow-Origin", "*"); // Allow requests 
from any origin
+        res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); 
// Specify allowed methods
+        res.set_header("Access-Control-Allow-Headers", "Content-Type, 
Authorization"); // Specify allowed headers
+
+        // Send your response
+        res.body= IsFaulty() ? "Faulty" : "Not Faulty";
+        res.end();
+      });
+      CROW_ROUTE(app, "/make_faulty").methods("GET"_method)([this](const 
crow::request& req, crow::response& res){
+        LOG(ERROR)<<"API 3";
+        res.set_header("Access-Control-Allow-Origin", "*"); // Allow requests 
from any origin
+        res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); 
// Specify allowed methods
+        res.set_header("Access-Control-Allow-Headers", "Content-Type, 
Authorization"); // Specify allowed headers
+
+        // Send your response
+        if(enable_faulty_switch_){
+          make_faulty_.store(!make_faulty_.load());
+        }
+        res.body= "Success";
+        res.end();
+      });
+      app.port(8500+transaction_summary_.port).multithreaded().run();
+      sleep(1);
+    }
+    catch( const std::exception& e){
+    }
+  }
+  app.stop();
+}
+
+bool Stats::IsFaulty(){
+  return make_faulty_.load();
+}
+
+void Stats::ChangePrimary(int primary_id){
+  transaction_summary_.primary_id=primary_id;
+  make_faulty_.store(false);
+}
+
+void Stats::SetProps(int replica_id, std::string ip, int port, bool 
resview_flag, bool faulty_flag){
+  transaction_summary_.replica_id=replica_id;
+  transaction_summary_.ip=ip;
+  transaction_summary_.port=port;
+  enable_resview=resview_flag;
+  enable_faulty_switch_=faulty_flag;
+  if(resview_flag){
+    crow_thread_ = std::thread(&Stats::CrowRoute, this);
+  }
+}
+
+void Stats::SetPrimaryId(int primary_id){
+  transaction_summary_.primary_id=primary_id;
+}
+
+void Stats::RecordStateTime(std::string state){
+  if(!enable_resview){
+    return;
+  }
+  if(state=="request" || state=="pre-prepare"){
+    
transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::now();
+  }
+  else if(state=="prepare"){
+    transaction_summary_.prepare_state_time=std::chrono::system_clock::now();
+  }
+  else if(state=="commit"){
+    transaction_summary_.commit_state_time=std::chrono::system_clock::now();
+  }
+}
+
+void Stats::GetTransactionDetails(BatchUserRequest batch_request){
+  if(!enable_resview){
+    return;
+  }
+  transaction_summary_.txn_number=batch_request.seq();
+  transaction_summary_.txn_command.clear();
+  transaction_summary_.txn_key.clear();
+  transaction_summary_.txn_value.clear();
+  for (auto& sub_request : batch_request.user_requests()) {
+    KVRequest kv_request;
+    if(!kv_request.ParseFromString(sub_request.request().data())){
+      break;
+    }
+    if (kv_request.cmd() == KVRequest::SET) {
+      transaction_summary_.txn_command.push_back("SET");
+      transaction_summary_.txn_key.push_back(kv_request.key());
+      transaction_summary_.txn_value.push_back(kv_request.value());
+    } else if (kv_request.cmd() == KVRequest::GET) {
+      transaction_summary_.txn_command.push_back("GET");
+      transaction_summary_.txn_key.push_back(kv_request.key());
+      transaction_summary_.txn_value.push_back("");
+    } else if (kv_request.cmd() == KVRequest::GETALLVALUES) {
+      transaction_summary_.txn_command.push_back("GETALLVALUES");
+      transaction_summary_.txn_key.push_back(kv_request.key());
+      transaction_summary_.txn_value.push_back("");
+    } else if (kv_request.cmd() == KVRequest::GETRANGE) {
+      transaction_summary_.txn_command.push_back("GETRANGE");
+      transaction_summary_.txn_key.push_back(kv_request.key());
+      transaction_summary_.txn_value.push_back(kv_request.value());
+    }
+  }
+}
+
+void Stats::SendSummary(){
+  if(!enable_resview){
+    return;
+  }
+  transaction_summary_.execution_time=std::chrono::system_clock::now();
+
+  //Convert Transaction Summary to JSON
+  summary_json_["replica_id"]=transaction_summary_.replica_id;
+  summary_json_["ip"]=transaction_summary_.ip;
+  summary_json_["port"]=transaction_summary_.port;
+  summary_json_["primary_id"]=transaction_summary_.primary_id;
+  
summary_json_["propose_pre_prepare_time"]=transaction_summary_.request_pre_prepare_state_time.time_since_epoch().count();
+  
summary_json_["prepare_time"]=transaction_summary_.prepare_state_time.time_since_epoch().count();
+  
summary_json_["commit_time"]=transaction_summary_.commit_state_time.time_since_epoch().count();
+  
summary_json_["execution_time"]=transaction_summary_.execution_time.time_since_epoch().count();
+  for(size_t i=0; 
i<transaction_summary_.prepare_message_count_times_list.size(); i++){
+    
summary_json_["prepare_message_timestamps"].push_back(transaction_summary_.prepare_message_count_times_list[i].time_since_epoch().count());
+  }
+  for(size_t i=0; 
i<transaction_summary_.commit_message_count_times_list.size(); i++){
+    
summary_json_["commit_message_timestamps"].push_back(transaction_summary_.commit_message_count_times_list[i].time_since_epoch().count());
+  }
+  summary_json_["txn_number"]=transaction_summary_.txn_number;
+  for(size_t i=0; i<transaction_summary_.txn_command.size(); i++){
+    
summary_json_["txn_commands"].push_back(transaction_summary_.txn_command[i]);
+  }
+  for(size_t i=0; i<transaction_summary_.txn_key.size(); i++){
+    summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]);
+  }
+  for(size_t i=0; i<transaction_summary_.txn_value.size(); i++){
+    summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]);
+  }
+
+  
consensus_history_[std::to_string(transaction_summary_.txn_number)]=summary_json_;
+
+  LOG(ERROR)<<summary_json_.dump();
+
+  //Reset Transaction Summary Parameters
+  
transaction_summary_.request_pre_prepare_state_time=std::chrono::system_clock::time_point::min();
+  
transaction_summary_.prepare_state_time=std::chrono::system_clock::time_point::min();
+  
transaction_summary_.commit_state_time=std::chrono::system_clock::time_point::min();
+  
transaction_summary_.execution_time=std::chrono::system_clock::time_point::min();
+  transaction_summary_.prepare_message_count_times_list.clear();
+  transaction_summary_.commit_message_count_times_list.clear();
 }
 
 void Stats::MonitorGlobal() {
@@ -238,6 +419,7 @@ void Stats::IncPrepare() {
     prometheus_->Inc(PREPARE, 1);
   }
   num_prepare_++;
+  
transaction_summary_.prepare_message_count_times_list.push_back(std::chrono::system_clock::now());
 }
 
 void Stats::IncCommit() {
@@ -245,9 +427,12 @@ void Stats::IncCommit() {
     prometheus_->Inc(COMMIT, 1);
   }
   num_commit_++;
+  
transaction_summary_.commit_message_count_times_list.push_back(std::chrono::system_clock::now());
 }
 
-void Stats::IncPendingExecute() { pending_execute_++; }
+void Stats::IncPendingExecute() {
+  pending_execute_++; 
+}
 
 void Stats::IncExecute() { execute_++; }
 
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index 436f942e..621aaf84 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -23,15 +23,59 @@
 #include <future>
 
 #include "platform/statistic/prometheus_handler.h"
+#include "platform/proto/resdb.pb.h"
+#include "proto/kv/kv.pb.h"
+#include "platform/common/network/tcp_socket.h"
+#include <nlohmann/json.hpp>
+#include "boost/asio.hpp"
+#include "boost/beast.hpp"
+#include <crow.h>
+
+namespace asio = boost::asio;
+namespace beast = boost::beast;
+using tcp = asio::ip::tcp;
 
 namespace resdb {
 
-class Stats {
+struct VisualData{
+    //Set when initializing
+    int replica_id;
+    int primary_id;
+    std::string ip;
+    int port;
+
+    //Set when new txn is received
+    int txn_number;
+    std::vector<std::string> txn_command;
+    std::vector<std::string> txn_key;
+    std::vector<std::string> txn_value;
+
+    //Request state if primary_id==replica_id, pre_prepare state otherwise
+    std::chrono::system_clock::time_point request_pre_prepare_state_time;
+    std::chrono::system_clock::time_point prepare_state_time;
+    std::vector<std::chrono::system_clock::time_point> 
prepare_message_count_times_list;
+    std::chrono::system_clock::time_point commit_state_time;
+    std::vector<std::chrono::system_clock::time_point> 
commit_message_count_times_list;
+    std::chrono::system_clock::time_point execution_time;
+};
+
+class Stats{
  public:
   static Stats* GetGlobalStats(int sleep_seconds = 5);
 
   void Stop();
 
+    void RetrieveProgress();
+    void SetProps(int replica_id, std::string ip, int port, bool resview_flag, 
bool faulty_flag);
+    void SetPrimaryId(int primary_id);
+    void RecordStateTime(std::string state);
+    void GetTransactionDetails(BatchUserRequest batch_request);
+    void SendSummary();
+    void CrowRoute();
+    bool IsFaulty();
+    void ChangePrimary(int primary_id);
+
+
   void AddLatency(uint64_t run_time);
 
   void Monitor();
@@ -92,6 +136,16 @@ class Stats {
   std::atomic<uint64_t> total_request_, total_geo_request_, geo_request_;
   int monitor_sleep_time_ = 5;  // default 5s.
 
+  std::thread crow_thread_;
+  bool enable_resview;
+  bool enable_faulty_switch_;
+  VisualData transaction_summary_;
+  std::atomic<bool> make_faulty_;
+  std::atomic<uint64_t> prev_num_prepare_;
+  std::atomic<uint64_t> prev_num_commit_;
+  nlohmann::json summary_json_;
+  nlohmann::json consensus_history_;
+
   std::unique_ptr<PrometheusHandler> prometheus_;
 };
 
diff --git a/script.js b/script.js
new file mode 100644
index 00000000..b9b183ee
--- /dev/null
+++ b/script.js
@@ -0,0 +1,103 @@
+import ResilientSDK from 'https://cdn.resilientdb.com/resilient-sdk.js';
+
+const sdk = new ResilientSDK();
+
+// Add a message listener
+sdk.addMessageListener((event) => {
+    const message = event.data.data;
+    alert(JSON.stringify(message));  // Set the message
+});
+
+var commit = document.querySelector('[data-nexres="commit-page-script"]');
+var fetcher = document.querySelector('[data-nexres="get-page-script"]');
+var update = document.querySelector('[data-nexres="update-page-script"]');
+var updateMulti = 
document.querySelector('[data-nexres="update-multi-page-script"]');
+var filter = document.querySelector('[data-nexres="filter-page-script"]');
+var account = document.querySelector('[data-nexres="account-page-script"]');
+var data = document.querySelector('[data-nexres="get-data"]');
+var amount = document.querySelector('[data-nexres="get-amount"]');
+var address = document.querySelector('[data-nexres="get-address"]');
+var id = document.querySelector('[data-nexres="get-id"]');
+var updateId = document.querySelector('[data-nexres="update-id"]');
+var updateData = document.querySelector('[data-nexres="update-data"]');
+var updateAmount = document.querySelector('[data-nexres="update-amount"]');
+var updateAddress = document.querySelector('[data-nexres="update-address"]');
+var ownerPublicKey = 
document.querySelector('[data-nexres="filter-owner-key"]');
+var recipientPublicKey = 
document.querySelector('[data-nexres="filter-recipient-key"]');
+var updateMultiId1 = 
document.querySelector('[data-nexres="update-multi-id1"]');
+var updateMultiData1 = 
document.querySelector('[data-nexres="update-multi-data1"]');
+var updateMultiAmount1 = 
document.querySelector('[data-nexres="update-multi-amount1"]');
+var updateMultiAddress1 = 
document.querySelector('[data-nexres="update-multi-address1"]');
+var updateMultiId2 = 
document.querySelector('[data-nexres="update-multi-id2"]');
+var updateMultiData2 = 
document.querySelector('[data-nexres="update-multi-data2"]');
+var updateMultiAmount2 = 
document.querySelector('[data-nexres="update-multi-amount2"]');
+var updateMultiAddress2 = 
document.querySelector('[data-nexres="update-multi-address2"]');
+
+commit.addEventListener("click", commitContentScript);
+fetcher.addEventListener("click", fetchContentScript);
+update.addEventListener("click", updateContentScript);
+updateMulti.addEventListener("click", updateMultiContentScript);
+filter.addEventListener("click", filterContentScript);
+account.addEventListener("click", accountContentScript);
+
+function commitContentScript() {
+    sdk.sendMessage({
+      direction: "commit-page-script",
+      message: data.value,
+      amount: amount.value,
+      address: address.value
+    });
+}
+
+function fetchContentScript() {
+    sdk.sendMessage({
+      direction: "get-page-script",
+      id: id.value
+    });
+}
+
+function updateContentScript() {
+    sdk.sendMessage({
+      direction: "update-page-script",
+      id: updateId.value,
+      message: updateData.value,
+      amount: updateAmount.value,
+      address: updateAddress.value
+    });
+}
+
+function updateMultiContentScript() {
+    const valuesList = [
+      {
+        id: updateMultiId1.value,
+        message: updateMultiData1.value,
+        amount: updateMultiAmount1.value,
+        address: updateMultiAddress1.value,
+      },
+      {
+        id: updateMultiId2.value,
+        message: updateMultiData2.value,
+        amount: updateMultiAmount2.value,
+        address: updateMultiAddress2.value,
+      }
+    ];
+
+    sdk.sendMessage({
+      direction: "update-multi-page-script",
+      values: valuesList
+    });
+}
+
+function filterContentScript() {
+    sdk.sendMessage({
+      direction: "filter-page-script",
+      owner: ownerPublicKey.value,
+      recipient: recipientPublicKey.value,
+    });
+}
+
+function accountContentScript() {
+    sdk.sendMessage({
+      direction: "account-page-script",
+    });
+}
diff --git a/scripts/deploy/config/template.config 
b/scripts/deploy/config/template.config
index a68d5dc0..7962f6b0 100644
--- a/scripts/deploy/config/template.config
+++ b/scripts/deploy/config/template.config
@@ -1,6 +1,6 @@
 {
   "clientBatchNum": 100,
-  "enable_viewchange": false,
+  "enable_viewchange": true,
   "recovery_enabled":true,
   "max_client_complaint_num":10
 }
diff --git a/service/tools/config/server/server.config 
b/service/tools/config/server/server.config
index 20a8b771..1dc5b0de 100644
--- a/service/tools/config/server/server.config
+++ b/service/tools/config/server/server.config
@@ -32,7 +32,10 @@
     write_buffer_size_mb:128,
     write_batch_size:1,
   },
-  require_txn_validation:false,
+  require_txn_validation:true,
+  enable_viewchange:true,
+  enable_resview:true,
+  enable_faulty_switch:false
 }
 
 
diff --git a/third_party/BUILD b/third_party/BUILD
index 24eb61ae..b43e9230 100644
--- a/third_party/BUILD
+++ b/third_party/BUILD
@@ -45,3 +45,10 @@ cc_library(
         "@eEVM",
     ],
 )
+
+cc_library(
+    name = "crow",
+    deps = [
+        "@com_crowcpp_crow//:crow",
+    ],
+)
\ No newline at end of file


Reply via email to