This is an automated email from the ASF dual-hosted git repository.

junchao pushed a commit to branch cassandra
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git

commit 481de7fc13fa7cc9530dd4e2264274560a9abac3
Author: cjcchen <[email protected]>
AuthorDate: Mon Apr 8 03:39:19 2024 +0000

    add cpu info for rcc
---
 .../consensus/execution/transaction_executor.cpp   |   5 +-
 .../ordering/rcc/algorithm/{rcc.cpp => ;}          | 127 ++++++++++++++++---
 platform/consensus/ordering/rcc/algorithm/rcc.cpp  | 140 ++++++++++++++++++---
 platform/consensus/ordering/rcc/algorithm/rcc.h    |   3 +
 .../consensus/ordering/rcc/framework/consensus.cpp |  13 ++
 .../consensus/ordering/rcc/proto/proposal.proto    |   2 +
 platform/statistic/BUILD                           |  20 +++
 platform/statistic/cpu_info.cpp                    |  87 +++++++++++++
 platform/statistic/cpu_info.h                      |  53 ++++++++
 platform/statistic/cpu_info_test.cpp               |  40 ++++++
 platform/statistic/stats.cpp                       |   3 +
 platform/statistic/stats.h                         |   3 +
 scripts/deploy/a.sh                                |  45 +++++++
 scripts/deploy/config/rcc.config                   |   2 +-
 14 files changed, 511 insertions(+), 32 deletions(-)

diff --git a/platform/consensus/execution/transaction_executor.cpp 
b/platform/consensus/execution/transaction_executor.cpp
index a4afb856..95984b32 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -379,10 +379,9 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> 
request,
       response_v = transaction_manager_->ExecuteBatchData(*data_p);
       FinishExecute(request->seq());
 
-      std::unique_ptr<BatchUserResponse> batch_response =
-          std::make_unique<BatchUserResponse>();
+      response = std::make_unique<BatchUserResponse>();
       for (auto& s : response_v) {
-        batch_response->add_response()->swap(*s);
+        response->add_response()->swap(*s);
       }
     }
   }
diff --git a/platform/consensus/ordering/rcc/algorithm/rcc.cpp 
b/platform/consensus/ordering/rcc/algorithm/;
similarity index 70%
copy from platform/consensus/ordering/rcc/algorithm/rcc.cpp
copy to platform/consensus/ordering/rcc/algorithm/;
index b2d6269d..6ff5ae41 100644
--- a/platform/consensus/ordering/rcc/algorithm/rcc.cpp
+++ b/platform/consensus/ordering/rcc/algorithm/;
@@ -3,6 +3,9 @@
 #include <glog/logging.h>
 #include "common/utils/utils.h"
 
+#define OP0
+#define OP2
+
 namespace resdb {
 namespace rcc {
 
@@ -55,6 +58,7 @@ void RCC::AsyncCommit() {
         //global_stats_->AddCommitWaitingLatency(commit_time - 
commit_time_[seq]);
     }
 
+    int last = next_seq_;
     while (!IsStop()) {
       if (seq_set_[next_seq_].size() == totoal_proposer_num_) {
         int64_t commit_time = GetCurrentTime();
@@ -77,17 +81,21 @@ void RCC::AsyncCommit() {
         seq_set_.erase(seq_set_.find(next_seq_));
         global_stats_->AddCommitTxn(num);
         global_stats_->AddCommitBlock(pro);
+        #ifdef OP1
         {
           std::unique_lock<std::mutex> lk(seq_mutex_);
           commited_seq_ = std::max(static_cast<int64_t>(next_seq_), 
commited_seq_);
           //LOG(ERROR)<<"update seq:"<<commited_seq_;
           vote_cv_.notify_all();
         }
+        #endif
         next_seq_++;
       } else {
         break;
       }
     }
+    int now_seq = next_seq_;
+    //LOG(ERROR)<<" execute seq interval:"<<(now_seq-last)<<" 
now:"<<now_seq<<" last:"<<last<<" next size:"<<seq_set_[next_seq_].size();
   }
 }
 
