This is an automated email from the ASF dual-hosted git repository. junchao pushed a commit to branch cassandra in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit 481de7fc13fa7cc9530dd4e2264274560a9abac3 Author: cjcchen <[email protected]> AuthorDate: Mon Apr 8 03:39:19 2024 +0000 add cpu info for rcc --- .../consensus/execution/transaction_executor.cpp | 5 +- .../ordering/rcc/algorithm/{rcc.cpp => ;} | 127 ++++++++++++++++--- platform/consensus/ordering/rcc/algorithm/rcc.cpp | 140 ++++++++++++++++++--- platform/consensus/ordering/rcc/algorithm/rcc.h | 3 + .../consensus/ordering/rcc/framework/consensus.cpp | 13 ++ .../consensus/ordering/rcc/proto/proposal.proto | 2 + platform/statistic/BUILD | 20 +++ platform/statistic/cpu_info.cpp | 87 +++++++++++++ platform/statistic/cpu_info.h | 53 ++++++++ platform/statistic/cpu_info_test.cpp | 40 ++++++ platform/statistic/stats.cpp | 3 + platform/statistic/stats.h | 3 + scripts/deploy/a.sh | 45 +++++++ scripts/deploy/config/rcc.config | 2 +- 14 files changed, 511 insertions(+), 32 deletions(-) diff --git a/platform/consensus/execution/transaction_executor.cpp b/platform/consensus/execution/transaction_executor.cpp index a4afb856..95984b32 100644 --- a/platform/consensus/execution/transaction_executor.cpp +++ b/platform/consensus/execution/transaction_executor.cpp @@ -379,10 +379,9 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> request, response_v = transaction_manager_->ExecuteBatchData(*data_p); FinishExecute(request->seq()); - std::unique_ptr<BatchUserResponse> batch_response = - std::make_unique<BatchUserResponse>(); + response = std::make_unique<BatchUserResponse>(); for (auto& s : response_v) { - batch_response->add_response()->swap(*s); + response->add_response()->swap(*s); } } } diff --git a/platform/consensus/ordering/rcc/algorithm/rcc.cpp b/platform/consensus/ordering/rcc/algorithm/; similarity index 70% copy from platform/consensus/ordering/rcc/algorithm/rcc.cpp copy to platform/consensus/ordering/rcc/algorithm/; index b2d6269d..6ff5ae41 100644 --- a/platform/consensus/ordering/rcc/algorithm/rcc.cpp +++ b/platform/consensus/ordering/rcc/algorithm/; @@ -3,6 +3,9 @@ #include <glog/logging.h> #include "common/utils/utils.h" +#define OP0 +#define OP2 + namespace resdb { namespace rcc { @@ -55,6 +58,7 @@ void RCC::AsyncCommit() { //global_stats_->AddCommitWaitingLatency(commit_time - commit_time_[seq]); } + int last = next_seq_; while (!IsStop()) { if (seq_set_[next_seq_].size() == totoal_proposer_num_) { int64_t commit_time = GetCurrentTime(); @@ -77,17 +81,21 @@ void RCC::AsyncCommit() { seq_set_.erase(seq_set_.find(next_seq_)); global_stats_->AddCommitTxn(num); global_stats_->AddCommitBlock(pro); + #ifdef OP1 { std::unique_lock<std::mutex> lk(seq_mutex_); commited_seq_ = std::max(static_cast<int64_t>(next_seq_), commited_seq_); //LOG(ERROR)<<"update seq:"<<commited_seq_; vote_cv_.notify_all(); } + #endif next_seq_++; } else { break; } } + int now_seq = next_seq_; + //LOG(ERROR)<<" execute seq interval:"<<(now_seq-last)<<" now:"<<now_seq<<" last:"<<last<<" next size:"<<seq_set_[next_seq_].size(); } } @@ -95,10 +103,19 @@ void RCC::CommitProposal(std::unique_ptr<Proposal> p) { { p->set_queuing_time(GetCurrentTime()); + if(p->header().proposer_id() == id_) { + #ifdef OP0 + std::unique_lock<std::mutex> lk(seq_mutex_); + commited_seq_ = std::max(static_cast<int64_t>(p->header().seq()), commited_seq_); + //LOG(ERROR)<<"update seq:"<<commited_seq_; + vote_cv_.notify_all(); + #endif + global_stats_->AddCommitLatency(GetCurrentTime()- p->header().create_time()); + } //std::unique_lock<std::mutex> lk(seq_mutex_); //commited_seq_ = std::max(static_cast<int64_t>(p->header().seq()), commited_seq_); //vote_cv_.notify_all(); - global_stats_->AddCommitLatency(GetCurrentTime()- p->header().create_time()); + //global_stats_->AddCommitLatency(GetCurrentTime()- p->header().create_time()); //LOG(ERROR)<<" seq:"<<p->header().seq()<<" proposer:"<<p->header().proposer_id()<<" commit:"<<GetCurrentTime()<<" create time:"<<p->header().create_time()<<" commit time:"<<(GetCurrentTime() - p->header().create_time()); //global_stats_->AddCommitWaitingLatency(GetCurrentTime() - last_commit_time_); //last_commit_time_ = GetCurrentTime(); @@ -120,7 +137,7 @@ bool RCC::ReceiveTransaction(std::unique_ptr<Transaction> txn) { void RCC::AsyncSend() { - int limit = 2; + int limit = 128; bool start = false; int64_t last_time = 0; @@ -178,9 +195,16 @@ void RCC::AsyncSend() { return; } +bool RCC::ReceiveProposalList(const Proposal& proposal){ + for(const Proposal& p : proposal.proposals()){ + ReceiveProposal(p); + } + return true; +} + bool RCC::ReceiveProposal(const Proposal& proposal) { int proposer = proposal.header().proposer_id(); - int seq = proposal.header().seq(); + int64_t seq = proposal.header().seq(); int sender = proposal.header().sender(); int status = proposal.header().status(); @@ -204,7 +228,7 @@ bool RCC::ReceiveProposal(const Proposal& proposal) { if (status != ProposalType::NewMsg) { if (is_commit_[proposer].find(seq) != is_commit_[proposer].end()) { - return 0; + return false; } } @@ -256,17 +280,19 @@ bool RCC::ReceiveProposal(const Proposal& proposal) { }); } + //LOG(ERROR)<<" changed:"<<changed; if (changed) { - Proposal new_proposal; - *new_proposal.mutable_header() = proposal.header(); - new_proposal.mutable_header()->set_sender(id_); - UpgradeState(&new_proposal); + std::unique_ptr<Proposal> new_proposal = std::make_unique<Proposal>(); + *new_proposal->mutable_header() = proposal.header(); + new_proposal->mutable_header()->set_sender(id_); + UpgradeState(new_proposal.get()); int64_t commit_time = GetCurrentTime(); - if (new_proposal.header().status() == ProposalType::Ready_execute) { - //global_stats_->AddCommitLatency(commit_time - proposal.header().create_time()); + + + + + if (new_proposal->header().status() == ProposalType::Ready_execute) { std::unique_lock<std::mutex> lk(mutex_[0]); - //std::unique_lock<std::mutex> lk(mutex_[proposer]); - // LOG(ERROR)<<"obtaind ata proposer:"<<proposer<<" seq:"<<seq; auto it = collector_[proposer].find(hash); assert(it != collector_[proposer].end()); @@ -278,16 +304,89 @@ bool RCC::ReceiveProposal(const Proposal& proposal) { assert(raw_p != nullptr); is_commit_[proposer].insert(seq); + int max_s = 0, min_s = -1; + for(int i = 1; i <=total_num_;++i){ + int s = 0; + if(is_commit_[i].empty()){ + //LOG(ERROR)<<" proposer:"<<i<<" comit seq:"<<0; + } + else { + s = *(--is_commit_[i].end()); + //LOG(ERROR)<<" proposer:"<<i<<" comit seq:"<<*(--is_commit_[i].end()); + } + if(min_s == -1 || min_s > s) min_s = s; + max_s = max_s > s?max_s:s; + //LOG(ERROR)<<" proposer:"<<i<<" comit seq:"<<s; + } + //LOG(ERROR)<<" proposer gap:"<<max_s- min_s<<" max:"<<max_s<<" min:"<<min_s; + global_stats_->AddCommitRoundLatency(max_s - min_s); // LOG(ERROR)<<"commit type:"<<new_proposal.header().status()<<" // transaction size:"<<raw_p->transactions_size(); CommitProposal(std::move(raw_p)); } else { + #ifdef OP2 + std::unique_lock<std::mutex> lk(mutex_[1]); + //LOG(ERROR)<<" seq:"<<seq<<" last num:"<<(seq==1?0:send_num_[status][seq-1])<<" status:"<<status; + assert(new_proposal != nullptr); + if(seq==1 || send_num_[status][seq-1] == total_num_){ // LOG(ERROR)<<"bc type:"<<new_proposal.header().status(); - Broadcast(MessageType::ConsensusMsg, new_proposal); + //LOG(ERROR)<<" bc seq:"<<seq<<" status:"<<status; + assert(new_proposal != nullptr); + Proposal proposal_list; + + Broadcast(MessageType::ConsensusMsg, *new_proposal); + + send_num_[status][seq]++; + if(seq > 2){ + if(send_num_[status].find(seq-2) != send_num_[status].end()){ + send_num_[status].erase(send_num_[status].find(seq-2)); + } + } + int64_t next_seq = seq+1; + while(send_num_[status][next_seq-1] == total_num_){ + if(send_num_[status].find(next_seq-2) != send_num_[status].end()){ + send_num_[status].erase(send_num_[status].find(next_seq-2)); + } + //LOG(ERROR)<<" find next:"<<next_seq<<" send num:"<<send_num_[status][next_seq]; + auto it = pending_msg_[status].find(next_seq); + if(it != pending_msg_[status].end()){ + for(auto& p: it->second){ + //LOG(ERROR)<<" bc next:"<<next_seq<<" status:"<<status; + assert(p != nullptr); + Broadcast(MessageType::ConsensusMsg, *p); + send_num_[status][next_seq]++; + } + pending_msg_[status].erase(it); + //LOG(ERROR)<<" bc next :"<<next_seq<<" status:"<<status<<" send num:"<<send_num_[status][next_seq]; + next_seq++; + } + else { + break; + } + } + if(status == 0){ + #ifdef OP3 + { + std::unique_lock<std::mutex> lk(seq_mutex_); + commited_seq_ = std::max(static_cast<int64_t>(next_seq_), commited_seq_); + //LOG(ERROR)<<"update seq:"<<commited_seq_; + vote_cv_.notify_all(); + } + #endif + + } + + } + else { + pending_msg_[status][seq].push_back(std::move(new_proposal)); + //LOG(ERROR)<<" push pending, seq:"<<seq<<" size:"<< pending_msg_[status][seq].size(); + } + #else + Broadcast(MessageType::ConsensusMsg, new_proposal); + #endif } } - // LOG(ERROR)<<"receive proposal done"; return true; } diff --git a/platform/consensus/ordering/rcc/algorithm/rcc.cpp b/platform/consensus/ordering/rcc/algorithm/rcc.cpp index b2d6269d..a9eeec7f 100644 --- a/platform/consensus/ordering/rcc/algorithm/rcc.cpp +++ b/platform/consensus/ordering/rcc/algorithm/rcc.cpp @@ -3,6 +3,10 @@ #include <glog/logging.h> #include "common/utils/utils.h" +#define OP0 +#define OP2 +//#define OP4 + namespace resdb { namespace rcc { @@ -13,7 +17,11 @@ RCC::RCC(int id, int f, int total_num, SignatureVerifier* verifier) local_txn_id_ = 1; execute_id_ = 1; totoal_proposer_num_ = total_num_; + #ifdef OP4 batch_size_ = 5; + #else + batch_size_ = 1; + #endif queue_size_ = 0; global_stats_ = Stats::GetGlobalStats(); @@ -55,6 +63,7 @@ void RCC::AsyncCommit() { //global_stats_->AddCommitWaitingLatency(commit_time - commit_time_[seq]); } + int last = next_seq_; while (!IsStop()) { if (seq_set_[next_seq_].size() == totoal_proposer_num_) { int64_t commit_time = GetCurrentTime(); @@ -77,17 +86,21 @@ void RCC::AsyncCommit() { seq_set_.erase(seq_set_.find(next_seq_)); global_stats_->AddCommitTxn(num); global_stats_->AddCommitBlock(pro); + #ifdef OP1 { std::unique_lock<std::mutex> lk(seq_mutex_); commited_seq_ = std::max(static_cast<int64_t>(next_seq_), commited_seq_); //LOG(ERROR)<<"update seq:"<<commited_seq_; vote_cv_.notify_all(); } + #endif next_seq_++; } else { break; } } + int now_seq = next_seq_; + //LOG(ERROR)<<" execute seq interval:"<<(now_seq-last)<<" now:"<<now_seq<<" last:"<<last<<" next size:"<<seq_set_[next_seq_].size(); } } @@ -95,10 +108,19 @@ void RCC::CommitProposal(std::unique_ptr<Proposal> p) { { p->set_queuing_time(GetCurrentTime()); + if(p->header().proposer_id() == id_) { + #ifdef OP0 + std::unique_lock<std::mutex> lk(seq_mutex_); + commited_seq_ = std::max(static_cast<int64_t>(p->header().seq()), commited_seq_); + //LOG(ERROR)<<"update seq:"<<commited_seq_; + vote_cv_.notify_all(); + #endif + global_stats_->AddCommitLatency(GetCurrentTime()- p->header().create_time()); + } //std::unique_lock<std::mutex> lk(seq_mutex_); //commited_seq_ = std::max(static_cast<int64_t>(p->header().seq()), commited_seq_); //vote_cv_.notify_all(); - global_stats_->AddCommitLatency(GetCurrentTime()- p->header().create_time()); + //global_stats_->AddCommitLatency(GetCurrentTime()- p->header().create_time()); //LOG(ERROR)<<" seq:"<<p->header().seq()<<" proposer:"<<p->header().proposer_id()<<" commit:"<<GetCurrentTime()<<" create time:"<<p->header().create_time()<<" commit time:"<<(GetCurrentTime() - p->header().create_time()); //global_stats_->AddCommitWaitingLatency(GetCurrentTime() - last_commit_time_); //last_commit_time_ = GetCurrentTime(); @@ -120,7 +142,7 @@ bool RCC::ReceiveTransaction(std::unique_ptr<Transaction> txn) { void RCC::AsyncSend() { - int limit = 2; + int limit = 128; bool start = false; int64_t last_time = 0; @@ -154,8 +176,13 @@ void RCC::AsyncSend() { txns.push_back(std::move(txn)); start = true; + for (int i = 1; i < batch_size_; i++) { + #ifdef OP4 + txn = txns_.Pop(10); + #else txn = txns_.Pop(0); + #endif if (txn == nullptr) { break; } @@ -171,6 +198,7 @@ void RCC::AsyncSend() { std::unique_ptr<Proposal> proposal = proposal_manager_->GenerateProposal(txns); + global_stats_->AddBlockSize(txns.size()); //LOG(ERROR) << "send proposal id:" << id_ << " seq:" << proposal->header().seq(); broadcast_call_(MessageType::ConsensusMsg, *proposal); @@ -178,9 +206,16 @@ void RCC::AsyncSend() { return; } +bool RCC::ReceiveProposalList(const Proposal& proposal){ + for(const Proposal& p : proposal.proposals()){ + ReceiveProposal(p); + } + return true; +} + bool RCC::ReceiveProposal(const Proposal& proposal) { int proposer = proposal.header().proposer_id(); - int seq = proposal.header().seq(); + int64_t seq = proposal.header().seq(); int sender = proposal.header().sender(); int status = proposal.header().status(); @@ -204,7 +239,7 @@ bool RCC::ReceiveProposal(const Proposal& proposal) { if (status != ProposalType::NewMsg) { if (is_commit_[proposer].find(seq) != is_commit_[proposer].end()) { - return 0; + return false; } } @@ -256,17 +291,19 @@ bool RCC::ReceiveProposal(const Proposal& proposal) { }); } + //LOG(ERROR)<<" changed:"<<changed; if (changed) { - Proposal new_proposal; - *new_proposal.mutable_header() = proposal.header(); - new_proposal.mutable_header()->set_sender(id_); - UpgradeState(&new_proposal); + std::unique_ptr<Proposal> new_proposal = std::make_unique<Proposal>(); + *new_proposal->mutable_header() = proposal.header(); + new_proposal->mutable_header()->set_sender(id_); + UpgradeState(new_proposal.get()); int64_t commit_time = GetCurrentTime(); - if (new_proposal.header().status() == ProposalType::Ready_execute) { - //global_stats_->AddCommitLatency(commit_time - proposal.header().create_time()); + + + + + if (new_proposal->header().status() == ProposalType::Ready_execute) { std::unique_lock<std::mutex> lk(mutex_[0]); - //std::unique_lock<std::mutex> lk(mutex_[proposer]); - // LOG(ERROR)<<"obtaind ata proposer:"<<proposer<<" seq:"<<seq; auto it = collector_[proposer].find(hash); assert(it != collector_[proposer].end()); @@ -278,16 +315,91 @@ bool RCC::ReceiveProposal(const Proposal& proposal) { assert(raw_p != nullptr); is_commit_[proposer].insert(seq); + int max_s = 0, min_s = -1; + for(int i = 1; i <=total_num_;++i){ + int s = 0; + if(is_commit_[i].empty()){ + //LOG(ERROR)<<" proposer:"<<i<<" comit seq:"<<0; + } + else { + s = *(--is_commit_[i].end()); + //LOG(ERROR)<<" proposer:"<<i<<" comit seq:"<<*(--is_commit_[i].end()); + } + if(min_s == -1 || min_s > s) min_s = s; + max_s = max_s > s?max_s:s; + //LOG(ERROR)<<" proposer:"<<i<<" comit seq:"<<s; + } + //LOG(ERROR)<<" proposer gap:"<<max_s- min_s<<" max:"<<max_s<<" min:"<<min_s; + global_stats_->AddCommitRoundLatency(max_s - min_s); // LOG(ERROR)<<"commit type:"<<new_proposal.header().status()<<" // transaction size:"<<raw_p->transactions_size(); CommitProposal(std::move(raw_p)); } else { + #ifdef OP2 + std::unique_lock<std::mutex> lk(mutex_[1]); + //LOG(ERROR)<<" seq:"<<seq<<" last num:"<<(seq==1?0:send_num_[status][seq-1])<<" status:"<<status; + assert(new_proposal != nullptr); + if(seq==1 || send_num_[status][seq-1] == total_num_){ // LOG(ERROR)<<"bc type:"<<new_proposal.header().status(); - Broadcast(MessageType::ConsensusMsg, new_proposal); + //LOG(ERROR)<<" bc seq:"<<seq<<" status:"<<status; + assert(new_proposal != nullptr); + Proposal proposal_list; + + //Broadcast(MessageType::ConsensusMsg, *new_proposal); + *proposal_list.add_proposals() = *new_proposal; + + send_num_[status][seq]++; + if(seq > 2){ + if(send_num_[status].find(seq-2) != send_num_[status].end()){ + send_num_[status].erase(send_num_[status].find(seq-2)); + } + } + int64_t next_seq = seq+1; + while(send_num_[status][next_seq-1] == total_num_){ + if(send_num_[status].find(next_seq-2) != send_num_[status].end()){ + send_num_[status].erase(send_num_[status].find(next_seq-2)); + } + //LOG(ERROR)<<" find next:"<<next_seq<<" send num:"<<send_num_[status][next_seq]; + auto it = pending_msg_[status].find(next_seq); + if(it != pending_msg_[status].end()){ + for(auto& p: it->second){ + //LOG(ERROR)<<" bc next:"<<next_seq<<" status:"<<status; + assert(p != nullptr); + //Broadcast(MessageType::ConsensusMsg, *p); + *proposal_list.add_proposals() = *p; + send_num_[status][next_seq]++; + } + pending_msg_[status].erase(it); + //LOG(ERROR)<<" bc next :"<<next_seq<<" status:"<<status<<" send num:"<<send_num_[status][next_seq]; + next_seq++; + } + else { + break; + } + } + Broadcast(MessageType::ConsensusMsgExt, proposal_list); + if(status == 0){ + #ifdef OP3 + { + std::unique_lock<std::mutex> lk(seq_mutex_); + commited_seq_ = std::max(static_cast<int64_t>(next_seq_), commited_seq_); + //LOG(ERROR)<<"update seq:"<<commited_seq_; + vote_cv_.notify_all(); + } + #endif + + } + } + else { + pending_msg_[status][seq].push_back(std::move(new_proposal)); + //LOG(ERROR)<<" push pending, seq:"<<seq<<" size:"<< pending_msg_[status][seq].size(); + } + #else + Broadcast(MessageType::ConsensusMsg, *new_proposal); + #endif } } - // LOG(ERROR)<<"receive proposal done"; return true; } diff --git a/platform/consensus/ordering/rcc/algorithm/rcc.h b/platform/consensus/ordering/rcc/algorithm/rcc.h index 03bd7026..338baada 100644 --- a/platform/consensus/ordering/rcc/algorithm/rcc.h +++ b/platform/consensus/ordering/rcc/algorithm/rcc.h @@ -26,6 +26,7 @@ class RCC : public common::ProtocolBase { bool ReceiveTransaction(std::unique_ptr<Transaction> txn); void SendTxn(); bool ReceiveProposal(const Proposal& proposal); + bool ReceiveProposalList(const Proposal& proposal); private: void UpgradeState(Proposal* proposal); @@ -57,6 +58,8 @@ class RCC : public common::ProtocolBase { Stats* global_stats_; int64_t last_commit_time_ = 0; std::map<int,int64_t> commit_time_; + std::map<int64_t, int> send_num_[10]; + std::map<int64_t, std::vector<std::unique_ptr<Proposal>> > pending_msg_[10]; }; } // namespace rcc diff --git a/platform/consensus/ordering/rcc/framework/consensus.cpp b/platform/consensus/ordering/rcc/framework/consensus.cpp index e1eeb0fa..8ee13d6f 100644 --- a/platform/consensus/ordering/rcc/framework/consensus.cpp +++ b/platform/consensus/ordering/rcc/framework/consensus.cpp @@ -79,6 +79,19 @@ int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) { } return 0; } + else if (request->user_type() == MessageType::ConsensusMsgExt) { + Proposal proposal; + if (!proposal.ParseFromString(request->data())) { + assert(1 == 0); + LOG(ERROR) << "parse proposal fail"; + return -1; + } + if (!rcc_->ReceiveProposalList(proposal)) { + return -1; + } + return 0; + } + return 0; } diff --git a/platform/consensus/ordering/rcc/proto/proposal.proto b/platform/consensus/ordering/rcc/proto/proposal.proto index 49acd85b..d2e4d732 100644 --- a/platform/consensus/ordering/rcc/proto/proposal.proto +++ b/platform/consensus/ordering/rcc/proto/proposal.proto @@ -34,10 +34,12 @@ message Proposal { Header header = 1; repeated Transaction transactions = 3; int64 queuing_time = 4; + repeated Proposal proposals = 5; }; enum MessageType { NewProposal = 0; ConsensusMsg = 1; + ConsensusMsgExt = 2; }; diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD index 4d3b5c68..59a2d063 100644 --- a/platform/statistic/BUILD +++ b/platform/statistic/BUILD @@ -3,12 +3,32 @@ package(default_visibility = [ "//service:__subpackages__", ]) +cc_library( + name = "cpu_info", + srcs = ["cpu_info.cpp"], + hdrs = ["cpu_info.h"], + deps = [ + "//common:comm", + "//common/utils", + ], +) + +cc_test( + name = "cpu_info_test", + srcs = ["cpu_info_test.cpp"], + deps = [ + ":cpu_info", + "//common/test:test_main", + ], +) + cc_library( name = "stats", srcs = ["stats.cpp"], hdrs = ["stats.h"], deps = [ ":prometheus_handler", + ":cpu_info", "//common:comm", "//common/utils", "//third_party:prometheus", diff --git a/platform/statistic/cpu_info.cpp b/platform/statistic/cpu_info.cpp new file mode 100644 index 00000000..23c051d2 --- /dev/null +++ b/platform/statistic/cpu_info.cpp @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "platform/statistic/cpu_info.h" + +#include <glog/logging.h> +#include <iostream> +#include <thread> +#include <chrono> +#include <assert.h> +#include <sys/sysinfo.h> +#include <string.h> + +namespace resdb { + +namespace { + +void get_cpuoccupy (cpu_occupy_t *cpust) +{ + FILE *fd; + char buff[256]; + cpu_occupy_t *cpu_occupy; + cpu_occupy=cpust; + + fd = fopen ("/proc/stat", "r"); + assert(fd != nullptr); + + fgets (buff, sizeof(buff), fd); + + sscanf (buff, "%s %u %u %u %u %u %u %u", cpu_occupy->name, &cpu_occupy->user, &cpu_occupy->nice,&cpu_occupy->system, &cpu_occupy->idle ,&cpu_occupy->iowait,&cpu_occupy->irq,&cpu_occupy->softirq); + + LOG(ERROR)<<" name:"<< cpu_occupy->name<<" user:"<<cpu_occupy->user<<" nice:"<<cpu_occupy->nice<<" sys:"<<cpu_occupy->system<<" idle:"<<cpu_occupy->idle <<" iowait:"<<cpu_occupy->iowait<<" irq:"<<cpu_occupy->irq<<" soft irq:"<<cpu_occupy->softirq; + + fclose(fd); +} + +double cal_cpuoccupy (cpu_occupy_t *o, cpu_occupy_t *n) +{ + double od, nd; + double id, sd; + double cpu_use ; + + od = (double) (o->user + o->nice + o->system +o->idle+o->softirq+o->iowait+o->irq); + nd = (double) (n->user + n->nice + n->system +n->idle+n->softirq+n->iowait+n->irq); + + id = (double) (n->idle); + sd = (double) (o->idle) ; + if((nd-od) != 0) + cpu_use =100.0 - ((id-sd))/(nd-od)*100.00; + else + cpu_use = 0; + return cpu_use; +} + + +} + +CPUInfo::CPUInfo() { + pid_ = getpid(); + LOG(ERROR)<<" get pid:"<<pid_; +} + +double CPUInfo::GetCPUUsage(){ + cpu_occupy_t current_cpu_stat; + get_cpuoccupy(¤t_cpu_stat); + double cpu = cal_cpuoccupy (&cpu_stat_, ¤t_cpu_stat); + cpu_stat_ = current_cpu_stat; + return cpu; +} + +} diff --git a/platform/statistic/cpu_info.h b/platform/statistic/cpu_info.h new file mode 100644 index 00000000..7433435d --- /dev/null +++ b/platform/statistic/cpu_info.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <stdint.h> + +namespace resdb { + +typedef struct cpu_occupy_ +{ + char name[20]; + unsigned int user; + unsigned int nice; + unsigned int system; + unsigned int idle; + unsigned int iowait; + unsigned int irq; + unsigned int softirq; +}cpu_occupy_t; + +class CPUInfo { + public: + CPUInfo(); + double GetCPUUsage(); + + private: + int pid_; + int64_t last_totalcputime_ = 0; + int64_t last_procputime_ = 0; + + cpu_occupy_t cpu_stat_; + +}; + + +} diff --git a/platform/statistic/cpu_info_test.cpp b/platform/statistic/cpu_info_test.cpp new file mode 100644 index 00000000..d6b0a305 --- /dev/null +++ b/platform/statistic/cpu_info_test.cpp @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "platform/statistic/cpu_info.h" + +#include <glog/logging.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + + +namespace resdb { +namespace { + +TEST(CPUInfoTest, Test) { + CPUInfo info; + LOG(ERROR)<<" first get:"<<info.GetCPUUsage(); + sleep(5); + LOG(ERROR)<<" second get:"<<info.GetCPUUsage(); +} + + +} // namespace + +} // namespace resdb diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp index f84bd37a..b3626a95 100644 --- a/platform/statistic/stats.cpp +++ b/platform/statistic/stats.cpp @@ -59,6 +59,7 @@ Stats::Stats(int sleep_time) { send_broad_cast_msg_ = 0; prometheus_ = nullptr; + cpu_info_ = std::make_unique<CPUInfo>(); global_thread_ = std::thread(&Stats::MonitorGlobal, this); // pass by reference @@ -389,6 +390,8 @@ void Stats::MonitorGlobal() { << static_cast<double>(commit_ratio_time - last_commit_ratio_time) / (commit_ratio_num - last_commit_ratio_num) / 1000000.0 + << " cpu usage:" + << cpu_info_->GetCPUUsage() << " " "\n--------------- monitor ------------"; if (run_req_num - last_run_req_num > 0) { diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h index a07cbc20..c8675c4c 100644 --- a/platform/statistic/stats.h +++ b/platform/statistic/stats.h @@ -23,6 +23,7 @@ #include <future> #include "platform/statistic/prometheus_handler.h" +#include "platform/statistic/cpu_info.h" namespace resdb { @@ -99,6 +100,8 @@ class Stats { std::condition_variable cv_; std::mutex mutex_; + std::unique_ptr<CPUInfo> cpu_info_; + std::thread global_thread_; std::atomic<uint64_t> num_client_req_, num_propose_, num_prepare_, num_commit_, pending_execute_, execute_, execute_done_; diff --git a/scripts/deploy/a.sh b/scripts/deploy/a.sh new file mode 100755 index 00000000..614e3902 --- /dev/null +++ b/scripts/deploy/a.sh @@ -0,0 +1,45 @@ +iplist=( +172.31.25.224 +172.31.20.228 +172.31.18.224 +172.31.16.230 +172.31.31.229 +172.31.26.233 +172.31.29.231 +172.31.17.244 +172.31.17.236 +172.31.26.249 +172.31.18.247 +172.31.16.151 +172.31.31.151 +172.31.31.155 +172.31.17.155 +172.31.22.98 +172.31.19.156 +172.31.22.106 +172.31.24.98 +172.31.23.110 +172.31.27.109 +172.31.31.111 +172.31.28.110 +172.31.20.117 +172.31.30.116 +172.31.16.120 +172.31.28.119 +172.31.27.136 +172.31.20.133 +172.31.22.136 +172.31.26.136 +172.31.30.139 +) + +key=~/.ssh/junchao.pem + +for ip in ${iplist[@]} +do + echo $ip + #cmd="sudo tc qdisc add dev ens5 root netem delay 50ms" + cmd="sudo tc qdisc add dev ens5 root netem delay 50ms 30ms" + ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "$cmd" +done + diff --git a/scripts/deploy/config/rcc.config b/scripts/deploy/config/rcc.config index eec20b8c..0a234fb4 100644 --- a/scripts/deploy/config/rcc.config +++ b/scripts/deploy/config/rcc.config @@ -3,7 +3,7 @@ "enable_viewchange": false, "recovery_enabled": false, "max_client_complaint_num":10, - "max_process_txn": 32, + "max_process_txn": 256, "worker_num": 30, "input_worker_num": 1, "output_worker_num": 10
