This is an automated email from the ASF dual-hosted git repository. junchao pushed a commit to branch cassandra in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit 52f4525919da35671cc26b6601f6648f16fb6d1d Author: cjcchen <[email protected]> AuthorDate: Fri Jan 10 16:54:14 2025 +0000 commit --- benchmark/protocols/cassandra_cft/BUILD | 16 + .../cassandra_cft/kv_server_performance.cpp | 89 ++++ .../protocols/cassandra_cft/kv_service_tools.cpp | 60 +-- .../ordering/cassandra_cft/algorithm/BUILD | 76 ++++ .../ordering/cassandra_cft/algorithm/cassandra.cpp | 274 ++++++++++++ .../ordering/cassandra_cft/algorithm/cassandra.h | 100 +++++ .../cassandra_cft/algorithm/proposal_graph.cpp | 477 +++++++++++++++++++++ .../cassandra_cft/algorithm/proposal_graph.h | 85 ++++ .../cassandra_cft/algorithm/proposal_manager.cpp | 238 ++++++++++ .../cassandra_cft/algorithm/proposal_manager.h | 64 +++ .../cassandra_cft/algorithm/proposal_state.h | 16 + .../ordering/cassandra_cft/algorithm/ranking.cpp | 10 + .../ordering/cassandra_cft/algorithm/ranking.h | 12 + .../ordering/cassandra_cft/framework/BUILD | 16 + .../framework/consensus.cpp | 84 ++-- .../framework/consensus.h | 22 +- .../cassandra_cft/framework/consensus_test.cpp | 179 ++++++++ .../consensus/ordering/cassandra_cft/proto/BUILD | 16 + .../ordering/cassandra_cft/proto/proposal.proto | 124 ++++++ .../common/framework/performance_manager.cpp | 3 + .../ordering/multipaxos/algorithm/multipaxos.cpp | 66 ++- .../ordering/multipaxos/algorithm/multipaxos.h | 6 +- .../multipaxos/algorithm/proposal_manager.cpp | 1 - .../ordering/multipaxos/framework/consensus.cpp | 15 + .../ordering/multipaxos/framework/consensus.h | 1 + .../ordering/multipaxos/proto/proposal.proto | 1 + platform/networkstrate/consensus_manager.cpp | 2 +- platform/networkstrate/replica_communicator.cpp | 79 +++- platform/networkstrate/replica_communicator.h | 13 + platform/proto/replica_info.proto | 1 + .../{multipaxos.config => cassandra_cft.config} | 7 +- .../deploy/config/kv_performance_server_16.conf | 64 +-- .../deploy/config/kv_performance_server_32.conf | 128 +++--- scripts/deploy/config/kv_performance_server_4.conf | 13 + .../deploy/config/kv_performance_server_48.conf | 101 +++++ .../deploy/config/kv_performance_server_64.conf | 256 +++++------ scripts/deploy/config/kv_performance_server_8.conf | 32 +- scripts/deploy/config/multipaxos.config | 7 +- .../performance/cassandra_cft_performance.sh | 5 + scripts/deploy/performance/run_performance.sh | 5 +- scripts/deploy/script/deploy.sh | 1 + 41 files changed, 2421 insertions(+), 344 deletions(-) diff --git a/benchmark/protocols/cassandra_cft/BUILD b/benchmark/protocols/cassandra_cft/BUILD new file mode 100644 index 00000000..cef949a3 --- /dev/null +++ b/benchmark/protocols/cassandra_cft/BUILD @@ -0,0 +1,16 @@ +package(default_visibility = ["//visibility:private"]) + +load("@bazel_skylib//rules:common_settings.bzl", "bool_flag") + +cc_binary( + name = "kv_server_performance", + srcs = ["kv_server_performance.cpp"], + deps = [ + "//chain/storage:memory_db", + "//executor/kv:kv_executor", + "//platform/config:resdb_config_utils", + "//platform/consensus/ordering/cassandra_cft/framework:consensus", + "//service/utils:server_factory", + ], +) + diff --git a/benchmark/protocols/cassandra_cft/kv_server_performance.cpp b/benchmark/protocols/cassandra_cft/kv_server_performance.cpp new file mode 100644 index 00000000..ed4746e4 --- /dev/null +++ b/benchmark/protocols/cassandra_cft/kv_server_performance.cpp @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#include <glog/logging.h> + +#include "chain/storage/memory_db.h" +#include "executor/kv/kv_executor.h" +#include "platform/config/resdb_config_utils.h" +#include "platform/consensus/ordering/cassandra_cft/framework/consensus.h" +#include "platform/networkstrate/service_network.h" +#include "platform/statistic/stats.h" +#include "proto/kv/kv.pb.h" + +using namespace resdb; +using namespace resdb::cassandra_cft; +using namespace resdb::storage; + +void ShowUsage() { + printf("<config> <private_key> <cert_file> [logging_dir]\n"); +} + +std::string GetRandomKey() { + int num1 = rand() % 10; + int num2 = rand() % 10; + return std::to_string(num1) + std::to_string(num2); +} + +int main(int argc, char** argv) { + if (argc < 3) { + ShowUsage(); + exit(0); + } + + // google::InitGoogleLogging(argv[0]); + // FLAGS_minloglevel = google::GLOG_WARNING; + + char* config_file = argv[1]; + char* private_key_file = argv[2]; + char* cert_file = argv[3]; + + if (argc >= 5) { + auto monitor_port = Stats::GetGlobalStats(5); + monitor_port->SetPrometheus(argv[4]); + } + + std::unique_ptr<ResDBConfig> config = + GenerateResDBConfig(config_file, private_key_file, cert_file); + + config->RunningPerformance(true); + ResConfigData config_data = config->GetConfigData(); + + auto performance_consens = std::make_unique<Consensus>( + *config, std::make_unique<KVExecutor>(std::make_unique<MemoryDB>())); + performance_consens->SetupPerformanceDataFunc([]() { + KVRequest request; + request.set_cmd(KVRequest::SET); + request.set_key(GetRandomKey()); + request.set_value("helloworld"); + std::string request_data; + request.SerializeToString(&request_data); + return request_data; + }); + + auto server = + std::make_unique<ServiceNetwork>(*config, std::move(performance_consens)); + server->Run(); +} diff --git a/platform/consensus/ordering/multipaxos/framework/consensus.h b/benchmark/protocols/cassandra_cft/kv_service_tools.cpp similarity index 56% copy from platform/consensus/ordering/multipaxos/framework/consensus.h copy to benchmark/protocols/cassandra_cft/kv_service_tools.cpp index 4d08c3ac..17858ef2 100644 --- a/platform/consensus/ordering/multipaxos/framework/consensus.h +++ b/benchmark/protocols/cassandra_cft/kv_service_tools.cpp @@ -23,31 +23,35 @@ * */ -#pragma once - -#include "executor/common/transaction_manager.h" -#include "platform/consensus/execution/transaction_executor.h" -#include "platform/consensus/ordering/multipaxos/algorithm/multipaxos.h" -#include "platform/consensus/ordering/common/framework/consensus.h" -#include "platform/networkstrate/consensus_manager.h" - -namespace resdb { -namespace multipaxos { - -class Consensus : public common::Consensus{ - public: - Consensus(const ResDBConfig& config, - std::unique_ptr<TransactionManager> transaction_manager); - - protected: - int ProcessCustomConsensus(std::unique_ptr<Request> request) override; - int ProcessNewTransaction(std::unique_ptr<Request> request) override; - int CommitMsg(const google::protobuf::Message& msg) override; - int CommitMsgInternal(const Transaction& txn); - - private: - std::unique_ptr<MultiPaxos> multipaxos_; -}; - -} // namespace tusk -} // namespace resdb +#include <fcntl.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> + +#include <fstream> + +#include "common/proto/signature_info.pb.h" +#include "interface/kv/kv_client.h" +#include "platform/config/resdb_config_utils.h" + +using resdb::GenerateReplicaInfo; +using resdb::GenerateResDBConfig; +using resdb::KVClient; +using resdb::ReplicaInfo; +using resdb::ResDBConfig; + +int main(int argc, char** argv) { + if (argc < 2) { + printf("<config path>\n"); + return 0; + } + std::string client_config_file = argv[1]; + ResDBConfig config = GenerateResDBConfig(client_config_file); + + config.SetClientTimeoutMs(100000); + + KVClient client(config); + + client.Set("start", "value"); + printf("start benchmark\n"); +} diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/BUILD b/platform/consensus/ordering/cassandra_cft/algorithm/BUILD new file mode 100644 index 00000000..a3429088 --- /dev/null +++ b/platform/consensus/ordering/cassandra_cft/algorithm/BUILD @@ -0,0 +1,76 @@ +package(default_visibility = ["//platform/consensus/ordering/cassandra_cft:__subpackages__"]) + +cc_library( + name = "proposal_state", + hdrs = ["proposal_state.h"], +) + +cc_library( + name = "proposal_manager", + srcs = ["proposal_manager.cpp"], + hdrs = ["proposal_manager.h"], + deps = [ + ":proposal_graph", + "//common:comm", + "//platform/statistic:stats", + "//common/crypto:signature_verifier", + "//common/utils", + "//platform/consensus/ordering/cassandra_cft/proto:proposal_cc_proto", + ], +) + +cc_library( + name = "ranking", + srcs = ["ranking.cpp"], + hdrs = ["ranking.h"], + deps = [ + "//common:comm", + ], +) + +cc_library( + name = "proposal_graph", + srcs = ["proposal_graph.cpp"], + hdrs = ["proposal_graph.h"], + deps = [ + ":ranking", + ":proposal_state", + "//platform/statistic:stats", + "//common:comm", + "//common/utils", + "//platform/consensus/ordering/cassandra_cft/proto:proposal_cc_proto", + ], +) + +cc_library( + name = "cassandra", + srcs = ["cassandra.cpp"], + hdrs = ["cassandra.h"], + deps = [ + ":proposal_graph", + ":proposal_manager", + "//platform/statistic:stats", + "//common:comm", + "//common/crypto:signature_verifier", + "//platform/consensus/ordering/common/algorithm:protocol_base", + "//platform/common/queue:lock_free_queue", + ], +) + +cc_test( + name = "proposal_graph_test", + srcs = ["proposal_graph_test.cpp"], + deps = [ + ":proposal_graph", + "//common/test:test_main", + ], +) + +cc_test( + name = "cassandra_cft_test", + srcs = ["cassandra_cft_test.cpp"], + deps = [ + ":cassandra", + "//common/test:test_main", + ], +) diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.cpp b/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.cpp new file mode 100644 index 00000000..e4d257a2 --- /dev/null +++ b/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.cpp @@ -0,0 +1,274 @@ +#include "platform/consensus/ordering/cassandra_cft/algorithm/cassandra.h" + +#include <glog/logging.h> + +#include "common/crypto/signature_verifier.h" +#include "common/utils/utils.h" + +#define Fail + +namespace resdb { +namespace cassandra_cft { + +Cassandra::Cassandra(int id, int f, int total_num, bool failure_mode) + : ProtocolBase(id, f, total_num), failure_mode_(failure_mode) { + + LOG(ERROR) << "get proposal graph"; + id_ = id; + total_num_ = total_num; + f_ = f; + is_stop_ = false; + //timeout_ms_ = 10000; + timeout_ms_ = 60000; + local_txn_id_ = 1; + local_proposal_id_ = 1; + batch_size_ = 15; + + recv_num_ = 0; + execute_num_ = 0; + executed_ = 0; + committed_num_ = 0; + precommitted_num_ = 0; + execute_id_ = 1; + + graph_ = std::make_unique<ProposalGraph>(f_); + proposal_manager_ = std::make_unique<ProposalManager>(id, f, total_num, graph_.get()); + + graph_->SetCommitCallBack( + [&](const Proposal& proposal) { + //CommitProposal(proposal); + }); + + proposal_manager_->SetCommitCallBack( + [&](std::unique_ptr<Proposal> proposal) { + CommitProposal(std::move(proposal)); + }); + + //Reset(); + + //consensus_thread_ = std::thread(&Cassandra::AsyncConsensus, this); + + block_thread_ = std::thread(&Cassandra::BroadcastTxn, this); + sbc_thread_ = std::thread(&Cassandra::SBC, this); + + commit_thread_ = std::thread(&Cassandra::AsyncCommit, this); + + global_stats_ = Stats::GetGlobalStats(); + + //prepare_thread_ = std::thread(&Cassandra::AsyncPrepare, this); +} + +Cassandra::~Cassandra() { + is_stop_ = true; + if (consensus_thread_.joinable()) { + consensus_thread_.join(); + } + if (commit_thread_.joinable()) { + commit_thread_.join(); + } + if (prepare_thread_.joinable()) { + prepare_thread_.join(); + } +} + +void Cassandra::SetPrepareFunction(std::function<int(const Transaction&)> prepare){ + prepare_ = prepare; +} + +bool Cassandra::IsStop() { return is_stop_; } + +bool Cassandra::ReceiveTransaction(std::unique_ptr<Transaction> txn) { + // LOG(ERROR)<<"recv txn:"; + txn->set_create_time(GetCurrentTime()); + txns_.Push(std::move(txn)); + recv_num_++; + return true; +} + +void Cassandra::Notify(int round) { + std::unique_lock<std::mutex> lk(mutex_); + SetSendNext(round); + vote_cv_.notify_one(); +} + +void Cassandra::BroadcastTxn() { + std::vector<std::unique_ptr<Transaction>> txns; + while (!IsStop()) { + std::unique_ptr<Transaction> txn = txns_.Pop(); + if (txn == nullptr) { + continue; + } + txns.push_back(std::move(txn)); + + while (!IsStop()) { + //int current_round = proposal_manager_->CurrentRound(); + std::unique_lock<std::mutex> lk(mutex_); + vote_cv_.wait_for(lk, std::chrono::microseconds(10000), + [&] { return CanSendNext(proposal_manager_->CurrentRound()); }); + + if (CanSendNext(proposal_manager_->CurrentRound())) { + start_ = 1; + break; + } + } + + for(int i = 1; i < batch_size_; ++i){ + std::unique_ptr<Transaction> txn = txns_.Pop(0); + if(txn == nullptr){ + break; + } + txns.push_back(std::move(txn)); + } + + //LOG(ERROR)<<" make block"; + std::unique_ptr<Proposal> proposal = proposal_manager_->MakeBlock(txns); + //LOG(ERROR)<<" send new proposal :"<<txns.size()<<" height:"<<proposal->header().height(); + Broadcast(MessageType::NewProposal, *proposal); + txns.clear(); + SetSBCSendReady(proposal->header().height()); + //LOG(ERROR)<<" make block done"; + } +} + +void Cassandra::SetSBCSendReady(int round) { + std::unique_lock<std::mutex> lk(send_ready_mutex_); + has_sent_ = round; + send_ready_cv_.notify_one(); + //LOG(ERROR)<<" set send ready round:"<<round; +} + +bool Cassandra::SBCSendReady(int round) { + return has_sent_ == round; +} + +void Cassandra::SetSendNext(int round) { + //LOG(ERROR)<<" set send next:"<<round; + sent_next_ = round; +} + +bool Cassandra::CanSendNext(int round) { + //LOG(ERROR)<<" can send next:"<<round; + return sent_next_ == round; +} + + +bool Cassandra::SBCRecvReady(int round) { + return proposal_manager_->Ready(round); +} + +void Cassandra::NotifyRecvReady() { + std::unique_lock<std::mutex> lk(recv_ready_mutex_); + recv_ready_cv_.notify_one(); +} + +void Cassandra::SBC() { + std::vector<std::unique_ptr<Transaction>> txns; + int round = 1; + while (!IsStop()) { + while (!IsStop()) { + //int current_round = proposal_manager_->CurrentRound(); + std::unique_lock<std::mutex> lk(send_ready_mutex_); + send_ready_cv_.wait_for(lk, std::chrono::microseconds(10000), + [&] { return SBCSendReady(round); }); + + if (SBCSendReady(round)) { + break; + } + } + //LOG(ERROR)<<" send ready round ready:"<<round; + + for(int i = 0; i < 10; ++i) { + //int current_round = proposal_manager_->CurrentRound(); + std::unique_lock<std::mutex> lk(recv_ready_mutex_); + recv_ready_cv_.wait_for(lk, std::chrono::microseconds(100000), + [&] { return SBCRecvReady(round); }); + //if(SBCRecvReady(round)){ + if(proposal_manager_->MayReady(round)){ + break; + } + } + //LOG(ERROR)<<" cehck recv round ready:"<<round; + proposal_manager_->CheckVote(round-1); + //LOG(ERROR)<<" notify"; + Notify(proposal_manager_->CurrentRound()); + round++; + } +} + + +bool Cassandra::ReceiveProposal(std::unique_ptr<Proposal> proposal) { + int round = proposal->header().height(); + int proposer = proposal->header().proposer_id(); + //LOG(ERROR)<<" receive proposal round:"<<round<<" proposer:"<<proposer<<" fail mode:"<<failure_mode_; + if(failure_mode_){ + if(round > 10 && proposer == 1) { + //LOG(ERROR)<<" reject"; + return true; + } + } + + proposal_manager_->AddProposal(std::move(proposal)); + NotifyRecvReady(); + return true; +} + +void Cassandra::CommitProposal(std::unique_ptr<Proposal> proposal) { + //LOG(ERROR)<<" commit proposal, proposer:"<<proposal->header().proposer_id()<<" round:"<<proposal->header().height(); + commit_q_.Push(std::move(proposal)); +} + +void Cassandra::AsyncCommit() { + int seq = 1; + while (!IsStop()) { + std::unique_ptr<Proposal> proposal = commit_q_.Pop(); + if (proposal == nullptr) { + continue; + } + //LOG(ERROR)<<" proposer commit:"<<proposal->header().proposer_id()<<" round:"<<proposal->header().height()<<" node_info:"<<proposal->node_info_size(); + if(proposal->header().height()>1){ + assert(proposal->node_info_size() > 0); + } + + { + int round = proposal->header().height(); + int proposer = proposal->header().proposer_id(); + assert(committed_.find(std::make_pair(proposer, round)) == committed_.end()); + committed_.insert(std::make_pair(proposer, round)); + } + + std::queue<std::unique_ptr<Proposal>> q; + q.push(std::move(proposal)); + std::vector<std::unique_ptr<Proposal>> list; + while(!q.empty()){ + std::unique_ptr<Proposal> p = std::move(q.front()); + q.pop(); + //LOG(ERROR)<<" commit queue proposal, proposer:"<<p->header().proposer_id()<<" round:"<<p->header().height(); + for(auto node_info : p->node_info()) { + int round = node_info.round(); + int proposer = node_info.proposer(); + if(committed_.find(std::make_pair(proposer, round)) != committed_.end()){ + continue; + } + committed_.insert(std::make_pair(proposer, round)); + //LOG(ERROR)<<" commit sub proposal:"<<proposer<<" round:"<<round; + std::unique_ptr<Proposal> np = proposal_manager_->FetchProposal(node_info); + assert(np != nullptr); + q.push(std::move(np)); + } + list.push_back(std::move(p)); + } + + for(auto& p : list) { + int round = p->header().height(); + int proposer = p->header().proposer_id(); + for (Transaction& txn : *p->mutable_transactions()) { + txn.set_id(seq++); + commit_(txn); + } + } + } +} + + +} // namespace cassandra_cft +} // namespace resdb diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.h b/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.h new file mode 100644 index 00000000..b7d1b85f --- /dev/null +++ b/platform/consensus/ordering/cassandra_cft/algorithm/cassandra.h @@ -0,0 +1,100 @@ +#pragma once + +#include <deque> +#include <map> +#include <queue> +#include <thread> + +#include "platform/common/queue/lock_free_queue.h" +#include "platform/consensus/ordering/common/algorithm/protocol_base.h" +#include "platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.h" +#include "platform/consensus/ordering/cassandra_cft/algorithm/proposal_manager.h" +#include "platform/consensus/ordering/cassandra_cft/proto/proposal.pb.h" +#include "platform/statistic/stats.h" + +namespace resdb { +namespace cassandra_cft { + +class Cassandra: public common::ProtocolBase { + public: + Cassandra(int id, int f, int total_num, bool failure_mode); + ~Cassandra(); + + bool ReceiveProposal(std::unique_ptr<Proposal> proposal); + + void SetPrepareFunction(std::function<int(const Transaction&)> prepare); + bool ReceiveTransaction(std::unique_ptr<Transaction> txn); + +void BroadcastTxn(); + private: + bool IsStop(); + + int SendTxn(int round); + +void Notify(int round); +void CommitProposal(std::unique_ptr<Proposal> proposal); +void AsyncCommit(); + + +void SetSBCSendReady(int round); +bool SBCSendReady(int round); +void RecvSBCSendReady(int round); +bool SBCRecvReady(int round); +void NotifyRecvReady(); + +void SBC(); +void SetSendNext(int round); +bool CanSendNext(int round); + + + private: + std::unique_ptr<ProposalGraph> graph_; + LockFreeQueue<Transaction> txns_; + std::unique_ptr<ProposalManager> proposal_manager_; + SignatureVerifier* verifier_; + std::mutex mutex_, g_mutex_; + std::map<int, std::set<int>> received_num_; + // int state_; + int id_, total_num_, f_, batch_size_; + bool failure_mode_ = false; + std::atomic<int> is_stop_; + int timeout_ms_; + int local_txn_id_, local_proposal_id_; + LockFreeQueue<Proposal> commit_q_, execute_queue_, prepare_queue_; + std::thread commit_thread_, consensus_thread_, block_thread_, prepare_thread_, sbc_thread_; + std::condition_variable vote_cv_; + std::map<int, bool> can_vote_; + std::atomic<int> committed_num_; + int voting_, start_ = false; + std::map<int, std::vector<std::unique_ptr<Transaction>>> uncommitted_txn_; + + bool use_linear_ = false; + int recv_num_ = 0; + int execute_num_ = 0; + int pending_num_ = 0; + std::atomic<int> executed_; + std::atomic<int> precommitted_num_; + int last_vote_ = 0; + int execute_id_; + + std::mutex block_mutex_; + std::set<int> received_; + std::map<int, std::set<int>> block_ack_; + std::map<int, std::vector<std::unique_ptr<Proposal>>> future_proposal_; + std::set<std::pair<int,int> > committed_; + + std::function<int(const Transaction&)> prepare_; + + int current_round_; + + std::mutex send_ready_mutex_, recv_ready_mutex_; + std::condition_variable send_ready_cv_, recv_ready_cv_; + int has_sent_ = 0; + int sent_next_ = 1; + + Stats* global_stats_; + +}; + +} // namespace cassandra_cft +} // namespace resdb diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.cpp b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.cpp new file mode 100644 index 00000000..dcfa032e --- /dev/null +++ b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.cpp @@ -0,0 +1,477 @@ +#include "platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.h" + +#include <glog/logging.h> + +#include <queue> +#include <stack> + +#include "common/utils/utils.h" + +namespace resdb { +namespace cassandra_cft { + +std::vector<ProposalState> GetStates() { + return std::vector<ProposalState>{ProposalState::New, ProposalState::Prepared, + ProposalState::PreCommit}; +} + +ProposalGraph::ProposalGraph(int fault_num) : f_(fault_num) { + ranking_ = std::make_unique<Ranking>(); + current_height_ = 0; + global_stats_ = Stats::GetGlobalStats(); +} + +void ProposalGraph::IncreaseHeight() { + //LOG(ERROR) << "increase height:" << current_height_; + current_height_++; +} + +void ProposalGraph::TryUpgradeHeight(int height) { + if (last_node_[height].size() > 0) { + // LOG(ERROR) << "upgrade height:" << height; + current_height_ = height; + } else { + // assert(1 == 0); + LOG(ERROR) << "need to recovery"; + } +} + +std::string Encode(const std::string& hash) { + std::string ret; + for (int i = 0; i < hash.size(); ++i) { + int x = hash[i]; + ret += std::to_string(x); + } + return ret; +} + +void ProposalGraph::AddProposalOnly(const Proposal& proposal) { + auto it = node_info_.find(proposal.header().hash()); + if (it == node_info_.end()) { + auto np = std::make_unique<NodeInfo>(proposal); + node_info_[proposal.header().hash()] = std::move(np); + //LOG(ERROR) << "add proposal proposer:" << proposal.header().proposer_id() + // << " id:" << proposal.header().proposal_id() + // << " hash:" << Encode(proposal.header().hash()); + } +} + +int ProposalGraph::AddProposal(const Proposal& proposal) { + //LOG(ERROR) << "add proposal height:" << proposal.header().height() + // << " current height:" << current_height_<<" from:"<<proposal.header().proposer_id()<<" proposal id:"<<proposal.header().proposal_id(); + assert(current_height_ >= latest_commit_.header().height()); + /* + if (proposal.header().height() < current_height_) { + LOG(ERROR) << "height not match:" << current_height_ + << " proposal height:" << proposal.header().height(); + return false; + } + */ + + if (proposal.header().height() > current_height_) { + pending_header_[proposal.header().height()].insert( + proposal.header().proposer_id()); + } else { + while (!pending_header_.empty()) { + if (pending_header_.begin()->first <= current_height_) { + pending_header_.erase(pending_header_.begin()); + } else { + break; + } + } + } + + if (proposal.header().height() > current_height_ + 1) { + LOG(ERROR) << "height not match:" << current_height_ + << " proposal height:" << proposal.header().height(); + if (pending_header_[proposal.header().height()].size() >= f_ + 1) { + TryUpgradeHeight(proposal.header().height()); + } + return 1; + } + + if (!VerifyParent(proposal)) { + LOG(ERROR) << "verify parent fail:" << proposal.header().proposer_id() + << " id:" << proposal.header().proposal_id(); + // assert(1==0); + return 2; + } + + // LOG(ERROR)<<"history size:"<<proposal.history_size(); + for (const auto& history : proposal.history()) { + std::string hash = history.hash(); + auto node_it = node_info_.find(hash); + assert(node_it != node_info_.end()); + /* + if (node_it == node_info_.end()) { + LOG(ERROR) << " history node not found"; + return false; + } + else { + LOG(ERROR)<<"find history"; + } + */ + } + + for (const auto& history : proposal.history()) { + std::string hash = history.hash(); + auto node_it = node_info_.find(hash); + for (auto s : GetStates()) { + if (s >= node_it->second->state && s <= history.state()) { + if (s != history.state()) { + LOG(ERROR) << "update from higher state"; + } + node_it->second->votes[s].insert(proposal.header().proposer_id()); + } + } + // LOG(ERROR)<<"proposal:("<<node_it->second->proposal.header().proposer_id() + //<<","<<node_it->second->proposal.header().proposal_id()<<")" + //<<" history state:"<<history.state()<<" + //num:"<<node_it->second->votes[history.state()].size(); + CheckState(node_it->second.get(), + static_cast<resdb::cassandra_cft::ProposalState>(history.state())); + if (node_it->second->state == ProposalState::PreCommit) { + Commit(hash); + } + //LOG(ERROR)<<"history proposal:("<<node_it->second->proposal.header().proposer_id() + //<<","<<node_it->second->proposal.header().proposal_id() + //<<" state:"<<node_it->second->state; + } + + if (proposal.header().height() < current_height_) { + LOG(ERROR) << "height not match:" << current_height_ + << " proposal height:" << proposal.header().height(); + + // LOG(ERROR)<<"add proposal proposer:"<<proposal.header().proposer_id()<<" + // id:"<<proposal.header().proposal_id(); + // g_[proposal.header().prehash()].push_back(proposal.header().hash()); + auto np = std::make_unique<NodeInfo>(proposal); + new_proposals_[proposal.header().hash()] = &np->proposal; + node_info_[proposal.header().hash()] = std::move(np); + last_node_[proposal.header().height()].insert(proposal.header().hash()); + + return 1; + } else { + g_[proposal.header().prehash()].push_back(proposal.header().hash()); + auto np = std::make_unique<NodeInfo>(proposal); + new_proposals_[proposal.header().hash()] = &np->proposal; + // LOG(ERROR)<<"add proposal proposer:"<<proposal.header().proposer_id()<<" + // id:"<<proposal.header().proposal_id()<<" + // hash:"<<Encode(proposal.header().hash()); + node_info_[proposal.header().hash()] = std::move(np); + last_node_[proposal.header().height()].insert(proposal.header().hash()); + } + return 0; +} + +void ProposalGraph::UpgradeState(ProposalState& state) { + switch (state) { + case None: + case New: + state = Prepared; + break; + case Prepared: + state = PreCommit; + break; + default: + break; + } +} + +int ProposalGraph::CheckState(NodeInfo* node_info, ProposalState state) { + // LOG(ERROR) << "node: (" << node_info->proposal.header().proposer_id() << + // "," + // << node_info->proposal.header().proposal_id() + // << ") state:" << node_info->state + // << " vote num:" << node_info->votes[node_info->state].size(); + + while (node_info->votes[node_info->state].size() >= 2 * f_ + 1) { + UpgradeState(node_info->state); + // LOG(ERROR) << "========== proposal:(" + // << node_info->proposal.header().proposer_id() << "," + // << node_info->proposal.header().proposal_id() << ")" + // << " upgrate state:" << node_info->state << (GetCurrentTime() - + // node_info->proposal.create_time()); + } + return true; +} + +void ProposalGraph::Commit(const std::string& hash) { +/* + auto it = node_info_.find(hash); + if (it == node_info_.end()) { + LOG(ERROR) << "node not found, hash:" << hash; + assert(1 == 0); + return; + } + + if (it->second->state != ProposalState::PreCommit) { + LOG(ERROR) << "hash not committed:" << hash; + assert(1 == 0); + return; + } + + std::set<std::string> is_main_hash; + is_main_hash.insert(hash); + + int from_proposer = it->second->proposal.header().proposer_id(); + int from_proposal_id = it->second->proposal.header().proposal_id(); + //LOG(ERROR)<<"commit :"<<it->second->proposal.header().proposer_id()<<" id:"<<it->second->proposal.header().proposal_id(); + + std::vector<std::vector<Proposal*>> commit_p; + auto bfs = [&]() { + std::queue<std::string> q; + q.push(hash); + while (!q.empty()) { + std::string c_hash = q.front(); + q.pop(); + + auto it = node_info_.find(c_hash); + if (it == node_info_.end()) { + LOG(ERROR) << "node not found, hash:"; + assert(1 == 0); + } + + Proposal* p = &it->second->proposal; + if (it->second->state == ProposalState::Committed) { + // LOG(ERROR)<<" try commit proposal, + // sender:"<<p->header().proposer_id() + //<<" proposal id:"<<p->header().proposal_id()<<" has been committed"; + continue; + } + + it->second->state = ProposalState::Committed; + if (is_main_hash.find(c_hash) != is_main_hash.end()) { + commit_num_[p->header().proposer_id()]++; + // LOG(ERROR)<<"commit main node:"<<p->header().proposer_id(); + is_main_hash.insert(p->header().prehash()); + commit_p.push_back(std::vector<Proposal*>()); + } + + commit_p.back().push_back(p); + //LOG(ERROR)<<"commit node:"<<p->header().proposer_id()<<" id:"<<p->header().proposal_id() + //<<" weak proposal size:"<<p->weak_proposals().hash_size(); + //LOG(ERROR)<<"push p:"<<p->header().proposer_id(); + for (const std::string& w_hash : p->weak_proposals().hash()) { + auto it = node_info_.find(w_hash); + if (it == node_info_.end()) { + LOG(ERROR) << "node not found, hash:"; + assert(1 == 0); + } + + // LOG(ERROR)<<"add weak + // proposal:"<<it->second->proposal.header().proposer_id()<<" + // id:"<<it->second->proposal.header().proposal_id(); + q.push(w_hash); + } + if (!p->header().prehash().empty()) { + q.push(p->header().prehash()); + } + } + }; + + bfs(); + if (commit_p.size() > 1) { + LOG(ERROR) << "commit more hash"; + } + int block_num = 0; + for (int i = commit_p.size() - 1; i >= 0; i--) { + for (int j = 0; j < commit_p[i].size(); ++j) { + block_num += commit_p[i][j]->block_size(); + if (commit_callback_) { + commit_callback_(*commit_p[i][j]); + } + } + } + //global_stats_->AddCommitBlock(block_num); + */ + + // TODO clean + //last_node_[it->second->proposal.header().height()].clear(); + //latest_commit_ = it->second->proposal; + //it->second->state = ProposalState::Committed; + // Clear(latest_commit_.header().hash()); +} + +std::vector<std::unique_ptr<Proposal>> ProposalGraph::GetNotFound( + int height, const std::string& hash) { + auto it = not_found_proposal_.find(height); + if (it == not_found_proposal_.end()) { + return std::vector<std::unique_ptr<Proposal>>(); + } + auto pre_it = it->second.find(hash); + std::vector<std::unique_ptr<Proposal>> ret; + if (pre_it != it->second.end()) { + ret = std::move(pre_it->second); + it->second.erase(pre_it); + LOG(ERROR) << "found future height:" << height; + } + return ret; +} + +bool ProposalGraph::VerifyParent(const Proposal& proposal) { + // LOG(ERROR) << "last commit:" << latest_commit_.header().proposal_id() + // << " current :" << proposal.header().proposal_id() + // << " height:" << proposal.header().height(); + + if (proposal.header().prehash() == latest_commit_.header().hash()) { + return true; + } + + std::string prehash = proposal.header().prehash(); + // LOG(ERROR)<<"prehash:"<<prehash; + + auto it = node_info_.find(prehash); + if (it == node_info_.end()) { + LOG(ERROR) << "prehash not here"; + not_found_proposal_[proposal.header().height()][proposal.header().prehash()] + .push_back(std::make_unique<Proposal>(proposal)); + return false; + } else { + if (proposal.header().height() != + it->second->proposal.header().height() + 1) { + LOG(ERROR) << "link to invalid proposal, height:" + << proposal.header().height() + << " pre height:" << it->second->proposal.header().height(); + return false; + } + } + return true; +} + +void ProposalGraph::UpdateHistory(Proposal* proposal) { + proposal->mutable_history()->Clear(); + std::string hash = proposal->header().hash(); + + for (int i = 0; i < 3 && !hash.empty(); ++i) { + auto node_it = node_info_.find(hash); + auto his = proposal->add_history(); + his->set_hash(hash); + his->set_state(node_it->second->state); + his->set_sender(node_it->second->proposal.header().proposer_id()); + his->set_id(node_it->second->proposal.header().proposal_id()); + hash = node_it->second->proposal.header().prehash(); + } +} + +Proposal* ProposalGraph::GetStrongestProposal() { + // LOG(ERROR) << "get strong proposal from height:" << current_height_; + if (last_node_.find(current_height_) == last_node_.end()) { + LOG(ERROR) << "no data:" << current_height_; + return nullptr; + } + + NodeInfo* sp = nullptr; + for (const auto& last_hash : last_node_[current_height_]) { + NodeInfo* node_info = node_info_[last_hash].get(); + if (sp == nullptr || Compare(*sp, *node_info)) { + sp = node_info; + } + } + + UpdateHistory(&sp->proposal); + //LOG(ERROR) << "get strong proposal from height:" << current_height_ << " ->(" + // << sp->proposal.header().proposer_id() << "," + // << sp->proposal.header().proposal_id() << ")"; + return &sp->proposal; +} + +bool ProposalGraph::Cmp(int id1, int id2) { + // LOG(ERROR) << "commit commit num:" << id1 << " " << id2 + // << " commit time:" << commit_num_[id1] << " " << + // commit_num_[id2]; + if (commit_num_[id1] + 1 < commit_num_[id2]) { + return false; + } + + if (commit_num_[id1] > commit_num_[id2] + 1) { + return true; + } + return id1 < id2; +} + +int ProposalGraph::StateScore(const ProposalState& state) { + // return state == ProposalState::Prepared? 1:0; + return state; +} + +int ProposalGraph::CompareState(const ProposalState& state1, + const ProposalState& state2) { + // LOG(ERROR) << "check state:" << state1 << " " << state2; + return StateScore(state1) - StateScore(state2); +} + +// p1 < p2 +bool ProposalGraph::Compare(const NodeInfo& p1, const NodeInfo& p2) { + // LOG(ERROR) << "proposer:" << p1.proposal.header().proposer_id() << " " + // << p2.proposal.header().proposer_id() + // << "height:" << p1.proposal.header().height() << " " + // << p2.proposal.header().height(); + if (p1.proposal.header().height() != p2.proposal.header().height()) { + return p1.proposal.header().height() < p2.proposal.header().height(); + } + // LOG(ERROR)<<"proposer:"<<p1.proposal.header().proposer_id()<<" + // "<<p2.proposal.header().proposer_id(); + if (CompareState(p1.state, p2.state) != 0) { + return CompareState(p1.state, p2.state) < 0; + } + + if (p1.proposal.header().proposer_id() == + p2.proposal.header().proposer_id()) { + return p1.proposal.header().proposal_id() < + p2.proposal.header().proposal_id(); + } + + return Cmp(p1.proposal.header().proposer_id(), + p2.proposal.header().proposer_id()); +} + +Proposal* ProposalGraph::GetLatestStrongestProposal() { + Proposal* sp = GetStrongestProposal(); + if (sp == nullptr) { + if (current_height_ > 0) { + assert(1 == 0); + } + return &latest_commit_; + } + // LOG(ERROR) << "====== get strong proposal from:" << + // sp->header().proposer_id() + // << " id:" << sp->header().proposal_id(); + return sp; +} + +ProposalState ProposalGraph::GetProposalState(const std::string& hash) const { + auto node_it = node_info_.find(hash); + if (node_it == node_info_.end()) { + return ProposalState::None; + } + return node_it->second->state; +} + +const Proposal* ProposalGraph::GetProposalInfo(const std::string& hash) const { + auto it = node_info_.find(hash); + if (it == node_info_.end()) { + LOG(ERROR) << "hash not found:" << Encode(hash); + return nullptr; + } + return &it->second->proposal; +} + +int ProposalGraph::GetCurrentHeight() { return current_height_; } + +std::vector<Proposal*> ProposalGraph::GetNewProposals(int height) { + std::vector<Proposal*> ps; + for (auto it : new_proposals_) { + if (it.second->header().height() >= height) { + continue; + } + ps.push_back(it.second); + } + for (Proposal* p : ps) { + new_proposals_.erase(new_proposals_.find(p->header().hash())); + } + return ps; +} + +} // namespace cassandra_cft +} // namespace resdb diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.h b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.h new file mode 100644 index 00000000..a0917b54 --- /dev/null +++ b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.h @@ -0,0 +1,85 @@ +#pragma once + +#include <map> + +#include "platform/consensus/ordering/cassandra_cft/algorithm/proposal_state.h" +#include "platform/consensus/ordering/cassandra_cft/algorithm/ranking.h" +#include "platform/consensus/ordering/cassandra_cft/proto/proposal.pb.h" +#include "platform/statistic/stats.h" + +namespace resdb { +namespace cassandra_cft { + +class ProposalGraph { + public: + ProposalGraph(int fault_num); + inline void SetCommitCallBack(std::function<void(const Proposal&)> func) { + commit_callback_ = func; + } + + int AddProposal(const Proposal& proposal); + void AddProposalOnly(const Proposal& proposal); + + Proposal* GetLatestStrongestProposal(); + const Proposal* GetProposalInfo(const std::string& hash) const; + + int GetCurrentHeight(); + + void Clear(const std::string& hash); + void IncreaseHeight(); + ProposalState GetProposalState(const std::string& hash) const; + + std::vector<std::unique_ptr<Proposal>> GetNotFound(int height, + const std::string& hash); + + std::vector<Proposal*> GetNewProposals(int height); + + private: + struct NodeInfo { + Proposal proposal; + ProposalState state; + int score; + int is_main; + // std::set<int> received_num[5]; + std::map<int, std::set<int>> votes; + + NodeInfo(const Proposal& proposal) + : proposal(proposal), state(ProposalState::New), score(0), is_main(0) {} + }; + + bool VerifyParent(const Proposal& proposal); + + bool Compare(const NodeInfo& p1, const NodeInfo& p2); + bool Cmp(int id1, int id2); + int StateScore(const ProposalState& state); + int CompareState(const ProposalState& state1, const ProposalState& state2); + + Proposal* GetStrongestProposal(); + + void UpdateHistory(Proposal* proposal); + int CheckState(NodeInfo* node_info, ProposalState state); + void UpgradeState(ProposalState& state); + void TryUpgradeHeight(int height); + + void Commit(const std::string& hash); + + private: + Proposal latest_commit_; + std::map<std::string, std::vector<std::string>> g_; + std::map<std::string, std::unique_ptr<NodeInfo>> node_info_; + std::map<std::string, std::vector<VoteMessage>> not_found_; + std::unique_ptr<Ranking> ranking_; + std::map<int, int> commit_num_; + std::map<int, std::set<std::string>> last_node_; + int current_height_; + uint32_t f_; + std::function<void(const Proposal&)> commit_callback_; + std::map<int, std::set<int>> pending_header_; + std::map<int, std::map<std::string, std::vector<std::unique_ptr<Proposal>>>> + not_found_proposal_; + std::map<std::string, Proposal*> new_proposals_; + Stats* global_stats_; +}; + +} // namespace cassandra_cft +} // namespace resdb diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/proposal_manager.cpp b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_manager.cpp new file mode 100644 index 00000000..ad163356 --- /dev/null +++ b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_manager.cpp @@ -0,0 +1,238 @@ +#include "platform/consensus/ordering/cassandra_cft/algorithm/proposal_manager.h" + +#include <glog/logging.h> + +#include "common/crypto/signature_verifier.h" +#include "common/utils/utils.h" + +namespace resdb { +namespace cassandra_cft { + +ProposalManager::ProposalManager(int32_t id, int f, int total_num, ProposalGraph* graph) + : id_(id), f_(f), total_num_(total_num), graph_(graph) { + local_proposal_id_ = 1; + round_ = 1; + global_stats_ = Stats::GetGlobalStats(); +} + +int ProposalManager::CurrentRound() { + return round_; +} + +std::unique_ptr<Proposal> ProposalManager::MakeBlock( + std::vector<std::unique_ptr<Transaction>>& txns) { + auto proposal = std::make_unique<Proposal>(); + for (const auto& txn : txns) { + *proposal->add_transactions() = *txn; + } + + proposal->mutable_header()->set_height(round_++); + proposal->mutable_header()->set_proposer_id(id_); + proposal->set_sender(id_); + //LOG(ERROR)<<" make proposal round:"<<proposal->header().height(); + //proposal->set_local_id(local_block_id_++); + GeneateGraph(proposal.get()); + return proposal; +} + +void ProposalManager::GeneateGraph(Proposal * proposal) { + int round = proposal->header().height(); + std::unique_lock<std::mutex> lk(mutex_); + for(auto it : proposals_) { + int proposer = it.first; + if(it.second.round() < round) { + //LOG(ERROR)<<" add graph proposal round:"<<it.second.round()<<" proposer:"<<proposer; + *proposal->add_node_info() = it.second; + } + else { + if(round>1) { + //LOG(ERROR)<<" add graph pre proposal round:"<<it.second.round()<<" proposer:"<<proposer; + assert(proposals_rounds_[round-1][proposer]->proposal !=nullptr); + *proposal->add_node_info() = GetNodeInfo(*proposals_rounds_[round-1][proposer]->proposal); + } + } + } + if(round > 1) { + GenerateVoteInfo(proposal); + } + //LOG(ERROR)<<" ====== "<<round; +} + +void ProposalManager::GenerateVoteInfo(Proposal * proposal) { + NodeInfo * vote_info = proposal->mutable_vote(); + int proposer = 1; + int vote_round = -1; + int round = proposal->header().height()-1; + for(const auto& p : proposal->node_info()) { + //LOG(ERROR)<<" node info proposer:"<<p.proposer()<<" round:"<<p.round()<<" vote record:"<<vote_record_[p.proposer()]; + if(p.round() != round) { + continue; + } + if(vote_record_[p.proposer()] > vote_round) { + vote_round = vote_record_[p.proposer()]; + proposer = p.proposer(); + } + } + //LOG(ERROR)<<" generate vote:"<<proposer<<" vote round:"<<vote_round<<" round:"<<round; + assert(vote_round>=0); + vote_info->set_proposer(proposer); + vote_info->set_round(round); +} + +void ProposalManager::AddProposal(std::unique_ptr<Proposal> proposal) { + int round = proposal->header().height(); + int proposer = proposal->header().proposer_id(); + //LOG(ERROR)<<" add proposal round:"<<round<<" from proposer:"<<proposer; + NodeInfo node_info = GetNodeInfo(*proposal); + { + std::unique_lock<std::mutex> lk(mutex_); + /* + if(round > 1) { + AddVote(*proposal); + } + */ + proposals_[proposer] = node_info; + LOG(ERROR)<<" add proposal round:"<<round<<" from proposer:"<<proposer; + proposals_rounds_[round][proposer] = std::make_unique<ProposalInfo>(std::move(proposal)); + //LOG(ERROR)<<" add proposal round:"<<round<<" from proposer:"<<proposer; + assert(proposals_rounds_[round][proposer]->proposal != nullptr); + } +} + +std::unique_ptr<Proposal> ProposalManager::FetchProposal(const NodeInfo& info) { + int round = info.round(); + int proposer = info.proposer(); + std::unique_lock<std::mutex> lk(mutex_); + LOG(ERROR)<<" fetch proposer:"<<proposer<<" round:"<<round; + assert(proposals_rounds_[round][proposer]->proposal !=nullptr); + return std::move(proposals_rounds_[round][proposer]->proposal); +} + +void ProposalManager::AddVote(const Proposal& proposal) { + NodeInfo vote_info = proposal.vote(); + int round = vote_info.round(); + int proposer = vote_info.proposer(); + //LOG(ERROR)<<" add vote:"<<round<<" proposer:"<<proposer; + ProposalInfo *pi = proposals_rounds_[round][proposer].get(); + assert(pi != nullptr); + pi->vote.insert(proposal.header().proposer_id()); + if(pi->vote.size() == f_+1) { + //LOG(ERROR)<<" vote done:"<<round<<" proposer:"<<proposer; + assert(round > vote_record_[proposer]); + vote_record_[proposer] = round; + vote_proposer_[round] = proposer; + } +} + + +void ProposalManager::CheckVote(int round) { + + if(round < 1) { + return; + } + + int leader = -1; + std::map<int,int> num; + { + std::unique_lock<std::mutex> lk(mutex_); + for(auto& it : proposals_rounds_[round+1]) { + ProposalInfo *rpi = it.second.get(); + Proposal * p = rpi->proposal.get(); + { + NodeInfo vote_info = p->vote(); + int round = vote_info.round(); + int proposer = vote_info.proposer(); + num[proposer]++; + if(num[proposer] == f_+1) { + leader = proposer; + } + } + } + } + +/* + //LOG(ERROR)<<" add vote:"<<round<<" proposer:"<<proposer<<" from round:"<<round+1<<" proposer:"<<p->header().proposer_id(); + ProposalInfo *pi = proposals_rounds_[round][proposer].get(); + + assert(pi != nullptr); + pi->vote.insert(p->header().proposer_id()); + if(pi->vote.size() == f_+1) { + //LOG(ERROR)<<" vote done:"<<round<<" proposer:"<<proposer; + assert(round > vote_record_[proposer]); + vote_record_[proposer] = round; + leader = proposer; + break; + } + } + } + */ + + //LOG(ERROR)<<" check vote:"<<round; + int proposer = leader; + //LOG(ERROR)<<" check vote:"<<round<<" proposer:"<<proposer; + + std::unique_ptr<Proposal> commit_p = nullptr; + while(commit_p == nullptr){ + std::unique_lock<std::mutex> lk(mutex_); + ProposalInfo *pi = proposals_rounds_[round][proposer].get(); + //LOG(ERROR)<<" get pi:"<<(pi == nullptr); + if(pi == nullptr) { + usleep(100); + continue; + } + commit_p = std::move(pi->proposal); + break; + } + //assert(pi != nullptr); + //assert(pi->vote.size() >= f_+1); + //LOG(ERROR)<<" commit vote:"<<round<<" proposer:"<<proposer; + CommitProposal(std::move(commit_p)); + //CommitProposal(std::move(pi->proposal)); + //vote_proposer_.erase(vote_proposer_.find(round)); + //LOG(ERROR)<<" check vote done"; +} + +NodeInfo ProposalManager::GetNodeInfo(const Proposal& proposal) { + int round = proposal.header().height(); + int proposer = proposal.header().proposer_id(); + NodeInfo node_info; + node_info.set_round(round); + node_info.set_proposer(proposer); + return node_info; +} + +void ProposalManager::CommitProposal(std::unique_ptr<Proposal> proposal) { + commit_func_(std::move(proposal)); +} + +bool ProposalManager::Ready() { + if(round_ == 1){ + return true; + } + std::unique_lock<std::mutex> lk(mutex_); + //LOG(ERROR)<<" round size:"<<proposals_rounds_[round_-1].size()<<" total:"<<total_num_<<" round:"<<round_; + return proposals_rounds_[round_-1].size() == total_num_; +} + +bool ProposalManager::Ready(int round) { + if(round < 1){ + return true; + } + std::unique_lock<std::mutex> lk(mutex_); + //LOG(ERROR)<<" round size:"<<proposals_rounds_[round].size()<<" total:"<<total_num_<<" round:"<<round; + return proposals_rounds_[round].size() == total_num_; +} + +bool ProposalManager::MayReady(int round){ + if(round < 1){ + return true; + } + std::unique_lock<std::mutex> lk(mutex_); + //LOG(ERROR)<<" round size:"<<proposals_rounds_[round].size()<<" total:"<<total_num_<<" round:"<<round; + return proposals_rounds_[round].size() >= f_+1; + +} + + +} // namespace cassandra_cft +} // namespace resdb diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/proposal_manager.h b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_manager.h new file mode 100644 index 00000000..05f7a2af --- /dev/null +++ b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_manager.h @@ -0,0 +1,64 @@ +#pragma once + +#include <condition_variable> +#include <list> + +#include "platform/consensus/ordering/cassandra_cft/algorithm/proposal_graph.h" +#include "platform/consensus/ordering/cassandra_cft/proto/proposal.pb.h" +#include "platform/statistic/stats.h" + +namespace resdb { +namespace cassandra_cft { + +struct ProposalInfo { + std::unique_ptr<Proposal> proposal; + std::set<int> vote; + ProposalInfo(std::unique_ptr<Proposal> proposal) : proposal(std::move(proposal)){} +}; + +class ProposalManager { + public: + ProposalManager(int32_t id, int f, int total_num, ProposalGraph* graph); + + std::unique_ptr<Proposal> MakeBlock( + std::vector<std::unique_ptr<Transaction>>& txns); + + void GeneateGraph(Proposal * proposal); + NodeInfo GetNodeInfo(const Proposal& proposal); + std::unique_ptr<Proposal> FetchProposal(const NodeInfo& info); + bool Ready() ; + void AddProposal(std::unique_ptr<Proposal> proposal); + void AddVote(const Proposal& proposal); + void GenerateVoteInfo(Proposal * proposal); +int CurrentRound(); +bool MayReady(int round); + + void CommitProposal(std::unique_ptr<Proposal> proposal); + void SetCommitCallBack(std::function<void(std::unique_ptr<Proposal>)> func) { + commit_func_ = func; + } + + bool Ready(int round); + void CheckVote(int round); + + private: + int32_t id_; + int f_; + int total_num_; + ProposalGraph* graph_; + int64_t local_proposal_id_ = 1, local_block_id_ = 1; + int round_; + + std::map<int, NodeInfo > proposals_; + std::map<int, std::map<int, std::unique_ptr<ProposalInfo>> > proposals_rounds_; + std::map<int, int> vote_record_; + std::function<void(std::unique_ptr<Proposal>)> commit_func_; + std::map<int, int> vote_proposer_; + + std::mutex mutex_; + + Stats* global_stats_; +}; + +} // namespace cassandra_cft +} // namespace resdb diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/proposal_state.h b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_state.h new file mode 100644 index 00000000..ba33525e --- /dev/null +++ b/platform/consensus/ordering/cassandra_cft/algorithm/proposal_state.h @@ -0,0 +1,16 @@ +#pragma once + +namespace resdb { +namespace cassandra_cft { + +enum ProposalState { + None = 0, + New = 1, + Voted = 2, + Prepared = 3, + PreCommit = 4, + Committed = 5, +}; + +} +} // namespace resdb diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/ranking.cpp b/platform/consensus/ordering/cassandra_cft/algorithm/ranking.cpp new file mode 100644 index 00000000..cf3871f5 --- /dev/null +++ b/platform/consensus/ordering/cassandra_cft/algorithm/ranking.cpp @@ -0,0 +1,10 @@ + +#include "platform/consensus/ordering/cassandra_cft/algorithm/ranking.h" + +namespace resdb { +namespace cassandra_cft { + +int Ranking::GetRank(int proposer_id) { return proposer_id; } + +} // namespace cassandra_cft +} // namespace resdb diff --git a/platform/consensus/ordering/cassandra_cft/algorithm/ranking.h b/platform/consensus/ordering/cassandra_cft/algorithm/ranking.h new file mode 100644 index 00000000..925af9be --- /dev/null +++ b/platform/consensus/ordering/cassandra_cft/algorithm/ranking.h @@ -0,0 +1,12 @@ +#pragma once + +namespace resdb { +namespace cassandra_cft { + +class Ranking { + public: + int GetRank(int proposer_id); +}; + +} // namespace cassandra_cft +} // namespace resdb diff --git a/platform/consensus/ordering/cassandra_cft/framework/BUILD b/platform/consensus/ordering/cassandra_cft/framework/BUILD new file mode 100644 index 00000000..f9fcfe10 --- /dev/null +++ b/platform/consensus/ordering/cassandra_cft/framework/BUILD @@ -0,0 +1,16 @@ +package(default_visibility = ["//visibility:private"]) + +cc_library( + name = "consensus", + srcs = ["consensus.cpp"], + hdrs = ["consensus.h"], + visibility = [ + "//visibility:public", + ], + deps = [ + "//common/utils", + "//platform/consensus/ordering/common/framework:consensus", + "//platform/consensus/ordering/cassandra_cft/algorithm:cassandra", + ], +) + diff --git a/platform/consensus/ordering/multipaxos/framework/consensus.cpp b/platform/consensus/ordering/cassandra_cft/framework/consensus.cpp similarity index 60% copy from platform/consensus/ordering/multipaxos/framework/consensus.cpp copy to platform/consensus/ordering/cassandra_cft/framework/consensus.cpp index b20d4e5e..de88e159 100644 --- a/platform/consensus/ordering/multipaxos/framework/consensus.cpp +++ b/platform/consensus/ordering/cassandra_cft/framework/consensus.cpp @@ -23,7 +23,7 @@ * */ -#include "platform/consensus/ordering/multipaxos/framework/consensus.h" +#include "platform/consensus/ordering/cassandra_cft/framework/consensus.h" #include <glog/logging.h> #include <unistd.h> @@ -31,57 +31,49 @@ #include "common/utils/utils.h" namespace resdb { -namespace multipaxos { +namespace cassandra_cft { Consensus::Consensus(const ResDBConfig& config, std::unique_ptr<TransactionManager> executor) - : common::Consensus(config, std::move(executor)) { - - Init(); - + : common::Consensus(config, std::move(executor)){ int total_replicas = config_.GetReplicaNum(); int f = (total_replicas - 1) / 3; + Init(); + + start_ = 0; if (config_.GetPublicKeyCertificateInfo() .public_key() .public_key_info() .type() != CertificateKeyInfo::CLIENT) { - multipaxos_= std::make_unique<MultiPaxos>(config_.GetSelfInfo().id(), f, total_replicas, GetSignatureVerifier()); - InitProtocol(multipaxos_.get()); + cassandra_cft_ = std::make_unique<Cassandra>( + config_.GetSelfInfo().id(), f, + total_replicas, config.GetConfigData().failure_mode()); + + InitProtocol(cassandra_cft_.get()); + + + cassandra_cft_->SetPrepareFunction([&](const Transaction& msg) { + return Prepare(msg); + }); } } int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) { - //LOG(ERROR)<<"recv request:"<<MessageType_Name(request->user_type()); - //int64_t current_time = GetCurrentTime(); - - if(request->user_type() == MessageType::Propose) { - std::unique_ptr<Proposal> p = std::make_unique<Proposal>(); - if (!p->ParseFromString(request->data())) { + //LOG(ERROR)<<"receive commit:"<<request->type()<<" "<<MessageType_Name(request->user_type()); + if (request->user_type() == MessageType::NewProposal) { + // LOG(ERROR)<<"receive proposal:"; + std::unique_ptr<Proposal> proposal = std::make_unique<Proposal>(); + if (!proposal->ParseFromString(request->data())) { LOG(ERROR) << "parse proposal fail"; assert(1 == 0); return -1; } - multipaxos_->ReceiveProposal(std::move(p)); - } - else if(request->user_type() == MessageType::Query) { - std::unique_ptr<Proposal> p = std::make_unique<Proposal>(); - if (!p->ParseFromString(request->data())) { - LOG(ERROR) << "parse proposal fail"; - assert(1 == 0); + if (!cassandra_cft_->ReceiveProposal(std::move(proposal))) { return -1; } - //multipaxos_->ReceivePropose(std::move(p)); - } - else if(request->user_type() == MessageType::Learn) { - std::unique_ptr<Proposal> p = std::make_unique<Proposal>(); - if (!p->ParseFromString(request->data())) { - LOG(ERROR) << "parse proposal fail"; - assert(1 == 0); - return -1; - } - multipaxos_->ReceiveLearn(std::move(p)); - } + return 0; + } return 0; } @@ -90,8 +82,8 @@ int Consensus::ProcessNewTransaction(std::unique_ptr<Request> request) { txn->set_data(request->data()); txn->set_hash(request->hash()); txn->set_proxy_id(request->proxy_id()); - txn->set_user_seq(request->user_seq()); - return multipaxos_->ReceiveTransaction(std::move(txn)); + //LOG(ERROR)<<"receive txn"; + return cassandra_cft_->ReceiveTransaction(std::move(txn)); } int Consensus::CommitMsg(const google::protobuf::Message& msg) { @@ -99,15 +91,33 @@ int Consensus::CommitMsg(const google::protobuf::Message& msg) { } int Consensus::CommitMsgInternal(const Transaction& txn) { - //LOG(ERROR)<<"commit txn:"<<txn.id()<<" proxy id:"<<txn.proxy_id(); + //LOG(ERROR)<<"commit txn:"<<txn.id()<<" proxy id:"<<txn.proxy_id()<<" uid:"<<txn.uid(); std::unique_ptr<Request> request = std::make_unique<Request>(); + request->set_queuing_time(txn.queuing_time()); request->set_data(txn.data()); request->set_seq(txn.id()); - request->set_proxy_id(txn.proxy_id()); + request->set_uid(txn.uid()); + //if (txn.proposer_id() == config_.GetSelfInfo().id()) { + request->set_proxy_id(txn.proxy_id()); + // LOG(ERROR)<<"commit txn:"<<txn.id()<<" proxy id:"<<request->uid(); + //assert(request->uid()>0); + //} + transaction_executor_->AddExecuteMessage(std::move(request)); return 0; } -} // namespace fairdag +int Consensus::Prepare(const Transaction& txn) { + // LOG(ERROR)<<"prepare txn:"<<txn.id()<<" proxy id:"<<txn.proxy_id()<<" + // uid:"<<txn.uid(); + std::unique_ptr<Request> request = std::make_unique<Request>(); + request->set_data(txn.data()); + request->set_uid(txn.uid()); + transaction_executor_->Prepare(std::move(request)); + return 0; +} + + +} // namespace cassandra_cft } // namespace resdb diff --git a/platform/consensus/ordering/multipaxos/framework/consensus.h b/platform/consensus/ordering/cassandra_cft/framework/consensus.h similarity index 81% copy from platform/consensus/ordering/multipaxos/framework/consensus.h copy to platform/consensus/ordering/cassandra_cft/framework/consensus.h index 4d08c3ac..956bda2b 100644 --- a/platform/consensus/ordering/multipaxos/framework/consensus.h +++ b/platform/consensus/ordering/cassandra_cft/framework/consensus.h @@ -26,28 +26,34 @@ #pragma once #include "executor/common/transaction_manager.h" -#include "platform/consensus/execution/transaction_executor.h" -#include "platform/consensus/ordering/multipaxos/algorithm/multipaxos.h" #include "platform/consensus/ordering/common/framework/consensus.h" +#include "platform/consensus/ordering/cassandra_cft/algorithm/cassandra.h" #include "platform/networkstrate/consensus_manager.h" namespace resdb { -namespace multipaxos { +namespace cassandra_cft { -class Consensus : public common::Consensus{ +class Consensus : public common::Consensus { public: Consensus(const ResDBConfig& config, std::unique_ptr<TransactionManager> transaction_manager); + virtual ~Consensus() = default; - protected: + private: int ProcessCustomConsensus(std::unique_ptr<Request> request) override; int ProcessNewTransaction(std::unique_ptr<Request> request) override; int CommitMsg(const google::protobuf::Message& msg) override; int CommitMsgInternal(const Transaction& txn); - private: - std::unique_ptr<MultiPaxos> multipaxos_; + int Prepare(const Transaction& txn); + + protected: + std::unique_ptr<Cassandra> cassandra_cft_; + Stats* global_stats_; + int64_t start_; + std::mutex mutex_; + int send_num_[200]; }; -} // namespace tusk +} // namespace cassandra_cft } // namespace resdb diff --git a/platform/consensus/ordering/cassandra_cft/framework/consensus_test.cpp b/platform/consensus/ordering/cassandra_cft/framework/consensus_test.cpp new file mode 100644 index 00000000..2c8834a8 --- /dev/null +++ b/platform/consensus/ordering/cassandra_cft/framework/consensus_test.cpp @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#include "platform/consensus/ordering/cassandra/framework/consensus.h" + +#include <glog/logging.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <future> + +#include "common/test/test_macros.h" +#include "executor/common/mock_transaction_manager.h" +#include "platform/config/resdb_config_utils.h" +#include "platform/networkstrate/mock_replica_communicator.h" + +namespace resdb { +namespace cassandra { +namespace { + +using ::resdb::testing::EqualsProto; +using ::testing::_; +using ::testing::Invoke; +using ::testing::Test; + +ResDBConfig GetConfig() { + ResDBConfig config({GenerateReplicaInfo(1, "127.0.0.1", 1234), + GenerateReplicaInfo(2, "127.0.0.1", 1235), + GenerateReplicaInfo(3, "127.0.0.1", 1236), + GenerateReplicaInfo(4, "127.0.0.1", 1237)}, + GenerateReplicaInfo(1, "127.0.0.1", 1234)); + return config; +} + +class ConsensusTest : public Test { + public: + ConsensusTest() : config_(GetConfig()) { + auto transaction_manager = + std::make_unique<MockTransactionExecutorDataImpl>(); + mock_transaction_manager_ = transaction_manager.get(); + consensus_ = + std::make_unique<Consensus>(config_, std::move(transaction_manager)); + consensus_->SetCommunicator(&replica_communicator_); + } + + void AddTransaction(const std::string& data) { + auto request = std::make_unique<Request>(); + request->set_type(Request::TYPE_NEW_TXNS); + + Transaction txn; + + BatchUserRequest batch_request; + auto req = batch_request.add_user_requests(); + req->mutable_request()->set_data(data); + + batch_request.set_local_id(1); + batch_request.SerializeToString(txn.mutable_data()); + + txn.SerializeToString(request->mutable_data()); + + EXPECT_EQ(consensus_->ConsensusCommit(nullptr, std::move(request)), 0); + } + + protected: + ResDBConfig config_; + MockTransactionExecutorDataImpl* mock_transaction_manager_; + MockReplicaCommunicator replica_communicator_; + std::unique_ptr<TransactionManager> transaction_manager_; + std::unique_ptr<Consensus> consensus_; +}; + +TEST_F(ConsensusTest, NormalCase) { + std::promise<bool> commit_done; + std::future<bool> commit_done_future = commit_done.get_future(); + + EXPECT_CALL(replica_communicator_, BroadCast) + .WillRepeatedly(Invoke([&](const google::protobuf::Message& msg) { + Request request = *dynamic_cast<const Request*>(&msg); + + if (request.user_type() == MessageType::NewProposal) { + LOG(ERROR) << "bc new proposal"; + consensus_->ConsensusCommit(nullptr, + std::make_unique<Request>(request)); + LOG(ERROR) << "recv proposal done"; + } + if (request.user_type() == MessageType::Vote) { + LOG(ERROR) << "bc vote"; + + VoteMessage ack_msg; + assert(ack_msg.ParseFromString(request.data())); + for (int i = 1; i <= 3; ++i) { + ack_msg.set_proposer_id(i); + auto new_req = std::make_unique<Request>(request); + ack_msg.SerializeToString(new_req->mutable_data()); + + consensus_->ConsensusCommit(nullptr, std::move(new_req)); + } + } + // LOG(ERROR)<<"bc type:"<<request->type()<<" user + // type:"<<request->user_type(); + if (request.user_type() == MessageType::Prepare) { + LOG(ERROR) << "bc prepare"; + + VoteMessage ack_msg; + assert(ack_msg.ParseFromString(request.data())); + for (int i = 1; i <= 3; ++i) { + ack_msg.set_proposer_id(i); + auto new_req = std::make_unique<Request>(request); + ack_msg.SerializeToString(new_req->mutable_data()); + + consensus_->ConsensusCommit(nullptr, std::move(new_req)); + } + } + if (request.user_type() == MessageType::Voteprep) { + LOG(ERROR) << "bc voterep:"; + + VoteMessage ack_msg; + assert(ack_msg.ParseFromString(request.data())); + for (int i = 1; i <= 3; ++i) { + ack_msg.set_proposer_id(i); + auto new_req = std::make_unique<Request>(request); + ack_msg.SerializeToString(new_req->mutable_data()); + LOG(ERROR) << "new request type:" << new_req->user_type(); + + consensus_->ConsensusCommit(nullptr, std::move(new_req)); + } + } + LOG(ERROR) << "done"; + return 0; + })); + + EXPECT_CALL(*mock_transaction_manager_, ExecuteData) + .WillOnce(Invoke([&](const std::string& msg) { + LOG(ERROR) << "execute txn:" << msg; + EXPECT_EQ(msg, "transaction1"); + return nullptr; + })); + + EXPECT_CALL(replica_communicator_, SendMessage(_, 0)) + .WillRepeatedly( + Invoke([&](const google::protobuf::Message& msg, int64_t) { + Request request = *dynamic_cast<const Request*>(&msg); + if (request.type() == Request::TYPE_RESPONSE) { + LOG(ERROR) << "get response"; + commit_done.set_value(true); + } + return; + })); + + AddTransaction("transaction1"); + + commit_done_future.get(); +} + +} // namespace +} // namespace cassandra +} // namespace resdb diff --git a/platform/consensus/ordering/cassandra_cft/proto/BUILD b/platform/consensus/ordering/cassandra_cft/proto/BUILD new file mode 100644 index 00000000..87992193 --- /dev/null +++ b/platform/consensus/ordering/cassandra_cft/proto/BUILD @@ -0,0 +1,16 @@ +package(default_visibility = ["//platform/consensus/ordering/cassandra_cft:__subpackages__"]) + +load("@rules_cc//cc:defs.bzl", "cc_proto_library") +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@rules_proto_grpc//python:defs.bzl", "python_proto_library") + +proto_library( + name = "proposal_proto", + srcs = ["proposal.proto"], + #visibility = ["//visibility:public"], +) + +cc_proto_library( + name = "proposal_cc_proto", + deps = [":proposal_proto"], +) diff --git a/platform/consensus/ordering/cassandra_cft/proto/proposal.proto b/platform/consensus/ordering/cassandra_cft/proto/proposal.proto new file mode 100644 index 00000000..81e2ce79 --- /dev/null +++ b/platform/consensus/ordering/cassandra_cft/proto/proposal.proto @@ -0,0 +1,124 @@ + +syntax = "proto3"; + +package resdb.cassandra_cft; + +message Transaction{ + int32 id = 1; + bytes data = 2; + bytes hash = 3; + int32 proxy_id = 4; + int32 proposer_id = 5; + int64 uid = 6; + int64 create_time = 7; + int64 queuing_time = 8; +} + +message Header { + bytes hash = 1; + int32 height = 2; + int32 proposer_id = 3; + int32 proposal_id = 4; + bytes prehash = 5; +} + +message History { + bytes hash = 1; + int32 state = 2; + int32 sender = 3; + int32 id = 4; +} + +message WeakProposal { + repeated bytes hash = 1; +} + +message Proposal { + Header header = 1; + repeated Transaction transactions = 2; + uint64 create_time = 3; + repeated NodeInfo node_info = 4; + repeated History history = 5; + int32 sender = 6; + NodeInfo vote = 7; +}; + +enum MessageType { + NewProposal = 0; + Vote = 1; + Prepare = 2; + VoteAck = 3; + Commit = 4; + Recovery = 5; + NewBlocks = 6; + ProposalAck = 7; + CMD_BlockACK = 8; + CMD_BlockQuery = 9; + CMD_SingleBlock = 10; + CMD_ProposalQuery = 11; + CMD_ProposalQueryResponse = 12; +}; + +message VoteMessage { + bytes hash = 1; + int32 proposer_id = 2; + MessageType type = 3; + int32 sender_id = 4; + int32 proposal_id = 5; +}; + +message CommittedProposals{ + repeated Proposal proposals = 1; + int32 sender_id = 2; +}; + +message HashValue{ + repeated uint64 bits = 1; +}; + +message Block { + message BlockData { + repeated Transaction transaction = 1; + } + BlockData data = 1; + bytes hash = 2; + int32 sender_id = 3; + int32 local_id = 4; + int64 create_time = 5; +} + +message BlockACK { + bytes hash = 2; + int32 sender_id = 3; + int32 local_id = 4; + int32 responder = 5; +} + +message BlockQuery { + bytes hash = 2; + int32 proposer = 3; + int32 local_id = 4; + int32 sender = 5; +} + +message ProposalQuery { + bytes hash = 2; + int32 proposer = 3; + int32 id = 4; + int32 sender = 5; +} + +message ProposalQueryResp { + repeated Proposal proposal = 1; +} + +message ProposalResponse { + Header header = 1; + int32 leader = 2; +} + + +message NodeInfo { + int32 round = 1; + int32 proposer = 2; +} diff --git a/platform/consensus/ordering/common/framework/performance_manager.cpp b/platform/consensus/ordering/common/framework/performance_manager.cpp index 883c6ea7..a5fc7ca8 100644 --- a/platform/consensus/ordering/common/framework/performance_manager.cpp +++ b/platform/consensus/ordering/common/framework/performance_manager.cpp @@ -44,6 +44,7 @@ PerformanceManager::PerformanceManager( stop_ = false; eval_started_ = false; eval_ready_future_ = eval_ready_promise_.get_future(); + LOG(ERROR)<<"?????"; if (config_.GetPublicKeyCertificateInfo() .public_key() .public_key_info() @@ -53,6 +54,7 @@ PerformanceManager::PerformanceManager( std::thread(&PerformanceManager::BatchProposeMsg, this); } } + LOG(ERROR)<<"?????"; global_stats_ = Stats::GetGlobalStats(); send_num_ = 0; total_num_ = 0; @@ -62,6 +64,7 @@ PerformanceManager::PerformanceManager( if (primary_ == 0) primary_ = replica_num_; local_id_ = 1; sum_ = 0; + LOG(ERROR)<<"?????"; } PerformanceManager::~PerformanceManager() { diff --git a/platform/consensus/ordering/multipaxos/algorithm/multipaxos.cpp b/platform/consensus/ordering/multipaxos/algorithm/multipaxos.cpp index a7966898..aab869e5 100644 --- a/platform/consensus/ordering/multipaxos/algorithm/multipaxos.cpp +++ b/platform/consensus/ordering/multipaxos/algorithm/multipaxos.cpp @@ -16,9 +16,11 @@ MultiPaxos::MultiPaxos(int id, int f, int total_num, SignatureVerifier * verifie send_thread_ = std::thread(&MultiPaxos::AsyncSend, this); commit_thread_ = std::thread(&MultiPaxos::AsyncCommit, this); commit_seq_thread_ = std::thread(&MultiPaxos::AsyncCommitSeq, this); - batch_size_ = 10; + learn_thread_ = std::thread(&MultiPaxos::AsyncLearn, this); + batch_size_ = 15; start_seq_ = 1; - } + master_ = 1; +} MultiPaxos::~MultiPaxos() { } @@ -91,15 +93,23 @@ void MultiPaxos::AsyncCommit() { int proposer = p->header().proposer(); int round = p->header().view(); std::unique_ptr<Proposal> data = nullptr; - { + while(data == nullptr) { std::unique_lock<std::mutex> lk(mutex_); auto it = receive_.find(round); + if(it == receive_.end()){ + //LOG(ERROR)<<" proposer:"<<proposer<<" round:"<<round<<" not exist"; + usleep(100); + continue; + } assert(it != receive_.end()); auto dit = it->second.find(proposer); + if(dit == it->second.end()){ + continue; + } assert(dit != it->second.end()); data = std::move(dit->second); - assert(data != nullptr); } + assert(data != nullptr); AddCommitData(std::move(data)); } } @@ -111,10 +121,10 @@ void MultiPaxos::AsyncCommitSeq() { break; } std::unique_ptr<Proposal> data = GetCommitData(); - int proposer = data->header().proposer(); - int round = data->header().view(); - int proposal_seq = data->header().proposal_id(); - LOG(ERROR)<<" commit proposer:"<<proposer<< " round:"<<round<<" seq:"<<proposal_seq; + //int proposer = data->header().proposer(); + //int round = data->header().view(); + //int proposal_seq = data->header().proposal_id(); + //LOG(ERROR)<<" commit proposer:"<<proposer<< " round:"<<round<<" seq:"<<proposal_seq; for(Transaction& txn : *data->mutable_transactions()){ txn.set_id(seq++); Commit(txn); @@ -122,7 +132,34 @@ void MultiPaxos::AsyncCommitSeq() { } } +void MultiPaxos::AsyncLearn() { + int seq = 1; + while (!IsStop()) { + auto proposal = learn_q_.Pop(); + if(proposal == nullptr){ + continue; + } + + int round = proposal->header().view(); + int sender = proposal->sender(); + int proposer = proposal->header().proposer(); + + std::unique_lock<std::mutex> lk(learn_mutex_); + learn_receive_[round].insert(sender); + //LOG(ERROR)<<"RECEIVE proposer learn:"<<round<<" size:"<<learn_receive_[round].size()<<" proposer:"<<proposer<<" sender:"<<sender; + if(learn_receive_[round].size() == f_+1){ + CommitProposal(std::move(proposal)); + } + + } +} + + bool MultiPaxos::ReceiveTransaction(std::unique_ptr<Transaction> txn) { + if(id_ != master_) { + SendMessage(MessageType::Redirect, *txn, master_); + return true; + } txn->set_proposer(id_); txns_.Push(std::move(txn)); return true; @@ -135,8 +172,8 @@ bool MultiPaxos::ReceiveProposal(std::unique_ptr<Proposal> proposal) { bool done = false; { - //LOG(ERROR)<<"recv proposal from:"<<proposer<<" round:"<<round<<" seq:"<<seq; std::unique_lock<std::mutex> lk(mutex_); + //LOG(ERROR)<<"recv proposal from:"<<proposer<<" round:"<<round<<" seq:"<<seq; receive_[round][proposer] = std::move(proposal); done = true; } @@ -152,16 +189,7 @@ bool MultiPaxos::ReceiveProposal(std::unique_ptr<Proposal> proposal) { } bool MultiPaxos::ReceiveLearn(std::unique_ptr<Proposal> proposal) { - int round = proposal->header().view(); - int sender = proposal->sender(); - int proposer = proposal->header().proposer(); - - std::unique_lock<std::mutex> lk(learn_mutex_); - learn_receive_[round].insert(sender); - LOG(ERROR)<<"RECEIVE proposer learn:"<<round<<" size:"<<learn_receive_[round].size()<<" proposer:"<<proposer; - if(learn_receive_[round].size() == f_+1){ - CommitProposal(std::move(proposal)); - } + learn_q_.Push(std::move(proposal)); return true; } diff --git a/platform/consensus/ordering/multipaxos/algorithm/multipaxos.h b/platform/consensus/ordering/multipaxos/algorithm/multipaxos.h index 6ade5312..e9838593 100644 --- a/platform/consensus/ordering/multipaxos/algorithm/multipaxos.h +++ b/platform/consensus/ordering/multipaxos/algorithm/multipaxos.h @@ -26,6 +26,7 @@ class MultiPaxos: public common::ProtocolBase { void AsyncSend(); void AsyncCommit(); void AsyncCommitSeq(); + void AsyncLearn(); void CommitProposal(std::unique_ptr<Proposal> p); @@ -36,11 +37,11 @@ class MultiPaxos: public common::ProtocolBase { private: LockFreeQueue<Transaction> txns_; - LockFreeQueue<Proposal> commit_q_; + LockFreeQueue<Proposal> commit_q_, learn_q_; std::unique_ptr<ProposalManager> proposal_manager_; - std::thread send_thread_, commit_thread_, commit_seq_thread_; + std::thread send_thread_, commit_thread_, commit_seq_thread_, learn_thread_; int batch_size_; @@ -54,6 +55,7 @@ class MultiPaxos: public common::ProtocolBase { std::map<int, std::unique_ptr<Proposal> > commit_data_; std::condition_variable vote_cv_; int start_seq_; + int master_; }; } // namespace tusk diff --git a/platform/consensus/ordering/multipaxos/algorithm/proposal_manager.cpp b/platform/consensus/ordering/multipaxos/algorithm/proposal_manager.cpp index 1a55f8db..2dd3e410 100644 --- a/platform/consensus/ordering/multipaxos/algorithm/proposal_manager.cpp +++ b/platform/consensus/ordering/multipaxos/algorithm/proposal_manager.cpp @@ -11,7 +11,6 @@ ProposalManager::ProposalManager(int32_t id, int limit_count, SignatureVerifier* : id_(id) { round_ = 1; seq_ = 1; - assert(verifier_ != nullptr); } std::unique_ptr<Proposal> ProposalManager::GenerateProposal( diff --git a/platform/consensus/ordering/multipaxos/framework/consensus.cpp b/platform/consensus/ordering/multipaxos/framework/consensus.cpp index b20d4e5e..f2442587 100644 --- a/platform/consensus/ordering/multipaxos/framework/consensus.cpp +++ b/platform/consensus/ordering/multipaxos/framework/consensus.cpp @@ -38,6 +38,7 @@ Consensus::Consensus(const ResDBConfig& config, : common::Consensus(config, std::move(executor)) { Init(); + failure_mode_ = config.GetConfigData().failure_mode(); int total_replicas = config_.GetReplicaNum(); int f = (total_replicas - 1) / 3; @@ -54,6 +55,10 @@ Consensus::Consensus(const ResDBConfig& config, int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) { //LOG(ERROR)<<"recv request:"<<MessageType_Name(request->user_type()); //int64_t current_time = GetCurrentTime(); + //LOG(ERROR)<<" failure more:"<<failure_mode_; + if(failure_mode_ && config_.GetSelfInfo().id() == 2) { + return 0; + } if(request->user_type() == MessageType::Propose) { std::unique_ptr<Proposal> p = std::make_unique<Proposal>(); @@ -64,6 +69,16 @@ int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) { } multipaxos_->ReceiveProposal(std::move(p)); } + else if(request->user_type() == MessageType::Redirect) { + std::unique_ptr<Transaction> p = std::make_unique<Transaction>(); + if (!p->ParseFromString(request->data())) { + LOG(ERROR) << "parse proposal fail"; + assert(1 == 0); + return -1; + } + multipaxos_->ReceiveTransaction(std::move(p)); + + } else if(request->user_type() == MessageType::Query) { std::unique_ptr<Proposal> p = std::make_unique<Proposal>(); if (!p->ParseFromString(request->data())) { diff --git a/platform/consensus/ordering/multipaxos/framework/consensus.h b/platform/consensus/ordering/multipaxos/framework/consensus.h index 4d08c3ac..f47a275c 100644 --- a/platform/consensus/ordering/multipaxos/framework/consensus.h +++ b/platform/consensus/ordering/multipaxos/framework/consensus.h @@ -47,6 +47,7 @@ class Consensus : public common::Consensus{ private: std::unique_ptr<MultiPaxos> multipaxos_; + bool failure_mode_ = false; }; } // namespace tusk diff --git a/platform/consensus/ordering/multipaxos/proto/proposal.proto b/platform/consensus/ordering/multipaxos/proto/proposal.proto index 83875513..1b924e7f 100644 --- a/platform/consensus/ordering/multipaxos/proto/proposal.proto +++ b/platform/consensus/ordering/multipaxos/proto/proposal.proto @@ -31,5 +31,6 @@ enum MessageType { Propose = 1; Learn = 2; Query = 3; + Redirect = 4; }; diff --git a/platform/networkstrate/consensus_manager.cpp b/platform/networkstrate/consensus_manager.cpp index 26725cbf..fa9acbab 100644 --- a/platform/networkstrate/consensus_manager.cpp +++ b/platform/networkstrate/consensus_manager.cpp @@ -138,7 +138,7 @@ void ConsensusManager::HeartBeat() { if (config_.IsTestMode()) { sleep_time = 1; } else { - sleep_time = 30; + sleep_time = 60 * 2; } } } diff --git a/platform/networkstrate/replica_communicator.cpp b/platform/networkstrate/replica_communicator.cpp index 6b7f06e6..056e82e2 100644 --- a/platform/networkstrate/replica_communicator.cpp +++ b/platform/networkstrate/replica_communicator.cpp @@ -34,7 +34,8 @@ ReplicaCommunicator::ReplicaCommunicator( verifier_(verifier), is_running_(false), batch_queue_("bc_batch", tcp_batch), - is_use_long_conn_(is_use_long_conn) { + is_use_long_conn_(is_use_long_conn), + tcp_batch_(tcp_batch) { global_stats_ = Stats::GetGlobalStats(); if (is_use_long_conn_) { worker_ = std::make_unique<boost::asio::io_service::work>(io_service_); @@ -45,6 +46,7 @@ ReplicaCommunicator::ReplicaCommunicator( LOG(ERROR)<<" tcp batch:"<<tcp_batch; StartBroadcastInBackGround(); + } ReplicaCommunicator::~ReplicaCommunicator() { @@ -122,6 +124,78 @@ void ReplicaCommunicator::StartBroadcastInBackGround() { }); } +void ReplicaCommunicator::StartSingleInBackGround(const std::string& ip, int port) { + single_bq_[std::make_pair(ip,port)] = std::make_unique<BatchQueue<std::unique_ptr<QueueItem>>>("s_batch", tcp_batch_); + + ReplicaInfo replica_info; + for (const auto& replica : replicas_) { + if (replica.ip() == ip) { + replica_info = replica; + break; + } + } + + if (replica_info.ip().empty()) { + for (const auto& replica : GetClientReplicas()) { + if (replica.ip() == ip) { + replica_info = replica; + break; + } + } + } + + + single_thread_.push_back(std::thread([&](BatchQueue<std::unique_ptr<QueueItem>> *bq, ReplicaInfo replica_info) { + while (IsRunning()) { + std::vector<std::unique_ptr<QueueItem>> batch_req = + bq->Pop(50000); + if (batch_req.empty()) { + continue; + } + BroadcastData broadcast_data; + for (auto& queue_item : batch_req) { + broadcast_data.add_data()->swap(queue_item->data); + } + + global_stats_->SendBroadCastMsg(broadcast_data.data_size()); + //LOG(ERROR)<<" send to ip:"<<replica_info.ip()<<" port:"<<replica_info.port()<<" bq size:"<<batch_req.size(); + int ret = SendMessageFromPool(broadcast_data, {replica_info}); + if (ret < 0) { + LOG(ERROR) << "broadcast request fail:"; + } + //LOG(ERROR)<<" send to ip:"<<replica_info.ip()<<" port:"<<replica_info.port()<<" bq size:"<<batch_req.size()<<" done"; + } + }, single_bq_[std::make_pair(ip,port)].get(), replica_info)); +} + + +int ReplicaCommunicator::SendSingleMessage(const google::protobuf::Message& message, +const ReplicaInfo& replica_info) { + + std::string ip = replica_info.ip(); + int port = replica_info.port(); + + + + + //LOG(ERROR)<<" send msg ip:"<<ip<<" port:"<<port; + global_stats_->BroadCastMsg(); + if (is_use_long_conn_) { + auto item = std::make_unique<QueueItem>(); + item->data = NetChannel::GetRawMessageString(message, verifier_); + std::lock_guard<std::mutex> lk(smutex_); + if(single_bq_.find(std::make_pair(ip, port)) == single_bq_.end()){ + StartSingleInBackGround(ip, port); + } + assert(single_bq_[std::make_pair(ip, port)] != nullptr); + single_bq_[std::make_pair(ip, port)]->Push(std::move(item)); + return 0; + } else { + LOG(ERROR) << "send internal"; + return SendMessageInternal(message, replicas_); + } +} + int ReplicaCommunicator::SendMessage(const google::protobuf::Message& message) { global_stats_->BroadCastMsg(); if (is_use_long_conn_) { @@ -137,6 +211,8 @@ int ReplicaCommunicator::SendMessage(const google::protobuf::Message& message) { int ReplicaCommunicator::SendMessage(const google::protobuf::Message& message, const ReplicaInfo& replica_info) { + return SendSingleMessage(message, replica_info); + if (is_use_long_conn_) { std::string data = NetChannel::GetRawMessageString(message, verifier_); BroadcastData broadcast_data; @@ -215,6 +291,7 @@ AsyncReplicaClient* ReplicaCommunicator::GetClientFromPool( auto client = std::make_unique<AsyncReplicaClient>( &io_service_, ip, port + (is_use_long_conn_ ? 10000 : 0), true); client_pools_[std::make_pair(ip, port)] = std::move(client); + //StartSingleInBackGround(ip, port); } return client_pools_[std::make_pair(ip, port)].get(); } diff --git a/platform/networkstrate/replica_communicator.h b/platform/networkstrate/replica_communicator.h index 87994770..239e4d21 100644 --- a/platform/networkstrate/replica_communicator.h +++ b/platform/networkstrate/replica_communicator.h @@ -73,6 +73,11 @@ class ReplicaCommunicator { bool IsRunning() const; bool IsInPool(const ReplicaInfo& replica_info); + void StartSingleInBackGround(const std::string& ip, int port); + + int SendSingleMessage(const google::protobuf::Message& message, + const ReplicaInfo& replica_info); + private: std::vector<ReplicaInfo> replicas_; SignatureVerifier* verifier_; @@ -93,6 +98,14 @@ class ReplicaCommunicator { std::vector<std::thread> worker_threads_; std::vector<ReplicaInfo> clients_; std::mutex mutex_; + + + + std::map<std::pair<std::string, int>, + std::unique_ptr<BatchQueue<std::unique_ptr<QueueItem>>>> single_bq_; + std::vector<std::thread> single_thread_; + int tcp_batch_; + std::mutex smutex_; }; } // namespace resdb diff --git a/platform/proto/replica_info.proto b/platform/proto/replica_info.proto index 9c81dade..aeda4643 100644 --- a/platform/proto/replica_info.proto +++ b/platform/proto/replica_info.proto @@ -45,6 +45,7 @@ message ResConfigData{ optional int32 max_client_complaint_num = 21; optional int32 duplicate_check_frequency_useconds = 22; + optional bool failure_mode = 23; } message ReplicaStates { diff --git a/scripts/deploy/config/multipaxos.config b/scripts/deploy/config/cassandra_cft.config similarity index 60% copy from scripts/deploy/config/multipaxos.config copy to scripts/deploy/config/cassandra_cft.config index 1ac9fe46..c63d58a7 100644 --- a/scripts/deploy/config/multipaxos.config +++ b/scripts/deploy/config/cassandra_cft.config @@ -3,8 +3,9 @@ "enable_viewchange": false, "recovery_enabled": false, "max_client_complaint_num":10, - "max_process_txn": 256, - "worker_num": 30, + "max_process_txn": 64, + "worker_num": 20, "input_worker_num": 1, - "output_worker_num": 10 + "output_worker_num": 5, + "failure_mode" : false } diff --git a/scripts/deploy/config/kv_performance_server_16.conf b/scripts/deploy/config/kv_performance_server_16.conf index 981ff4eb..1f6a31a0 100644 --- a/scripts/deploy/config/kv_performance_server_16.conf +++ b/scripts/deploy/config/kv_performance_server_16.conf @@ -1,36 +1,36 @@ iplist=( -172.31.29.163 -172.31.22.39 -172.31.16.32 -172.31.27.98 -172.31.23.49 -172.31.18.178 -172.31.25.109 -172.31.17.109 -172.31.18.116 -172.31.25.117 -172.31.28.50 -172.31.24.115 -172.31.17.251 -172.31.17.252 -172.31.27.122 -172.31.26.123 -172.31.20.132 -172.31.20.70 -172.31.24.255 -172.31.25.132 -172.31.24.15 -172.31.20.16 -172.31.26.76 -172.31.18.15 -172.31.29.23 -172.31.19.215 -172.31.19.17 -172.31.31.149 -172.31.17.92 -172.31.31.159 -172.31.26.155 -172.31.24.155 +172.31.45.163 +172.31.32.226 +172.31.41.104 +172.31.47.100 +172.31.42.174 +172.31.36.105 +172.31.47.113 +172.31.45.112 +172.31.42.121 +172.31.47.116 +172.31.45.187 +172.31.45.250 +172.31.47.192 +172.31.40.62 +172.31.41.198 +172.31.45.196 +172.31.37.160 +172.31.33.9 +172.31.37.136 +172.31.44.139 +172.31.36.138 +172.31.40.211 +172.31.36.209 +172.31.36.21 +172.31.38.85 +172.31.38.151 +172.31.32.87 +172.31.39.153 +172.31.42.217 +172.31.47.218 +172.31.34.90 +172.31.36.92 ) key=~/.ssh/junchao.pem diff --git a/scripts/deploy/config/kv_performance_server_32.conf b/scripts/deploy/config/kv_performance_server_32.conf index aab16fae..7a3ee111 100644 --- a/scripts/deploy/config/kv_performance_server_32.conf +++ b/scripts/deploy/config/kv_performance_server_32.conf @@ -1,68 +1,68 @@ iplist=( -172.31.31.59 -172.31.19.187 -172.31.28.180 -172.31.27.51 -172.31.20.3 -172.31.24.2 -172.31.29.63 -172.31.18.59 -172.31.26.174 -172.31.28.43 -172.31.27.40 -172.31.27.36 -172.31.20.179 -172.31.17.176 -172.31.27.48 -172.31.31.174 -172.31.20.20 -172.31.27.19 -172.31.17.17 -172.31.16.16 -172.31.23.31 -172.31.21.158 -172.31.24.152 -172.31.18.23 -172.31.16.11 -172.31.27.137 -172.31.26.6 -172.31.24.133 -172.31.20.14 -172.31.17.13 -172.31.18.141 -172.31.24.11 -172.31.30.254 -172.31.16.252 -172.31.29.120 -172.31.22.248 -172.31.28.200 -172.31.26.71 -172.31.24.197 -172.31.26.255 -172.31.24.233 -172.31.19.227 -172.31.28.98 -172.31.18.224 -172.31.19.119 -172.31.31.113 -172.31.25.111 -172.31.26.233 -172.31.17.218 -172.31.26.89 -172.31.16.87 -172.31.18.87 -172.31.26.34 -172.31.27.160 -172.31.24.93 -172.31.24.92 -172.31.18.83 -172.31.29.210 -172.31.30.76 -172.31.19.73 -172.31.21.87 -172.31.28.213 -172.31.23.211 -172.31.16.211 +172.31.45.163 +172.31.32.226 +172.31.41.104 +172.31.47.100 +172.31.42.174 +172.31.36.105 +172.31.47.113 +172.31.45.112 +172.31.42.121 +172.31.47.116 +172.31.45.187 +172.31.45.250 +172.31.47.192 +172.31.40.62 +172.31.41.198 +172.31.45.196 +172.31.37.160 +172.31.33.9 +172.31.37.136 +172.31.44.139 +172.31.36.138 +172.31.40.211 +172.31.36.209 +172.31.36.21 +172.31.38.85 +172.31.38.151 +172.31.32.87 +172.31.39.153 +172.31.42.217 +172.31.47.218 +172.31.34.90 +172.31.36.92 +172.31.44.19 +172.31.44.89 +172.31.46.181 +172.31.41.184 +172.31.46.177 +172.31.38.180 +172.31.35.64 +172.31.38.192 +172.31.32.187 +172.31.38.190 +172.31.32.133 +172.31.33.74 +172.31.33.1 +172.31.42.130 +172.31.35.144 +172.31.33.210 +172.31.39.11 +172.31.46.79 +172.31.40.224 +172.31.39.225 +172.31.47.227 +172.31.37.166 +172.31.46.98 +172.31.44.163 +172.31.32.231 +172.31.35.169 +172.31.40.166 +172.31.45.102 +172.31.44.172 +172.31.46.173 +172.31.45.171 +172.31.46.235 ) key=~/.ssh/junchao.pem diff --git a/scripts/deploy/config/kv_performance_server_4.conf b/scripts/deploy/config/kv_performance_server_4.conf new file mode 100644 index 00000000..c03f863c --- /dev/null +++ b/scripts/deploy/config/kv_performance_server_4.conf @@ -0,0 +1,13 @@ +iplist=( +172.31.33.62 +172.31.35.77 +172.31.40.74 +172.31.39.30 +172.31.44.157 +172.31.34.48 +172.31.39.186 +172.31.46.4 +) + +key=~/.ssh/junchao.pem +client_num=4 diff --git a/scripts/deploy/config/kv_performance_server_48.conf b/scripts/deploy/config/kv_performance_server_48.conf new file mode 100644 index 00000000..335ea701 --- /dev/null +++ b/scripts/deploy/config/kv_performance_server_48.conf @@ -0,0 +1,101 @@ +iplist=( +172.31.45.163 +172.31.32.226 +172.31.41.104 +172.31.47.100 +172.31.42.174 +172.31.36.105 +172.31.47.113 +172.31.45.112 +172.31.42.121 +172.31.47.116 +172.31.45.187 +172.31.45.250 +172.31.47.192 +172.31.40.62 +172.31.41.198 +172.31.45.196 +172.31.37.160 +172.31.33.9 +172.31.37.136 +172.31.44.139 +172.31.36.138 +172.31.40.211 +172.31.36.209 +172.31.36.21 +172.31.38.85 +172.31.38.151 +172.31.32.87 +172.31.39.153 +172.31.42.217 +172.31.47.218 +172.31.34.90 +172.31.36.92 +172.31.44.96 +172.31.47.254 +172.31.40.126 +172.31.39.65 +172.31.45.195 +172.31.35.112 +172.31.47.242 +172.31.45.117 +172.31.32.119 +172.31.35.234 +172.31.45.106 +172.31.41.110 +172.31.42.238 +172.31.34.97 +172.31.35.97 +172.31.39.97 +172.31.34.99 +172.31.41.40 +172.31.44.168 +172.31.38.45 +172.31.40.48 +172.31.34.221 +172.31.33.222 +172.31.37.34 +172.31.34.34 +172.31.35.202 +172.31.37.85 +172.31.37.215 +172.31.36.217 +172.31.40.199 +172.31.36.201 +172.31.34.202 +172.31.41.202 +172.31.37.0 +172.31.46.129 +172.31.32.131 +172.31.46.5 +172.31.47.188 +172.31.34.189 +172.31.40.61 +172.31.47.190 +172.31.42.181 +172.31.37.56 +172.31.44.60 +172.31.42.60 +172.31.47.50 +172.31.36.51 +172.31.34.180 +172.31.42.180 +172.31.46.28 +172.31.46.156 +172.31.47.158 +172.31.34.22 +172.31.39.22 +172.31.38.153 +172.31.43.156 +172.31.44.9 +172.31.38.141 +172.31.40.17 +172.31.42.18 +172.31.40.134 +172.31.34.6 +172.31.33.135 +172.31.42.137 +) + +key=~/.ssh/junchao.pem +client_num=48 diff --git a/scripts/deploy/config/kv_performance_server_64.conf b/scripts/deploy/config/kv_performance_server_64.conf index 7e8be748..24237ad3 100644 --- a/scripts/deploy/config/kv_performance_server_64.conf +++ b/scripts/deploy/config/kv_performance_server_64.conf @@ -1,132 +1,132 @@ iplist=( -172.31.29.160 -172.31.22.239 -172.31.23.112 -172.31.21.177 -172.31.23.242 -172.31.26.233 -172.31.23.235 -172.31.16.108 -172.31.22.174 -172.31.23.56 -172.31.30.57 -172.31.22.58 -172.31.16.127 -172.31.16.245 -172.31.17.246 -172.31.17.118 -172.31.24.247 -172.31.24.204 -172.31.23.12 -172.31.29.140 -172.31.28.141 -172.31.31.3 -172.31.30.199 -172.31.29.73 -172.31.27.75 -172.31.19.87 -172.31.25.31 -172.31.18.95 -172.31.23.208 -172.31.22.83 -172.31.20.214 -172.31.16.87 -172.31.26.116 -172.31.27.244 -172.31.18.245 -172.31.25.121 -172.31.29.121 -172.31.27.254 -172.31.21.255 -172.31.27.255 -172.31.28.96 -172.31.21.231 -172.31.21.236 -172.31.24.237 -172.31.30.240 -172.31.22.242 -172.31.18.49 -172.31.18.178 -172.31.17.179 -172.31.26.181 -172.31.31.54 -172.31.29.55 -172.31.30.188 -172.31.27.190 -172.31.17.162 -172.31.31.168 -172.31.20.169 -172.31.22.171 -172.31.28.172 -172.31.24.173 -172.31.28.45 -172.31.19.174 -172.31.31.215 -172.31.22.216 -172.31.19.217 -172.31.28.217 -172.31.19.220 -172.31.18.221 -172.31.21.32 -172.31.17.34 -172.31.29.69 -172.31.23.202 -172.31.16.205 -172.31.30.208 -172.31.20.82 -172.31.28.211 -172.31.30.83 -172.31.30.84 -172.31.25.28 -172.31.18.30 -172.31.28.14 -172.31.19.14 -172.31.30.144 -172.31.31.149 -172.31.28.22 -172.31.31.22 -172.31.20.23 -172.31.22.155 -172.31.26.191 -172.31.21.1 -172.31.17.130 -172.31.17.6 -172.31.29.9 -172.31.29.12 -172.31.18.142 -172.31.26.142 -172.31.29.220 -172.31.24.157 -172.31.27.64 -172.31.30.5 -172.31.30.139 -172.31.27.13 -172.31.22.60 -172.31.22.189 -172.31.31.62 -172.31.24.192 -172.31.23.82 -172.31.23.83 -172.31.29.25 -172.31.23.154 -172.31.23.143 -172.31.29.16 -172.31.29.144 -172.31.22.144 -172.31.20.35 -172.31.25.228 -172.31.24.164 -172.31.16.165 -172.31.22.33 -172.31.29.161 -172.31.26.168 -172.31.31.173 -172.31.31.53 -172.31.20.54 -172.31.24.165 -172.31.26.230 -172.31.18.230 -172.31.22.104 +172.31.45.163 +172.31.32.226 +172.31.41.104 +172.31.47.100 +172.31.42.174 +172.31.36.105 +172.31.47.113 +172.31.45.112 +172.31.42.121 +172.31.47.116 +172.31.45.187 +172.31.45.250 +172.31.47.192 +172.31.40.62 +172.31.41.198 +172.31.45.196 +172.31.37.160 +172.31.33.9 +172.31.37.136 +172.31.44.139 +172.31.36.138 +172.31.40.211 +172.31.36.209 +172.31.36.21 +172.31.38.85 +172.31.38.151 +172.31.32.87 +172.31.39.153 +172.31.42.217 +172.31.47.218 +172.31.34.90 +172.31.36.92 +172.31.44.96 +172.31.47.254 +172.31.40.126 +172.31.39.65 +172.31.45.195 +172.31.35.112 +172.31.47.242 +172.31.45.117 +172.31.32.119 +172.31.35.234 +172.31.45.106 +172.31.41.110 +172.31.42.238 +172.31.34.97 +172.31.35.97 +172.31.39.97 +172.31.34.99 +172.31.41.40 +172.31.44.168 +172.31.38.45 +172.31.40.48 +172.31.34.221 +172.31.33.222 +172.31.37.34 +172.31.34.34 +172.31.35.202 +172.31.37.85 +172.31.37.215 +172.31.36.217 +172.31.40.199 +172.31.36.201 +172.31.34.202 +172.31.41.202 +172.31.37.0 +172.31.46.129 +172.31.32.131 +172.31.46.5 +172.31.47.188 +172.31.34.189 +172.31.40.61 +172.31.47.190 +172.31.42.181 +172.31.37.56 +172.31.44.60 +172.31.42.60 +172.31.47.50 +172.31.36.51 +172.31.34.180 +172.31.42.180 +172.31.46.28 +172.31.46.156 +172.31.47.158 +172.31.34.22 +172.31.39.22 +172.31.38.153 +172.31.43.156 +172.31.44.9 +172.31.38.141 +172.31.40.17 +172.31.42.18 +172.31.40.134 +172.31.34.6 +172.31.33.135 +172.31.42.137 +172.31.44.19 +172.31.44.89 +172.31.46.181 +172.31.41.184 +172.31.46.177 +172.31.38.180 +172.31.35.64 +172.31.38.192 +172.31.32.187 +172.31.38.190 +172.31.32.133 +172.31.33.74 +172.31.33.1 +172.31.42.130 +172.31.35.144 +172.31.33.210 +172.31.39.11 +172.31.46.79 +172.31.40.224 +172.31.39.225 +172.31.47.227 +172.31.37.166 +172.31.46.98 +172.31.44.163 +172.31.32.231 +172.31.35.169 +172.31.40.166 +172.31.45.102 +172.31.44.172 +172.31.46.173 +172.31.45.171 +172.31.46.235 ) key=~/.ssh/junchao.pem diff --git a/scripts/deploy/config/kv_performance_server_8.conf b/scripts/deploy/config/kv_performance_server_8.conf index 0580f94f..3b5792e6 100644 --- a/scripts/deploy/config/kv_performance_server_8.conf +++ b/scripts/deploy/config/kv_performance_server_8.conf @@ -1,20 +1,20 @@ iplist=( -172.31.40.86 -172.31.33.214 -172.31.35.90 -172.31.40.93 -172.31.46.32 -172.31.37.33 -172.31.37.162 -172.31.44.169 -172.31.45.42 -172.31.33.42 -172.31.37.172 -172.31.37.44 -172.31.38.178 -172.31.35.180 -172.31.33.52 -172.31.47.184 +172.31.44.213 +172.31.34.55 +172.31.45.219 +172.31.32.91 +172.31.42.124 +172.31.43.254 +172.31.46.234 +172.31.40.203 +172.31.32.172 +172.31.42.236 +172.31.35.140 +172.31.34.241 +172.31.36.113 +172.31.37.19 +172.31.38.69 +172.31.34.136 ) key=~/.ssh/junchao.pem diff --git a/scripts/deploy/config/multipaxos.config b/scripts/deploy/config/multipaxos.config index 1ac9fe46..bfd9b15b 100644 --- a/scripts/deploy/config/multipaxos.config +++ b/scripts/deploy/config/multipaxos.config @@ -3,8 +3,9 @@ "enable_viewchange": false, "recovery_enabled": false, "max_client_complaint_num":10, - "max_process_txn": 256, - "worker_num": 30, + "max_process_txn": 64, + "worker_num": 20, "input_worker_num": 1, - "output_worker_num": 10 + "output_worker_num": 10, + "failure_mode" :true } diff --git a/scripts/deploy/performance/cassandra_cft_performance.sh b/scripts/deploy/performance/cassandra_cft_performance.sh new file mode 100755 index 00000000..bc1b6a21 --- /dev/null +++ b/scripts/deploy/performance/cassandra_cft_performance.sh @@ -0,0 +1,5 @@ +export server=//benchmark/protocols/cassandra_cft:kv_server_performance +export TEMPLATE_PATH=$PWD/config/cassandra_cft.config + +./performance/run_performance.sh $* +echo $0 diff --git a/scripts/deploy/performance/run_performance.sh b/scripts/deploy/performance/run_performance.sh index 1caa2c6e..ece3816f 100755 --- a/scripts/deploy/performance/run_performance.sh +++ b/scripts/deploy/performance/run_performance.sh @@ -27,6 +27,7 @@ echo "benchmark done" count=1 for ip in ${iplist[@]}; do +echo "$ip" `ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "killall -9 ${server_bin}"` ((count++)) done @@ -40,9 +41,11 @@ echo "getting results" for ip in ${iplist[@]}; do echo "scp -i ${key} ubuntu@${ip}:/home/ubuntu/${server_bin}.log ./${ip}_log" - `scp -i ${key} ubuntu@${ip}:/home/ubuntu/${server_bin}.log result_${ip}_log` + `scp -i ${key} ubuntu@${ip}:/home/ubuntu/${server_bin}.log result_${ip}_log` & done +wait + python3 performance/calculate_result.py `ls result_*_log` > results.log rm -rf result_*_log diff --git a/scripts/deploy/script/deploy.sh b/scripts/deploy/script/deploy.sh index 4d220c7b..d57869ce 100755 --- a/scripts/deploy/script/deploy.sh +++ b/scripts/deploy/script/deploy.sh @@ -79,6 +79,7 @@ function run_cmd(){ } function run_one_cmd(){ + echo " $1" ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "$1" }