@@ -95,10 +103,19 @@ void RCC::CommitProposal(std::unique_ptr<Proposal> p) {
 
   {
     p->set_queuing_time(GetCurrentTime());
+    if(p->header().proposer_id() == id_) {
+    #ifdef OP0
+      std::unique_lock<std::mutex> lk(seq_mutex_);
+      commited_seq_ = std::max(static_cast<int64_t>(p->header().seq()), 
commited_seq_);
+      //LOG(ERROR)<<"update seq:"<<commited_seq_;
+      vote_cv_.notify_all();
+    #endif
+      global_stats_->AddCommitLatency(GetCurrentTime()- 
p->header().create_time());
+    }
     //std::unique_lock<std::mutex> lk(seq_mutex_);
     //commited_seq_ = std::max(static_cast<int64_t>(p->header().seq()), 
commited_seq_);
     //vote_cv_.notify_all();
-    global_stats_->AddCommitLatency(GetCurrentTime()- 
p->header().create_time());
+    //global_stats_->AddCommitLatency(GetCurrentTime()- 
p->header().create_time());
     //LOG(ERROR)<<" seq:"<<p->header().seq()<<" 
proposer:"<<p->header().proposer_id()<<" commit:"<<GetCurrentTime()<<" create 
time:"<<p->header().create_time()<<" commit time:"<<(GetCurrentTime() - 
p->header().create_time());
     //global_stats_->AddCommitWaitingLatency(GetCurrentTime() - 
last_commit_time_);
     //last_commit_time_ = GetCurrentTime();
