This is an automated email from the ASF dual-hosted git repository. junchao pushed a commit to branch cassandra in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit 6ab4f5fa9cca32cd1d400cc1e10d8971886c9206 Author: cjcchen <[email protected]> AuthorDate: Thu Feb 26 14:29:42 2026 +0000 add autobahn --- benchmark/protocols/autobahn/BUILD | 16 + .../protocols/autobahn/kv_server_performance.cpp | 89 +++++ benchmark/protocols/autobahn/kv_service_tools.cpp | 57 ++++ .../consensus/ordering/autobahn/algorithm/BUILD | 77 +++++ .../ordering/autobahn/algorithm/autobahn.cpp | 369 +++++++++++++++++++++ .../ordering/autobahn/algorithm/autobahn.h | 74 +++++ .../algorithm/proposal_graph.cpp | 72 +--- .../ordering/autobahn/algorithm/proposal_graph.h | 101 ++++++ .../autobahn/algorithm/proposal_manager.cpp | 257 ++++++++++++++ .../ordering/autobahn/algorithm/proposal_manager.h | 71 ++++ .../algorithm/proposal_state.h | 4 +- .../ordering/autobahn/algorithm/ranking.cpp | 12 + .../ordering/autobahn/algorithm/ranking.h | 14 + .../consensus/ordering/autobahn/framework/BUILD | 16 + .../ordering/autobahn/framework/consensus.cpp | 207 ++++++++++++ .../ordering/autobahn/framework/consensus.h | 59 ++++ .../ordering/autobahn/framework/consensus_test.cpp | 179 ++++++++++ platform/consensus/ordering/autobahn/proto/BUILD | 16 + .../ordering/autobahn/proto/proposal.proto | 133 ++++++++ .../ordering/cassandra/algorithm/cassandra.cpp | 13 +- .../cassandra/algorithm/proposal_graph.cpp | 2 +- .../cassandra/algorithm/proposal_manager.cpp | 14 +- .../cassandra/algorithm/proposal_manager.h | 3 +- .../ordering/cassandra/algorithm/proposal_state.h | 2 +- .../config/{cassandra.config => autobahn.config} | 2 +- scripts/deploy/config/cassandra.config | 2 +- scripts/deploy/config/kv_performance_server.conf | 6 +- .../deploy/config/kv_performance_server_32.conf | 4 +- scripts/deploy/performance/autobahn_performance.sh | 5 + scripts/deploy/performance/run_performance.sh | 2 +- scripts/null | 2 +- 31 files changed, 1794 insertions(+), 86 deletions(-) diff --git a/benchmark/protocols/autobahn/BUILD b/benchmark/protocols/autobahn/BUILD new file mode 100644 index 00000000..fdf1cc4e --- /dev/null +++ b/benchmark/protocols/autobahn/BUILD @@ -0,0 +1,16 @@ +package(default_visibility = ["//visibility:private"]) + +load("@bazel_skylib//rules:common_settings.bzl", "bool_flag") + +cc_binary( + name = "kv_server_performance", + srcs = ["kv_server_performance.cpp"], + deps = [ + "//chain/storage:memory_db", + "//executor/kv:kv_executor", + "//platform/config:resdb_config_utils", + "//platform/consensus/ordering/autobahn/framework:consensus", + "//service/utils:server_factory", + ], +) + diff --git a/benchmark/protocols/autobahn/kv_server_performance.cpp b/benchmark/protocols/autobahn/kv_server_performance.cpp new file mode 100644 index 00000000..3b209b82 --- /dev/null +++ b/benchmark/protocols/autobahn/kv_server_performance.cpp @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#include <glog/logging.h> + +#include "chain/storage/memory_db.h" +#include "executor/kv/kv_executor.h" +#include "platform/config/resdb_config_utils.h" +#include "platform/consensus/ordering/autobahn/framework/consensus.h" +#include "platform/networkstrate/service_network.h" +#include "platform/statistic/stats.h" +#include "proto/kv/kv.pb.h" + +using namespace resdb; +using namespace resdb::autobahn; +using namespace resdb::storage; + +void ShowUsage() { + printf("<config> <private_key> <cert_file> [logging_dir]\n"); +} + +std::string GetRandomKey() { + int num1 = rand() % 10; + int num2 = rand() % 10; + return std::to_string(num1) + std::to_string(num2); +} + +int main(int argc, char** argv) { + if (argc < 3) { + ShowUsage(); + exit(0); + } + + // google::InitGoogleLogging(argv[0]); + // FLAGS_minloglevel = google::GLOG_WARNING; + + char* config_file = argv[1]; + char* private_key_file = argv[2]; + char* cert_file = argv[3]; + + if (argc >= 5) { + auto monitor_port = Stats::GetGlobalStats(5); + monitor_port->SetPrometheus(argv[4]); + } + + std::unique_ptr<ResDBConfig> config = + GenerateResDBConfig(config_file, private_key_file, cert_file); + + config->RunningPerformance(true); + ResConfigData config_data = config->GetConfigData(); + + auto performance_consens = std::make_unique<Consensus>( + *config, std::make_unique<KVExecutor>(std::make_unique<MemoryDB>())); + performance_consens->SetupPerformanceDataFunc([]() { + KVRequest request; + request.set_cmd(KVRequest::SET); + request.set_key(GetRandomKey()); + request.set_value("helloworld"); + std::string request_data; + request.SerializeToString(&request_data); + return request_data; + }); + + auto server = + std::make_unique<ServiceNetwork>(*config, std::move(performance_consens)); + server->Run(); +} diff --git a/benchmark/protocols/autobahn/kv_service_tools.cpp b/benchmark/protocols/autobahn/kv_service_tools.cpp new file mode 100644 index 00000000..17858ef2 --- /dev/null +++ b/benchmark/protocols/autobahn/kv_service_tools.cpp @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#include <fcntl.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> + +#include <fstream> + +#include "common/proto/signature_info.pb.h" +#include "interface/kv/kv_client.h" +#include "platform/config/resdb_config_utils.h" + +using resdb::GenerateReplicaInfo; +using resdb::GenerateResDBConfig; +using resdb::KVClient; +using resdb::ReplicaInfo; +using resdb::ResDBConfig; + +int main(int argc, char** argv) { + if (argc < 2) { + printf("<config path>\n"); + return 0; + } + std::string client_config_file = argv[1]; + ResDBConfig config = GenerateResDBConfig(client_config_file); + + config.SetClientTimeoutMs(100000); + + KVClient client(config); + + client.Set("start", "value"); + printf("start benchmark\n"); +} diff --git a/platform/consensus/ordering/autobahn/algorithm/BUILD b/platform/consensus/ordering/autobahn/algorithm/BUILD new file mode 100644 index 00000000..0f7e42a1 --- /dev/null +++ b/platform/consensus/ordering/autobahn/algorithm/BUILD @@ -0,0 +1,77 @@ +package(default_visibility = ["//platform/consensus/ordering/autobahn:__subpackages__"]) + +cc_library( + name = "proposal_state", + hdrs = ["proposal_state.h"], +) + +cc_library( + name = "proposal_manager", + srcs = ["proposal_manager.cpp"], + hdrs = ["proposal_manager.h"], + deps = [ + ":proposal_graph", + "//common:comm", + "//platform/statistic:stats", + "//common/crypto:signature_verifier", + "//common/utils", + "//platform/consensus/ordering/autobahn/proto:proposal_cc_proto", + ], +) + +cc_library( + name = "ranking", + srcs = ["ranking.cpp"], + hdrs = ["ranking.h"], + deps = [ + "//common:comm", + ], +) + +cc_library( + name = "proposal_graph", + srcs = ["proposal_graph.cpp"], + hdrs = ["proposal_graph.h"], + deps = [ + #":ranking", + ":proposal_state", + "//platform/statistic:stats", + "//common:comm", + "//common/utils", + "//platform/consensus/ordering/autobahn/proto:proposal_cc_proto", + ], +) + +cc_library( + name = "autobahn", + srcs = ["autobahn.cpp"], + hdrs = ["autobahn.h"], + deps = [ + ":proposal_graph", + ":proposal_manager", + "//platform/statistic:stats", + "//common:comm", + "//common/crypto:signature_verifier", + "//platform/consensus/ordering/common/algorithm:protocol_base", + "//platform/consensus/ordering/autobahn/proto:proposal_cc_proto", + "//platform/common/queue:lock_free_queue", + ], +) + +cc_test( + name = "proposal_graph_test", + srcs = ["proposal_graph_test.cpp"], + deps = [ + ":proposal_graph", + "//common/test:test_main", + ], +) + +cc_test( + name = "autobahn_test", + srcs = ["autobahn_test.cpp"], + deps = [ + ":autobahn", + "//common/test:test_main", + ], +) diff --git a/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp b/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp new file mode 100644 index 00000000..8580d74e --- /dev/null +++ b/platform/consensus/ordering/autobahn/algorithm/autobahn.cpp @@ -0,0 +1,369 @@ +#include "platform/consensus/ordering/autobahn/algorithm/autobahn.h" + +#include <glog/logging.h> + +#include "common/crypto/signature_verifier.h" +#include "common/utils/utils.h" + + +namespace resdb { +namespace autobahn { + +AutoBahn::AutoBahn(int id, int f, int total_num, int block_size, SignatureVerifier* verifier) + : ProtocolBase(id, f, total_num), verifier_(verifier) { + + LOG(ERROR) << "get proposal graph"; + id_ = id; + total_num_ = total_num; + f_ = f; + is_stop_ = false; + //timeout_ms_ = 100; + timeout_ms_ = 60000; + batch_size_ = block_size; + execute_id_ = 1; + + proposal_manager_ = std::make_unique<ProposalManager>(id, total_num_, f_, verifier); + + block_thread_ = std::thread(&AutoBahn::GenerateBlocks, this); + dissemi_thread_ = std::thread(&AutoBahn::AsyncDissemination, this); + consensus_thread_ = std::thread(&AutoBahn::AsyncConsensus, this); + prepare_thread_ = std::thread(&AutoBahn::AsyncPrepare, this); + commit_thread_ = std::thread(&AutoBahn::AsyncCommit, this); +} + +AutoBahn::~AutoBahn() { + is_stop_ = true; + if (block_thread_.joinable()) { + block_thread_.join(); + } + if (dissemi_thread_.joinable()) { + dissemi_thread_.join(); + } +} + +bool AutoBahn::IsStop() { + return is_stop_; +} + +bool AutoBahn::ReceiveTransaction(std::unique_ptr<Transaction> txn) { + // LOG(ERROR)<<"recv txn:"; + txn->set_create_time(GetCurrentTime()); + txns_.Push(std::move(txn)); + return true; +} + +void AutoBahn::GenerateBlocks() { + std::vector<std::unique_ptr<Transaction>> txns; + while (!IsStop()) { + std::unique_ptr<Transaction> txn = txns_.Pop(); + if (txn == nullptr) { + continue; + } + txns.push_back(std::move(txn)); + for(int i = 1; i < batch_size_; ++i){ + std::unique_ptr<Transaction> txn = txns_.Pop(100); + if(txn == nullptr){ + break; + } + txns.push_back(std::move(txn)); + } + + proposal_manager_->MakeBlock(txns); + txns.clear(); + } +} + +bool AutoBahn::WaitForResponse(int64_t block_id) { + std::unique_lock<std::mutex> lk(bc_mutex_); + //LOG(ERROR)<<"wait for block id :"<<block_id; + bc_block_cv_.wait_for(lk, std::chrono::microseconds(timeout_ms_ * 1000), + [&] { return block_id<= proposal_manager_->GetCurrentBlockId(); }); + if (block_id > proposal_manager_->GetCurrentBlockId()) { + return false; + } + return true; +} + +void AutoBahn::BlockDone() { + bc_block_cv_.notify_all(); +} + +void AutoBahn::AsyncDissemination() { + int next_block = 1; + while (!IsStop()) { + if(WaitForResponse(next_block-1)){ + next_block++; + } + if(next_block == 1) { + next_block++; + } + LOG(ERROR)<<" get block :"<<next_block-1; + const Block* block = proposal_manager_->GetLocalBlock(next_block-1); + if(block == nullptr) { + next_block--; + continue; + } + LOG(ERROR)<<" broadcast block :"<<next_block-1<<" id:"<<block->local_id(); + Broadcast(MessageType::NewBlocks, *block); + LOG(ERROR)<<" broadcast block :"<<next_block-1<<" id:"<<block->local_id()<<" done"; + } +} + +void AutoBahn::NotifyView() { + std::unique_lock<std::mutex> lk(view_mutex_); + view_cv_.notify_all(); +} + +bool AutoBahn::WaitForNextView(int view) { + std::unique_lock<std::mutex> lk(view_mutex_); + //LOG(ERROR)<<"wait for next view:"<<view; + view_cv_.wait_for(lk, std::chrono::microseconds(timeout_ms_ * 1000), + [&] { return proposal_manager_->ReadyView(view); }); + return proposal_manager_->ReadyView(view); +} + +void AutoBahn::AsyncConsensus() { + while (!IsStop()) { + if (id_ != 1) { + break; + } + int view = proposal_manager_->GetCurrentView(); + if(!WaitForNextView(view)) { + continue; + } + std::pair<int, std::map<int, int64_t>> blocks = proposal_manager_->GetCut(); + auto proposal = proposal_manager_->GenerateProposal(blocks.first, blocks.second); + Broadcast(MessageType::NewProposal, *proposal); + } +} + + +void AutoBahn::ReceiveBlock(std::unique_ptr<Block> block) { + LOG(ERROR)<<"recv block from:"<<block->sender_id()<<" block id:"<<block->local_id(); + BlockACK block_ack; + block_ack.set_hash(block->hash()); + block_ack.set_sender_id(block->sender_id()); + block_ack.set_local_id(block->local_id()); + block_ack.set_responder(id_); + *block_ack.mutable_sign_info() = proposal_manager_->SignBlock(*block); + + proposal_manager_->AddBlock(std::move(block)); + SendMessage(MessageType::CMD_BlockACK, block_ack, block_ack.sender_id()); + + proposal_manager_->UpdateView(block_ack.sender_id(), block_ack.local_id()); + NotifyView(); + LOG(ERROR)<<"send block ack to:"<<block_ack.sender_id()<<" block id:"<<block_ack.local_id(); +} + +void AutoBahn::ReceiveBlockACK(std::unique_ptr<BlockACK> block) { + LOG(ERROR)<<"recv block ack:"<<block->local_id()<<" from:"<<block->responder()<<" block sign info:"<<block->sign_info().sender_id(); + std::unique_lock<std::mutex> lk(block_mutex_); + block_ack_[block->local_id()].insert(std::make_pair(block->responder(), block->sign_info())); + LOG(ERROR)<<"recv block ack:"<<block->local_id() + <<" from:"<<block->responder()<< " num:"<<block_ack_[block->local_id()].size(); + if (block_ack_[block->local_id()].size() >= f_ + 1 && + block_ack_[block->local_id()].find(id_) != + block_ack_[block->local_id()].end()) { + std::unique_lock<std::mutex> lk(bc_mutex_); + proposal_manager_->BlockReady(block_ack_[block->local_id()], block->local_id()); + BlockDone(); + } + LOG(ERROR)<<"recv block ack:"<<block->local_id()<<" done"; +} + +bool AutoBahn::ReceiveProposal(std::unique_ptr<Proposal> proposal) { + LOG(ERROR)<<" receive proposal from:"<<proposal->sender_id()<<" slot:"<<proposal->slot_id()<<" block size:"<<proposal->block_size(); + Proposal vote; + vote.set_slot_id(proposal->slot_id()); + vote.set_sender_id(id_); + vote.set_hash(proposal->hash()); + + auto hash_signature_or = verifier_->SignMessage(vote.hash()); + if (!hash_signature_or.ok()) { + LOG(ERROR) << "Sign message fail"; + return false; + } + *vote.mutable_sign()=*hash_signature_or; + + int sender_id = proposal->sender_id(); + proposal_manager_->AddProposalData(std::move(proposal)); + //Broadcast(MessageType::ProposalAck, vote); + SendMessage(MessageType::ProposalAck, vote, sender_id); + + return true; +} + +bool AutoBahn::ReceiveVote(std::unique_ptr<Proposal> vote) { + LOG(ERROR)<<"recv vote ack:"<<vote->slot_id()<<" from:"<<vote->sender_id(); + + std::unique_ptr<Proposal> vote_cpy = std::make_unique<Proposal>(*vote); + + std::unique_lock<std::mutex> lk(vote_mutex_); + int slot_id = vote->slot_id(); + int sender = vote->sender_id(); + vote_ack_[vote->slot_id()].insert(std::make_pair(vote->sender_id(), std::move(vote))); + + LOG(ERROR)<<"recv vote ack:"<<slot_id<<" from:"<<sender + << " num:"<<vote_ack_[slot_id].size(); + + if (vote_ack_[slot_id].size() >= 2*f_ + 1){ + PrepareDone(std::move(vote_cpy)); + } + LOG(ERROR)<<"recv vote ack done"; + return true; +} + + +bool AutoBahn::IsFastCommit(const Proposal& proposal) { + + //LOG(ERROR)<<" is fast commit slot:"<<proposal.slot_id()<<" sign size:"<<proposal.cert().sign_size(); + if(proposal.cert().sign_size() != total_num_) { + return false; + } + + for(auto& sign : proposal.cert().sign()){ + bool valid = verifier_->VerifyMessage(proposal.hash(), sign); + if (!valid) { + LOG(ERROR)<<" sign info sign fail"; + return false; + } + } + return true; +} + +bool AutoBahn::ReceivePrepare(std::unique_ptr<Proposal> proposal) { + //LOG(ERROR)<<"recv prepare:"<<proposal->slot_id()<<" from:"<<proposal->sender_id() + // <<" is fast commit:"<<proposal->fast_commit(); + // verify + if (IsFastCommit(*proposal)){ + // proposal->fast_commit() ){ + CommitDone(std::move(proposal)); + } + else { + proposal->set_sender_id(id_); + Broadcast(MessageType::Commit, *proposal); + } + LOG(ERROR)<<"recv vote ack done"; + return true; +} + +bool AutoBahn::ReceiveCommit(std::unique_ptr<Proposal> proposal) { + //LOG(ERROR)<<"recv commit:"<<proposal->slot_id()<<" from:"<<proposal->sender_id(); + + std::unique_lock<std::mutex> lk(commit_mutex_); + commit_ack_[proposal->slot_id()].insert(proposal->sender_id()); + LOG(ERROR)<<"recv commit ack:"<<proposal->slot_id()<<" from:"<<proposal->sender_id() + << " num:"<<commit_ack_[proposal->slot_id()].size(); + if (commit_ack_[proposal->slot_id()].size() >= 2*f_ + 1){ + CommitDone(std::move(proposal)); + } + LOG(ERROR)<<"recv vote ack done"; + return true; +} + + +void AutoBahn::PrepareDone(std::unique_ptr<Proposal> vote) { + LOG(ERROR)<<" vote prepare done:"<<vote->slot_id(); + prepare_queue_.Push(std::move(vote)); +} + +void AutoBahn::CommitDone(std::unique_ptr<Proposal> proposal) { + commit_queue_.Push(std::move(proposal)); +} + +void AutoBahn::AsyncPrepare() { + int view = 1; + std::map<int, std::pair<int64_t,std::unique_ptr<Proposal>> > votes; + while (!IsStop()) { + std::unique_ptr<Proposal> p = prepare_queue_.Pop(timeout_ms_ * 1000); + if(p== nullptr) { + continue; + } + assert(p != nullptr); + //LOG(ERROR)<<" obtain slot vote:"<<p->slot_id(); + int slot_id = p->slot_id(); + votes[slot_id] = std::make_pair(GetCurrentTime(), std::move(p)); + while(!votes.empty() && votes.begin()->first <= view) { + if(votes.begin()->first < view) { + votes.erase(votes.begin()); + continue; + } + int delay = 1000; + int wait_time = GetCurrentTime() - votes.begin()->second.first; + wait_time = delay - wait_time; + LOG(ERROR)<<" view :"<<view<<" wait time:"<<wait_time; + if(wait_time> 0) { + usleep(wait_time); + } + Prepare(std::move(votes.begin()->second.second)); + votes.erase(votes.begin()); + view++; + } + } +} + +void AutoBahn::AsyncCommit() { + int view = 1; + std::map<int, std::unique_ptr<Proposal> > proposals; + while (!IsStop()) { + std::unique_ptr<Proposal> p = commit_queue_.Pop(timeout_ms_ * 1000); + if(p== nullptr) { + continue; + } + assert(p != nullptr); + //LOG(ERROR)<<" obtain comit slot vote:"<<p->slot_id(); + int slot_id = p->slot_id(); + proposals[slot_id] = std::move(p); + while(!proposals.empty() && proposals.begin()->first <= view) { + if(proposals.begin()->first < view) { + proposals.erase(proposals.begin()); + continue; + } + Commit(std::move(proposals.begin()->second)); + proposals.erase(proposals.begin()); + view++; + } + } +} + +void AutoBahn::Prepare(std::unique_ptr<Proposal> vote) { + //LOG(ERROR)<<" prepare vote:"<<vote->slot_id()<< " num:"<<vote_ack_[vote->slot_id()].size(); + if (vote_ack_[vote->slot_id()].size() == total_num_){ + // fast path + //Commit(std::move(vote)); + vote->set_fast_commit(true); + for(auto& it : vote_ack_[vote->slot_id()]){ + *vote->mutable_cert()->add_sign() = it.second->sign(); + } + } + + // slot path + //LOG(ERROR)<<" broadcast commit:"<<vote->slot_id()<<" is fast:"<<vote->fast_commit(); + vote->set_sender_id(id_); + Broadcast(MessageType::Prepare, *vote); +} + +void AutoBahn::Commit(std::unique_ptr<Proposal> proposal) { + auto raw_proposal = proposal_manager_->GetProposalData(proposal->slot_id()); + assert(raw_proposal != nullptr); + //LOG(ERROR)<<" proposal proposal slot id:"<<proposal->slot_id(); + for(const auto& block : raw_proposal->block()) { + int block_owner = block.sender_id(); + int block_id = block.local_id(); + //LOG(ERROR)<<" commit :"<<block_owner<<" block id :"<<block_id; + + Block * data_block = proposal_manager_->GetBlock(block_owner, block_id); + assert(data_block != nullptr); + + //LOG(ERROR)<<" txn size:"<<data_block->mutable_data()->transaction_size(); + for (Transaction& txn : + *data_block->mutable_data()->mutable_transaction()) { + txn.set_id(execute_id_++); + commit_(txn); + } + } +} + + +} // namespace autobahn +} // namespace resdb diff --git a/platform/consensus/ordering/autobahn/algorithm/autobahn.h b/platform/consensus/ordering/autobahn/algorithm/autobahn.h new file mode 100644 index 00000000..e4364daf --- /dev/null +++ b/platform/consensus/ordering/autobahn/algorithm/autobahn.h @@ -0,0 +1,74 @@ +#pragma once + +#include <deque> +#include <map> +#include <queue> +#include <thread> + +#include "platform/common/queue/lock_free_queue.h" +#include "platform/consensus/ordering/common/algorithm/protocol_base.h" +#include "platform/consensus/ordering/autobahn/algorithm/proposal_manager.h" +#include "platform/consensus/ordering/autobahn/proto/proposal.pb.h" +#include "platform/statistic/stats.h" + +namespace resdb { +namespace autobahn { + +class AutoBahn: public common::ProtocolBase { + public: + AutoBahn(int id, int f, int total_num, int block_size, SignatureVerifier* verifier); + ~AutoBahn(); + + bool ReceiveTransaction(std::unique_ptr<Transaction> txn); + void ReceiveBlock(std::unique_ptr<Block> block); + void ReceiveBlockACK(std::unique_ptr<BlockACK> block); + bool ReceiveVote(std::unique_ptr<Proposal>); + bool ReceiveProposal(std::unique_ptr<Proposal> proposal); + bool ReceiveCommit(std::unique_ptr<Proposal> proposal); + bool ReceivePrepare(std::unique_ptr<Proposal> proposal); + + private: + bool IsStop(); + void BroadcastTxn(); + void GenerateBlocks(); + void AsyncDissemination(); + void AsyncConsensus(); + void AsyncPrepare(); + void AsyncCommit(); + + bool WaitForResponse(int64_t block_id); + void BlockDone(); + void PrepareDone(std::unique_ptr<Proposal> vote); + void CommitDone(std::unique_ptr<Proposal> proposal); + + void NotifyView(); + bool WaitForNextView(int view); + + void Prepare(std::unique_ptr<Proposal> vote); + void Commit(std::unique_ptr<Proposal> proposal); + + bool IsFastCommit(const Proposal& proposal); + + private: + std::condition_variable bc_block_cv_, view_cv_; + LockFreeQueue<Transaction> txns_; + LockFreeQueue<Proposal> prepare_queue_, commit_queue_; + std::unique_ptr<ProposalManager> proposal_manager_; + SignatureVerifier* verifier_; + int execute_id_; + + int id_, total_num_, f_, batch_size_; + std::atomic<int> is_stop_; + int timeout_ms_; + + std::thread block_thread_, dissemi_thread_, consensus_thread_, prepare_thread_, commit_thread_; + + std::mutex block_mutex_, bc_mutex_, view_mutex_, vote_mutex_, commit_mutex_; + std::map<int, std::map<int, SignInfo>> block_ack_; + std::map<int, std::map<int, std::unique_ptr<Proposal>>> vote_ack_ ; + std::map<int, std::set<int>> commit_ack_; + Stats* global_stats_; +}; + +} // namespace autobahn +} // namespace resdb diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp b/platform/consensus/ordering/autobahn/algorithm/proposal_graph.cpp similarity index 91% copy from platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp copy to platform/consensus/ordering/autobahn/algorithm/proposal_graph.cpp index e5b91135..fce7718b 100644 --- a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp +++ b/platform/consensus/ordering/autobahn/algorithm/proposal_graph.cpp @@ -1,4 +1,4 @@ -#include "platform/consensus/ordering/cassandra/algorithm/proposal_graph.h" +#include "platform/consensus/ordering/autobahn/algorithm/proposal_graph.h" #include <glog/logging.h> @@ -8,22 +8,13 @@ #include "common/utils/utils.h" namespace resdb { -namespace cassandra { -namespace cassandra_recv { +namespace autobahn { -/* -std::vector<ProposalState> GetStates() { - return std::vector<ProposalState>{ProposalState::New, ProposalState::Prepared, - ProposalState::PreCommit}; -} -*/ ProposalGraph::ProposalGraph(int fault_num, int id, int total_num) : f_(fault_num),id_(id), total_num_(total_num) { - ranking_ = std::make_unique<Ranking>(); - current_height_ = 0; - global_stats_ = Stats::GetGlobalStats(); } +/* int ProposalGraph::GetBlockNum(const std::string& hash, int local_id, int proposer_id) { if(num_callback_) { return num_callback_(hash, local_id, proposer_id); @@ -72,15 +63,6 @@ int ProposalGraph::AddProposal(const Proposal& proposal) { << " current height:" << current_height_<<" from:"<<proposal.header().proposer_id()<<" proposal id:"<<proposal.header().proposal_id(); assert(current_height_ >= latest_commit_.header().height()); - - /* - if (proposal.header().height() < current_height_) { - LOG(ERROR) << "height not match:" << current_height_ - << " proposal height:" << proposal.header().height(); - return false; - } - */ - if (proposal.header().height() > current_height_) { pending_header_[proposal.header().height()].insert( proposal.header().proposer_id()); @@ -124,13 +106,6 @@ int ProposalGraph::AddProposal(const Proposal& proposal) { } //LOG(ERROR)<<"history size:"<<proposal.history_size(); - /* - for (const auto& history : proposal.history()) { - std::string hash = history.hash(); - auto node_it = node_info_.find(hash); - assert(node_it != node_info_.end()); - } - */ //LOG(ERROR)<<" proposal history:"<<proposal.history_size(); if(proposal.history_size()>0){ @@ -139,7 +114,7 @@ int ProposalGraph::AddProposal(const Proposal& proposal) { auto node_it = node_info_.find(hash); node_it->second->votes[ProposalState::New].insert(proposal.header().proposer_id()); CheckState(node_it->second.get(), - static_cast<resdb::cassandra::ProposalState>(history.state())); + static_cast<resdb::autobahn::ProposalState>(history.state())); if (node_it->second->state == ProposalState::PoR && node_it->second->proposal.header().proposer_id() == id_){ @@ -289,11 +264,6 @@ void ProposalGraph::Commit(const std::string& hash) { } //LOG(ERROR)<<" bfs sub block size :"<<p->sub_block_size(); - /* - for(auto block : p->sub_block()){ - LOG(ERROR)<<" get sub block proposer:"<<p->header().proposer_id()<<" local id:"<<block.local_id(); - } - */ it->second->state = ProposalState::Committed; if (is_main_hash.find(c_hash) != is_main_hash.end()) { commit_num_[p->header().proposer_id()]++; @@ -322,14 +292,6 @@ void ProposalGraph::Commit(const std::string& hash) { int p_num = 0; for (int i = commit_p.size() - 1; i >= 0; i--) { for (int j = 0; j < commit_p[i].size(); ++j) { - /* - if (j == 0) { - LOG(ERROR) << "commmit proposal lead from:" - << commit_p[i][j]->header().proposer_id() - << " height:" << commit_p[i][j]->header().height() - << " size:" << commit_p[i].size(); - } - */ //LOG(ERROR) << "commmit proposal:" // << commit_p[i][j]->header().proposer_id() // << " height:" << commit_p[i][j]->header().height() @@ -508,21 +470,6 @@ Proposal* ProposalGraph::GetStrongestProposal() { } } - /* - if(node_info->proposal.header().proposer_id() == sp->proposal.header().proposer_id()) { - continue; - } - - if(sp == node_info) { - continue; - } - */ - - /* - if(node_info_.find(last_hash) != node_info_.end()){ - node_info_.erase(node_info_.find(last_hash)); - } - */ } @@ -589,7 +536,7 @@ bool ProposalGraph::Compare(const NodeInfo& p1, const NodeInfo& p2) { int h = (p1.proposal.header().height())%total_num_; if ( h == 0) h = total_num_; - //LOG(ERROR)<<" check height :"<<h<<" cmp:"<<abs(p1.proposal.header().proposer_id() - h )<<" "<<abs(p2.proposal.header().proposer_id() - h); + LOG(ERROR)<<" check height :"<<h<<" cmp:"<<abs(p1.proposal.header().proposer_id() - h )<<" "<<abs(p2.proposal.header().proposer_id() - h)<<" from:"<<p1.proposal.header().proposer_id(); //if (p1.proposal.header().height() <= 120 && 220 <= proposal.header().height()) { return abs(p1.proposal.header().proposer_id() - h ) > abs(p2.proposal.header().proposer_id() - h); //} @@ -645,11 +592,6 @@ int ProposalGraph::GetCurrentHeight() { return current_height_; } std::vector<Proposal*> ProposalGraph::GetNewProposals(int height) { std::vector<Proposal*> ps; for (auto it : new_proposals_) { - /* - if (it.second->header().height() >= height) { - continue; - } - */ ps.push_back(it.second); } for (Proposal* p : ps) { @@ -667,7 +609,7 @@ std::vector<Block> ProposalGraph::GetNewBlocks() { return ps; } +*/ -} // namespace cassandra_recv -} // namespace cassandra +} // namespace autobahn } // namespace resdb diff --git a/platform/consensus/ordering/autobahn/algorithm/proposal_graph.h b/platform/consensus/ordering/autobahn/algorithm/proposal_graph.h new file mode 100644 index 00000000..7fff4a0b --- /dev/null +++ b/platform/consensus/ordering/autobahn/algorithm/proposal_graph.h @@ -0,0 +1,101 @@ +#pragma once + +#include <map> + +#include "platform/consensus/ordering/autobahn/algorithm/proposal_state.h" +//#include "platform/consensus/ordering/autobahn/algorithm/ranking.h" +#include "platform/consensus/ordering/autobahn/proto/proposal.pb.h" +#include "platform/statistic/stats.h" + +namespace resdb { +namespace autobahn { + +class ProposalGraph { + public: + ProposalGraph(int fault_num, int id,int total_num); + /* + inline void SetCommitCallBack(std::function<void(const Proposal&)> func) { + commit_callback_ = func; + } + + inline void SetBlockNumCallBack( + std::function<int(const std::string& hash, int id, int sender)> func) { + num_callback_ = func; + } + + int AddProposal(const Proposal& proposal); + void AddProposalOnly(const Proposal& proposal); + + Proposal* GetLatestStrongestProposal(); + const Proposal* GetProposalInfo(const std::string& hash) const; + + int GetCurrentHeight(); + + void Clear(const std::string& hash); + void IncreaseHeight(); + ProposalState GetProposalState(const std::string& hash) const; + + std::vector<std::unique_ptr<Proposal>> GetNotFound(int height, + const std::string& hash); + + std::vector<Proposal*> GetNewProposals(int height); + std::vector<Block> GetNewBlocks(); + */ + + private: + struct NodeInfo { + Proposal proposal; + ProposalState state; + int score; + int is_main; + // std::set<int> received_num[5]; + std::map<int, std::set<int>> votes; + + NodeInfo(const Proposal& proposal) + : proposal(proposal), state(ProposalState::New), score(0), is_main(0) {} + }; + + /* + bool VerifyParent(const Proposal& proposal); + + bool Compare(const NodeInfo& p1, const NodeInfo& p2); + bool Cmp(int id1, int id2); + int StateScore(const ProposalState& state); + int CompareState(const ProposalState& state1, const ProposalState& state2); + + Proposal* GetStrongestProposal(); + + void UpdateHistory(Proposal* proposal); + int CheckState(NodeInfo* node_info, ProposalState state); + void UpgradeState(ProposalState& state); + void TryUpgradeHeight(int height); + + void Commit(const std::string& hash); + int GetBlockNum(const std::string& hash, int local_id, int proposer_id); + */ + + private: + Proposal latest_commit_; + std::map<std::string, std::vector<std::string>> g_; + std::map<std::string, std::unique_ptr<NodeInfo>> node_info_; + std::map<std::string, std::vector<VoteMessage>> not_found_; + //std::unique_ptr<Ranking> ranking_; + std::map<int, int> commit_num_; + std::map<int, std::set<std::string>> last_node_; + int current_height_; + uint32_t f_; + std::function<void(const Proposal&)> commit_callback_; + std::function<int(const std::string&id, int, int&)> num_callback_; + std::map<int, std::set<int>> pending_header_; + std::map<int, std::map<std::string, std::vector<std::unique_ptr<Proposal>>>> + not_found_proposal_; + std::map<std::string, Proposal*> new_proposals_; + Stats* global_stats_; + int id_; + std::map<std::string, Block> new_blocks_; + int total_num_; + std::set<std::pair<int,int> > check_; +}; + +} // namespace autobahn +} // namespace resdb diff --git a/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp b/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp new file mode 100644 index 00000000..b34dcb79 --- /dev/null +++ b/platform/consensus/ordering/autobahn/algorithm/proposal_manager.cpp @@ -0,0 +1,257 @@ +#include "platform/consensus/ordering/autobahn/algorithm/proposal_manager.h" + +#include <glog/logging.h> + +#include "common/crypto/signature_verifier.h" +#include "common/utils/utils.h" + +namespace resdb { +namespace autobahn { + +namespace { +std::string Encode(const std::string& hash) { + std::string ret; + for (int i = 0; i < hash.size(); ++i) { + int x = hash[i]; + ret += std::to_string(x); + } + return ret; +} + +} + +ProposalManager::ProposalManager(int32_t id, int total_num, int f, SignatureVerifier* verifier) + : id_(id), total_num_(total_num), f_(f), verifier_(verifier) { + current_height_ = 0; + local_block_id_ = 1; + current_slot_ = 1; +} + +void ProposalManager::MakeBlock( + std::vector<std::unique_ptr<Transaction>>& txns) { + auto block = std::make_unique<Block>(); + Block::BlockData* data = block->mutable_data(); + for (const auto& txn : txns) { + *data->add_transaction() = *txn; + } + + std::string data_str; + data->SerializeToString(&data_str); + std::string hash = SignatureVerifier::CalculateHash(data_str); + block->set_hash(hash); + block->set_sender_id(id_); + block->set_create_time(GetCurrentTime()); + block->set_local_id(local_block_id_++); + AddLocalBlock(std::move(block)); + // LOG(ERROR)<<"make block time:"<<block->create_time(); +} + +void ProposalManager::AddBlock(std::unique_ptr<Block> block) { + std::unique_lock<std::mutex> lk(mutex_); + int sender = block->sender_id(); + int block_id = block->local_id(); + + //LOG(ERROR)<<"add block from sender:"<<sender<<" id:"<<block_id; + + if(block_id>1) { + assert(block->last_sign_info_size() >= f_+1); + assert(VerifyBlock(*block)); + assert(pending_blocks_[sender].find(block_id-1) != pending_blocks_[sender].end()); + *pending_blocks_[sender][block_id-1]->mutable_sign_info() = block->last_sign_info(); + } + pending_blocks_[sender][block_id] = std::move(block); +} + +Block* ProposalManager::GetBlock(int sender, int64_t block_id) { + std::unique_lock<std::mutex> lk(mutex_); + //LOG(ERROR)<<" get block from sender:"<<sender<<" block id:"<<block_id; + auto it = pending_blocks_[sender].find(block_id); + assert(it != pending_blocks_[sender].end()); + return it->second.get(); +} + +void ProposalManager::AddLocalBlock(std::unique_ptr<Block> block) { + std::unique_lock<std::mutex> lk(mutex_); + //LOG(ERROR)<<"add local block :"<<block->local_id(); + blocks_candidates_[block->local_id()] = std::move(block); +} + +const Block* ProposalManager::GetLocalBlock(int64_t block_id) { + std::unique_lock<std::mutex> lk(mutex_); + if(blocks_candidates_.find(block_id) == blocks_candidates_.end()) { + return nullptr; + } + //LOG(ERROR)<<"get local block :"<<block_id; + while(!blocks_candidates_.empty() && blocks_candidates_.begin()->first < block_id) { + blocks_candidates_.erase(blocks_candidates_.begin()); + } + Block * block = blocks_candidates_.begin()->second.get(); + UpdateLastSign(block); + return block; +} + +void ProposalManager::BlockReady(const std::map<int, SignInfo>& sign_info, int64_t local_id) { + std::unique_lock<std::mutex> lk(mutex_); + //LOG(ERROR)<<"ready block:"<<local_id; + auto it = blocks_candidates_.find(local_id); + if(it == blocks_candidates_.end()){ + return; + } + assert(it != blocks_candidates_.end()); + Block * block = it->second.get(); + for(auto sit : sign_info) { + assert(sit.second.hash() == block->hash()); + *block->add_sign_info() = sit.second; + LOG(ERROR)<<" add last sign:"<<sit.second.sender_id(); + } + + //LOG(ERROR)<<" update block sender:"<<id_<<" local id:"<<local_id; + assert(it->second != nullptr); + pending_blocks_[id_][local_id] = std::move(it->second); + blocks_candidates_.erase(it); + current_height_ = std::max(current_height_, local_id); +} + +int64_t ProposalManager::GetCurrentBlockId() { + std::unique_lock<std::mutex> lk(mutex_); + return current_height_; +} + +void ProposalManager:: UpdateLastSign(Block * block) { + int block_id = block->local_id(); + //LOG(ERROR)<<" update block sign:"<<block_id; + if(block_id>1) { + auto it = pending_blocks_[id_].find(block_id-1); + assert(it != pending_blocks_[id_].end()); + *block->mutable_last_sign_info() = it->second->sign_info(); + } +} + +bool ProposalManager::VerifyBlock(const Block& block) { + + if(block.last_sign_info_size() < f_+1) { + LOG(ERROR)<<" sign info size fail"; + return false; + } + + std::set<int> senders; + for(const auto& sign_info : block.last_sign_info()){ + if(sign_info.hash() != block.last_sign_info(0).hash()){ + LOG(ERROR)<<" sign info hash fail"; + return false; + } + if(sign_info.local_id() != block.last_sign_info(0).local_id()){ + LOG(ERROR)<<" sign info local id fail"; + return false; + } + //LOG(ERROR)<<" check sign :"<<sign_info.sender_id(); + senders.insert(sign_info.sender_id()); + + bool valid = verifier_->VerifyMessage(sign_info.hash(), + sign_info.sign()); + if (!valid) { + LOG(ERROR)<<" sign info sign fail"; + return false; + } + } + //LOG(ERROR)<<" sign info sender size"<< senders.size(); + return senders.size() >= f_+1; +} + +SignInfo ProposalManager::SignBlock(const Block& block) { + SignInfo sign_info; + sign_info.set_hash(block.hash()); + sign_info.set_sender_id(id_); + sign_info.set_local_id(block.local_id()); + + auto hash_signature_or = verifier_->SignMessage(block.hash()); + if (!hash_signature_or.ok()) { + LOG(ERROR) << "Sign message fail"; + return SignInfo(); + } + *sign_info.mutable_sign()=*hash_signature_or; + return sign_info; +} + +void ProposalManager::UpdateView(int sender, int64_t block_id) { + std::unique_lock<std::mutex> lk(slot_mutex_); + //LOG(ERROR)<<"update slot sender:"<<sender<<" slot:"<<current_slot_; + if(slot_state_[sender].first != current_slot_) { + new_blocks_[current_slot_]++; + } + slot_state_[sender] = std::make_pair(current_slot_, block_id); +} + +bool ProposalManager::ReadyView(int slot){ + std::unique_lock<std::mutex> lk(slot_mutex_); + //LOG(ERROR)<<"ready slot sender:"<<current_slot_<<" new blocks:"<<new_blocks_[current_slot_]; + return new_blocks_[current_slot_]>=2*f_+1; +} + +int ProposalManager::GetCurrentView() { + std::unique_lock<std::mutex> lk(slot_mutex_); + return current_slot_; +} + +void ProposalManager::IncreaseView() { + std::unique_lock<std::mutex> lk(slot_mutex_); + //LOG(ERROR)<<"increase slot:"<<current_slot_; + current_slot_++; +} + +std::pair<int, std::map<int, int64_t>> ProposalManager::GetCut() { + std::map<int, int64_t> blocks; + std::unique_lock<std::mutex> lk(slot_mutex_); + for(auto it : slot_state_) { + if(it.second.first == current_slot_) { + blocks[it.first]=it.second.second; + //LOG(ERROR)<<"get cut sender:"<<it.first<<" block:"<<it.second.second; + } + } + current_slot_++; + return std::make_pair(current_slot_-1, blocks); +} + +std::unique_ptr<Proposal> ProposalManager::GenerateProposal(int slot, const std::map<int, int64_t>& blocks) { + auto proposal = std::make_unique<Proposal>(); + std::string data_hash; + { + for (auto& it: blocks) { + Block* block = proposal->add_block(); + Block* data_block = GetBlock(it.first, it.second); + data_hash += data_block->hash(); + //LOG(ERROR)<<" gene proposal block from:"<<data_block->sender_id()<<" block id:"<<data_block->local_id(); + *block->mutable_sign_info() = data_block->sign_info(); + block->set_local_id(data_block->local_id()); + block->set_sender_id(data_block->sender_id()); + } + } + proposal->set_slot_id(slot); + proposal->set_sender_id(id_); + proposal->set_hash(data_hash); + return proposal; +} + +std::unique_ptr<Proposal> ProposalManager::GetProposalData(int slot) { + std::unique_lock<std::mutex> lk(p_mutex_); + //LOG(ERROR)<<" get proposal:"<<slot; + return std::move(pending_proposals_[slot]); +} + +void ProposalManager::AddProposalData(std::unique_ptr<Proposal> p) { + std::unique_lock<std::mutex> lk(p_mutex_); + int slot_id = p->slot_id(); + //LOG(ERROR)<<" add proposal:"<<slot_id; + /* + for(const auto& block : p->block()) { + int block_owner = block.sender_id(); + int block_id = block.local_id(); + LOG(ERROR)<<" add proposal block:"<<block_owner<<" block id :"<<block_id<<" slot:"<<slot_id; + } + */ + + pending_proposals_[slot_id] = std::move(p); +} + +} // namespace autobahn +} // namespace resdb diff --git a/platform/consensus/ordering/autobahn/algorithm/proposal_manager.h b/platform/consensus/ordering/autobahn/algorithm/proposal_manager.h new file mode 100644 index 00000000..638d6cff --- /dev/null +++ b/platform/consensus/ordering/autobahn/algorithm/proposal_manager.h @@ -0,0 +1,71 @@ +#pragma once + +#include <condition_variable> +#include <list> + +#include "platform/consensus/ordering/autobahn/algorithm/proposal_graph.h" +#include "platform/consensus/ordering/autobahn/proto/proposal.pb.h" +#include "platform/statistic/stats.h" +#include "common/crypto/signature_verifier.h" + +namespace resdb { +namespace autobahn { + +class ProposalManager { + public: + ProposalManager(int32_t id, int total_num, int f, SignatureVerifier* verifier); + + void MakeBlock( + std::vector<std::unique_ptr<Transaction>>& txn); + void AddBlock(std::unique_ptr<Block> block); + void AddLocalBlock(std::unique_ptr<Block> block); + const Block* GetLocalBlock(int64_t block_id); + Block* GetBlock(int sender, int64_t block_id); + int64_t GetCurrentBlockId(); + + void BlockReady(const std::map<int, SignInfo>& sign_info, int64_t local_id); + + SignInfo SignBlock(const Block& block); + bool VerifyBlock(const Block& block); + + bool ReadyView(int slot); + int GetCurrentView(); + void IncreaseView(); + void UpdateView(int sender, int64_t block_id); + + std::pair<int, std::map<int, int64_t>> GetCut(); + std::unique_ptr<Proposal> GenerateProposal(int slot, const std::map<int, int64_t>& blocks); + + std::unique_ptr<Proposal> GetProposalData(int slot); + void AddProposalData(std::unique_ptr<Proposal> p); + + + private: + void UpdateLastSign(Block * block); + + private: + int32_t id_; + int64_t local_block_id_ = 1; + + std::map<int64_t, std::unique_ptr<Block>> pending_blocks_[512]; + std::mutex mutex_, slot_mutex_, p_mutex_; + std::map<int, std::unique_ptr<Block>> blocks_candidates_; + + std::map<int, std::pair<int, int64_t>> slot_state_; + std::map<int,int> new_blocks_; + + //std::mutex t_mutex_; + //std::map<std::string, std::unique_ptr<Proposal>> local_proposal_; + //Stats* global_stats_; + int total_num_; + int f_; + int64_t current_height_; + int current_slot_; + + SignatureVerifier* verifier_; + + std::map<int, std::unique_ptr<Proposal> > pending_proposals_; +}; + +} // namespace autobahn +} // namespace resdb diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h b/platform/consensus/ordering/autobahn/algorithm/proposal_state.h similarity index 88% copy from platform/consensus/ordering/cassandra/algorithm/proposal_state.h copy to platform/consensus/ordering/autobahn/algorithm/proposal_state.h index 876b1be0..5ff98e16 100644 --- a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h +++ b/platform/consensus/ordering/autobahn/algorithm/proposal_state.h @@ -2,12 +2,12 @@ //#define GOPOA //#define GOTPOA -#define GOTPOR +//#define GOTPOR //#define NOPOA namespace resdb { -namespace cassandra { +namespace autobahn { /* enum ProposalState { diff --git a/platform/consensus/ordering/autobahn/algorithm/ranking.cpp b/platform/consensus/ordering/autobahn/algorithm/ranking.cpp new file mode 100644 index 00000000..d8454e43 --- /dev/null +++ b/platform/consensus/ordering/autobahn/algorithm/ranking.cpp @@ -0,0 +1,12 @@ + +#include "platform/consensus/ordering/cassandra/algorithm/ranking.h" + +namespace resdb { +namespace cassandra { +namespace cassandra_recv { + +int Ranking::GetRank(int proposer_id) { return proposer_id; } + +} // namespace cassandra_recv +} // namespace cassandra +} // namespace resdb diff --git a/platform/consensus/ordering/autobahn/algorithm/ranking.h b/platform/consensus/ordering/autobahn/algorithm/ranking.h new file mode 100644 index 00000000..d3d2dfb6 --- /dev/null +++ b/platform/consensus/ordering/autobahn/algorithm/ranking.h @@ -0,0 +1,14 @@ +#pragma once + +namespace resdb { +namespace cassandra { +namespace cassandra_recv { + +class Ranking { + public: + int GetRank(int proposer_id); +}; + +} // namespace cassandra_recv +} // namespace cassandra +} // namespace resdb diff --git a/platform/consensus/ordering/autobahn/framework/BUILD b/platform/consensus/ordering/autobahn/framework/BUILD new file mode 100644 index 00000000..8bf19829 --- /dev/null +++ b/platform/consensus/ordering/autobahn/framework/BUILD @@ -0,0 +1,16 @@ +package(default_visibility = ["//visibility:private"]) + +cc_library( + name = "consensus", + srcs = ["consensus.cpp"], + hdrs = ["consensus.h"], + visibility = [ + "//visibility:public", + ], + deps = [ + "//common/utils", + "//platform/consensus/ordering/common/framework:consensus", + "//platform/consensus/ordering/autobahn/algorithm:autobahn", + ], +) + diff --git a/platform/consensus/ordering/autobahn/framework/consensus.cpp b/platform/consensus/ordering/autobahn/framework/consensus.cpp new file mode 100644 index 00000000..0097a5c4 --- /dev/null +++ b/platform/consensus/ordering/autobahn/framework/consensus.cpp @@ -0,0 +1,207 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#include "platform/consensus/ordering/autobahn/framework/consensus.h" + +#include <glog/logging.h> +#include <unistd.h> + +#include "common/utils/utils.h" + +namespace resdb { +namespace autobahn { + +Consensus::Consensus(const ResDBConfig& config, + std::unique_ptr<TransactionManager> executor) + : common::Consensus(config, std::move(executor)){ + int total_replicas = config_.GetReplicaNum(); + int f = (total_replicas - 1) / 3; + + Init(); + + start_ = 0; + + if (config_.GetPublicKeyCertificateInfo() + .public_key() + .public_key_info() + .type() != CertificateKeyInfo::CLIENT) { + autobahn_ = std::make_unique<AutoBahn>( + config_.GetSelfInfo().id(), f, + total_replicas, config_.GetConfigData().block_size(), + GetSignatureVerifier()); + + InitProtocol(autobahn_.get()); + + } +} + +int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) { + //LOG(ERROR)<<"receive commit:"<<request->type()<<" "<<MessageType_Name(request->user_type())<<" from:"<<request->sender_id(); + if (request->user_type() == MessageType::NewBlocks) { + std::unique_ptr<Block> block = std::make_unique<Block>(); + if (!block->ParseFromString(request->data())) { + assert(1 == 0); + LOG(ERROR) << "parse proposal fail"; + return -1; + } + autobahn_->ReceiveBlock(std::move(block)); + return 0; + } + else if (request->user_type() == MessageType::CMD_BlockACK) { + std::unique_ptr<BlockACK> block_ack = std::make_unique<BlockACK>(); + if (!block_ack->ParseFromString(request->data())) { + LOG(ERROR) << "parse proposal fail"; + assert(1 == 0); + return -1; + } + autobahn_->ReceiveBlockACK(std::move(block_ack)); + return 0; + + } else if (request->user_type() == MessageType::NewProposal) { + // LOG(ERROR)<<"receive proposal:"; + std::unique_ptr<Proposal> proposal = std::make_unique<Proposal>(); + if (!proposal->ParseFromString(request->data())) { + LOG(ERROR) << "parse proposal fail"; + assert(1 == 0); + return -1; + } + if (!autobahn_->ReceiveProposal(std::move(proposal))) { + return -1; + } + return 0; + } else if (request->user_type() == MessageType::ProposalAck) { + // LOG(ERROR)<<"receive proposal:"; + std::unique_ptr<Proposal> proposal = std::make_unique<Proposal>(); + if (!proposal->ParseFromString(request->data())) { + LOG(ERROR) << "parse proposal fail"; + assert(1 == 0); + return -1; + } + if (!autobahn_->ReceiveVote(std::move(proposal))) { + return -1; + } + return 0; + } else if (request->user_type() == MessageType::Prepare) { + std::unique_ptr<Proposal> proposal = std::make_unique<Proposal>(); + if (!proposal->ParseFromString(request->data())) { + LOG(ERROR) << "parse proposal fail"; + assert(1 == 0); + return -1; + } + if (!autobahn_->ReceivePrepare(std::move(proposal))) { + return -1; + } + return 0; + + } else if (request->user_type() == MessageType::Commit) { + std::unique_ptr<Proposal> proposal = std::make_unique<Proposal>(); + if (!proposal->ParseFromString(request->data())) { + LOG(ERROR) << "parse proposal fail"; + assert(1 == 0); + return -1; + } + if (!autobahn_->ReceiveCommit(std::move(proposal))) { + return -1; + } + return 0; + + /* + } else if (request->user_type() == MessageType::CMD_BlockQuery) { + std::unique_ptr<BlockQuery> block = std::make_unique<BlockQuery>(); + if (!block->ParseFromString(request->data())) { + assert(1 == 0); + LOG(ERROR) << "parse proposal fail"; + return -1; + } + autobahn_->SendBlock(*block); + return 0; + } else if (request->user_type() == MessageType::CMD_ProposalQuery) { + std::unique_ptr<ProposalQuery> query = + std::make_unique<ProposalQuery>(); + if (!query->ParseFromString(request->data())) { + assert(1 == 0); + LOG(ERROR) << "parse proposal fail"; + return -1; + } + autobahn_->SendProposal(*query); + } else if (request->user_type() == + MessageType::CMD_ProposalQueryResponse) { + std::unique_ptr<ProposalQueryResp> resp = + std::make_unique<ProposalQueryResp>(); + if (!resp->ParseFromString(request->data())) { + assert(1 == 0); + LOG(ERROR) << "parse proposal fail"; + return -1; + } + autobahn_->ReceiveProposalQueryResp(*resp); + */ + } + return 0; +} + +int Consensus::ProcessNewTransaction(std::unique_ptr<Request> request) { + std::unique_ptr<Transaction> txn = std::make_unique<Transaction>(); + txn->set_data(request->data()); + txn->set_hash(request->hash()); + txn->set_proxy_id(request->proxy_id()); + //LOG(ERROR)<<"receive txn"; + return autobahn_->ReceiveTransaction(std::move(txn)); +} + +int Consensus::CommitMsg(const google::protobuf::Message& msg) { + return CommitMsgInternal(dynamic_cast<const Transaction&>(msg)); +} + +int Consensus::CommitMsgInternal(const Transaction& txn) { + //LOG(ERROR)<<"commit txn:"<<txn.id()<<" proxy id:"<<txn.proxy_id()<<" uid:"<<txn.uid(); + std::unique_ptr<Request> request = std::make_unique<Request>(); + request->set_queuing_time(txn.queuing_time()); + request->set_data(txn.data()); + request->set_seq(txn.id()); + request->set_uid(txn.uid()); + //if (txn.proposer_id() == config_.GetSelfInfo().id()) { + request->set_proxy_id(txn.proxy_id()); + // LOG(ERROR)<<"commit txn:"<<txn.id()<<" proxy id:"<<request->uid(); + //assert(request->uid()>0); + //} + + transaction_executor_->AddExecuteMessage(std::move(request)); + return 0; +} + + +int Consensus::Prepare(const Transaction& txn) { + // LOG(ERROR)<<"prepare txn:"<<txn.id()<<" proxy id:"<<txn.proxy_id()<<" + // uid:"<<txn.uid(); + std::unique_ptr<Request> request = std::make_unique<Request>(); + request->set_data(txn.data()); + request->set_uid(txn.uid()); + transaction_executor_->Prepare(std::move(request)); + return 0; +} + + +} // namespace autobahn +} // namespace resdb diff --git a/platform/consensus/ordering/autobahn/framework/consensus.h b/platform/consensus/ordering/autobahn/framework/consensus.h new file mode 100644 index 00000000..a13e3eff --- /dev/null +++ b/platform/consensus/ordering/autobahn/framework/consensus.h @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#pragma once + +#include "executor/common/transaction_manager.h" +#include "platform/consensus/ordering/common/framework/consensus.h" +#include "platform/consensus/ordering/autobahn/algorithm/autobahn.h" +#include "platform/networkstrate/consensus_manager.h" + +namespace resdb { +namespace autobahn { + +class Consensus : public common::Consensus { + public: + Consensus(const ResDBConfig& config, + std::unique_ptr<TransactionManager> transaction_manager); + virtual ~Consensus() = default; + + private: + int ProcessCustomConsensus(std::unique_ptr<Request> request) override; + int ProcessNewTransaction(std::unique_ptr<Request> request) override; + int CommitMsg(const google::protobuf::Message& msg) override; + int CommitMsgInternal(const Transaction& txn); + + int Prepare(const Transaction& txn); + + protected: + std::unique_ptr<AutoBahn> autobahn_; + Stats* global_stats_; + int64_t start_; + std::mutex mutex_; + int send_num_[200]; +}; + +} // namespace autobahn +} // namespace resdb diff --git a/platform/consensus/ordering/autobahn/framework/consensus_test.cpp b/platform/consensus/ordering/autobahn/framework/consensus_test.cpp new file mode 100644 index 00000000..2c8834a8 --- /dev/null +++ b/platform/consensus/ordering/autobahn/framework/consensus_test.cpp @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2019-2022 ExpoLab, UC Davis + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + */ + +#include "platform/consensus/ordering/cassandra/framework/consensus.h" + +#include <glog/logging.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <future> + +#include "common/test/test_macros.h" +#include "executor/common/mock_transaction_manager.h" +#include "platform/config/resdb_config_utils.h" +#include "platform/networkstrate/mock_replica_communicator.h" + +namespace resdb { +namespace cassandra { +namespace { + +using ::resdb::testing::EqualsProto; +using ::testing::_; +using ::testing::Invoke; +using ::testing::Test; + +ResDBConfig GetConfig() { + ResDBConfig config({GenerateReplicaInfo(1, "127.0.0.1", 1234), + GenerateReplicaInfo(2, "127.0.0.1", 1235), + GenerateReplicaInfo(3, "127.0.0.1", 1236), + GenerateReplicaInfo(4, "127.0.0.1", 1237)}, + GenerateReplicaInfo(1, "127.0.0.1", 1234)); + return config; +} + +class ConsensusTest : public Test { + public: + ConsensusTest() : config_(GetConfig()) { + auto transaction_manager = + std::make_unique<MockTransactionExecutorDataImpl>(); + mock_transaction_manager_ = transaction_manager.get(); + consensus_ = + std::make_unique<Consensus>(config_, std::move(transaction_manager)); + consensus_->SetCommunicator(&replica_communicator_); + } + + void AddTransaction(const std::string& data) { + auto request = std::make_unique<Request>(); + request->set_type(Request::TYPE_NEW_TXNS); + + Transaction txn; + + BatchUserRequest batch_request; + auto req = batch_request.add_user_requests(); + req->mutable_request()->set_data(data); + + batch_request.set_local_id(1); + batch_request.SerializeToString(txn.mutable_data()); + + txn.SerializeToString(request->mutable_data()); + + EXPECT_EQ(consensus_->ConsensusCommit(nullptr, std::move(request)), 0); + } + + protected: + ResDBConfig config_; + MockTransactionExecutorDataImpl* mock_transaction_manager_; + MockReplicaCommunicator replica_communicator_; + std::unique_ptr<TransactionManager> transaction_manager_; + std::unique_ptr<Consensus> consensus_; +}; + +TEST_F(ConsensusTest, NormalCase) { + std::promise<bool> commit_done; + std::future<bool> commit_done_future = commit_done.get_future(); + + EXPECT_CALL(replica_communicator_, BroadCast) + .WillRepeatedly(Invoke([&](const google::protobuf::Message& msg) { + Request request = *dynamic_cast<const Request*>(&msg); + + if (request.user_type() == MessageType::NewProposal) { + LOG(ERROR) << "bc new proposal"; + consensus_->ConsensusCommit(nullptr, + std::make_unique<Request>(request)); + LOG(ERROR) << "recv proposal done"; + } + if (request.user_type() == MessageType::Vote) { + LOG(ERROR) << "bc vote"; + + VoteMessage ack_msg; + assert(ack_msg.ParseFromString(request.data())); + for (int i = 1; i <= 3; ++i) { + ack_msg.set_proposer_id(i); + auto new_req = std::make_unique<Request>(request); + ack_msg.SerializeToString(new_req->mutable_data()); + + consensus_->ConsensusCommit(nullptr, std::move(new_req)); + } + } + // LOG(ERROR)<<"bc type:"<<request->type()<<" user + // type:"<<request->user_type(); + if (request.user_type() == MessageType::Prepare) { + LOG(ERROR) << "bc prepare"; + + VoteMessage ack_msg; + assert(ack_msg.ParseFromString(request.data())); + for (int i = 1; i <= 3; ++i) { + ack_msg.set_proposer_id(i); + auto new_req = std::make_unique<Request>(request); + ack_msg.SerializeToString(new_req->mutable_data()); + + consensus_->ConsensusCommit(nullptr, std::move(new_req)); + } + } + if (request.user_type() == MessageType::Voteprep) { + LOG(ERROR) << "bc voterep:"; + + VoteMessage ack_msg; + assert(ack_msg.ParseFromString(request.data())); + for (int i = 1; i <= 3; ++i) { + ack_msg.set_proposer_id(i); + auto new_req = std::make_unique<Request>(request); + ack_msg.SerializeToString(new_req->mutable_data()); + LOG(ERROR) << "new request type:" << new_req->user_type(); + + consensus_->ConsensusCommit(nullptr, std::move(new_req)); + } + } + LOG(ERROR) << "done"; + return 0; + })); + + EXPECT_CALL(*mock_transaction_manager_, ExecuteData) + .WillOnce(Invoke([&](const std::string& msg) { + LOG(ERROR) << "execute txn:" << msg; + EXPECT_EQ(msg, "transaction1"); + return nullptr; + })); + + EXPECT_CALL(replica_communicator_, SendMessage(_, 0)) + .WillRepeatedly( + Invoke([&](const google::protobuf::Message& msg, int64_t) { + Request request = *dynamic_cast<const Request*>(&msg); + if (request.type() == Request::TYPE_RESPONSE) { + LOG(ERROR) << "get response"; + commit_done.set_value(true); + } + return; + })); + + AddTransaction("transaction1"); + + commit_done_future.get(); +} + +} // namespace +} // namespace cassandra +} // namespace resdb diff --git a/platform/consensus/ordering/autobahn/proto/BUILD b/platform/consensus/ordering/autobahn/proto/BUILD new file mode 100644 index 00000000..58e3ed37 --- /dev/null +++ b/platform/consensus/ordering/autobahn/proto/BUILD @@ -0,0 +1,16 @@ +package(default_visibility = ["//platform/consensus/ordering/autobahn:__subpackages__"]) + +load("@rules_cc//cc:defs.bzl", "cc_proto_library") +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@rules_proto_grpc//python:defs.bzl", "python_proto_library") + +proto_library( + name = "proposal_proto", + srcs = ["proposal.proto"], + deps = ["//common/proto:signature_info_proto"], +) + +cc_proto_library( + name = "proposal_cc_proto", + deps = [":proposal_proto"], +) diff --git a/platform/consensus/ordering/autobahn/proto/proposal.proto b/platform/consensus/ordering/autobahn/proto/proposal.proto new file mode 100644 index 00000000..9afd7567 --- /dev/null +++ b/platform/consensus/ordering/autobahn/proto/proposal.proto @@ -0,0 +1,133 @@ + +syntax = "proto3"; +import "common/proto/signature_info.proto"; + +package resdb.autobahn; + +message Transaction{ + int32 id = 1; + bytes data = 2; + bytes hash = 3; + int32 proxy_id = 4; + int32 proposer_id = 5; + int64 uid = 6; + int64 create_time = 7; + int64 queuing_time = 8; +} + +message Header { + bytes hash = 1; + int32 height = 2; + int32 proposer_id = 3; + int32 proposal_id = 4; + bytes prehash = 5; +} + +message History { + bytes hash = 1; + int32 state = 2; + int32 sender = 3; +} + +message Cert { + repeated SignatureInfo sign = 1; +} + +message Proposal { + int32 slot_id = 1; + int32 sender_id = 2; + repeated Block block = 3; + bool fast_commit = 4; + SignatureInfo sign = 5; + bytes hash = 6; + Cert cert = 7; +}; + +enum MessageType { + NewProposal = 0; + Vote = 1; + Prepare = 2; + VoteAck = 3; + Commit = 4; + Recovery = 5; + NewBlocks = 6; + ProposalAck = 7; + CMD_BlockACK = 8; + CMD_BlockQuery = 9; + CMD_SingleBlock = 10; + CMD_ProposalQuery = 11; + CMD_ProposalQueryResponse = 12; +}; + +message VoteMessage { + bytes hash = 1; + int32 proposer_id = 2; + MessageType type = 3; + int32 sender_id = 4; + int32 proposal_id = 5; +}; + +message CommittedProposals{ + repeated Proposal proposals = 1; + int32 sender_id = 2; +}; + +message HashValue{ + repeated uint64 bits = 1; +}; + +message SignInfo { + bytes hash = 1; + int32 sender_id = 2; + int32 local_id = 3; + SignatureInfo sign = 4; +} + +message Block { + message BlockData { + repeated Transaction transaction = 1; + } + BlockData data = 1; + bytes hash = 2; + int32 sender_id = 3; + int32 local_id = 4; + int64 create_time = 5; + repeated SignInfo last_sign_info = 6; + repeated SignInfo sign_info = 7; +} + +message BlockACK { + bytes hash = 2; + int32 sender_id = 3; + int32 local_id = 4; + int32 responder = 5; + SignInfo sign_info = 6; +} + +message BlockQuery { + bytes hash = 2; + int32 proposer = 3; + int32 local_id = 4; + int32 sender = 5; +} + +message ProposalQuery { + bytes hash = 2; + int32 proposer = 3; + int32 id = 4; + int32 sender = 5; +} + +message ProposalQueryResp { + repeated Proposal proposal = 1; +} + +message ProposalResponse { + Header header = 1; + int32 leader = 2; +} + +message VoteMsg { + int32 slot_id = 1; + int32 sender_id = 2; +} diff --git a/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp b/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp index ddad256f..ba79e744 100644 --- a/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp +++ b/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp @@ -18,8 +18,8 @@ Cassandra::Cassandra(int id, int f, int total_num, int block_size, SignatureVeri total_num_ = total_num; f_ = f; is_stop_ = false; - timeout_ms_ = 100; - //timeout_ms_ = 60000; + //timeout_ms_ = 100; + timeout_ms_ = 60000; local_txn_id_ = 1; local_proposal_id_ = 1; batch_size_ = block_size; @@ -32,7 +32,7 @@ Cassandra::Cassandra(int id, int f, int total_num, int block_size, SignatureVeri execute_id_ = 1; graph_ = std::make_unique<ProposalGraph>(f_, id, total_num); - proposal_manager_ = std::make_unique<ProposalManager>(id, graph_.get()); + proposal_manager_ = std::make_unique<ProposalManager>(id, graph_.get(), total_num_); graph_->SetCommitCallBack( [&](const Proposal& proposal) { CommitProposal(proposal); }); @@ -617,13 +617,16 @@ bool Cassandra::AddProposal(const Proposal& proposal) { << " proposal height:" << proposal.header().height() << " num:" << received_num_[graph_->GetCurrentHeight()].size() << " from:" << proposal.header().proposer_id() - << " last vote:" << last_vote_; + << " last vote?:" << last_vote_ + << " total num:"<<total_num_ + <<" check:"<<(received_num_[graph_->GetCurrentHeight()].size() == total_num_); if (received_num_[graph_->GetCurrentHeight()].size() == total_num_) { + LOG(ERROR)<<" last vote:"<<last_vote_<<" CurrentHeight:"<<graph_->GetCurrentHeight(); if (last_vote_ < graph_->GetCurrentHeight()) { last_vote_ = graph_->GetCurrentHeight(); can_vote_[graph_->GetCurrentHeight()] = true; vote_cv_.notify_all(); - //LOG(ERROR) << "can vote:"; + LOG(ERROR) << "can vote:"; } } //LOG(ERROR)<<"recv done"; diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp b/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp index e5b91135..fb7a85f0 100644 --- a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp +++ b/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp @@ -589,7 +589,7 @@ bool ProposalGraph::Compare(const NodeInfo& p1, const NodeInfo& p2) { int h = (p1.proposal.header().height())%total_num_; if ( h == 0) h = total_num_; - //LOG(ERROR)<<" check height :"<<h<<" cmp:"<<abs(p1.proposal.header().proposer_id() - h )<<" "<<abs(p2.proposal.header().proposer_id() - h); + LOG(ERROR)<<" check height :"<<h<<" cmp:"<<abs(p1.proposal.header().proposer_id() - h )<<" "<<abs(p2.proposal.header().proposer_id() - h)<<" from:"<<p1.proposal.header().proposer_id(); //if (p1.proposal.header().height() <= 120 && 220 <= proposal.header().height()) { return abs(p1.proposal.header().proposer_id() - h ) > abs(p2.proposal.header().proposer_id() - h); //} diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp index 41c03cd2..708c4204 100644 --- a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp +++ b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp @@ -21,8 +21,8 @@ std::string Encode(const std::string& hash) { } -ProposalManager::ProposalManager(int32_t id, ProposalGraph* graph) - : id_(id), graph_(graph) { +ProposalManager::ProposalManager(int32_t id, ProposalGraph* graph, int total_num) + : id_(id), graph_(graph), total_num_(total_num) { local_proposal_id_ = 1; global_stats_ = Stats::GetGlobalStats(); } @@ -190,12 +190,22 @@ std::unique_ptr<Proposal> ProposalManager::GenerateProposal(int round, return nullptr; // LOG(ERROR) << "generate wait proposal block size:" << blocks_.size(); } + int max_block = 1; + + int h = round % total_num_; + if ( h == 0) h = total_num_; + if (id_ != h ) { + max_block = 0; + } + + int num = 0; int64_t current_time = GetCurrentTime(); proposal->set_create_time(current_time); //LOG(ERROR)<<"block size:"<<blocks_.size(); for (auto& block : blocks_) { + if (max_block <= 0) break; data += block->hash(); Block* ab = proposal->add_block(); *ab = *block; diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.h b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.h index 0ba35bb0..75b13fc9 100644 --- a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.h +++ b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.h @@ -13,7 +13,7 @@ namespace cassandra_recv { class ProposalManager { public: - ProposalManager(int32_t id, ProposalGraph* graph); + ProposalManager(int32_t id, ProposalGraph* graph, int total_num); int VerifyProposal(const Proposal& proposal); @@ -68,6 +68,7 @@ class ProposalManager { std::mutex t_mutex_; std::map<std::string, std::unique_ptr<Proposal>> local_proposal_; Stats* global_stats_; + int total_num_; }; } // namespace cassandra_recv diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h b/platform/consensus/ordering/cassandra/algorithm/proposal_state.h index 876b1be0..ae497e7a 100644 --- a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h +++ b/platform/consensus/ordering/cassandra/algorithm/proposal_state.h @@ -2,7 +2,7 @@ //#define GOPOA //#define GOTPOA -#define GOTPOR +//#define GOTPOR //#define NOPOA diff --git a/scripts/deploy/config/cassandra.config b/scripts/deploy/config/autobahn.config similarity index 91% copy from scripts/deploy/config/cassandra.config copy to scripts/deploy/config/autobahn.config index fb5ff19d..f445a70a 100644 --- a/scripts/deploy/config/cassandra.config +++ b/scripts/deploy/config/autobahn.config @@ -7,5 +7,5 @@ "worker_num": 10, "input_worker_num": 1, "output_worker_num": 5, - "block_size":1000 + "block_size":100 } diff --git a/scripts/deploy/config/cassandra.config b/scripts/deploy/config/cassandra.config index fb5ff19d..f445a70a 100644 --- a/scripts/deploy/config/cassandra.config +++ b/scripts/deploy/config/cassandra.config @@ -7,5 +7,5 @@ "worker_num": 10, "input_worker_num": 1, "output_worker_num": 5, - "block_size":1000 + "block_size":100 } diff --git a/scripts/deploy/config/kv_performance_server.conf b/scripts/deploy/config/kv_performance_server.conf index 823c66cb..c260a9bb 100644 --- a/scripts/deploy/config/kv_performance_server.conf +++ b/scripts/deploy/config/kv_performance_server.conf @@ -1,12 +1,12 @@ iplist=( 172.31.18.66 -172.31.27.193 172.31.27.153 -172.31.16.133 172.31.31.76 -172.31.17.121 172.31.24.63 172.31.20.159 +172.31.25.33 +172.31.30.103 +172.31.31.166 ) client_num=4 diff --git a/scripts/deploy/config/kv_performance_server_32.conf b/scripts/deploy/config/kv_performance_server_32.conf index 338c7b0a..50b7fcc2 100644 --- a/scripts/deploy/config/kv_performance_server_32.conf +++ b/scripts/deploy/config/kv_performance_server_32.conf @@ -63,8 +63,8 @@ iplist=( 172.31.21.148 172.31.17.147 172.31.21.207 -172.31.30.15 -172.31.26.29 +#172.31.30.15 +#172.31.26.29 ) key=~/.ssh/junchao.pem diff --git a/scripts/deploy/performance/autobahn_performance.sh b/scripts/deploy/performance/autobahn_performance.sh new file mode 100755 index 00000000..750be672 --- /dev/null +++ b/scripts/deploy/performance/autobahn_performance.sh @@ -0,0 +1,5 @@ +export server=//benchmark/protocols/autobahn:kv_server_performance +export TEMPLATE_PATH=$PWD/config/autobahn.config + +./performance/run_performance.sh $* +echo $0 diff --git a/scripts/deploy/performance/run_performance.sh b/scripts/deploy/performance/run_performance.sh index d1b9c22a..ece3816f 100755 --- a/scripts/deploy/performance/run_performance.sh +++ b/scripts/deploy/performance/run_performance.sh @@ -21,7 +21,7 @@ echo "get cofigfile:"$config_file ${BAZEL_WORKSPACE_PATH}/bazel-bin/benchmark/protocols/pbft/kv_service_tools $config_file done -sleep 120 +sleep 60 echo "benchmark done" count=1 diff --git a/scripts/null b/scripts/null index 93ada22f..1afefe98 100644 --- a/scripts/null +++ b/scripts/null @@ -1 +1 @@ -/home/ubuntu/asf-resilientdb/service/contract/benchmark/data/smallbank.json: No such file or directory +/home/ubuntu/asf_resilientdb/service/contract/benchmark/data/smallbank.json: No such file or directory