@@ -120,7 +137,7 @@ bool RCC::ReceiveTransaction(std::unique_ptr<Transaction> 
txn) {
 
 void RCC::AsyncSend() {
 
-  int limit = 2;
+  int limit = 128;
   bool start = false;
   int64_t last_time = 0;
 
@@ -178,9 +195,16 @@ void RCC::AsyncSend() {
   return;
 }
 
+bool RCC::ReceiveProposalList(const Proposal& proposal){
+  for(const Proposal& p : proposal.proposals()){
+    ReceiveProposal(p);
+  }
+  return true;
+}
+
 bool RCC::ReceiveProposal(const Proposal& proposal) {
   int proposer = proposal.header().proposer_id();
-  int seq = proposal.header().seq();
+  int64_t seq = proposal.header().seq();
   int sender = proposal.header().sender();
   int status = proposal.header().status();
 
@@ -204,7 +228,7 @@ bool RCC::ReceiveProposal(const Proposal& proposal) {
 
     if (status != ProposalType::NewMsg) {
       if (is_commit_[proposer].find(seq) != is_commit_[proposer].end()) {
-        return 0;
+        return false;
       }
     }
 
@@ -256,17 +280,19 @@ bool RCC::ReceiveProposal(const Proposal& proposal) {
         });
   }
 
+  //LOG(ERROR)<<" changed:"<<changed;
   if (changed) {
-    Proposal new_proposal;
-    *new_proposal.mutable_header() = proposal.header();
-    new_proposal.mutable_header()->set_sender(id_);
-    UpgradeState(&new_proposal);
+    std::unique_ptr<Proposal> new_proposal = std::make_unique<Proposal>();
+    *new_proposal->mutable_header() = proposal.header();
+    new_proposal->mutable_header()->set_sender(id_);
+    UpgradeState(new_proposal.get());
     int64_t commit_time = GetCurrentTime();
-    if (new_proposal.header().status() == ProposalType::Ready_execute) {
-      //global_stats_->AddCommitLatency(commit_time - 
proposal.header().create_time());
+
+
+    
+
+    if (new_proposal->header().status() == ProposalType::Ready_execute) {
       std::unique_lock<std::mutex> lk(mutex_[0]);
-      //std::unique_lock<std::mutex> lk(mutex_[proposer]);
-      // LOG(ERROR)<<"obtaind ata proposer:"<<proposer<<" seq:"<<seq;
       auto it = collector_[proposer].find(hash);
       assert(it != collector_[proposer].end());
 
@@ -278,16 +304,89 @@ bool RCC::ReceiveProposal(const Proposal& proposal) {
 
       assert(raw_p != nullptr);
       is_commit_[proposer].insert(seq);
+      int max_s = 0, min_s = -1;
+      for(int i = 1; i <=total_num_;++i){
+        int s = 0;
+        if(is_commit_[i].empty()){
+          //LOG(ERROR)<<" proposer:"<<i<<" comit seq:"<<0;
+        }
+        else {
+          s = *(--is_commit_[i].end());
+          //LOG(ERROR)<<" proposer:"<<i<<" comit 
seq:"<<*(--is_commit_[i].end());
+        }
+        if(min_s == -1 || min_s > s) min_s = s;
+        max_s = max_s > s?max_s:s;
+        //LOG(ERROR)<<" proposer:"<<i<<" comit seq:"<<s;
+      }
+        //LOG(ERROR)<<" proposer gap:"<<max_s- min_s<<" max:"<<max_s<<" 
min:"<<min_s;
+        global_stats_->AddCommitRoundLatency(max_s - min_s);
 
       // LOG(ERROR)<<"commit type:"<<new_proposal.header().status()<<"
       // transaction size:"<<raw_p->transactions_size();
       CommitProposal(std::move(raw_p));
     } else {
+    #ifdef OP2
+    std::unique_lock<std::mutex> lk(mutex_[1]);
+    //LOG(ERROR)<<" seq:"<<seq<<" last 
num:"<<(seq==1?0:send_num_[status][seq-1])<<" status:"<<status;
+        assert(new_proposal != nullptr);
+    if(seq==1 || send_num_[status][seq-1] == total_num_){
       // LOG(ERROR)<<"bc type:"<<new_proposal.header().status();
-      Broadcast(MessageType::ConsensusMsg, new_proposal);
+      //LOG(ERROR)<<" bc seq:"<<seq<<" status:"<<status;
+      assert(new_proposal != nullptr);
+      Proposal proposal_list;
+      
+      Broadcast(MessageType::ConsensusMsg, *new_proposal);
+
+      send_num_[status][seq]++;
+      if(seq > 2){
+        if(send_num_[status].find(seq-2) != send_num_[status].end()){
+          send_num_[status].erase(send_num_[status].find(seq-2));
+        }
+      }
+      int64_t next_seq = seq+1;
+      while(send_num_[status][next_seq-1] == total_num_){
+        if(send_num_[status].find(next_seq-2) != send_num_[status].end()){
+          send_num_[status].erase(send_num_[status].find(next_seq-2));
+        }
+        //LOG(ERROR)<<" find next:"<<next_seq<<" send 
num:"<<send_num_[status][next_seq];
+        auto it = pending_msg_[status].find(next_seq);
+        if(it != pending_msg_[status].end()){
+          for(auto& p: it->second){
+            //LOG(ERROR)<<" bc next:"<<next_seq<<" status:"<<status;
+            assert(p != nullptr);
+            Broadcast(MessageType::ConsensusMsg, *p);
+            send_num_[status][next_seq]++;
+          }
+          pending_msg_[status].erase(it);
+          //LOG(ERROR)<<" bc next :"<<next_seq<<" status:"<<status<<" send 
num:"<<send_num_[status][next_seq];
+          next_seq++;
+        }
+        else {
+          break;
+        }
+      }
+      if(status == 0){
+      #ifdef OP3
+        {
+          std::unique_lock<std::mutex> lk(seq_mutex_);
+          commited_seq_ = std::max(static_cast<int64_t>(next_seq_), 
commited_seq_);
+          //LOG(ERROR)<<"update seq:"<<commited_seq_;
+          vote_cv_.notify_all();
+        }
+        #endif
+
+    }
+
+    }
+      else {
+        pending_msg_[status][seq].push_back(std::move(new_proposal));
+        //LOG(ERROR)<<" push pending, seq:"<<seq<<" size:"<< 
pending_msg_[status][seq].size();
+      }
+      #else
+        Broadcast(MessageType::ConsensusMsg, new_proposal);
+      #endif
     }
   }
-  // LOG(ERROR)<<"receive proposal done";
   return true;
 }
 
diff --git a/platform/consensus/ordering/rcc/algorithm/rcc.cpp 
b/platform/consensus/ordering/rcc/algorithm/rcc.cpp
index b2d6269d..a9eeec7f 100644
--- a/platform/consensus/ordering/rcc/algorithm/rcc.cpp
+++ b/platform/consensus/ordering/rcc/algorithm/rcc.cpp
@@ -3,6 +3,10 @@
 #include <glog/logging.h>
 #include "common/utils/utils.h"
 
+#define OP0
+#define OP2
+//#define OP4
+
 namespace resdb {
 namespace rcc {
 
@@ -13,7 +17,11 @@ RCC::RCC(int id, int f, int total_num, SignatureVerifier* 
verifier)
   local_txn_id_ = 1;
   execute_id_ = 1;
   totoal_proposer_num_ = total_num_;
+  #ifdef OP4
   batch_size_ = 5;
+  #else
+  batch_size_ = 1;
+  #endif
   queue_size_ = 0;
   global_stats_ = Stats::GetGlobalStats();
 
@@ -55,6 +63,7 @@ void RCC::AsyncCommit() {
         //global_stats_->AddCommitWaitingLatency(commit_time - 
commit_time_[seq]);
     }
 
+    int last = next_seq_;
     while (!IsStop()) {
       if (seq_set_[next_seq_].size() == totoal_proposer_num_) {
         int64_t commit_time = GetCurrentTime();
@@ -77,17 +86,21 @@ void RCC::AsyncCommit() {
         seq_set_.erase(seq_set_.find(next_seq_));
         global_stats_->AddCommitTxn(num);
         global_stats_->AddCommitBlock(pro);
+        #ifdef OP1
         {
           std::unique_lock<std::mutex> lk(seq_mutex_);
           commited_seq_ = std::max(static_cast<int64_t>(next_seq_), 
commited_seq_);
           //LOG(ERROR)<<"update seq:"<<commited_seq_;
           vote_cv_.notify_all();
         }
+        #endif
         next_seq_++;
       } else {
         break;
       }
     }
+    int now_seq = next_seq_;
+    //LOG(ERROR)<<" execute seq interval:"<<(now_seq-last)<<" 
now:"<<now_seq<<" last:"<<last<<" next size:"<<seq_set_[next_seq_].size();
   }
 }
 
@@ -95,10 +108,19 @@ void RCC::CommitProposal(std::unique_ptr<Proposal> p) {
 
   {
     p->set_queuing_time(GetCurrentTime());
+    if(p->header().proposer_id() == id_) {
+    #ifdef OP0
+      std::unique_lock<std::mutex> lk(seq_mutex_);
+      commited_seq_ = std::max(static_cast<int64_t>(p->header().seq()), 
commited_seq_);
+      //LOG(ERROR)<<"update seq:"<<commited_seq_;
+      vote_cv_.notify_all();
+    #endif
+      global_stats_->AddCommitLatency(GetCurrentTime()- 
p->header().create_time());
+    }
     //std::unique_lock<std::mutex> lk(seq_mutex_);
     //commited_seq_ = std::max(static_cast<int64_t>(p->header().seq()), 
commited_seq_);
     //vote_cv_.notify_all();
-    global_stats_->AddCommitLatency(GetCurrentTime()- 
p->header().create_time());
+    //global_stats_->AddCommitLatency(GetCurrentTime()- 
p->header().create_time());
     //LOG(ERROR)<<" seq:"<<p->header().seq()<<" 
proposer:"<<p->header().proposer_id()<<" commit:"<<GetCurrentTime()<<" create 
time:"<<p->header().create_time()<<" commit time:"<<(GetCurrentTime() - 
p->header().create_time());
     //global_stats_->AddCommitWaitingLatency(GetCurrentTime() - 
last_commit_time_);
     //last_commit_time_ = GetCurrentTime();
@@ -120,7 +142,7 @@ bool RCC::ReceiveTransaction(std::unique_ptr<Transaction> 
txn) {
 
 void RCC::AsyncSend() {
 
-  int limit = 2;
+  int limit = 128;
   bool start = false;
   int64_t last_time = 0;
 
@@ -154,8 +176,13 @@ void RCC::AsyncSend() {
     txns.push_back(std::move(txn));
     start = true;
 
+
     for (int i = 1; i < batch_size_; i++) {
+    #ifdef OP4
+      txn = txns_.Pop(10);
+    #else
       txn = txns_.Pop(0);
+      #endif
       if (txn == nullptr) {
         break;
       }
@@ -171,6 +198,7 @@ void RCC::AsyncSend() {
 
     std::unique_ptr<Proposal> proposal =
         proposal_manager_->GenerateProposal(txns);
+    global_stats_->AddBlockSize(txns.size());
 
     //LOG(ERROR) << "send proposal id:" << id_ << " seq:" << 
proposal->header().seq();
     broadcast_call_(MessageType::ConsensusMsg, *proposal);
@@ -178,9 +206,16 @@ void RCC::AsyncSend() {
   return;
 }
 
+bool RCC::ReceiveProposalList(const Proposal& proposal){
+  for(const Proposal& p : proposal.proposals()){
+    ReceiveProposal(p);
+  }
+  return true;
+}
+
 bool RCC::ReceiveProposal(const Proposal& proposal) {
   int proposer = proposal.header().proposer_id();
-  int seq = proposal.header().seq();
+  int64_t seq = proposal.header().seq();
   int sender = proposal.header().sender();
   int status = proposal.header().status();
 
@@ -204,7 +239,7 @@ bool RCC::ReceiveProposal(const Proposal& proposal) {
 
     if (status != ProposalType::NewMsg) {
       if (is_commit_[proposer].find(seq) != is_commit_[proposer].end()) {
-        return 0;
+        return false;
       }
     }
 
@@ -256,17 +291,19 @@ bool RCC::ReceiveProposal(const Proposal& proposal) {
         });
   }
 
+  //LOG(ERROR)<<" changed:"<<changed;
   if (changed) {
-    Proposal new_proposal;
-    *new_proposal.mutable_header() = proposal.header();
-    new_proposal.mutable_header()->set_sender(id_);
-    UpgradeState(&new_proposal);
+    std::unique_ptr<Proposal> new_proposal = std::make_unique<Proposal>();
+    *new_proposal->mutable_header() = proposal.header();
+    new_proposal->mutable_header()->set_sender(id_);
+    UpgradeState(new_proposal.get());
     int64_t commit_time = GetCurrentTime();
-    if (new_proposal.header().status() == ProposalType::Ready_execute) {
-      //global_stats_->AddCommitLatency(commit_time - 
proposal.header().create_time());
+
+
+    
+
+    if (new_proposal->header().status() == ProposalType::Ready_execute) {
       std::unique_lock<std::mutex> lk(mutex_[0]);
-      //std::unique_lock<std::mutex> lk(mutex_[proposer]);
-      // LOG(ERROR)<<"obtaind ata proposer:"<<proposer<<" seq:"<<seq;
       auto it = collector_[proposer].find(hash);
       assert(it != collector_[proposer].end());
 
@@ -278,16 +315,91 @@ bool RCC::ReceiveProposal(const Proposal& proposal) {
 
       assert(raw_p != nullptr);
       is_commit_[proposer].insert(seq);
+      int max_s = 0, min_s = -1;
+      for(int i = 1; i <=total_num_;++i){
+        int s = 0;
+        if(is_commit_[i].empty()){
+          //LOG(ERROR)<<" proposer:"<<i<<" comit seq:"<<0;
+        }
+        else {
+          s = *(--is_commit_[i].end());
+          //LOG(ERROR)<<" proposer:"<<i<<" comit 
seq:"<<*(--is_commit_[i].end());
+        }
+        if(min_s == -1 || min_s > s) min_s = s;
+        max_s = max_s > s?max_s:s;
+        //LOG(ERROR)<<" proposer:"<<i<<" comit seq:"<<s;
+      }
+        //LOG(ERROR)<<" proposer gap:"<<max_s- min_s<<" max:"<<max_s<<" 
min:"<<min_s;
+        global_stats_->AddCommitRoundLatency(max_s - min_s);
 
       // LOG(ERROR)<<"commit type:"<<new_proposal.header().status()<<"
       // transaction size:"<<raw_p->transactions_size();
       CommitProposal(std::move(raw_p));
     } else {
+    #ifdef OP2
+    std::unique_lock<std::mutex> lk(mutex_[1]);
+    //LOG(ERROR)<<" seq:"<<seq<<" last 
num:"<<(seq==1?0:send_num_[status][seq-1])<<" status:"<<status;
+        assert(new_proposal != nullptr);
+    if(seq==1 || send_num_[status][seq-1] == total_num_){
       // LOG(ERROR)<<"bc type:"<<new_proposal.header().status();
-      Broadcast(MessageType::ConsensusMsg, new_proposal);
+      //LOG(ERROR)<<" bc seq:"<<seq<<" status:"<<status;
+      assert(new_proposal != nullptr);
+      Proposal proposal_list;
+      
+      //Broadcast(MessageType::ConsensusMsg, *new_proposal);
+      *proposal_list.add_proposals() = *new_proposal;
+
+      send_num_[status][seq]++;
+      if(seq > 2){
+        if(send_num_[status].find(seq-2) != send_num_[status].end()){
+          send_num_[status].erase(send_num_[status].find(seq-2));
+        }
+      }
+      int64_t next_seq = seq+1;
+      while(send_num_[status][next_seq-1] == total_num_){
+        if(send_num_[status].find(next_seq-2) != send_num_[status].end()){
+          send_num_[status].erase(send_num_[status].find(next_seq-2));
+        }
+        //LOG(ERROR)<<" find next:"<<next_seq<<" send 
num:"<<send_num_[status][next_seq];
+        auto it = pending_msg_[status].find(next_seq);
+        if(it != pending_msg_[status].end()){
+          for(auto& p: it->second){
+            //LOG(ERROR)<<" bc next:"<<next_seq<<" status:"<<status;
+            assert(p != nullptr);
+            //Broadcast(MessageType::ConsensusMsg, *p);
+            *proposal_list.add_proposals() = *p;
+            send_num_[status][next_seq]++;
+          }
+          pending_msg_[status].erase(it);
+          //LOG(ERROR)<<" bc next :"<<next_seq<<" status:"<<status<<" send 
num:"<<send_num_[status][next_seq];
+          next_seq++;
+        }
+        else {
+          break;
+        }
+      }
+          Broadcast(MessageType::ConsensusMsgExt, proposal_list);
+      if(status == 0){
+      #ifdef OP3
+        {
+          std::unique_lock<std::mutex> lk(seq_mutex_);
+          commited_seq_ = std::max(static_cast<int64_t>(next_seq_), 
commited_seq_);
+          //LOG(ERROR)<<"update seq:"<<commited_seq_;
+          vote_cv_.notify_all();
+        }
+        #endif
+
+      }
+    }
+      else {
+        pending_msg_[status][seq].push_back(std::move(new_proposal));
+        //LOG(ERROR)<<" push pending, seq:"<<seq<<" size:"<< 
pending_msg_[status][seq].size();
+      }
+      #else
+        Broadcast(MessageType::ConsensusMsg, *new_proposal);
+      #endif
     }
   }
-  // LOG(ERROR)<<"receive proposal done";
   return true;
 }
 
diff --git a/platform/consensus/ordering/rcc/algorithm/rcc.h 
b/platform/consensus/ordering/rcc/algorithm/rcc.h
index 03bd7026..338baada 100644
--- a/platform/consensus/ordering/rcc/algorithm/rcc.h
+++ b/platform/consensus/ordering/rcc/algorithm/rcc.h
@@ -26,6 +26,7 @@ class RCC : public common::ProtocolBase {
   bool ReceiveTransaction(std::unique_ptr<Transaction> txn);
   void SendTxn();
   bool ReceiveProposal(const Proposal& proposal);
+  bool ReceiveProposalList(const Proposal& proposal);
 
  private:
   void UpgradeState(Proposal* proposal);
@@ -57,6 +58,8 @@ class RCC : public common::ProtocolBase {
   Stats* global_stats_;
   int64_t last_commit_time_ = 0;
   std::map<int,int64_t> commit_time_;
+  std::map<int64_t, int> send_num_[10];
+  std::map<int64_t, std::vector<std::unique_ptr<Proposal>> > pending_msg_[10];
 };
 
 }  // namespace rcc
diff --git a/platform/consensus/ordering/rcc/framework/consensus.cpp 
b/platform/consensus/ordering/rcc/framework/consensus.cpp
index e1eeb0fa..8ee13d6f 100644
--- a/platform/consensus/ordering/rcc/framework/consensus.cpp
+++ b/platform/consensus/ordering/rcc/framework/consensus.cpp
@@ -79,6 +79,19 @@ int 
Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) {
     }
     return 0;
   }
+  else if (request->user_type() == MessageType::ConsensusMsgExt) {
+    Proposal proposal;
+    if (!proposal.ParseFromString(request->data())) {
+      assert(1 == 0);
+      LOG(ERROR) << "parse proposal fail";
+      return -1;
+    }
+    if (!rcc_->ReceiveProposalList(proposal)) {
+      return -1;
+    }
+    return 0;
+  }
+
   return 0;
 }
 
diff --git a/platform/consensus/ordering/rcc/proto/proposal.proto 
b/platform/consensus/ordering/rcc/proto/proposal.proto
index 49acd85b..d2e4d732 100644
--- a/platform/consensus/ordering/rcc/proto/proposal.proto
+++ b/platform/consensus/ordering/rcc/proto/proposal.proto
@@ -34,10 +34,12 @@ message Proposal {
     Header header = 1;
   repeated Transaction transactions = 3;
   int64 queuing_time = 4;
+  repeated Proposal proposals = 5;
 };
 
 enum MessageType {
   NewProposal = 0;
   ConsensusMsg = 1;
+  ConsensusMsgExt = 2;
 };
 
diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD
index 4d3b5c68..59a2d063 100644
--- a/platform/statistic/BUILD
+++ b/platform/statistic/BUILD
@@ -3,12 +3,32 @@ package(default_visibility = [
     "//service:__subpackages__",
 ])
 
+cc_library(
+    name = "cpu_info",
+    srcs = ["cpu_info.cpp"],
+    hdrs = ["cpu_info.h"],
+    deps = [
+        "//common:comm",
+        "//common/utils",
+    ],
+)
+
+cc_test(
+    name = "cpu_info_test",
+    srcs = ["cpu_info_test.cpp"],
+    deps = [
+      ":cpu_info",
+        "//common/test:test_main",
+    ],
+)
+
 cc_library(
     name = "stats",
     srcs = ["stats.cpp"],
     hdrs = ["stats.h"],
     deps = [
         ":prometheus_handler",
+        ":cpu_info",
         "//common:comm",
         "//common/utils",
         "//third_party:prometheus",
diff --git a/platform/statistic/cpu_info.cpp b/platform/statistic/cpu_info.cpp
new file mode 100644
index 00000000..23c051d2
--- /dev/null
+++ b/platform/statistic/cpu_info.cpp
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "platform/statistic/cpu_info.h"
+
+#include <glog/logging.h>
+#include <iostream>
+#include <thread>
+#include <chrono>
+#include <assert.h>
+#include <sys/sysinfo.h>
+#include <string.h>
+
+namespace resdb {
+
+namespace {
+
+void get_cpuoccupy (cpu_occupy_t *cpust)
+{
+    FILE *fd;
+    char buff[256];
+    cpu_occupy_t *cpu_occupy;
+    cpu_occupy=cpust;
+
+    fd = fopen ("/proc/stat", "r");
+    assert(fd != nullptr);
+
+    fgets (buff, sizeof(buff), fd);
+
+    sscanf (buff, "%s %u %u %u %u %u %u %u", cpu_occupy->name, 
&cpu_occupy->user, &cpu_occupy->nice,&cpu_occupy->system, &cpu_occupy->idle 
,&cpu_occupy->iowait,&cpu_occupy->irq,&cpu_occupy->softirq);
+
+    LOG(ERROR)<<" name:"<< cpu_occupy->name<<" user:"<<cpu_occupy->user<<" 
nice:"<<cpu_occupy->nice<<" sys:"<<cpu_occupy->system<<" 
idle:"<<cpu_occupy->idle <<" iowait:"<<cpu_occupy->iowait<<" 
irq:"<<cpu_occupy->irq<<" soft irq:"<<cpu_occupy->softirq;
+
+    fclose(fd);
+}
+
+double cal_cpuoccupy (cpu_occupy_t *o, cpu_occupy_t *n)
+{
+    double od, nd;
+    double id, sd;
+    double cpu_use ;
+
+    od = (double) (o->user + o->nice + o->system 
+o->idle+o->softirq+o->iowait+o->irq);
+    nd = (double) (n->user + n->nice + n->system 
+n->idle+n->softirq+n->iowait+n->irq);
+
+    id = (double) (n->idle);    
+    sd = (double) (o->idle) ;  
+    if((nd-od) != 0)
+        cpu_use =100.0 - ((id-sd))/(nd-od)*100.00; 
+    else
+        cpu_use = 0;
+    return cpu_use;
+}
+
+
+}
+
+CPUInfo::CPUInfo() {
+  pid_ = getpid();
+  LOG(ERROR)<<" get pid:"<<pid_;
+}
+
+double CPUInfo::GetCPUUsage(){
+  cpu_occupy_t current_cpu_stat;
+  get_cpuoccupy(&current_cpu_stat);
+  double cpu = cal_cpuoccupy (&cpu_stat_, &current_cpu_stat);
+  cpu_stat_ = current_cpu_stat;
+  return cpu;
+}
+
+}
diff --git a/platform/statistic/cpu_info.h b/platform/statistic/cpu_info.h
new file mode 100644
index 00000000..7433435d
--- /dev/null
+++ b/platform/statistic/cpu_info.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <stdint.h>
+
+namespace resdb {
+
+typedef struct cpu_occupy_         
+{
+    char name[20];                
+    unsigned int user;           
+    unsigned int nice;          
+    unsigned int system;       
+    unsigned int idle;        
+    unsigned int iowait;
+    unsigned int irq;
+    unsigned int softirq;
+}cpu_occupy_t;
+
+class CPUInfo {
+    public:
+      CPUInfo();
+      double GetCPUUsage();
+
+    private:
+      int pid_;
+  int64_t last_totalcputime_ = 0;
+  int64_t last_procputime_  = 0;
+
+  cpu_occupy_t cpu_stat_;
+
+};
+
+
+}
diff --git a/platform/statistic/cpu_info_test.cpp 
b/platform/statistic/cpu_info_test.cpp
new file mode 100644
index 00000000..d6b0a305
--- /dev/null
+++ b/platform/statistic/cpu_info_test.cpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "platform/statistic/cpu_info.h"
+
+#include <glog/logging.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+
+namespace resdb {
+namespace {
+
+TEST(CPUInfoTest, Test) {
+  CPUInfo info;
+  LOG(ERROR)<<" first get:"<<info.GetCPUUsage();
+  sleep(5);
+  LOG(ERROR)<<" second get:"<<info.GetCPUUsage();
+}
+
+
+}  // namespace
+
+}  // namespace resdb
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index f84bd37a..b3626a95 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -59,6 +59,7 @@ Stats::Stats(int sleep_time) {
   send_broad_cast_msg_ = 0;
 
   prometheus_ = nullptr;
+  cpu_info_ = std::make_unique<CPUInfo>(); 
 
   global_thread_ =
       std::thread(&Stats::MonitorGlobal, this);  // pass by reference
@@ -389,6 +390,8 @@ void Stats::MonitorGlobal() {
                  << static_cast<double>(commit_ratio_time -
                                         last_commit_ratio_time) /
                         (commit_ratio_num - last_commit_ratio_num) / 1000000.0
+              << " cpu usage:" 
+                 << cpu_info_->GetCPUUsage()
                << " "
                   "\n--------------- monitor ------------";
     if (run_req_num - last_run_req_num > 0) {
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index a07cbc20..c8675c4c 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -23,6 +23,7 @@
 #include <future>
 
 #include "platform/statistic/prometheus_handler.h"
+#include "platform/statistic/cpu_info.h"
 
 namespace resdb {
 
@@ -99,6 +100,8 @@ class Stats {
   std::condition_variable cv_;
   std::mutex mutex_;
 
+  std::unique_ptr<CPUInfo> cpu_info_; 
+
   std::thread global_thread_;
   std::atomic<uint64_t> num_client_req_, num_propose_, num_prepare_,
       num_commit_, pending_execute_, execute_, execute_done_;
diff --git a/scripts/deploy/a.sh b/scripts/deploy/a.sh
new file mode 100755
index 00000000..614e3902
--- /dev/null
+++ b/scripts/deploy/a.sh
@@ -0,0 +1,45 @@
+iplist=(
+172.31.25.224
+172.31.20.228
+172.31.18.224
+172.31.16.230
+172.31.31.229
+172.31.26.233
+172.31.29.231
+172.31.17.244
+172.31.17.236
+172.31.26.249
+172.31.18.247
+172.31.16.151
+172.31.31.151
+172.31.31.155
+172.31.17.155
+172.31.22.98
+172.31.19.156
+172.31.22.106
+172.31.24.98
+172.31.23.110
+172.31.27.109
+172.31.31.111
+172.31.28.110
+172.31.20.117
+172.31.30.116
+172.31.16.120
+172.31.28.119
+172.31.27.136
+172.31.20.133
+172.31.22.136
+172.31.26.136
+172.31.30.139
+)
+
+key=~/.ssh/junchao.pem
+
+for ip in ${iplist[@]}
+do
+  echo $ip
+  #cmd="sudo tc qdisc add dev ens5 root netem delay 50ms"
+  cmd="sudo tc qdisc add dev ens5 root netem delay 50ms 30ms"
+  ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} 
"$cmd" 
+done
+
diff --git a/scripts/deploy/config/rcc.config b/scripts/deploy/config/rcc.config
index eec20b8c..0a234fb4 100644
--- a/scripts/deploy/config/rcc.config
+++ b/scripts/deploy/config/rcc.config
@@ -3,7 +3,7 @@
   "enable_viewchange": false,
   "recovery_enabled": false,
   "max_client_complaint_num":10,
-  "max_process_txn": 32,
+  "max_process_txn": 256,
   "worker_num": 30,
   "input_worker_num": 1,
   "output_worker_num": 10

Reply via email to