This is an automated email from the ASF dual-hosted git repository. junchao pushed a commit to branch poc_merge in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit a90c055cd7e164dbb33a2cf2f90bc1492b1d6859 Author: cjcchen <[email protected]> AuthorDate: Sun Dec 28 06:46:48 2025 +0000 add poc --- platform/config/resdb_poc_config.cpp | 30 +- platform/config/resdb_poc_config.h | 35 +- platform/consensus/ordering/poc/pow/BUILD | 202 +++++++++ .../consensus/ordering/poc/pow/block_manager.cpp | 262 +++++++++++ .../consensus/ordering/poc/pow/block_manager.h | 86 ++++ .../ordering/poc/pow/block_manager_test.cpp | 400 +++++++++++++++++ .../ordering/poc/pow/consensus_service_pow.cpp | 53 +++ .../ordering/poc/pow/consensus_service_pow.h | 27 ++ .../poc/pow/consensus_service_pow_test.cpp | 330 ++++++++++++++ platform/consensus/ordering/poc/pow/merkle.cpp | 28 ++ platform/consensus/ordering/poc/pow/merkle.h | 12 + .../consensus/ordering/poc/pow/merkle_test.cpp | 46 ++ platform/consensus/ordering/poc/pow/miner.cpp | 161 +++++++ platform/consensus/ordering/poc/pow/miner.h | 44 ++ .../consensus/ordering/poc/pow/miner_manager.cpp | 12 + .../consensus/ordering/poc/pow/miner_manager.h | 18 + platform/consensus/ordering/poc/pow/miner_test.cpp | 170 ++++++++ .../consensus/ordering/poc/pow/miner_utils.cpp | 102 +++++ platform/consensus/ordering/poc/pow/miner_utils.h | 30 ++ .../ordering/poc/pow/miner_utils_test.cpp | 76 ++++ .../ordering/poc/pow/mock_transaction_accessor.h | 15 + .../consensus/ordering/poc/pow/pow_manager.cpp | 271 ++++++++++++ platform/consensus/ordering/poc/pow/pow_manager.h | 79 ++++ .../ordering/poc/pow/pow_manager_test.cpp | 482 +++++++++++++++++++++ .../consensus/ordering/poc/pow/shift_manager.cpp | 31 ++ .../consensus/ordering/poc/pow/shift_manager.h | 24 + .../ordering/poc/pow/transaction_accessor.cpp | 153 +++++++ .../ordering/poc/pow/transaction_accessor.h | 48 ++ .../ordering/poc/pow/transaction_accessor_test.cpp | 121 ++++++ platform/consensus/ordering/poc/proto/BUILD | 28 ++ platform/consensus/ordering/poc/proto/pow.proto | 52 +++ .../consensus/ordering/poc/proto/transaction.proto | 12 + platform/networkstrate/consensus_manager.h | 2 +- platform/proto/resdb.proto | 21 + service/poc/BUILD | 12 + service/poc/pow_server.cpp | 56 +++ 36 files changed, 3487 insertions(+), 44 deletions(-) diff --git a/platform/config/resdb_poc_config.cpp b/platform/config/resdb_poc_config.cpp index c644f391..d638e5f2 100644 --- a/platform/config/resdb_poc_config.cpp +++ b/platform/config/resdb_poc_config.cpp @@ -1,26 +1,18 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - #include "platform/config/resdb_poc_config.h" namespace resdb { +ResDBPoCConfig::ResDBPoCConfig(const ResDBConfig& bft_config, + const std::vector<ReplicaInfo>& replicas, + const ReplicaInfo& self_info, + const KeyInfo& private_key, + const CertificateInfo& public_key_cert_info) + : ResDBConfig(replicas, self_info, private_key, public_key_cert_info), + bft_config_(bft_config) { + SetHeartBeatEnabled(false); + SetSignatureVerifierEnabled(false); +} + ResDBPoCConfig::ResDBPoCConfig(const ResDBConfig& bft_config, const ResConfigData& config_data, const ReplicaInfo& self_info, diff --git a/platform/config/resdb_poc_config.h b/platform/config/resdb_poc_config.h index c0c73a5d..7a336589 100644 --- a/platform/config/resdb_poc_config.h +++ b/platform/config/resdb_poc_config.h @@ -1,22 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - #pragma once #include "platform/config/resdb_config.h" @@ -27,10 +8,16 @@ namespace resdb { class ResDBPoCConfig : public ResDBConfig { public: ResDBPoCConfig(const ResDBConfig& bft_config, - const ResConfigData& config_data, const ReplicaInfo& self_info, - const KeyInfo& private_key, + const std::vector<ReplicaInfo>& replicas, + const ReplicaInfo& self_info, const KeyInfo& private_key, + const CertificateInfo& public_key_cert_info); + + ResDBPoCConfig(const ResDBConfig& bft_config, + const ResConfigData& config_data, + const ReplicaInfo& self_info, const KeyInfo& private_key, const CertificateInfo& public_key_cert_info); + const ResDBConfig* GetBFTConfig() const; void SetMaxNonceBit(uint32_t bit); @@ -54,8 +41,8 @@ class ResDBPoCConfig : public ResDBConfig { uint32_t GetWokerNum(); void SetWorkerNum(uint32_t worker_num); - uint32_t GetMiningTime() const { return mining_time_ms_; } - void SetMiningTime(uint32_t time_ms) { mining_time_ms_ = time_ms; } + uint32_t GetMiningTime() const { return mining_time_ms_;} + void SetMiningTime(uint32_t time_ms) {mining_time_ms_ = time_ms;} private: uint32_t difficulty_ = 0; @@ -63,7 +50,7 @@ class ResDBPoCConfig : public ResDBConfig { uint32_t target_value_ = 0; uint32_t batch_num_ = 12000; uint32_t worker_num_ = 16; - uint32_t mining_time_ms_ = 60000; // 60s + uint32_t mining_time_ms_ = 60000; // 60s std::vector<ReplicaInfo> bft_replicas_; ResDBConfig bft_config_; }; diff --git a/platform/consensus/ordering/poc/pow/BUILD b/platform/consensus/ordering/poc/pow/BUILD new file mode 100644 index 00000000..175907ff --- /dev/null +++ b/platform/consensus/ordering/poc/pow/BUILD @@ -0,0 +1,202 @@ +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "miner_utils", + srcs = ["miner_utils.cpp"], + hdrs = ["miner_utils.h"], + deps = [ + "//common/crypto:signature_verifier", + "//platform/consensus/ordering/poc/proto:pow_cc_proto", + "@boost//:format", + ], +) + +cc_test( + name = "miner_utils_test", + srcs = ["miner_utils_test.cpp"], + deps = [ + ":miner_utils", + "//common/test:test_main", + "//platform/config:resdb_config_utils", + "//common/crypto:signature_verifier", + ], +) + +cc_library( + name = "merkle", + srcs = ["merkle.cpp"], + hdrs = ["merkle.h"], + deps = [ + ":miner_utils", + ], +) + +cc_test( + name = "merkle_test", + srcs = ["merkle_test.cpp"], + deps = [ + ":merkle", + "//common/test:test_main", + ], +) + +cc_library( + name = "miner", + srcs = ["miner.cpp"], + hdrs = ["miner.h"], + deps = [ + ":miner_utils", + "//platform/config:resdb_poc_config", + "//common/crypto:signature_verifier", + "//platform/consensus/ordering/poc/proto:pow_cc_proto", + "@boost//:format", + ], +) + +cc_test( + name = "miner_test", + srcs = ["miner_test.cpp"], + deps = [ + ":miner", + "//common/test:test_main", + "//platform/config:resdb_config_utils", + ], +) + +cc_library( + name = "miner_manager", + srcs = ["miner_manager.cpp"], + hdrs = ["miner_manager.h"], + deps = [ + "//platform/config:resdb_poc_config", + ], +) + +cc_library( + name = "block_manager", + srcs = ["block_manager.cpp"], + hdrs = ["block_manager.h"], + deps = [ + ":merkle", + ":miner", + "//common:comm", + "//platform/config:resdb_poc_config", + "//platform/consensus/ordering/poc/proto:pow_cc_proto", + "//platform/statistic:stats", + "//platform/proto:resdb_cc_proto", + ], +) + +cc_test( + name = "block_manager_test", + srcs = ["block_manager_test.cpp"], + deps = [ + ":block_manager", + "//common/test:test_main", + "//platform/config:resdb_config_utils", + "//common/crypto:key_generator", + ], +) + +cc_library( + name = "transaction_accessor", + srcs = ["transaction_accessor.cpp"], + hdrs = ["transaction_accessor.h"], + deps = [ + ":miner_manager", + "//interface/common:resdb_txn_accessor", + "//platform/common/queue:lock_free_queue", + "//common/utils:utils", + "//platform/config:resdb_poc_config", + "//platform/consensus/ordering/poc/proto:pow_cc_proto", + "//platform/statistic:stats", + ], +) + +cc_library( + name = "mock_transaction_accessor", + hdrs = ["mock_transaction_accessor.h"], + deps = [ + ":transaction_accessor", + ], +) + +cc_test( + name = "transaction_accessor_test", + srcs = ["transaction_accessor_test.cpp"], + deps = [ + ":transaction_accessor", + "//interface/rdbc:net_channel", + "//interface/common:mock_resdb_txn_accessor", + "//common/test:test_main", + "//platform/config:resdb_config_utils", + ], +) + +cc_library( + name = "consensus_service_pow", + srcs = ["consensus_service_pow.cpp"], + hdrs = ["consensus_service_pow.h"], + deps = [ + ":block_manager", + ":transaction_accessor", + ":pow_manager", + "//platform/common/queue:blocking_queue", + "//common/utils", + "//platform/consensus/ordering/poc/proto:pow_cc_proto", + "//platform/networkstrate:service_network", + "//platform/consensus/ordering/common/framework:consensus" + ], +) + +''' +cc_test( + name = "consensus_service_pow_test", + srcs = ["consensus_service_pow_test.cpp"], + deps = [ + ":consensus_service_pow", + "//common/test:test_main", + "//config:resdb_config_utils", + "//crypto:key_generator", + "//statistic:stats", + ], +) +''' + +cc_library( + name = "shift_manager", + srcs = ["shift_manager.cpp"], + hdrs = ["shift_manager.h"], + deps = [ + "//platform/config:resdb_poc_config", + "//platform/consensus/ordering/poc/proto:pow_cc_proto", + ], +) + + +cc_library( + name = "pow_manager", + srcs = ["pow_manager.cpp"], + hdrs = ["pow_manager.h"], + deps = [ + ":block_manager", + ":shift_manager", + ":transaction_accessor", + "//platform/networkstrate:replica_communicator", + "//platform/common/queue:blocking_queue", + "//platform/consensus/ordering/poc/proto:pow_cc_proto", + "//common/utils", + ], +) + +cc_test( + name = "pow_manager_test", + srcs = ["pow_manager_test.cpp"], + deps = [ + ":pow_manager", + ":mock_transaction_accessor", + "//platform/networkstrate:mock_replica_communicator", + "//common/test:test_main", + "//platform/config:resdb_config_utils", + ], +) diff --git a/platform/consensus/ordering/poc/pow/block_manager.cpp b/platform/consensus/ordering/poc/pow/block_manager.cpp new file mode 100644 index 00000000..d252aac0 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/block_manager.cpp @@ -0,0 +1,262 @@ +#include "platform/consensus/ordering/poc/pow/block_manager.h" + +#include <glog/logging.h> + +#include "common/utils/utils.h" +#include "common/crypto/signature_verifier.h" +#include "platform/consensus/ordering/poc/pow/merkle.h" +#include "platform/proto/resdb.pb.h" +#include "platform/consensus/ordering/poc/pow/miner_utils.h" +#include "platform/statistic/stats.h" + +namespace resdb { + + +BlockManager::BlockManager(const ResDBPoCConfig& config) : config_(config){ + miner_ = std::make_unique<Miner>(config); + global_stats_ = Stats::GetGlobalStats(); + last_update_time_ = 0; + //prometheus_handler_ = Stats::GetGlobalPrometheus(); +} + +void BlockManager::SaveClientTransactions(std::unique_ptr<BatchClientTransactions> client_request){ + for(const ClientTransactions& client_tx : client_request->transactions()){ + *request_candidate_.add_transactions() = client_tx; + } + if(request_candidate_.min_seq() == 0){ + request_candidate_.set_min_seq(client_request->min_seq()); + } + else { + request_candidate_.set_min_seq(std::min(request_candidate_.min_seq(), client_request->min_seq())); + } + request_candidate_.set_max_seq(std::max(request_candidate_.max_seq(), client_request->max_seq())); +} + +int BlockManager::SetNewMiningBlock( + std::unique_ptr<BatchClientTransactions> client_request) { +SaveClientTransactions(std::move(client_request)); + + std::unique_ptr<Block> new_block = std::make_unique<Block>(); + if (request_candidate_.min_seq() != GetLastSeq() + 1) { + LOG(ERROR) << "seq invalid:" << request_candidate_.min_seq() + << " last seq:" << GetLastSeq(); + return -2; + } + + int64_t new_time = 0; + for(ClientTransactions& client_tx : *request_candidate_.mutable_transactions()){ + new_time = client_tx.create_time(); + if(new_time>0){ + create_time_[client_tx.seq()] = new_time; + client_tx.clear_create_time(); + } + } + + new_block->mutable_header()->set_height(GetCurrentHeight() + 1); + *new_block->mutable_header()->mutable_pre_hash() = + GetPreviousBlcokHash(); // set the hash value of the parent block. + request_candidate_.SerializeToString(new_block->mutable_transaction_data()); + new_block->set_min_seq(request_candidate_.min_seq()); + new_block->set_max_seq(request_candidate_.max_seq()); + *new_block->mutable_header()->mutable_merkle_hash() = + Merkle::MakeHash(request_candidate_); + new_block->set_miner(config_.GetSelfInfo().id()); + new_block->set_block_time(GetCurrentTime()); + + LOG(ERROR) << "create new block:" << request_candidate_.transactions(0).create_time()<<"["<<new_block->min_seq()<<","<<new_block->max_seq()<<"]" << " miner:"<<config_.GetSelfInfo().id()<<" time:"<<new_block->block_time()<<" delay:"<<(new_block->block_time() - new_time)/1000000.0 << " current:"<<GetCurrentTime(); + new_mining_block_ = std::move(new_block); + miner_->SetSliceIdx(0); + return 0; +} + +Block* BlockManager::GetNewMiningBlock() { + return new_mining_block_ == nullptr ? nullptr : new_mining_block_.get(); +} + +// Mine the nonce. +absl::Status BlockManager::Mine() { + if (new_mining_block_ == nullptr) { + LOG(ERROR) << "don't contain mining block."; + return absl::InvalidArgumentError("height invalid"); + } + + if (new_mining_block_->header().height() != GetCurrentHeight() + 1) { + // a new block has been committed. + LOG(ERROR) << "new block height:" << new_mining_block_->header().height() + << " current height:" << GetCurrentHeight() << " not equal"; + return absl::InvalidArgumentError("height invalid"); + } + + return miner_->Mine(new_mining_block_.get()); +} + +int32_t BlockManager::GetSliceIdx() { return miner_->GetSliceIdx(); } + +int BlockManager::SetSliceIdx(const SliceInfo& slice_info) { + if (new_mining_block_ == nullptr) { + return 0; + } + if (slice_info.height() != new_mining_block_->header().height()) { + return -2; + } + size_t f = config_.GetMaxMaliciousReplicaNum(); + if(slice_info.shift_idx() > f ){ + // reset + LOG(ERROR)<<"reset slice"; + miner_->SetSliceIdx(0); + return 1; + } + miner_->SetSliceIdx(slice_info.shift_idx()); + return 0; +} + +int BlockManager::Commit() { + request_candidate_.Clear(); + uint64_t mining_time = GetCurrentTime() - new_mining_block_->block_time(); + new_mining_block_->set_mining_time(mining_time); + int ret = AddNewBlock(std::move(new_mining_block_)); + new_mining_block_ = nullptr; + return ret; +} + +// =============== Mining Related End ========================== +int BlockManager::Commit(std::unique_ptr<Block> new_block) { + return AddNewBlock(std::move(new_block)); +} + +int BlockManager::AddNewBlock(std::unique_ptr<Block> new_block) { + { + std::unique_lock<std::mutex> lck(mtx_); + if (new_block->header().height() != GetCurrentHeightNoLock() + 1) { + // a new block has been committed. + LOG(ERROR) << "new block height:" << new_block->header().height() + << " current height:" << GetCurrentHeightNoLock() + << " not equal"; + return -2; + } + LOG(INFO) << "============= commit new block:" + << new_block->header().height() + << " current height:" << GetCurrentHeightNoLock(); +LOG(ERROR)<<"commit:"<<new_block->header().height()<<" from:"<<new_block->miner()<<" mining time:"<<new_block->mining_time()/1000000.0; + //prometheus_handler_->SetValue(MINING_TIME, new_block->mining_time()/1000000.0); + //prometheus_handler_->Inc(MINING_LATENCY, new_block->mining_time()/1000000000.0); + miner_->Terminate(); + request_candidate_.Clear(); + block_list_.push_back(std::move(new_block)); + Execute(*block_list_.back()); + } + return 0; +} + +void BlockManager::Execute(const Block& block){ + +BatchClientTransactions batch_client_request; + if(!batch_client_request.ParseFromString(block.transaction_data())){ + LOG(ERROR)<<"parse client transaction fail"; + } + /* + uint64_t current_time = get_sys_clock(); + if(last_update_time_>0){ + LOG(ERROR)<<" update block:"<<(current_time - last_update_time_); + } + last_update_time_ = current_time; + */ + + LOG(ERROR)<<" execute seq:["<<batch_client_request.min_seq()<<","<<batch_client_request.max_seq()<<"]"; + uint64_t lat = 0; + int num = 0; + uint64_t total_tx = 0; + uint64_t run_time = GetCurrentTime(); + for(const ClientTransactions& client_tx : batch_client_request.transactions()){ + + BatchClientRequest batch_request; + if (!batch_request.ParseFromString(client_tx.transaction_data())) { + LOG(ERROR) << "parse data fail"; + } + total_tx += batch_request.client_requests_size(); + //global_stats_->IncTotalRequest(batch_request.client_requests_size()); + + if(block.miner() == config_.GetSelfInfo().id()){ + uint64_t create_time = create_time_[client_tx.seq()]; + uint64_t latency = run_time-create_time; + lat += latency; + num++; + //global_stats_->AddLatency(latency); + //LOG(ERROR)<<"get latency:"<<latency/1000000000.0<<" create time:"<<create_time<<" now:"<<run_time << " current time:"<<GetCurrentTime(); + /* + if(create_time>0){ + lat += latency; + num++; + global_stats_->AddLatency(latency); + } + */ + } + } + if(total_tx>0){ + uint64_t current_time = GetCurrentTime(); + if(last_update_time_>0){ + LOG(ERROR)<<" tps:"<<total_tx/((current_time - last_update_time_)/1000000.0)<< " wait:"<<((current_time - last_update_time_)/1000000.0); + //prometheus_handler_->SetValue(THROUGHPUT, total_tx/((current_time - last_update_time_)/1000000.0)); + } + last_update_time_ = current_time; + } + if(lat>0){ + //prometheus_handler_->SetValue(TRANSACTION_LATENCY, lat/1000000.0/num); + LOG(ERROR)<<" execute seq:["<<batch_client_request.min_seq()<<","<<batch_client_request.max_seq()<<"]:"<<" total:"<<total_tx<<" lat:"<<(lat/1000000.0/num); + } + else { + LOG(ERROR)<<" execute seq:["<<batch_client_request.min_seq()<<","<<batch_client_request.max_seq()<<"]:"" total:"<<total_tx; + } + +} + +bool BlockManager::VerifyBlock(const Block* block) { + if (!miner_->IsValidHash(block)) { + LOG(ERROR) << "hash not valid:" << block->hash().DebugString(); + return false; + } + + BatchClientTransactions client_request; + if (!client_request.ParseFromString(block->transaction_data())) { + LOG(ERROR) << "parse transaction fail"; + return false; + } + return Merkle::MakeHash(client_request) == block->header().merkle_hash(); +} + +uint64_t BlockManager::GetCurrentHeight() { + std::unique_lock<std::mutex> lck(mtx_); + return GetCurrentHeightNoLock(); +} + +uint64_t BlockManager::GetLastSeq() { + std::unique_lock<std::mutex> lck(mtx_); + return block_list_.empty() ? 0 : block_list_.back()->max_seq(); +} + +uint64_t BlockManager::GetLastCandidateSeq() { + return request_candidate_.max_seq(); +} + +uint64_t BlockManager::GetCurrentHeightNoLock() { + return block_list_.empty() ? 0 : block_list_.back()->header().height(); +} + +HashValue BlockManager::GetPreviousBlcokHash() { + std::unique_lock<std::mutex> lck(mtx_); + return block_list_.empty() ? HashValue() : block_list_.back()->hash(); +} + +Block* BlockManager::GetBlockByHeight(uint64_t height) { + std::unique_lock<std::mutex> lck(mtx_); + if (block_list_.empty() || block_list_.back()->header().height() < height) { + return nullptr; + } + return block_list_[height - 1].get(); +} + +void BlockManager::SetTargetValue(const HashValue& target_value) { + miner_->SetTargetValue(target_value); +} + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/block_manager.h b/platform/consensus/ordering/poc/pow/block_manager.h new file mode 100644 index 00000000..dbf626b8 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/block_manager.h @@ -0,0 +1,86 @@ +#pragma once + +#include "absl/status/status.h" +#include "platform/config/resdb_poc_config.h" +#include "platform/consensus/ordering/poc/pow/miner.h" +#include "platform/consensus/ordering/poc/proto/pow.pb.h" +#include "platform/statistic/stats.h" + +namespace resdb { + +// Manager all the blocks and mine the new blocks. +class BlockManager { + public: + BlockManager(const ResDBPoCConfig& config); + virtual ~BlockManager() = default; + // ================ mining a new block ============================ + // All the mining functions below are not thread safe. + // They should be run in the same thread. + + // Setup a new mining block with the transactions in the client_request. + // new block will be saved to 'new_mining_block_'. + int SetNewMiningBlock( + std::unique_ptr<BatchClientTransactions> client_request); + + // Get the new mining block set from SetMiningBlock(). + Block* GetNewMiningBlock(); + + // Mine the nonce using the local slice. + virtual absl::Status Mine(); + + // Shift the slice. + virtual int SetSliceIdx(const SliceInfo& slice_info); + + // Commit the new mining block. + int Commit(); + + // =============================================================== + + // Commit a new block other than the new mining block. + // The new mining block should call the Commit() to commit. + int Commit(std::unique_ptr<Block> new_block); + + // 1 verify the hash value + // 2 verify the transaction + bool VerifyBlock(const Block* block); + + // Obtain the block with height 'height'. + Block* GetBlockByHeight(uint64_t height); + + // Get the height of committed block with max height. + uint64_t GetCurrentHeight(); + uint64_t GetLastSeq(); + uint64_t GetLastCandidateSeq(); + + // Obtain the slice shift of the current mining block. + int32_t GetSliceIdx(); + + // Update the miner info. + void SetTargetValue(const HashValue& target_value); + + private: + HashValue GetPreviousBlcokHash(); + int AddNewBlock(std::unique_ptr<Block> new_block); + uint64_t GetCurrentHeightNoLock(); + void Execute(const Block& block); + void SaveClientTransactions(std::unique_ptr<BatchClientTransactions> client_request); + + private: +ResDBPoCConfig config_; + std::mutex mtx_; + std::unique_ptr<Miner> miner_; + // Blocks that have been committed. + // TODO move to executor and write to ds. + std::vector<std::unique_ptr<Block> > block_list_ GUARDED_BY(mtx_); + // The current minning block. + std::unique_ptr<Block> new_mining_block_; + BatchClientTransactions request_candidate_; + + Stats* global_stats_; + uint64_t first_block_time_ = 0, first_mine_time_= 0; + std::map<uint64_t, uint64_t> create_time_; + std::atomic<uint64_t> last_update_time_; + PrometheusHandler * prometheus_handler_; +}; + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/block_manager_test.cpp b/platform/consensus/ordering/poc/pow/block_manager_test.cpp new file mode 100644 index 00000000..1862f61b --- /dev/null +++ b/platform/consensus/ordering/poc/pow/block_manager_test.cpp @@ -0,0 +1,400 @@ +#include "platform/consensus/ordering/poc/pow/block_manager.h" + +#include <glog/logging.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include "common/test/test_macros.h" +#include "platform/config/resdb_config_utils.h" +#include "common/crypto/key_generator.h" +#include "common/crypto/signature_verifier.h" +#include "platform/consensus/ordering/poc/pow/merkle.h" + +namespace resdb { +namespace { + +using ::resdb::testing::EqualsProto; +using ::testing::ElementsAre; +using ::testing::Invoke; +using ::testing::Pair; +using ::testing::Pointee; +using ::testing::Test; + +KeyInfo GetKeyInfo(SecretKey key) { + KeyInfo info; + info.set_key(key.private_key()); + info.set_hash_type(key.hash_type()); + return info; +} + +KeyInfo GetPublicKeyInfo(SecretKey key) { + KeyInfo info; + info.set_key(key.public_key()); + info.set_hash_type(key.hash_type()); + return info; +} + +CertificateInfo GetCertInfo(int64_t node_id) { + CertificateInfo cert_info; + cert_info.set_node_id(node_id); + return cert_info; +} + +class BlockManagerTest : public Test { + protected: + BlockManagerTest() + : bft_config_({GenerateReplicaInfo(1, "127.0.0.1", 1234), + GenerateReplicaInfo(2, "127.0.0.1", 1235), + GenerateReplicaInfo(3, "127.0.0.1", 1236), + GenerateReplicaInfo(4, "127.0.0.1", 1237)}, + GenerateReplicaInfo(3, "127.0.0.1", 1234), KeyInfo(), + CertificateInfo()), + config_(bft_config_, + {GenerateReplicaInfo(1, "127.0.0.1", 1234), + GenerateReplicaInfo(2, "127.0.0.1", 1235), + GenerateReplicaInfo(3, "127.0.0.1", 1236), + GenerateReplicaInfo(4, "127.0.0.1", 1237)}, + GenerateReplicaInfo(1, "127.0.0.1", 1234), KeyInfo(), + CertificateInfo()) { + config_.SetMaxNonceBit(10); + config_.SetDifficulty(1); + } + + Block* GenerateNewBlock( + BlockManager* block_manager, + std::unique_ptr<BatchClientTransactions> client_request) { + if (block_manager->SetNewMiningBlock(std::move(client_request)) != 0) { + return nullptr; + } + return block_manager->GetNewMiningBlock(); + } + + Block GenerateExpectedBlock( + const BatchClientTransactions& batch_client_request, int height) { + Block expected_block; + expected_block.mutable_header()->set_height(height); + batch_client_request.SerializeToString( + expected_block.mutable_transaction_data()); + *expected_block.mutable_header()->mutable_pre_hash() = HashValue(); + *expected_block.mutable_header()->mutable_merkle_hash() = + Merkle::MakeHash(batch_client_request); + expected_block.set_min_seq(batch_client_request.min_seq()); + expected_block.set_max_seq(batch_client_request.max_seq()); + return expected_block; + } + + ResDBConfig bft_config_; + ResDBPoCConfig config_; +}; + +TEST_F(BlockManagerTest, GenerateNewBlock) { + BlockManager block_manager(config_); + std::unique_ptr<BatchClientTransactions> batch_client_request = + std::make_unique<BatchClientTransactions>(); + auto client_request = batch_client_request->add_transactions(); + client_request->set_seq(1); + client_request->set_transaction_data("test"); + batch_client_request->set_min_seq(1); + batch_client_request->set_max_seq(1); + + Block expected_block = GenerateExpectedBlock(*batch_client_request, 1); + + Block* block = + GenerateNewBlock(&block_manager, std::move(batch_client_request)); + + EXPECT_THAT(block, Pointee(EqualsProto(expected_block))); +} + +TEST_F(BlockManagerTest, ResetSliceFail) { + BlockManager block_manager(config_); + BatchClientTransactions batch_client_request; + ClientTransactions* client_request = batch_client_request.add_transactions(); + client_request->set_seq(1); + client_request->set_transaction_data("test"); + batch_client_request.set_min_seq(1); + batch_client_request.set_max_seq(1); + + Block expected_block = GenerateExpectedBlock(batch_client_request, 1); + + Block* block = GenerateNewBlock( + &block_manager, + std::make_unique<BatchClientTransactions>(batch_client_request)); + EXPECT_THAT(block, Pointee(EqualsProto(expected_block))); + + SliceInfo slice_info; + slice_info.set_shift_idx(2); + slice_info.set_height(2); + block_manager.SetSliceIdx(slice_info); + EXPECT_EQ(block_manager.GetSliceIdx(), 0); +} + +TEST_F(BlockManagerTest, ResetSlice) { + BlockManager block_manager(config_); + BatchClientTransactions batch_client_request; + ClientTransactions* client_request = batch_client_request.add_transactions(); + client_request->set_seq(1); + client_request->set_transaction_data("test"); + batch_client_request.set_min_seq(1); + batch_client_request.set_max_seq(1); + + Block expected_block = GenerateExpectedBlock(batch_client_request, 1); + + Block* block = GenerateNewBlock( + &block_manager, + std::make_unique<BatchClientTransactions>(batch_client_request)); + EXPECT_THAT(block, Pointee(EqualsProto(expected_block))); + + SliceInfo slice_info; + slice_info.set_shift_idx(2); + slice_info.set_height(1); + block_manager.SetSliceIdx(slice_info); + EXPECT_EQ(block_manager.GetSliceIdx(), 2); + + block = GenerateNewBlock( + &block_manager, + std::make_unique<BatchClientTransactions>(batch_client_request)); + EXPECT_THAT(block, Pointee(EqualsProto(expected_block))); + + EXPECT_EQ(block_manager.GetSliceIdx(), 0); +} + +// successfully find out the solution. +TEST_F(BlockManagerTest, Mine) { + BlockManager block_manager(config_); + std::unique_ptr<BatchClientTransactions> batch_client_request = + std::make_unique<BatchClientTransactions>(); + auto client_request = batch_client_request->add_transactions(); + client_request->set_seq(1); + client_request->set_transaction_data("test"); + batch_client_request->set_min_seq(1); + batch_client_request->set_max_seq(1); + + Block expected_block = GenerateExpectedBlock(*batch_client_request, 1); + + Block* block = + GenerateNewBlock(&block_manager, std::move(batch_client_request)); + EXPECT_THAT(block, Pointee(EqualsProto(expected_block))); + + EXPECT_TRUE(block_manager.Mine().ok()); +} + +// could not find the solution. +TEST_F(BlockManagerTest, MineFail) { + config_.SetDifficulty(20); + BlockManager block_manager(config_); + std::unique_ptr<BatchClientTransactions> batch_client_request = + std::make_unique<BatchClientTransactions>(); + auto client_request = batch_client_request->add_transactions(); + client_request->set_seq(1); + client_request->set_transaction_data("test"); + batch_client_request->set_min_seq(1); + batch_client_request->set_max_seq(1); + + Block expected_block = GenerateExpectedBlock(*batch_client_request, 1); + + Block* block = + GenerateNewBlock(&block_manager, std::move(batch_client_request)); + EXPECT_THAT(block, Pointee(EqualsProto(expected_block))); + + EXPECT_FALSE(block_manager.Mine().ok()); +} + +// seq is too larger. +TEST_F(BlockManagerTest, MineSeqNotValid) { + BlockManager block_manager(config_); + std::unique_ptr<BatchClientTransactions> batch_client_request = + std::make_unique<BatchClientTransactions>(); + auto client_request = batch_client_request->add_transactions(); + client_request->set_seq(2); + client_request->set_transaction_data("test"); + batch_client_request->set_min_seq(2); + batch_client_request->set_max_seq(2); + + Block* block = + GenerateNewBlock(&block_manager, std::move(batch_client_request)); + EXPECT_EQ(block, nullptr); +} + +// Commit a mined block. +TEST_F(BlockManagerTest, Commit) { + BlockManager block_manager(config_); + std::unique_ptr<BatchClientTransactions> batch_client_request = + std::make_unique<BatchClientTransactions>(); + auto client_request = batch_client_request->add_transactions(); + client_request->set_seq(1); + client_request->set_transaction_data("test"); + batch_client_request->set_min_seq(1); + batch_client_request->set_max_seq(1); + + Block expected_block = GenerateExpectedBlock(*batch_client_request, 1); + + Block* block = + GenerateNewBlock(&block_manager, std::move(batch_client_request)); + EXPECT_THAT(block, Pointee(EqualsProto(expected_block))); + EXPECT_TRUE(block_manager.Mine().ok()); + EXPECT_GT(block->header().nonce(), 0); + + EXPECT_EQ(block_manager.Commit(), 0); + + EXPECT_TRUE(block_manager.GetBlockByHeight(1) != nullptr); +} + +TEST_F(BlockManagerTest, MineACommittedSeq) { + BlockManager block_manager(config_); + { + // Commit seq 1 + std::unique_ptr<BatchClientTransactions> batch_client_request = + std::make_unique<BatchClientTransactions>(); + auto client_request = batch_client_request->add_transactions(); + client_request->set_seq(1); + client_request->set_transaction_data("test"); + batch_client_request->set_min_seq(1); + batch_client_request->set_max_seq(1); + + Block expected_block = GenerateExpectedBlock(*batch_client_request, 1); + + Block* block = + GenerateNewBlock(&block_manager, std::move(batch_client_request)); + EXPECT_THAT(block, Pointee(EqualsProto(expected_block))); + + EXPECT_TRUE(block_manager.Mine().ok()); + EXPECT_EQ(block_manager.Commit(), 0); + } + { + // Fail to mind seq 1 again. + std::unique_ptr<BatchClientTransactions> batch_client_request = + std::make_unique<BatchClientTransactions>(); + auto client_request = batch_client_request->add_transactions(); + client_request->set_seq(1); + client_request->set_transaction_data("test"); + batch_client_request->set_min_seq(1); + batch_client_request->set_max_seq(1); + + Block expected_block = GenerateExpectedBlock(*batch_client_request, 1); + + Block* block = + GenerateNewBlock(&block_manager, std::move(batch_client_request)); + EXPECT_EQ(block, nullptr); + } +} + +// Commit a mined block with invalid height. +TEST_F(BlockManagerTest, CommitWithInvalidHeight) { + BlockManager block_manager(config_); + std::unique_ptr<BatchClientTransactions> batch_client_request = + std::make_unique<BatchClientTransactions>(); + auto client_request = batch_client_request->add_transactions(); + client_request->set_seq(1); + client_request->set_transaction_data("test"); + batch_client_request->set_min_seq(1); + batch_client_request->set_max_seq(1); + + Block expected_block = GenerateExpectedBlock(*batch_client_request, 1); + + Block* block = + GenerateNewBlock(&block_manager, std::move(batch_client_request)); + EXPECT_THAT(block, Pointee(EqualsProto(expected_block))); + EXPECT_TRUE(block_manager.Mine().ok()); + EXPECT_GT(block->header().nonce(), 0); + + std::unique_ptr<Block> old_block = std::make_unique<Block>(*block); + old_block->mutable_header()->set_height(0); + EXPECT_EQ(block_manager.Commit(), 0); + EXPECT_NE(block_manager.Commit(std::move(old_block)), 0); +} + +// Mine a block after it is committed from others. +TEST_F(BlockManagerTest, MineCommittedBlock) { + BlockManager block_manager(config_); + std::unique_ptr<BatchClientTransactions> batch_client_request = + std::make_unique<BatchClientTransactions>(); + auto client_request = batch_client_request->add_transactions(); + client_request->set_seq(1); + client_request->set_transaction_data("test"); + batch_client_request->set_min_seq(1); + batch_client_request->set_max_seq(1); + + Block expected_block = GenerateExpectedBlock(*batch_client_request, 1); + + Block* block = + GenerateNewBlock(&block_manager, std::move(batch_client_request)); + std::unique_ptr<Block> d_block = std::make_unique<Block>(*block); + + EXPECT_THAT(block, Pointee(EqualsProto(expected_block))); + EXPECT_TRUE(block_manager.Mine().ok()); + EXPECT_GT(block->header().nonce(), 0); + + EXPECT_EQ(block_manager.Commit(std::move(d_block)), 0); + + // mind the committed block, d_block + EXPECT_FALSE(block_manager.Mine().ok()); +} + +// Verify a mined block with invalid hash. +TEST_F(BlockManagerTest, VerifyWithInvalidHash) { + BlockManager block_manager(config_); + std::unique_ptr<BatchClientTransactions> batch_client_request = + std::make_unique<BatchClientTransactions>(); + auto client_request = batch_client_request->add_transactions(); + client_request->set_seq(1); + client_request->set_transaction_data("test"); + batch_client_request->set_min_seq(1); + batch_client_request->set_max_seq(1); + + Block expected_block = GenerateExpectedBlock(*batch_client_request, 1); + + Block* block = + GenerateNewBlock(&block_manager, std::move(batch_client_request)); + EXPECT_THAT(block, Pointee(EqualsProto(expected_block))); + EXPECT_TRUE(block_manager.Mine().ok()); + EXPECT_GT(block->header().nonce(), 0); + + *block->mutable_hash() = HashValue(); + EXPECT_FALSE(block_manager.VerifyBlock(block)); +} + +TEST_F(BlockManagerTest, VerifyBlockWithInvalidTxn) { + BlockManager block_manager(config_); + std::unique_ptr<BatchClientTransactions> batch_client_request = + std::make_unique<BatchClientTransactions>(); + auto client_request = batch_client_request->add_transactions(); + client_request->set_seq(1); + client_request->set_transaction_data("test"); + batch_client_request->set_min_seq(1); + batch_client_request->set_max_seq(1); + + Block expected_block = GenerateExpectedBlock(*batch_client_request, 1); + + Block* block = + GenerateNewBlock(&block_manager, std::move(batch_client_request)); + EXPECT_THAT(block, Pointee(EqualsProto(expected_block))); + EXPECT_TRUE(block_manager.Mine().ok()); + EXPECT_GT(block->header().nonce(), 0); + block->set_transaction_data("test1"); + + EXPECT_FALSE(block_manager.VerifyBlock(block)); +} + +TEST_F(BlockManagerTest, VerifyBlock) { + BlockManager block_manager(config_); + std::unique_ptr<BatchClientTransactions> batch_client_request = + std::make_unique<BatchClientTransactions>(); + auto client_request = batch_client_request->add_transactions(); + client_request->set_seq(1); + client_request->set_transaction_data("test"); + batch_client_request->set_min_seq(1); + batch_client_request->set_max_seq(1); + + Block expected_block = GenerateExpectedBlock(*batch_client_request, 1); + + Block* block = + GenerateNewBlock(&block_manager, std::move(batch_client_request)); + EXPECT_TRUE(block_manager.Mine().ok()); + LOG(ERROR) << "block:" << block->DebugString(); + EXPECT_GT(block->header().nonce(), 0); + + EXPECT_TRUE(block_manager.VerifyBlock(block)); +} + +} // namespace +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/consensus_service_pow.cpp b/platform/consensus/ordering/poc/pow/consensus_service_pow.cpp new file mode 100644 index 00000000..2f3f4b1d --- /dev/null +++ b/platform/consensus/ordering/poc/pow/consensus_service_pow.cpp @@ -0,0 +1,53 @@ +#include "platform/consensus/ordering/poc/pow/consensus_service_pow.h" + +#include "common/utils/utils.h" +#include "glog/logging.h" + +namespace resdb { + +ConsensusServicePoW::ConsensusServicePoW(const ResDBPoCConfig& config) + : common::Consensus(config, nullptr) { + miner_manager_ = std::make_unique<MinerManager>(config); + pow_manager_ = std::make_unique<PoWManager>(config,GetBroadCastClient()); +} + +void ConsensusServicePoW::Start() { + common::Consensus::Start(); + pow_manager_->Start(); +} + +ConsensusServicePoW::~ConsensusServicePoW() { +} + +std::vector<ReplicaInfo> ConsensusServicePoW::GetReplicas() { + return miner_manager_->GetReplicas(); +} + +int ConsensusServicePoW::ProcessCustomConsensus(std::unique_ptr<Request> request) { + LOG(ERROR) << "recv impl type:" << request->type() << " " + << request->client_info().DebugString() + << "sender id:" << request->sender_id(); + switch (request->type()) { + case PoWRequest::TYPE_COMMITTED_BLOCK: { + std::unique_ptr<Block> block = std::make_unique<Block>(); + if (block->ParseFromString(request->data())) { + pow_manager_->Commit(std::move(block)); + } + break; + } + case PoWRequest::TYPE_SHIFT_MSG: { + std::unique_ptr<SliceInfo> slice_info = std::make_unique<SliceInfo>(); + if (slice_info->ParseFromString(request->data())) { + pow_manager_->AddShiftMsg(*slice_info); + } + else { + LOG(ERROR)<<"parse info fail"; + } + break; + } + } + + return 0; +} + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/consensus_service_pow.h b/platform/consensus/ordering/poc/pow/consensus_service_pow.h new file mode 100644 index 00000000..37f13e3e --- /dev/null +++ b/platform/consensus/ordering/poc/pow/consensus_service_pow.h @@ -0,0 +1,27 @@ +#pragma once + +#include "platform/config/resdb_poc_config.h" +#include "platform/consensus/ordering/poc/pow/pow_manager.h" +#include "platform/consensus/ordering/poc/pow/miner_manager.h" +#include "platform/consensus/ordering/common/framework/consensus.h" +#include "platform/networkstrate/consensus_manager.h" + +namespace resdb { + +class ConsensusServicePoW : public common::Consensus { + public: + ConsensusServicePoW(const ResDBPoCConfig& config); + virtual ~ConsensusServicePoW(); + + // Start the service. + void Start() override; + + int ProcessCustomConsensus(std::unique_ptr<Request> request) override; + std::vector<ReplicaInfo> GetReplicas() override; + + protected: + std::unique_ptr<PoWManager> pow_manager_; + std::unique_ptr<MinerManager> miner_manager_; +}; + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/consensus_service_pow_test.cpp b/platform/consensus/ordering/poc/pow/consensus_service_pow_test.cpp new file mode 100644 index 00000000..40de6fc7 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/consensus_service_pow_test.cpp @@ -0,0 +1,330 @@ +#include "ordering/poc/pow/consensus_service_pow.h" + +#include <glog/logging.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <future> + +#include "common/test/test_macros.h" +#include "config/resdb_config_utils.h" +#include "crypto/key_generator.h" +#include "crypto/signature_verifier.h" +#include "statistic/stats.h" + +namespace resdb { +namespace { + +using ::resdb::testing::EqualsProto; +using ::testing::ElementsAre; +using ::testing::Invoke; +using ::testing::Pair; +using ::testing::Pointee; +using ::testing::Test; + +KeyInfo GetKeyInfo(SecretKey key) { + KeyInfo info; + info.set_key(key.private_key()); + info.set_hash_type(key.hash_type()); + return info; +} + +KeyInfo GetPublicKeyInfo(SecretKey key) { + KeyInfo info; + info.set_key(key.public_key()); + info.set_hash_type(key.hash_type()); + return info; +} + +CertificateInfo GetCertInfo(int64_t node_id) { + CertificateInfo cert_info; + cert_info.set_node_id(node_id); + return cert_info; +} + +class MockConsensusServicePoW : public ConsensusServicePoW { + public: + MockConsensusServicePoW(const ResDBPoCConfig& config) + : ConsensusServicePoW(config) {} + + MOCK_METHOD(std::unique_ptr<BatchClientTransactions>, GetClientTransactions, + (uint64_t), (override)); + MOCK_METHOD(int, BroadCastNewBlock, (const Block&), (override)); + MOCK_METHOD(int, BroadCastShiftMsg, (const SliceInfo&), (override)); + MOCK_METHOD(void, BroadCast, (const Request&), (override)); +}; + +class ConsensusServicePoWTest : public Test { + protected: + ConsensusServicePoWTest() + : stats_(Stats::GetGlobalStats(/*sleep_seconds = */ 1)), + bft_config_({GenerateReplicaInfo(1, "127.0.0.1", 1234), + GenerateReplicaInfo(2, "127.0.0.1", 1235), + GenerateReplicaInfo(3, "127.0.0.1", 1236), + GenerateReplicaInfo(4, "127.0.0.1", 1237)}, + GenerateReplicaInfo(3, "127.0.0.1", 1234), KeyInfo(), + CertificateInfo()), + config_(bft_config_, + {GenerateReplicaInfo(1, "127.0.0.1", 1234), + GenerateReplicaInfo(2, "127.0.0.1", 1235), + GenerateReplicaInfo(3, "127.0.0.1", 1236), + GenerateReplicaInfo(4, "127.0.0.1", 1237)}, + GenerateReplicaInfo(2, "127.0.0.1", 1234), KeyInfo(), + CertificateInfo()) { + config_.SetMaxNonceBit(10); + config_.SetDifficulty(1); + } + + Stats* stats_; + ResDBConfig bft_config_; + ResDBPoCConfig config_; +}; + +TEST_F(ConsensusServicePoWTest, MineOneBlock) { + std::mutex mtx; + std::condition_variable cv; + + MockConsensusServicePoW service(config_); + EXPECT_CALL(service, GetClientTransactions) + .WillRepeatedly(Invoke([&](uint64_t seq) { + std::unique_ptr<BatchClientTransactions> batch_client_request = + std::make_unique<BatchClientTransactions>(); + ClientTransactions* client_request = + batch_client_request->add_transactions(); + client_request->set_seq(seq); + client_request->set_transaction_data("test"); + + batch_client_request->set_min_seq(seq); + batch_client_request->set_max_seq(seq); + LOG(ERROR) << "get seq:" << seq; + return batch_client_request; + })); + + EXPECT_CALL(service, BroadCastNewBlock) + .WillRepeatedly(Invoke([&](const Block& block) { + std::unique_lock<std::mutex> lck(mtx); + cv.notify_all(); + return 0; + })); + + service.Start(); + { + std::unique_lock<std::mutex> lck(mtx); + cv.wait(lck); + } + service.Stop(); +} + +TEST_F(ConsensusServicePoWTest, ReceiveCommittedBlock) { + Block new_block; + // Server 1 mines a new block. + { + std::mutex mtx; + std::condition_variable cv; + + MockConsensusServicePoW service(config_); + EXPECT_CALL(service, GetClientTransactions) + .WillRepeatedly(Invoke([&](uint64_t seq) { + std::unique_ptr<BatchClientTransactions> batch_client_request = + std::make_unique<BatchClientTransactions>(); + ClientTransactions* client_request = + batch_client_request->add_transactions(); + client_request->set_seq(seq); + client_request->set_transaction_data("test"); + + batch_client_request->set_min_seq(seq); + batch_client_request->set_max_seq(seq); + return batch_client_request; + })); + + EXPECT_CALL(service, BroadCastNewBlock) + .WillRepeatedly(Invoke([&](const Block& block) { + new_block = block; + std::unique_lock<std::mutex> lck(mtx); + cv.notify_all(); + return 0; + })); + + service.Start(); + { + std::unique_lock<std::mutex> lck(mtx); + cv.wait(lck); + } + service.Stop(); + } + + LOG(ERROR) << "start server 2"; + std::unique_ptr<Request> request = std::make_unique<Request>(); + new_block.SerializeToString(request->mutable_data()); + request->set_type(PoWRequest::TYPE_COMMITTED_BLOCK); + + MockConsensusServicePoW service2(config_); + service2.Start(); + EXPECT_EQ( + service2.ConsensusCommit(std::unique_ptr<Context>(), std::move(request)), + 0); + service2.Stop(); +} + +/* +TEST_F(ConsensusServicePoWTest, ReceiveShift) { + std::mutex mtx; + std::condition_variable cv; + + std::condition_variable shift_cv; + + config_.SetMaxNonceBit(8); + config_.SetDifficulty(0); + + MockConsensusServicePoW service(config_); + EXPECT_CALL(service, GetClientTransactions) + .WillOnce(Invoke([&](uint64_t seq) { + auto client_request = std::make_unique<ClientTransactions>(); + client_request->set_seq(seq); + client_request->set_transaction_data("test"); + return client_request; + })); + + EXPECT_CALL(service, BroadCastShiftMsg) + .WillRepeatedly(Invoke([&](const SliceInfo& slice_info) { + std::unique_lock<std::mutex> lck(mtx); + if(slice_info.shift_idx()==1){ + cv.notify_all(); +return 0; + } + shift_cv.notify_all(); + return 0; + })); + + service.Start(); + { + std::unique_lock<std::mutex> lck(mtx); + shift_cv.wait(lck); + for (int i = 0; i < 3; ++i) { + SliceInfo slice_info; + slice_info.set_height(1); + slice_info.set_shift_idx(0); + slice_info.set_sender(i + 1); + + std::unique_ptr<Request> request = std::make_unique<Request>(); + slice_info.SerializeToString(request->mutable_data()); + request->set_type(PoWRequest::TYPE_SHIFT_MSG); + service.ConsensusCommit(std::unique_ptr<Context>(), std::move(request)); + } + } + { + std::unique_lock<std::mutex> lck(mtx); + cv.wait(lck); + } + service.Stop(); +} + +TEST_F(ConsensusServicePoWTest, ReceiveShiftMore) { + std::mutex mtx; + std::condition_variable cv; + + std::condition_variable shift_cv; + + config_.SetMaxNonceBit(7); + config_.SetDifficulty(0); + + MockConsensusServicePoW service(config_); + EXPECT_CALL(service, GetClientTransactions) + .WillOnce(Invoke([&](uint64_t seq) { + auto client_request = std::make_unique<ClientTransactions>(); + client_request->set_seq(seq); + client_request->set_transaction_data("test"); + return client_request; + })); + + EXPECT_CALL(service, BroadCastShiftMsg) + .WillRepeatedly(Invoke([&](const SliceInfo& slice_info) { + std::unique_lock<std::mutex> lck(mtx); + if(slice_info.shift_idx()==2){ + cv.notify_all(); + return 0; + } + shift_cv.notify_all(); + return 0; + })); + + service.Start(); + for(int k = 0; k <2; ++k) { + std::unique_lock<std::mutex> lck(mtx); + shift_cv.wait(lck); + LOG(ERROR)<<"======:"<<k; + for (int i = 0; i < 3; ++i) { + SliceInfo slice_info; + slice_info.set_height(1); + slice_info.set_shift_idx(k); + slice_info.set_sender(i + 1); + + std::unique_ptr<Request> request = std::make_unique<Request>(); + slice_info.SerializeToString(request->mutable_data()); + request->set_type(PoWRequest::TYPE_SHIFT_MSG); + service.ConsensusCommit(std::unique_ptr<Context>(), std::move(request)); + } + } + { + std::unique_lock<std::mutex> lck(mtx); + cv.wait(lck); + } + service.Stop(); +} + +// The shift messages arrive before the mining id done. +TEST_F(ConsensusServicePoWTest, ReceiveShiftEarly) { + std::mutex mtx; + std::condition_variable cv; + + std::condition_variable shift_cv; + + config_.SetMaxNonceBit(8); + config_.SetDifficulty(0); + + MockConsensusServicePoW service(config_); + EXPECT_CALL(service, GetClientTransactions) + .WillRepeatedly(Invoke([&](uint64_t seq) { + auto client_request = std::make_unique<ClientTransactions>(); + client_request->set_seq(seq); + client_request->set_transaction_data("test"); + return client_request; + })); + + EXPECT_CALL(service, BroadCastShiftMsg) + .WillOnce(Invoke([&](const SliceInfo& slice_info) { + // Receive the shift messages first. + { + for (int i = 0; i < 3; ++i) { + SliceInfo slice_info; + slice_info.set_height(1); + slice_info.set_shift_idx(1); + slice_info.set_sender(i + 1); + + std::unique_ptr<Request> request = std::make_unique<Request>(); + slice_info.SerializeToString(request->mutable_data()); + request->set_type(PoWRequest::TYPE_SHIFT_MSG); + service.ConsensusCommit(std::unique_ptr<Context>(), + std::move(request)); + } + } + return 0; + })); + + EXPECT_CALL(service, BroadCastNewBlock) + .WillRepeatedly(Invoke([&](const Block& block) { + std::unique_lock<std::mutex> lck(mtx); + cv.notify_all(); + return 0; + })); + service.Start(); + { + std::unique_lock<std::mutex> lck(mtx); + cv.wait(lck); + } + service.Stop(); +} +*/ + +} // namespace +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/merkle.cpp b/platform/consensus/ordering/poc/pow/merkle.cpp new file mode 100644 index 00000000..2475e2d3 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/merkle.cpp @@ -0,0 +1,28 @@ +#include "platform/consensus/ordering/poc/pow/merkle.h" + +#include "platform/consensus/ordering/poc/pow/miner_utils.h" + +namespace resdb { +namespace { + +std::string TrverseMakeHash(const BatchClientTransactions& transaction, + int l_idx, int r_idx) { + if (l_idx == r_idx) { + return GetHashValue(transaction.transactions(l_idx).transaction_data()); + } + int mid = (l_idx + r_idx) >> 1; + + std::string l_chd = TrverseMakeHash(transaction, l_idx, mid); + std::string r_chd = TrverseMakeHash(transaction, mid + 1, r_idx); + return GetHashValue(l_chd + r_chd); +} + +} // namespace + +HashValue Merkle::MakeHash(const BatchClientTransactions& transaction) { + std::string root_hash = + TrverseMakeHash(transaction, 0, transaction.transactions_size() - 1); + return DigestToHash(root_hash); +} + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/merkle.h b/platform/consensus/ordering/poc/pow/merkle.h new file mode 100644 index 00000000..ef0fe468 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/merkle.h @@ -0,0 +1,12 @@ +#pragma once + +#include "platform/consensus/ordering/poc/proto/pow.pb.h" + +namespace resdb { + +class Merkle { + public: + static HashValue MakeHash(const BatchClientTransactions& transaction); +}; + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/merkle_test.cpp b/platform/consensus/ordering/poc/pow/merkle_test.cpp new file mode 100644 index 00000000..897afa02 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/merkle_test.cpp @@ -0,0 +1,46 @@ +#include "platform/consensus/ordering/poc/pow/merkle.h" + +#include <glog/logging.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <boost/format.hpp> + +#include "common/test/test_macros.h" +#include "platform/consensus/ordering/poc/pow/miner_utils.h" + +namespace resdb { +namespace { + +TEST(MerkleTest, GetHashFromOneTxn) { + BatchClientTransactions transactions; + transactions.add_transactions()->set_transaction_data("txn_1"); + HashValue root_value = Merkle::MakeHash(transactions); + + EXPECT_EQ(GetDigestHexString(GetHashDigest(root_value)), + "875c5b1c32e0fd4826f1c8191a2a8abf840936e5a2aef9bba5ac8c4f54c14129"); +} + +TEST(MerkleTest, GetHash) { + BatchClientTransactions transactions; + transactions.add_transactions()->set_transaction_data("txn_1"); + transactions.add_transactions()->set_transaction_data("txn_2"); + HashValue root_value = Merkle::MakeHash(transactions); + + EXPECT_EQ(GetDigestHexString(GetHashDigest(root_value)), + "c659bb4eed97cf5bcc375df90b78742762ac731be916b19ce7efc8217bf33b0a"); +} + +TEST(MerkleTest, GetHashFromThreeTxn) { + BatchClientTransactions transactions; + transactions.add_transactions()->set_transaction_data("txn_1"); + transactions.add_transactions()->set_transaction_data("txn_2"); + transactions.add_transactions()->set_transaction_data("txn_3"); + HashValue root_value = Merkle::MakeHash(transactions); + + EXPECT_EQ(GetDigestHexString(GetHashDigest(root_value)), + "1cee75f552290940e8f7b195b5e573fafe3db67eadcea19a76bcecd1df94d8cd"); +} + +} // namespace +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/miner.cpp b/platform/consensus/ordering/poc/pow/miner.cpp new file mode 100644 index 00000000..cd581400 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/miner.cpp @@ -0,0 +1,161 @@ +#include "platform/consensus/ordering/poc/pow/miner.h" + +#include <assert.h> +#include <glog/logging.h> + +#include <boost/format.hpp> +#include <thread> + +#include "common/crypto/signature_verifier.h" +#include "platform/consensus/ordering/poc/pow/miner_utils.h" + +namespace resdb { + +Miner::Miner(const ResDBPoCConfig& config) : config_(config) { + // the number of zeros ahead of the binary value. + difficulty_ = config_.GetDifficulty(); + worker_num_ = config_.GetWokerNum(); + assert(difficulty_ > 0); + LOG(INFO) << " target config value: difficulty:" << difficulty_; + SetSliceIdx(0); +} + +std::vector<std::pair<uint64_t, uint64_t>> Miner::GetMiningSlices() { + return mining_slices_; +} + +int32_t Miner::GetSliceIdx() const { return shift_idx_; } + +void Miner::SetTargetValue(const HashValue& target_value) { + target_value_ = target_value; + LOG(INFO) << " set target value:" << target_value_.DebugString(); +} + +void Miner::SetSliceIdx(int slice_idx) { + shift_idx_ = slice_idx; + + // the maximun bits used for mining. + uint64_t total_slice = config_.GetMaxNonceBit(); + assert(total_slice <= 64); + total_slice = 1ll << total_slice; + + size_t replica_size = config_.GetReplicaNum(); + + // id starts from 1. + uint32_t replica_id = config_.GetSelfInfo().id(); + assert(total_slice >= replica_size); + + int idx = (replica_id - 1 + shift_idx_) % replica_size + 1; + + // the default slice is [min_slice, max_slice]. + uint64_t min_slice = 0; + uint64_t max_slice = total_slice-1; + LOG(ERROR) << "slice idx ??:" << slice_idx + << " total slice size:" << total_slice + << " replica id:" << replica_id << " replica size:" << replica_size + << " idx:" << idx << " min slice:" << min_slice + << " max slice:" << max_slice; + mining_slices_.clear(); + mining_slices_.push_back(std::make_pair(min_slice, max_slice)); +} + +absl::Status Miner::Mine(Block* new_block) { + if(new_block->header().height() == 10 && new_block->max_seq() - new_block->min_seq() +1<= config_.BatchTransactionNum() && shift_idx_ == 0){ + LOG(ERROR)<<"skip fake fail"; + return absl::NotFoundError("solution not found"); + } + if(new_block->header().height() == 30 && new_block->max_seq() - new_block->min_seq() +1<= config_.BatchTransactionNum() && shift_idx_ == 0){ + LOG(ERROR)<<"skip fake fail"; + return absl::NotFoundError("solution not found"); + } + LOG(ERROR) << " start mine block slice:" << shift_idx_; + stop_ = false; + std::vector<std::pair<uint64_t, uint64_t>> slices = GetMiningSlices(); + + Block::Header header(new_block->header()); + + for (const auto& slice : slices) { + uint64_t max_slice = slice.second; + uint64_t min_slice = slice.first; + + std::vector<std::thread> ths; + std::atomic<bool> solution_found = false; + + uint64_t step = static_cast<uint64_t>(worker_num_); + for (uint32_t i = 0; i < worker_num_; ++i) { + uint64_t current_slice_start = i + min_slice; + + ths.push_back(std::thread( + [&](Block::Header header, std::pair<uint64_t, uint64_t> slice) { + std::string header_hash; + header_hash += GetHashDigest(header.pre_hash()); + header_hash += GetHashDigest(header.merkle_hash()); + + for (uint64_t nonce = slice.first; + nonce <= max_slice && !stop_ && !solution_found; + nonce += step) { + //LOG(ERROR)<<" miner min:"<<slice.first<<" max:"<<max_slice<<" nonce"<<nonce; + header.set_nonce(nonce); + std::string str_value = + header_hash + std::to_string(header.nonce()); + std::string hash_digest = GetHashValue(str_value); + + if (IsValidDigest(hash_digest, difficulty_)) { + solution_found = true; + *new_block->mutable_hash() = DigestToHash(hash_digest); + *new_block->mutable_header() = header; + LOG(ERROR) << "nonce:" << nonce + << " hex string:" << GetDigestHexString(hash_digest) + << " target:" << difficulty_; + return; + } + } + LOG(ERROR) << "mine done slice:" << slice.first << " len:" << step; + }, + header, std::make_pair(current_slice_start, 0))); + } + + for (auto& th : ths) { + th.join(); + } + if (stop_) { + LOG(ERROR) << "minning has been terminated."; + return absl::CancelledError("terminated"); + } + + if (solution_found) { + LOG(ERROR) << "find solution:" << new_block->header().DebugString() + << " hashvalue:" << new_block->hash().DebugString(); + return absl::OkStatus(); + } + } + LOG(ERROR) << "solution not found"; + return absl::NotFoundError("solution not found"); +} + +void Miner::Terminate() { + LOG(ERROR) << "terminate mining"; + stop_ = true; +} + +// Calculate the hash value: SHA256(SHA256(header)) +// The hash value will be a 32bit integer. +std::string Miner::CalculatePoWHashDigest(const Block::Header& header) { + std::string str_value; + str_value += GetHashDigest(header.pre_hash()); + str_value += GetHashDigest(header.merkle_hash()); + str_value += std::to_string(header.nonce()); + return SignatureVerifier::CalculateHash( + SignatureVerifier::CalculateHash(str_value)); +} + +HashValue Miner::CalculatePoWHash(const Block* new_block) { + std::string digest = CalculatePoWHashDigest(new_block->header()); + return DigestToHash(digest); +} + +bool Miner::IsValidHash(const Block* block) { + return CalculatePoWHash(block) == block->hash(); +} + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/miner.h b/platform/consensus/ordering/poc/pow/miner.h new file mode 100644 index 00000000..4a0b2f66 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/miner.h @@ -0,0 +1,44 @@ +#pragma once +#include "absl/status/status.h" +#include "platform/config/resdb_poc_config.h" +#include "platform/consensus/ordering/poc/proto/pow.pb.h" + +namespace resdb { + +// A miner used to mine the block according to its header. +// It contains the slices as the search space. +// It looks for a solution from the nonces inside the slices +// that the hash value of the header within the searching nonce +// (SHA256(SHA256(header))) is less than a target value which is +// defined from the config and can be reset later. +class Miner { + public: + Miner(const ResDBPoCConfig& config); + + std::vector<std::pair<uint64_t, uint64_t>> GetMiningSlices(); + + absl::Status Mine(Block* new_block); + void Terminate(); + bool IsValidHash(const Block* block); + + // Obtain the shift idx. + int32_t GetSliceIdx() const; + void SetSliceIdx(int slice_idx); + + void SetTargetValue(const HashValue& target_value); + + private: + std::string CalculatePoWHashDigest(const Block::Header& header); + HashValue CalculatePoWHash(const Block* new_block); + + private: + ResDBPoCConfig config_; + HashValue target_value_; + int shift_idx_ = 0; + std::atomic<bool> stop_; + std::vector<std::pair<uint64_t, uint64_t>> mining_slices_; + uint32_t difficulty_ = 1; + uint32_t worker_num_ = 16; +}; + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/miner_manager.cpp b/platform/consensus/ordering/poc/pow/miner_manager.cpp new file mode 100644 index 00000000..a8a2560c --- /dev/null +++ b/platform/consensus/ordering/poc/pow/miner_manager.cpp @@ -0,0 +1,12 @@ +#include "platform/consensus/ordering/poc/pow/miner_manager.h" + +#include <assert.h> +#include <glog/logging.h> + +namespace resdb { + +MinerManager::MinerManager(const ResDBPoCConfig& config) : config_(config) {} + +std::vector<ReplicaInfo> MinerManager::GetReplicas() { return replicas_; } + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/miner_manager.h b/platform/consensus/ordering/poc/pow/miner_manager.h new file mode 100644 index 00000000..23187130 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/miner_manager.h @@ -0,0 +1,18 @@ +#pragma once + +#include "platform/config/resdb_poc_config.h" + +namespace resdb { + +class MinerManager { + public: + MinerManager(const ResDBPoCConfig& config); + + std::vector<ReplicaInfo> GetReplicas(); + + private: + ResDBPoCConfig config_; + std::vector<ReplicaInfo> replicas_; +}; + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/miner_test.cpp b/platform/consensus/ordering/poc/pow/miner_test.cpp new file mode 100644 index 00000000..0e44d891 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/miner_test.cpp @@ -0,0 +1,170 @@ +#include "platform/consensus/ordering/poc/pow/miner.h" + +#include <glog/logging.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <boost/format.hpp> + +#include "common/test/test_macros.h" +#include "platform/config/resdb_config_utils.h" +#include "platform/consensus/ordering/poc/pow/miner_utils.h" + +namespace resdb { +namespace { + +using ::google::protobuf::util::MessageDifferencer; +using ::testing::ElementsAre; +using ::testing::Pair; +using ::testing::Test; + +class MinerTest : public Test { + public: + ResDBPoCConfig GetConfig(int idx) { + ResDBConfig bft_config({GenerateReplicaInfo(1, "127.0.0.1", 1234), + GenerateReplicaInfo(2, "127.0.0.1", 1235), + GenerateReplicaInfo(3, "127.0.0.1", 1236), + GenerateReplicaInfo(4, "127.0.0.1", 1237)}, + GenerateReplicaInfo(idx, "127.0.0.1", 1234), + KeyInfo(), CertificateInfo()); + + return ResDBPoCConfig(bft_config, + {GenerateReplicaInfo(1, "127.0.0.1", 1234), + GenerateReplicaInfo(2, "127.0.0.1", 1235), + GenerateReplicaInfo(3, "127.0.0.1", 1236), + GenerateReplicaInfo(4, "127.0.0.1", 1237)}, + GenerateReplicaInfo(idx, "127.0.0.1", 1234), + KeyInfo(), CertificateInfo()); + } + + ResDBPoCConfig GetConfig(int size, int idx) { + std::vector<ReplicaInfo> replicas; + for (int i = 0; i < size; ++i) { + replicas.push_back(GenerateReplicaInfo(i + 1, "127.0.0.1", 1234 + i)); + } + ResDBConfig bft_config(replicas, + GenerateReplicaInfo(idx, "127.0.0.1", 1234), + KeyInfo(), CertificateInfo()); + + return ResDBPoCConfig(bft_config, replicas, + GenerateReplicaInfo(idx, "127.0.0.1", 1234), + KeyInfo(), CertificateInfo()); + } +}; + +TEST_F(MinerTest, Slices) { + for (int i = 1; i <= 4; ++i) { + ResDBPoCConfig config = GetConfig(i); + config.SetMaxNonceBit(3); + config.SetDifficulty(1); + Miner miner(config); + + std::vector<std::pair<uint64_t, uint64_t>> slices = miner.GetMiningSlices(); + EXPECT_THAT(slices, ElementsAre(Pair((i - 1) * 2, (i - 1) * 2 + 1))); + } +} + +TEST_F(MinerTest, AddNewBlock) { + ResDBPoCConfig config = GetConfig(1, 1); + config.SetMaxNonceBit(30); + config.SetDifficulty(7); + Miner miner(config); + + Block block; + block.mutable_header()->set_height(1); + + EXPECT_TRUE(miner.Mine(&block).ok()); +} + +TEST_F(MinerTest, Mine) { + ResDBPoCConfig config = GetConfig(3); + config.SetMaxNonceBit(20); + config.SetDifficulty(2); + config.SetWorkerNum(1); + Miner miner(config); + + Block block; + *block.mutable_header()->mutable_pre_hash() = + DigestToHash(GetHashValue("pre_hash")); + *block.mutable_header()->mutable_merkle_hash() = + DigestToHash(GetHashValue("merkle_hash")); + block.mutable_header()->set_height(1); + block.mutable_header()->set_nonce(524292); + + std::string expected_hash_header; + expected_hash_header += GetHashDigest(block.header().pre_hash()); + expected_hash_header += GetHashDigest(block.header().merkle_hash()); + expected_hash_header += std::to_string(block.header().nonce()); + + std::string expected_hash = GetHashValue(expected_hash_header); + + EXPECT_TRUE(miner.Mine(&block).ok()); + EXPECT_TRUE( + MessageDifferencer::Equals(block.hash(), DigestToHash(expected_hash))); +} + +TEST_F(MinerTest, MineFail) { + ResDBPoCConfig config = GetConfig(3); + config.SetMaxNonceBit(3); + config.SetDifficulty(2); + Miner miner(config); + + Block block; + block.mutable_header()->set_height(1); + + absl::Status status = miner.Mine(&block); + EXPECT_FALSE(status.ok()); +} + +TEST_F(MinerTest, HashValid) { + ResDBPoCConfig config = GetConfig(3); + config.SetMaxNonceBit(3); + config.SetDifficulty(2); + Miner miner(config); + + Block block; + *block.mutable_header()->mutable_pre_hash() = + DigestToHash(GetHashValue("pre_hash")); + *block.mutable_header()->mutable_merkle_hash() = + DigestToHash(GetHashValue("merkle_hash")); + block.mutable_header()->set_height(1); + block.mutable_header()->set_nonce(4); + + std::string expected_hash_header; + expected_hash_header += GetHashDigest(block.header().pre_hash()); + expected_hash_header += GetHashDigest(block.header().merkle_hash()); + expected_hash_header += std::to_string(block.header().nonce()); + + std::string expected_hash = GetHashValue(expected_hash_header); + *block.mutable_hash() = DigestToHash(expected_hash); + EXPECT_TRUE(miner.IsValidHash(&block)); +} + +TEST_F(MinerTest, HashNotValid) { + ResDBPoCConfig config = GetConfig(3); + config.SetMaxNonceBit(3); + config.SetDifficulty(2); + Miner miner(config); + + Block block; + *block.mutable_header()->mutable_pre_hash() = + DigestToHash(GetHashValue("pre_hash")); + *block.mutable_header()->mutable_merkle_hash() = + DigestToHash(GetHashValue("merkle_hash")); + block.mutable_header()->set_height(1); + block.mutable_header()->set_nonce(4); + + std::string expected_hash_header; + expected_hash_header += GetHashDigest(block.header().pre_hash()); + expected_hash_header += GetHashDigest(block.header().merkle_hash()); + expected_hash_header += std::to_string(block.header().nonce()); + + std::string expected_hash = GetHashValue(expected_hash_header); + *block.mutable_hash() = DigestToHash(expected_hash); + block.mutable_header()->set_nonce(5); + + EXPECT_FALSE(miner.IsValidHash(&block)); +} + +} // namespace +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/miner_utils.cpp b/platform/consensus/ordering/poc/pow/miner_utils.cpp new file mode 100644 index 00000000..a4dbb046 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/miner_utils.cpp @@ -0,0 +1,102 @@ +#include "platform/consensus/ordering/poc/pow/miner_utils.h" + +#include <boost/format.hpp> +#include <glog/logging.h> + +#include "common/crypto/signature_verifier.h" +namespace resdb { + +HashValue DigestToHash(const std::string& value) { + HashValue hash; + const char* pos = value.c_str(); + for (int j = 0; j < 4; ++j) { + uint64_t v = 0; + for (int i = 56; i >= 0; i -= 8) { + uint64_t tmp = *(pos++) & 0xff; + v |= tmp << i; + } + hash.add_bits(v); + } + return hash; +} + +std::string GetHashDigest(const HashValue& hash) { + if (hash.bits().empty()) { + return ""; + } + char mem[32]; + int mem_idx = 0; + for (int i = 0; i < 4; ++i) { + uint64_t bit = hash.bits(i); + for (int j = 56; j >= 0; j -= 8) { + mem[mem_idx++] = (bit >> j) & 0xff; + } + } + return std::string(mem, 32); +} + +std::string GetDigestHexString(const std::string digest) { + std::string hex; + for (size_t i = 0; i < digest.size(); ++i) { + boost::format fmt("%02x"); + fmt % ((uint32_t)digest[i] & 0xff); + hex += fmt.str(); + } + return hex; +} + +bool IsValidDigest(const std::string& digest, uint32_t difficulty) { + uint32_t num = 0; + for (size_t i = 0; i < digest.size(); ++i) { + int zeros = 8; + if (digest[i] == 0) { + } else { + uint8_t m = digest[i]; + while (m > 0) { + zeros--; + m >>= 1; + } + } + num += zeros; + if (zeros != 8) break; + } + //LOG(ERROR)<<"digest:"<<digest<<" num:"<<num; + return num >= difficulty; +} + +std::string GetHashValue(const std::string& data) { + return SignatureVerifier::CalculateHash( + SignatureVerifier::CalculateHash(data)); +} + +int CmpHash(const HashValue& h1, const HashValue& h2) { + if (h1.bits_size() != 4 || h2.bits_size() != 4) return -1; + for (int i = 0; i < 4; ++i) { + if (h1.bits(i) != h2.bits(i)) { + return h1.bits(i) < h2.bits(i) ? -1 : 1; + } + } + return 0; +} + +bool operator<(const HashValue& h1, const HashValue& h2) { + return CmpHash(h1, h2) < 0; +} + +bool operator<=(const HashValue& h1, const HashValue& h2) { + return CmpHash(h1, h2) <= 0; +} + +bool operator>(const HashValue& h1, const HashValue& h2) { + return CmpHash(h1, h2) > 0; +} + +bool operator>=(const HashValue& h1, const HashValue& h2) { + return CmpHash(h1, h2) >= 0; +} + +bool operator==(const HashValue& h1, const HashValue& h2) { + return CmpHash(h1, h2) == 0; +} + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/miner_utils.h b/platform/consensus/ordering/poc/pow/miner_utils.h new file mode 100644 index 00000000..2e04f82a --- /dev/null +++ b/platform/consensus/ordering/poc/pow/miner_utils.h @@ -0,0 +1,30 @@ +#pragma once + +#include "platform/consensus/ordering/poc/proto/pow.pb.h" + +namespace resdb { + +// Calculate the hash value by sha256(sha256(data)) +// and return the binary string. +std::string GetHashValue(const std::string& data); + +// Convert the hexadecimal string digest to binary type. +HashValue DigestToHash(const std::string& value); + +// Convert the binrary value to the hexadecimal string. +std::string GetHashDigest(const HashValue& hash); + +// Convert the hexadecimal string value from the binary value +// obtained from the hash function. +std::string GetDigestHexString(const std::string digest); + +// Check if the digest contains 'difficulty' number of zeros. +bool IsValidDigest(const std::string& digest, uint32_t difficulty); + +bool operator<(const HashValue& h1, const HashValue& h2); +bool operator<=(const HashValue& h1, const HashValue& h2); +bool operator>(const HashValue& h1, const HashValue& h2); +bool operator>=(const HashValue& h1, const HashValue& h2); +bool operator==(const HashValue& h1, const HashValue& h2); + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/miner_utils_test.cpp b/platform/consensus/ordering/poc/pow/miner_utils_test.cpp new file mode 100644 index 00000000..ac6691f1 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/miner_utils_test.cpp @@ -0,0 +1,76 @@ +#include "platform/consensus/ordering/poc/pow/miner_utils.h" + +#include <glog/logging.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <boost/format.hpp> + +#include "common/test/test_macros.h" +#include "common/crypto/signature_verifier.h" + +namespace resdb { +namespace { + +using ::google::protobuf::util::MessageDifferencer; +using ::testing::ElementsAre; +using ::testing::Pair; +using ::testing::Test; + +TEST(MinerUtilsTest, DigestToHashValue) { + std::string str_value; + str_value += std::to_string(100); + str_value += std::to_string(1); + + std::string ret = SignatureVerifier::CalculateHash( + SignatureVerifier::CalculateHash(str_value)); + + HashValue hash_value = DigestToHash(ret); + + std::string hash_digest = GetHashDigest(hash_value); + + EXPECT_EQ(hash_digest, ret); +} + +TEST(MinerUtilsTest, CmpLeadingZerosFromRawDigest) { + std::string str_value; + str_value += std::to_string(100); + str_value += std::to_string(1); + + for (int i = 1; i <= 64; ++i) { + std::string ret = SignatureVerifier::CalculateHash( + SignatureVerifier::CalculateHash(str_value)); + + int j = 0; + for (j = 0; j < (i + 1) / 2; ++j) { + ret[j] = 0x00; + } + + if (i % 2 != 0) { + ret[j - 1] = 0x0f; + } else { + ret[j] = 0xff; + } + + EXPECT_TRUE(IsValidDigest(ret, i * 4)); + if (i + 1 <= 64) { + EXPECT_FALSE(IsValidDigest(ret, (i + 1) * 4)); + } + } +} + +TEST(MinerUtilsTest, DigestString) { + std::string str_value; + str_value += std::to_string(100); + str_value += std::to_string(1); + + std::string ret = SignatureVerifier::CalculateHash( + SignatureVerifier::CalculateHash(str_value)); + ret[0] = 0x03; + std::string hex_string = GetDigestHexString(ret); + EXPECT_EQ(hex_string, + "039411d480d5867a111f06aaf9e5cba7fd5ebd730a54962d5752f74dc49b0898"); +} + +} // namespace +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/mock_transaction_accessor.h b/platform/consensus/ordering/poc/pow/mock_transaction_accessor.h new file mode 100644 index 00000000..9fe5ef15 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/mock_transaction_accessor.h @@ -0,0 +1,15 @@ +#pragma once + +#include "platform/consensus/ordering/poc/pow/transaction_accessor.h" +#include "gmock/gmock.h" + +namespace resdb { + +class MockTransactionAccessor : public TransactionAccessor { + public: + MockTransactionAccessor(const ResDBPoCConfig& config):TransactionAccessor(config, false){} + MOCK_METHOD(std::unique_ptr<BatchClientTransactions>, ConsumeTransactions, (uint64_t seq), + (override)); +}; + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/pow_manager.cpp b/platform/consensus/ordering/poc/pow/pow_manager.cpp new file mode 100644 index 00000000..c19b2973 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/pow_manager.cpp @@ -0,0 +1,271 @@ +#include "platform/consensus/ordering/poc/pow/pow_manager.h" + +#include "common/utils/utils.h" +#include "glog/logging.h" + +namespace resdb { +namespace { + +std::unique_ptr<Request> NewRequest(PoWRequest type, + const ::google::protobuf::Message& message, + int32_t sender) { + auto new_request = std::make_unique<Request>(); + new_request->set_type(type); + new_request->set_sender_id(sender); + message.SerializeToString(new_request->mutable_data()); + return new_request; +} + +} // namespace + + +PoWManager::PoWManager(const ResDBPoCConfig& config, + ReplicaCommunicator* client + ) : config_(config),bc_client_(client){ +Reset(); + is_stop_ = false; + //prometheus_handler_ = Stats::GetGlobalPrometheus(); +} + +PoWManager::~PoWManager() { + Stop(); + if (miner_thread_.joinable()) { + miner_thread_.join(); + } + if (gossip_thread_.joinable()) { + gossip_thread_.join(); + } +} + +std::unique_ptr<TransactionAccessor> PoWManager::GetTransactionAccessor(const ResDBPoCConfig& config){ + return std::make_unique<TransactionAccessor>(config); +} + +std::unique_ptr<ShiftManager> PoWManager::GetShiftManager(const ResDBPoCConfig& config){ + return std::make_unique<ShiftManager>(config); +} + +std::unique_ptr<BlockManager> PoWManager::GetBlockManager(const ResDBPoCConfig& config){ + return std::make_unique<BlockManager>(config); +} + +void PoWManager::Commit(std::unique_ptr<Block> block){ + std::unique_lock<std::mutex> lck(tx_mutex_); + if(block_manager_->Commit(std::move(block))==0){ + LOG(ERROR)<<"commit block succ"; + NotifyNextBlock(); + } + LOG(ERROR)<<"commit block done"; +} + +void PoWManager::NotifyNextBlock(){ + std::unique_lock<std::mutex> lk(mutex_); + cv_.notify_one(); + if(current_status_ == GENERATE_NEW){ + current_status_ = NEXT_NEWBLOCK; + } + LOG(ERROR)<<"notify block:"<<current_status_; +} + +PoWManager::BlockStatus PoWManager::GetBlockStatus() { + return current_status_; +} + +absl::Status PoWManager::WaitBlockDone(){ + LOG(ERROR)<<"wait block:"<<current_status_; + int timeout_ms = config_.GetMiningTime(); + //60000000; + //int timeout_ms = 300000000; + std::unique_lock<std::mutex> lk(mutex_); + cv_.wait_for(lk, std::chrono::microseconds(timeout_ms), [&] { + return current_status_ == NEXT_NEWBLOCK; + }); + LOG(ERROR)<<"wait block done:" << current_status_; + if (current_status_ == NEXT_NEWBLOCK){ + return absl::OkStatus(); + } + return absl::NotFoundError("No new transaction."); +} + +void PoWManager::Reset(){ + transaction_accessor_ = GetTransactionAccessor(config_); + shift_manager_ = GetShiftManager(config_); + block_manager_ = GetBlockManager(config_); + LOG(ERROR)<<"reset:"<<transaction_accessor_.get(); +} + +void PoWManager::Start() { + miner_thread_ = std::thread(&PoWManager::MiningProcess, this); + gossip_thread_ = std::thread(&PoWManager::GossipProcess, this); + is_stop_ = false; +} + +void PoWManager::Stop() { + is_stop_ = true; +} + +bool PoWManager::IsRunning() { + return !is_stop_; +} + +void PoWManager::AddShiftMsg(const SliceInfo& slice_info) { + shift_manager_->AddSliceInfo(slice_info); +} + +int PoWManager::GetShiftMsg(const SliceInfo& slice_info) { + LOG(ERROR)<<"check shift msg:"<<slice_info.DebugString(); + if(!shift_manager_->Check(slice_info)){ + return -1; + } + LOG(ERROR)<<"slice info is ok:"<<slice_info.DebugString(); + if(block_manager_->SetSliceIdx(slice_info)==1){ + return 1; + } + return 0; +} + +int PoWManager::GetMiningTxn(MiningType type){ + std::unique_lock<std::mutex> lck(tx_mutex_); + LOG(ERROR)<<"get mining txn status:"<<current_status_; + if(current_status_ == NEXT_NEWBLOCK){ + type = MiningType::NEWBLOCK; + } + if(type == NEWBLOCK){ + uint64_t max_seq = std::max(block_manager_->GetLastSeq(), block_manager_->GetLastCandidateSeq()); + LOG(ERROR)<<"get block last max:"<<block_manager_->GetLastSeq()<<" "<<block_manager_->GetLastCandidateSeq(); + auto client_tx = transaction_accessor_->ConsumeTransactions(max_seq+1); + if(client_tx == nullptr){ + return -2; + } + block_manager_->SetNewMiningBlock(std::move(client_tx)); + } + else { + //prometheus_handler_->Inc(SHIFT_MSG, 1); + int ret = GetShiftMsg(need_slice_info_); + LOG(ERROR)<<"get shift msg ret:"<<ret; + if(ret==1){ + // no solution after enought shift. + return 1; + } + if (ret !=0){ + BroadCastShiftMsg(need_slice_info_); + return -2; + } + LOG(ERROR)<<"get shift msg:"<<need_slice_info_.DebugString(); + } + return 0; +} + +PoWManager::MiningStatus PoWManager::Wait(){ + current_status_ = GENERATE_NEW; + auto mining_thread = std::thread([&](){ + LOG(ERROR)<<"mine"; + absl::Status status = block_manager_->Mine(); + if(status.ok()){ + if(block_manager_->Commit()==0){ + NotifyNextBlock(); + } + LOG(ERROR)<<"done mine"; + } + LOG(ERROR)<<"mine:"<<status.ok(); + }); + + auto status = WaitBlockDone(); + if(mining_thread.joinable()){ + mining_thread.join(); + } + LOG(ERROR)<<"success:"<<status.ok()<<" status:"<<current_status_; + if(status.ok()||current_status_ == NEXT_NEWBLOCK){ + return MiningStatus::OK; + } + return MiningStatus::TIMEOUT; +} + +// receive a block before send and after need send +void PoWManager::SendShiftMsg(){ + LOG(ERROR)<<"send shift"; + SliceInfo slice_info; + slice_info.set_shift_idx(block_manager_->GetSliceIdx()+1); + slice_info.set_height( + block_manager_->GetNewMiningBlock()->header().height()); + slice_info.set_sender(config_.GetSelfInfo().id()); + BroadCastShiftMsg(slice_info); + + need_slice_info_ = slice_info; + LOG(ERROR)<<"send shift msg"; +} + +int PoWManager::BroadCastNewBlock(const Block& block) { + auto request = NewRequest(PoWRequest::TYPE_COMMITTED_BLOCK, block, + config_.GetSelfInfo().id()); + bc_client_->BroadCast(*request); + return 0; +} + +int PoWManager::BroadCastShiftMsg(const SliceInfo& slice_info) { + auto request = NewRequest(PoWRequest::TYPE_SHIFT_MSG, slice_info, + config_.GetSelfInfo().id()); + bc_client_->BroadCast(*request); + return 0; +} + +// Broadcast the new block if once it is committed. +void PoWManager::GossipProcess() { + uint64_t last_height = 0; + while (IsRunning()) { + std::unique_lock<std::mutex> lck(broad_cast_mtx_); + broad_cast_cv_.wait_until( + lck, std::chrono::system_clock::now() + std::chrono::seconds(1)); + + Block* block = block_manager_->GetBlockByHeight(last_height + 1); + if (block == nullptr) { + // LOG(ERROR) << "======= get block fail:" << last_height + // << " id:" << config_.GetSelfInfo().id(); + continue; + } + BroadCastNewBlock(*block); + last_height++; + } +} + +void PoWManager::NotifyBroadCast() { + std::unique_lock<std::mutex> lck(broad_cast_mtx_); + broad_cast_cv_.notify_all(); +} + +// Mining the new blocks got from PBFT cluster. +void PoWManager::MiningProcess() { + if(config_.GetSelfInfo().id() > 8 && config_.GetSelfInfo().id() < 13){ + //return; + } + LOG(ERROR)<<"start"; + MiningType type = MiningType::NEWBLOCK; + while (IsRunning()) { + int ret = GetMiningTxn(type); + LOG(ERROR)<<"get mining ret:"<<ret; + if(ret<0){ + usleep(10000); + continue; + } + else if(ret>0){ + LOG(ERROR)<<"get new block"; + type = MiningType::NEWBLOCK; + continue; + } + LOG(ERROR)<<"get ok"; + + auto mining_status = Wait(); + LOG(ERROR)<<"done:"<<mining_status; + if(mining_status == MiningStatus::TIMEOUT){ + type = MiningType::SHIFTING; + SendShiftMsg(); + } + else { + type = MiningType::NEWBLOCK; + LOG(ERROR)<<"done"; + NotifyBroadCast(); + } + } +} + +} diff --git a/platform/consensus/ordering/poc/pow/pow_manager.h b/platform/consensus/ordering/poc/pow/pow_manager.h new file mode 100644 index 00000000..c4065528 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/pow_manager.h @@ -0,0 +1,79 @@ +#pragma once + +//#include "server/resdb_replica_client.h" +#include "platform/networkstrate/replica_communicator.h" +#include "platform/common/queue/blocking_queue.h" +#include "platform/config/resdb_poc_config.h" +#include "platform/consensus/ordering/poc/pow/block_manager.h" +#include "platform/consensus/ordering/poc/pow/miner_manager.h" +#include "platform/consensus/ordering/poc/pow/shift_manager.h" +#include "platform/consensus/ordering/poc/pow/transaction_accessor.h" +#include "platform/consensus/ordering/poc/proto/pow.pb.h" + +namespace resdb { + +class PoWManager { + public: + PoWManager(const ResDBPoCConfig& config, ReplicaCommunicator * bc_client); + virtual ~PoWManager(); + + void Start(); + void Stop(); + bool IsRunning(); + void Reset(); + void Commit(std::unique_ptr<Block> block); + void AddShiftMsg(const SliceInfo& slice_info); + + enum MiningStatus { + OK = 0, + TIMEOUT = 1, + FAIL = 2, + }; + + enum BlockStatus { + GENERATE_NEW = 0, + NEXT_NEWBLOCK = 1, + }; + + enum MiningType { + NEWBLOCK = 0, + SHIFTING = 1, + }; + + protected: + virtual std::unique_ptr<TransactionAccessor> GetTransactionAccessor(const ResDBPoCConfig& config); + virtual std::unique_ptr<ShiftManager> GetShiftManager(const ResDBPoCConfig& config); + virtual std::unique_ptr<BlockManager> GetBlockManager(const ResDBPoCConfig& config); + + virtual MiningStatus Wait(); + virtual void NotifyBroadCast(); + virtual int GetShiftMsg(const SliceInfo& slice_info); + + int GetMiningTxn(MiningType type); + void NotifyNextBlock(); + absl::Status WaitBlockDone(); + BlockStatus GetBlockStatus(); + + void SendShiftMsg(); + void MiningProcess(); + int BroadCastNewBlock(const Block& block); + int BroadCastShiftMsg(const SliceInfo& slice_info); + void GossipProcess(); + + private: + ResDBPoCConfig config_; + std::unique_ptr<BlockManager> block_manager_; + std::unique_ptr<ShiftManager> shift_manager_; + std::unique_ptr<TransactionAccessor> transaction_accessor_; + std::thread miner_thread_, gossip_thread_; + std::atomic<bool> is_stop_; + + std::mutex broad_cast_mtx_, mutex_, tx_mutex_; + std::condition_variable broad_cast_cv_,cv_; + std::atomic<BlockStatus> current_status_ = BlockStatus::GENERATE_NEW; + ReplicaCommunicator* bc_client_; + SliceInfo need_slice_info_; + PrometheusHandler * prometheus_handler_; +}; + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/pow_manager_test.cpp b/platform/consensus/ordering/poc/pow/pow_manager_test.cpp new file mode 100644 index 00000000..83627dfe --- /dev/null +++ b/platform/consensus/ordering/poc/pow/pow_manager_test.cpp @@ -0,0 +1,482 @@ +#include "platform/consensus/ordering/poc/pow/pow_manager.h" + +#include <glog/logging.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include "common/test/test_macros.h" +#include "platform/config/resdb_config_utils.h" +#include "platform/consensus/ordering/poc/pow/mock_transaction_accessor.h" +#include "platform/networkstrate/mock_replica_communicator.h" + +namespace resdb { +namespace { + +using ::resdb::testing::EqualsProto; +using ::testing::ElementsAre; +using ::testing::Invoke; +using ::testing::Return; +using ::testing::Pair; +using ::testing::_; +using ::testing::Pointee; +using ::testing::Test; + +class MockPoWManager : public PoWManager { +public: + MockPoWManager(const ResDBPoCConfig& config):PoWManager(config, &mock_replica_client_){ + } + + std::unique_ptr<TransactionAccessor> GetTransactionAccessor(const ResDBPoCConfig& config) override { + auto accessor = std::make_unique<MockTransactionAccessor>(config); + mock_transaction_accessor_ = accessor.get(); + return accessor; + } + MOCK_METHOD(int, GetShiftMsg, (const SliceInfo& slice_info), (override)); + MOCK_METHOD(void, NotifyBroadCast, (), (override)); + MockTransactionAccessor * mock_transaction_accessor_; + MockReplicaCommunicator mock_replica_client_; +}; + +class PoWManagerBaseTest: public Test { + protected: + PoWManagerBaseTest(): + bft_config_({GenerateReplicaInfo(1, "127.0.0.1", 1234), + GenerateReplicaInfo(2, "127.0.0.1", 1235), + GenerateReplicaInfo(3, "127.0.0.1", 1236), + GenerateReplicaInfo(4, "127.0.0.1", 1237)}, + GenerateReplicaInfo(3, "127.0.0.1", 1234), KeyInfo(), + CertificateInfo()), + config_(bft_config_, + {GenerateReplicaInfo(1, "127.0.0.1", 1234), + GenerateReplicaInfo(2, "127.0.0.1", 1235), + GenerateReplicaInfo(3, "127.0.0.1", 1236), + GenerateReplicaInfo(4, "127.0.0.1", 1237)}, + GenerateReplicaInfo(1, "127.0.0.1", 1234), KeyInfo(), + CertificateInfo()){ + config_.SetMaxNonceBit(10); + config_.SetDifficulty(1); + pow_manager_= std::make_unique<MockPoWManager>(config_); + } + + ResDBConfig bft_config_; + ResDBPoCConfig config_; + std::unique_ptr<MockPoWManager> pow_manager_; +}; + +/* +TEST_F(PoWManagerBaseTest, NoData) { + pow_manager_->Reset(); + std::promise<bool> done; + std::future<bool> done_future = done.get_future(); + int call_time = 0; + ASSERT_TRUE(pow_manager_->mock_transaction_accessor_!=nullptr); + EXPECT_CALL(*pow_manager_->mock_transaction_accessor_, ConsumeTransactions).WillRepeatedly(Invoke([&](uint64_t seq){ + LOG(ERROR)<<"seq:"<<seq; + call_time++; + if(call_time>2){ + done.set_value(true); + } + return nullptr; + })); + EXPECT_CALL(*pow_manager_, NotifyBroadCast).Times(0); + pow_manager_->Start(); + done_future.get(); +} +*/ + +TEST_F(PoWManagerBaseTest, GenerateOneBlock) { + pow_manager_->Reset(); + std::promise<bool> done; + std::future<bool> done_future = done.get_future(); + ASSERT_TRUE(pow_manager_->mock_transaction_accessor_!=nullptr); + EXPECT_CALL(*pow_manager_->mock_transaction_accessor_, ConsumeTransactions(_)).WillRepeatedly(Invoke([&](uint64_t seq){ + int batch_num = 1; + std::unique_ptr<BatchClientRequest> batch_request = std::make_unique<BatchClientRequest>(); + for(int j = 0; j < 10; ++j){ + auto req = batch_request->add_client_requests(); + req->mutable_request()->set_data(std::to_string(rand()%10000)); + req->mutable_request()->set_seq(j+seq); + } + std::string data; + batch_request->SerializeToString(&data); + + std::unique_ptr<BatchClientTransactions> batch_transactions = + std::make_unique<BatchClientTransactions>(); + for (uint32_t i = 0; i < batch_num; ++i) { + std::unique_ptr<ClientTransactions> client_txn = std::make_unique<ClientTransactions>(); + client_txn->set_transaction_data(data); + client_txn->set_seq(seq+i); + + *batch_transactions->add_transactions() = *client_txn; + } + batch_transactions->set_min_seq(seq); + batch_transactions->set_max_seq(seq + batch_num - 1); + return batch_transactions; + })); + EXPECT_CALL(*pow_manager_, NotifyBroadCast).WillOnce(Invoke([&](){ + done.set_value(true); + })); + pow_manager_->Start(); + done_future.get(); +} + +/* +TEST_F(PoWManagerBaseTest, MineBlockFail) { + config_.SetMaxNonceBit(5); + config_.SetDifficulty(9); + pow_manager_= std::make_unique<MockPoWManager>(config_); + pow_manager_->Reset(); + std::promise<bool> done; + std::future<bool> done_future = done.get_future(); + int call_time = 0; + ASSERT_TRUE(pow_manager_->mock_transaction_accessor_!=nullptr); + + EXPECT_CALL(*pow_manager_->mock_transaction_accessor_, ConsumeTransactions(_)).WillRepeatedly(Invoke([&](uint64_t seq){ + if(seq>1) { + return (std::unique_ptr<resdb::BatchClientTransactions>)nullptr; + } + int batch_num = 1; + std::unique_ptr<BatchClientRequest> batch_request = std::make_unique<BatchClientRequest>(); + for(int j = 0; j < 10; ++j){ + auto req = batch_request->add_client_requests(); + req->mutable_request()->set_data(std::to_string(rand()%10000)); + req->mutable_request()->set_seq(j+seq); + } + std::string data; + batch_request->SerializeToString(&data); + + std::unique_ptr<BatchClientTransactions> batch_transactions = + std::make_unique<BatchClientTransactions>(); + for (uint32_t i = 0; i < batch_num; ++i) { + std::unique_ptr<ClientTransactions> client_txn = std::make_unique<ClientTransactions>(); + client_txn->set_transaction_data(data); + client_txn->set_seq(seq+i); + + *batch_transactions->add_transactions() = *client_txn; + } + batch_transactions->set_min_seq(seq); + batch_transactions->set_max_seq(seq + batch_num - 1); + return batch_transactions; + })); + EXPECT_CALL(*pow_manager_, GetShiftMsg).WillOnce(Invoke([&](const SliceInfo& slice_info){ + done.set_value(true); + return absl::NotFoundError("No new transaction."); + })); + pow_manager_->Start(); + done_future.get(); +} + +TEST_F(PoWManagerBaseTest, RecvCommitMsg) { + config_.SetMaxNonceBit(5); + config_.SetDifficulty(9); + pow_manager_= std::make_unique<MockPoWManager>(config_); + pow_manager_->Reset(); + std::promise<bool> done; + std::future<bool> done_future = done.get_future(); + int call_time = 0; + ASSERT_TRUE(pow_manager_->mock_transaction_accessor_!=nullptr); + + EXPECT_CALL(*pow_manager_->mock_transaction_accessor_, ConsumeTransactions(_)).WillRepeatedly(Invoke([&](uint64_t seq){ + if(seq>1) { + return (std::unique_ptr<resdb::BatchClientTransactions>)nullptr; + } + int batch_num = 1; + std::unique_ptr<BatchClientRequest> batch_request = std::make_unique<BatchClientRequest>(); + for(int j = 0; j < 10; ++j){ + auto req = batch_request->add_client_requests(); + req->mutable_request()->set_data(std::to_string(rand()%10000)); + req->mutable_request()->set_seq(j+seq); + } + std::string data; + batch_request->SerializeToString(&data); + + std::unique_ptr<BatchClientTransactions> batch_transactions = + std::make_unique<BatchClientTransactions>(); + for (uint32_t i = 0; i < batch_num; ++i) { + std::unique_ptr<ClientTransactions> client_txn = std::make_unique<ClientTransactions>(); + client_txn->set_transaction_data(data); + client_txn->set_seq(seq+i); + + *batch_transactions->add_transactions() = *client_txn; + } + batch_transactions->set_min_seq(seq); + batch_transactions->set_max_seq(seq + batch_num - 1); + return batch_transactions; + })); + EXPECT_CALL(*pow_manager_, GetShiftMsg).WillOnce(Invoke([&](const SliceInfo& slice_info){ + done.set_value(true); + return absl::NotFoundError("No new transaction."); + })); + pow_manager_->Start(); + done_future.get(); +} +*/ + +class MockBlockManager : public BlockManager { + public: + MockBlockManager(const ResDBPoCConfig& config):BlockManager(config){} + MOCK_METHOD(absl::Status, Mine, (), (override)); +}; + +class MockShiftManager : public ShiftManager { + public: + MockShiftManager(const ResDBPoCConfig& config):ShiftManager(config){} + MOCK_METHOD(bool, Check, (const SliceInfo& slice_info, int), (override)); +}; + +class MockPoWManagerWithBC : public PoWManager { +public: + MockPoWManagerWithBC(const ResDBPoCConfig& config):PoWManager(config, &mock_replica_client_){ + } + + std::unique_ptr<TransactionAccessor> GetTransactionAccessor(const ResDBPoCConfig& config) override { + auto accessor = std::make_unique<MockTransactionAccessor>(config); + mock_transaction_accessor_ = accessor.get(); + return accessor; + } + + std::unique_ptr<ShiftManager> GetShiftManager(const ResDBPoCConfig& config) override { + auto manager = std::make_unique<MockShiftManager>(config); + mock_shift_manager_= manager.get(); + return manager ; + } + std::unique_ptr<BlockManager> GetBlockManager(const ResDBPoCConfig& config) override { + auto manager = std::make_unique<MockBlockManager>(config); + mock_block_manager_= manager.get(); + return manager ; + } + + + MockTransactionAccessor * mock_transaction_accessor_; + MockReplicaCommunicator mock_replica_client_; + MockShiftManager * mock_shift_manager_; + MockBlockManager * mock_block_manager_; +}; + +class PoWManagerTest: public Test { + protected: + PoWManagerTest(): + bft_config_({GenerateReplicaInfo(1, "127.0.0.1", 1234), + GenerateReplicaInfo(2, "127.0.0.1", 1235), + GenerateReplicaInfo(3, "127.0.0.1", 1236), + GenerateReplicaInfo(4, "127.0.0.1", 1237)}, + GenerateReplicaInfo(3, "127.0.0.1", 1234), KeyInfo(), + CertificateInfo()), + config_(bft_config_, + {GenerateReplicaInfo(1, "127.0.0.1", 1234), + GenerateReplicaInfo(2, "127.0.0.1", 1235), + GenerateReplicaInfo(3, "127.0.0.1", 1236), + GenerateReplicaInfo(4, "127.0.0.1", 1237)}, + GenerateReplicaInfo(1, "127.0.0.1", 1234), KeyInfo(), + CertificateInfo()){ + config_.SetMaxNonceBit(10); + config_.SetDifficulty(1); + pow_manager_= std::make_unique<MockPoWManagerWithBC>(config_); + } + + ResDBConfig bft_config_; + ResDBPoCConfig config_; + std::unique_ptr<MockPoWManagerWithBC> pow_manager_; +}; + +TEST_F(PoWManagerTest, Broadcast) { + pow_manager_->Reset(); + std::promise<bool> done; + std::future<bool> done_future = done.get_future(); + int call_time = 0; + ASSERT_TRUE(pow_manager_->mock_transaction_accessor_!=nullptr); + EXPECT_CALL(*pow_manager_->mock_transaction_accessor_, ConsumeTransactions(_)).WillRepeatedly(Invoke([&](uint64_t seq){ + int batch_num = 1; + std::unique_ptr<BatchClientRequest> batch_request = std::make_unique<BatchClientRequest>(); + for(int j = 0; j < 10; ++j){ + auto req = batch_request->add_client_requests(); + req->mutable_request()->set_data(std::to_string(rand()%10000)); + req->mutable_request()->set_seq(j+seq); + } + std::string data; + batch_request->SerializeToString(&data); + + std::unique_ptr<BatchClientTransactions> batch_transactions = + std::make_unique<BatchClientTransactions>(); + for (uint32_t i = 0; i < batch_num; ++i) { + std::unique_ptr<ClientTransactions> client_txn = std::make_unique<ClientTransactions>(); + client_txn->set_transaction_data(data); + client_txn->set_seq(seq+i); + + *batch_transactions->add_transactions() = *client_txn; + } + batch_transactions->set_min_seq(seq); + batch_transactions->set_max_seq(seq + batch_num - 1); + return batch_transactions; + })); + EXPECT_CALL(pow_manager_->mock_replica_client_, BroadCast).WillOnce(Invoke([&](const google::protobuf::Message& message){ + done.set_value(true); + })); + pow_manager_->Start(); + done_future.get(); +} + +TEST_F(PoWManagerTest, Broadcast2) { + pow_manager_->Reset(); + std::promise<bool> done; + std::future<bool> done_future = done.get_future(); + int call_time = 0; + ASSERT_TRUE(pow_manager_->mock_transaction_accessor_!=nullptr); + EXPECT_CALL(*pow_manager_->mock_transaction_accessor_, ConsumeTransactions(_)).WillRepeatedly(Invoke([&](uint64_t seq){ + int batch_num = 1; + std::unique_ptr<BatchClientRequest> batch_request = std::make_unique<BatchClientRequest>(); + for(int j = 0; j < 10; ++j){ + auto req = batch_request->add_client_requests(); + req->mutable_request()->set_data(std::to_string(rand()%10000)); + req->mutable_request()->set_seq(j+seq); + } + std::string data; + batch_request->SerializeToString(&data); + + std::unique_ptr<BatchClientTransactions> batch_transactions = + std::make_unique<BatchClientTransactions>(); + for (uint32_t i = 0; i < batch_num; ++i) { + std::unique_ptr<ClientTransactions> client_txn = std::make_unique<ClientTransactions>(); + client_txn->set_transaction_data(data); + client_txn->set_seq(seq+i); + + *batch_transactions->add_transactions() = *client_txn; + } + batch_transactions->set_min_seq(seq); + batch_transactions->set_max_seq(seq + batch_num - 1); + return batch_transactions; + })); + EXPECT_CALL(pow_manager_->mock_replica_client_, BroadCast).WillRepeatedly(Invoke([&](const google::protobuf::Message& message){ + call_time++; + if(call_time>1){ + done.set_value(true); + } + })); + pow_manager_->Start(); + done_future.get(); +} + +TEST_F(PoWManagerTest, SendShift) { + config_.SetMaxNonceBit(5); + config_.SetDifficulty(9); + pow_manager_= std::make_unique<MockPoWManagerWithBC>(config_); + pow_manager_->Reset(); + std::promise<bool> done; + std::future<bool> done_future = done.get_future(); + ASSERT_TRUE(pow_manager_->mock_transaction_accessor_!=nullptr); + + EXPECT_CALL(*pow_manager_->mock_shift_manager_, Check).WillRepeatedly(Invoke([&](const SliceInfo &info, int){ + LOG(ERROR)<<"check:"; + return true; + })); + EXPECT_CALL(*pow_manager_->mock_transaction_accessor_, ConsumeTransactions(_)).WillRepeatedly(Invoke([&](uint64_t seq){ + if(seq>1) { + return (std::unique_ptr<resdb::BatchClientTransactions>)nullptr; + } + int batch_num = 1; + std::unique_ptr<BatchClientRequest> batch_request = std::make_unique<BatchClientRequest>(); + for(int j = 0; j < 10; ++j){ + auto req = batch_request->add_client_requests(); + req->mutable_request()->set_data(std::to_string(rand()%10000)); + req->mutable_request()->set_seq(j+seq); + } + std::string data; + batch_request->SerializeToString(&data); + + std::unique_ptr<BatchClientTransactions> batch_transactions = + std::make_unique<BatchClientTransactions>(); + for (uint32_t i = 0; i < batch_num; ++i) { + std::unique_ptr<ClientTransactions> client_txn = std::make_unique<ClientTransactions>(); + client_txn->set_transaction_data(data); + client_txn->set_seq(seq+i); + + *batch_transactions->add_transactions() = *client_txn; + } + batch_transactions->set_min_seq(seq); + batch_transactions->set_max_seq(seq + batch_num - 1); + return batch_transactions; + })); + EXPECT_CALL(pow_manager_->mock_replica_client_, BroadCast).WillOnce(Invoke([&](const google::protobuf::Message& message){ + const Request *request = (const Request *)&message; + EXPECT_EQ(request->type(), PoWRequest::TYPE_SHIFT_MSG); + return absl::OkStatus(); + })); + int call_time = 0; + EXPECT_CALL(*pow_manager_->mock_block_manager_, Mine).Times(2).WillRepeatedly(Invoke([&](){ + LOG(ERROR)<<"mine time:"<<call_time<<" "<<pow_manager_->mock_block_manager_->GetSliceIdx(); + EXPECT_EQ(pow_manager_->mock_block_manager_->GetSliceIdx(), call_time); + call_time++; + if(call_time>1){ + done.set_value(true); + return absl::OkStatus(); + } + return absl::NotFoundError("No new transaction."); + })); + pow_manager_->Start(); + done_future.get(); +} + +TEST_F(PoWManagerTest, ReSendShift) { + config_.SetMaxNonceBit(5); + config_.SetDifficulty(9); + config_.SetMiningTime(1000); + pow_manager_= std::make_unique<MockPoWManagerWithBC>(config_); + pow_manager_->Reset(); + std::promise<bool> done; + std::future<bool> done_future = done.get_future(); + ASSERT_TRUE(pow_manager_->mock_transaction_accessor_!=nullptr); + + EXPECT_CALL(*pow_manager_->mock_shift_manager_, Check).WillRepeatedly(Invoke([&](const SliceInfo &info, int){ + return false; + })); + EXPECT_CALL(*pow_manager_->mock_transaction_accessor_, ConsumeTransactions(_)).WillRepeatedly(Invoke([&](uint64_t seq){ + if(seq>1) { + return (std::unique_ptr<resdb::BatchClientTransactions>)nullptr; + } + int batch_num = 1; + std::unique_ptr<BatchClientRequest> batch_request = std::make_unique<BatchClientRequest>(); + for(int j = 0; j < 10; ++j){ + auto req = batch_request->add_client_requests(); + req->mutable_request()->set_data(std::to_string(rand()%10000)); + req->mutable_request()->set_seq(j+seq); + } + std::string data; + batch_request->SerializeToString(&data); + + std::unique_ptr<BatchClientTransactions> batch_transactions = + std::make_unique<BatchClientTransactions>(); + for (uint32_t i = 0; i < batch_num; ++i) { + std::unique_ptr<ClientTransactions> client_txn = std::make_unique<ClientTransactions>(); + client_txn->set_transaction_data(data); + client_txn->set_seq(seq+i); + + *batch_transactions->add_transactions() = *client_txn; + } + batch_transactions->set_min_seq(seq); + batch_transactions->set_max_seq(seq + batch_num - 1); + return batch_transactions; + })); + + int call_time = 0; + EXPECT_CALL(pow_manager_->mock_replica_client_, BroadCast).WillRepeatedly(Invoke([&](const google::protobuf::Message& message){ + const Request *request = (const Request *)&message; + EXPECT_EQ(request->type(), PoWRequest::TYPE_SHIFT_MSG); + LOG(ERROR)<<"call broad cast:"<<call_time; + call_time++; + if(call_time>1){ + done.set_value(true); + return absl::OkStatus(); + } + return absl::OkStatus(); + })); + + EXPECT_CALL(*pow_manager_->mock_block_manager_, Mine).Times(1).WillRepeatedly(Invoke([&](){ + return absl::NotFoundError("No new transaction."); + })); + + pow_manager_->Start(); + done_future.get(); +} + +} // namespace +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/shift_manager.cpp b/platform/consensus/ordering/poc/pow/shift_manager.cpp new file mode 100644 index 00000000..0d44d08c --- /dev/null +++ b/platform/consensus/ordering/poc/pow/shift_manager.cpp @@ -0,0 +1,31 @@ +#include "platform/consensus/ordering/poc/pow/shift_manager.h" + +#include <glog/logging.h> + +namespace resdb { + +ShiftManager::ShiftManager(const ResDBPoCConfig& config) : config_(config) {} + +void ShiftManager::AddSliceInfo(const SliceInfo& slice_info) { + std::unique_lock<std::mutex> lk(mutex_); + cv_.notify_one(); + LOG(ERROR)<<"add shift info:"<<slice_info.DebugString(); + data_[std::make_pair(slice_info.height(), slice_info.shift_idx())].insert(slice_info.sender()); +} + + +bool ShiftManager::Check(const SliceInfo& slice_info, int timeout_ms) { + auto check_done = [&](){ + size_t n = config_.GetReplicaNum(); + size_t f = config_.GetMaxMaliciousReplicaNum(); + return data_[std::make_pair(slice_info.height(), slice_info.shift_idx())].size()>=n-f; + }; + std::unique_lock<std::mutex> lk(mutex_); + cv_.wait_for(lk, std::chrono::microseconds(timeout_ms), [&] { + return check_done(); + }); + LOG(ERROR)<<"get shift msg:"<<slice_info.height()<<" "<<slice_info.shift_idx()<<":"<<data_[std::make_pair(slice_info.height(), slice_info.shift_idx())].size(); + return check_done(); +} + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/shift_manager.h b/platform/consensus/ordering/poc/pow/shift_manager.h new file mode 100644 index 00000000..ee6e7587 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/shift_manager.h @@ -0,0 +1,24 @@ +#pragma once + +#include "platform/config/resdb_poc_config.h" +#include "platform/consensus/ordering/poc/proto/pow.pb.h" +#include <condition_variable> + +namespace resdb { + +class ShiftManager { + public: + ShiftManager(const ResDBPoCConfig& config); + virtual ~ShiftManager() = default; + + void AddSliceInfo(const SliceInfo& slice_info); + virtual bool Check(const SliceInfo& slice_info, int timeout_ms=10000); + + private: + ResDBPoCConfig config_; + std::map<std::pair<uint64_t, uint64_t>, std::set<uint32_t>> data_; + std::mutex mutex_; + std::condition_variable cv_; +}; + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/transaction_accessor.cpp b/platform/consensus/ordering/poc/pow/transaction_accessor.cpp new file mode 100644 index 00000000..90e61ac9 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/transaction_accessor.cpp @@ -0,0 +1,153 @@ +#include "platform/consensus/ordering/poc/pow/transaction_accessor.h" + +#include <glog/logging.h> +#include "common/utils/utils.h" + +namespace resdb { + +TransactionAccessor::TransactionAccessor(const ResDBPoCConfig& config, + bool auto_start) + : config_(config) { + stop_ = false; + max_received_seq_ = 0; + next_consume_ = 1; + if (auto_start) { + fetching_thread_ = + std::thread(&TransactionAccessor::TransactionFetching, this); + } + //prometheus_handler_ = Stats::GetGlobalPrometheus(); +} + +uint64_t GetCurrentTime(){ + uint64_t ret = 0; + struct timeval tv; + struct timezone tz; + gettimeofday (&tv, &tz); + return (uint64_t)(tv.tv_sec)*1000000 + tv.tv_usec; +} + +TransactionAccessor::~TransactionAccessor() { + stop_ = true; + if (fetching_thread_.joinable()) { + fetching_thread_.join(); + } +} + +void TransactionAccessor::Start() { + fetching_thread_ = + std::thread(&TransactionAccessor::TransactionFetching, this); +} + +void TransactionAccessor::TransactionFetching() { + std::unique_ptr<ResDBTxnAccessor> client = GetResDBTxnAccessor(); + assert(client != nullptr); + + uint64_t last_time = 0; + uint64_t current_time = GetCurrentTime(); + //double sleep_time = 1.0/8928; + //double sleep_time = 1.0/1063; + sleep(10); + int cur_seq_pbft = 1; + srand(12345); + //srand(1234); + //srand(time(0)); + + while (!stop_) { + + uint64_t cur_seq = max_received_seq_ + 1; + if(cur_seq > 10000000)break; + auto ret = client->GetTxn(cur_seq, cur_seq+1000); + if (!ret.ok() || (*ret).size()==0 || (*ret)[0].first != cur_seq) { + // LOG(ERROR) << "get txn fail:" << cur_seq; + sleep(1); + continue; + } + /* + absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> ret; + std::vector<std::pair<uint64_t, std::string>> ret1; + for(int i = 0; i < 1000; ++i){ + BatchClientRequest batch_request; + for(int j = 0; j < 100; ++j){ + auto req = batch_request.add_client_requests(); + req->mutable_request()->set_data(std::to_string(rand()%10000)); + req->mutable_request()->set_seq(cur_seq+i); + } + std::string data; + batch_request.SerializeToString(&data); + ret1.push_back(std::make_pair(cur_seq+i, data)); + } + ret=ret1; + */ + + double need_time = 2.0/1000; + uint64_t current_time1 = GetCurrentTime(); + for(auto& res : (*ret)){ + uint64_t current_time = GetCurrentTime(); + + if(last_time>0){ + double sleep_time = std::max((uint64_t)0,current_time - last_time)/1000000.0; + // LOG(ERROR)<<"sleep:"<<sleep_time<<" "<<(need_time-sleep_time)<<" "<<(int)((need_time - sleep_time)*1000000); + if(need_time - sleep_time>0){ + usleep((int)((need_time - sleep_time)*1000000)); + } + } + + //LOG(ERROR)<<"current time:"<<current_time; + last_time = current_time; + std::unique_ptr<ClientTransactions> client_txn = + std::make_unique<ClientTransactions>(); + client_txn->set_transaction_data(res.second); + client_txn->set_seq(cur_seq); + client_txn->set_create_time(GetCurrentTime()); + //prometheus_handler_->Inc(TRANSACTION_INPUT, 1); + queue_.Push(std::move(client_txn)); + cur_seq++; + + } + max_received_seq_ = cur_seq; + + std::lock_guard<std::mutex> lk(mutex_); + cv_.notify_all(); + } + return; +} + +std::unique_ptr<ResDBTxnAccessor> TransactionAccessor::GetResDBTxnAccessor() { + return std::make_unique<ResDBTxnAccessor>(*config_.GetBFTConfig()); +} + +// obtain [seq, seq+batch_num-1] transactions +std::unique_ptr<BatchClientTransactions> +TransactionAccessor::ConsumeTransactions(uint64_t seq) { + LOG(ERROR) << "consume transaction:" << seq + << " batch:" << config_.BatchTransactionNum() + << " received max seq:" << max_received_seq_; + if (seq + config_.BatchTransactionNum() > max_received_seq_ + 1) { + std::unique_lock<std::mutex> lk(mutex_); + cv_.wait_for(lk, std::chrono::seconds(1), + [&] { return seq + config_.BatchTransactionNum() <= max_received_seq_ + 1; }); + + return nullptr; + } + while(seq > next_consume_){ + *queue_.Pop(); + next_consume_++; + } + if (seq != next_consume_) { + LOG(ERROR) << "next should consume:" << next_consume_; + return nullptr; + } + + std::unique_ptr<BatchClientTransactions> batch_transactions = + std::make_unique<BatchClientTransactions>(); + for (uint32_t i = 0; i < config_.BatchTransactionNum(); ++i) { + *batch_transactions->add_transactions() = *queue_.Pop(); + } + batch_transactions->set_min_seq(seq); + batch_transactions->set_max_seq(seq + config_.BatchTransactionNum() - 1); + next_consume_ = next_consume_ + config_.BatchTransactionNum(); + LOG(ERROR)<<"get batch:"<<GetCurrentTime(); + return batch_transactions; +} + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/transaction_accessor.h b/platform/consensus/ordering/poc/pow/transaction_accessor.h new file mode 100644 index 00000000..063d24d4 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/transaction_accessor.h @@ -0,0 +1,48 @@ +#pragma once + +#include <thread> + +#include "interface/common/resdb_txn_accessor.h" +#include "platform/common/queue/lock_free_queue.h" +#include "platform/config/resdb_poc_config.h" +#include "platform/consensus/ordering/poc/proto/pow.pb.h" +#include "platform/statistic/stats.h" + +namespace resdb { + +// TransactionAccessor obtains the transaction from BFT cluster. +// It broadcasts the request to all the replicas in BFT cluster +// and waits for 2f+1 same response, but only return one to the +// caller. +class TransactionAccessor { + public: + // For test, it is started by the tester. + TransactionAccessor(const ResDBPoCConfig& config, bool auto_start = true); + virtual ~TransactionAccessor(); + + // consume the transaction between [seq, seq+batch_num-1] + virtual std::unique_ptr<BatchClientTransactions> ConsumeTransactions(uint64_t seq); + + // For test. + void Start(); + + protected: + void TransactionFetching(); + virtual std::unique_ptr<ResDBTxnAccessor> GetResDBTxnAccessor(); + + private: + ResDBPoCConfig config_; + std::atomic<bool> stop_; + std::thread fetching_thread_; + std::atomic<uint64_t> max_received_seq_; + LockFreeQueue<ClientTransactions> queue_; + uint64_t next_consume_ = 0; + + std::condition_variable cv_; + std::mutex mutex_; + uint64_t last_time_ = 0; + + PrometheusHandler * prometheus_handler_; +}; + +} // namespace resdb diff --git a/platform/consensus/ordering/poc/pow/transaction_accessor_test.cpp b/platform/consensus/ordering/poc/pow/transaction_accessor_test.cpp new file mode 100644 index 00000000..3027aa29 --- /dev/null +++ b/platform/consensus/ordering/poc/pow/transaction_accessor_test.cpp @@ -0,0 +1,121 @@ +#include "platform/consensus/ordering/poc/pow/transaction_accessor.h" + +#include <glog/logging.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <future> + +#include "interface/common/mock_resdb_txn_accessor.h" +#include "common/test/test_macros.h" +#include "platform/config/resdb_config_utils.h" + +namespace resdb { +namespace { + +using ::resdb::testing::EqualsProto; +using ::testing::_; +using ::testing::Invoke; +using ::testing::Pointee; +using ::testing::Return; + +class MockTransactionAccessor : public TransactionAccessor { + public: + MockTransactionAccessor(const ResDBPoCConfig& config) + : TransactionAccessor(config, false) {} + MOCK_METHOD(std::unique_ptr<ResDBTxnAccessor>, GetResDBTxnAccessor, (), + (override)); +}; + +ResDBPoCConfig GetConfig() { + ResDBConfig bft_config({GenerateReplicaInfo(1, "127.0.0.1", 2001), + GenerateReplicaInfo(2, "127.0.0.1", 2002), + GenerateReplicaInfo(3, "127.0.0.1", 2003), + GenerateReplicaInfo(4, "127.0.0.1", 2004)}, + GenerateReplicaInfo(1, "127.0.0.1", 2001), KeyInfo(), + CertificateInfo()); + + return ResDBPoCConfig(bft_config, + {GenerateReplicaInfo(1, "127.0.0.1", 1234), + GenerateReplicaInfo(2, "127.0.0.1", 1235), + GenerateReplicaInfo(3, "127.0.0.1", 1236), + GenerateReplicaInfo(4, "127.0.0.1", 1237)}, + GenerateReplicaInfo(1, "127.0.0.1", 1234), KeyInfo(), + CertificateInfo()); +} + +TEST(TransactionAccessorTest, GetTransactionsFail) { + std::promise<bool> cli_done; + std::future<bool> cli_done_future = cli_done.get_future(); + + ResDBPoCConfig config = GetConfig(); + config.SetBatchTransactionNum(1); + QueryRequest request; + request.set_min_seq(1); + request.set_max_seq(1); + MockTransactionAccessor accessor(config); + EXPECT_CALL(accessor, GetResDBTxnAccessor).WillRepeatedly(Invoke([&]() { + auto client = std::make_unique<MockResDBTxnAccessor>(*config.GetBFTConfig()); + EXPECT_CALL(*client, GetTxn(1, 1)).WillOnce(Invoke([&]() { + cli_done.set_value(true); + return absl::InternalError("recv data fail."); + })); + return client; + })); + accessor.Start(); + cli_done_future.get(); + std::unique_ptr<BatchClientTransactions> resp = + accessor.ConsumeTransactions(0); + EXPECT_EQ(resp, nullptr); +} + +TEST(TransactionAccessorTest, GetTransactions) { + std::promise<bool> cli_done; + std::future<bool> cli_done_future = cli_done.get_future(); + ResDBPoCConfig config = GetConfig(); + config.SetBatchTransactionNum(1); + + ClientTransactions expected_resp; + expected_resp.set_seq(1); + expected_resp.set_transaction_data("test"); + + BatchClientTransactions expected_batch_txn; + *expected_batch_txn.add_transactions() = expected_resp; + expected_batch_txn.set_min_seq(1); + expected_batch_txn.set_max_seq(1); + + QueryRequest request; + request.set_min_seq(1); + request.set_max_seq(1); + MockTransactionAccessor accessor(config); + EXPECT_CALL(accessor, GetResDBTxnAccessor).WillRepeatedly(Invoke([&]() { + auto client = std::make_unique<MockResDBTxnAccessor>(*config.GetBFTConfig()); + ON_CALL(*client, GetTxn(1, 1)).WillByDefault(Invoke([&]() { + std::vector<std::pair<uint64_t, std::string>> resp; + resp.push_back(std::make_pair(expected_resp.seq(), + expected_resp.transaction_data())); + cli_done.set_value(true); + return resp; + })); + ON_CALL(*client, GetTxn(2, 2)).WillByDefault(Invoke([&]() { + return absl::InternalError("recv data fail."); + })); + + return client; + })); + accessor.Start(); + + cli_done_future.get(); + while (true) { + std::unique_ptr<BatchClientTransactions> resp = + accessor.ConsumeTransactions(1); + if (resp == nullptr) { + continue; + } + EXPECT_THAT(resp, Pointee(EqualsProto(expected_batch_txn))); + break; + } +} + +} // namespace +} // namespace resdb diff --git a/platform/consensus/ordering/poc/proto/BUILD b/platform/consensus/ordering/poc/proto/BUILD new file mode 100644 index 00000000..18a32c64 --- /dev/null +++ b/platform/consensus/ordering/poc/proto/BUILD @@ -0,0 +1,28 @@ +package(default_visibility = ["//visibility:public"]) + +load("@rules_cc//cc:defs.bzl", "cc_proto_library") +load("@rules_proto//proto:defs.bzl", "proto_library") + +proto_library( + name = "transaction_proto", + srcs = ["transaction.proto"], + deps = [ + "//common/proto:signature_info_proto" + ], +) + +cc_proto_library( + name = "transaction_cc_proto", + deps = [":transaction_proto"], +) + +proto_library( + name = "pow_proto", + srcs = ["pow.proto"], + deps = [":transaction_proto"], +) + +cc_proto_library( + name = "pow_cc_proto", + deps = [":pow_proto"], +) diff --git a/platform/consensus/ordering/poc/proto/pow.proto b/platform/consensus/ordering/poc/proto/pow.proto new file mode 100644 index 00000000..bce48b32 --- /dev/null +++ b/platform/consensus/ordering/poc/proto/pow.proto @@ -0,0 +1,52 @@ +syntax = "proto3"; + +package resdb; + + +message ClientTransactions { + bytes transaction_data = 1; + uint64 seq = 2; + uint64 create_time = 3; +} + +message BatchClientTransactions { + repeated ClientTransactions transactions = 1; + uint64 min_seq = 2; + uint64 max_seq = 3; +} + +enum PoWRequest { + NONE = 0; + TYPE_COMMITTED_BLOCK = 101; + TYPE_SHIFT_MSG = 102; +}; + +message SliceInfo { + uint64 height = 1; + int32 shift_idx = 2; + int32 sender = 3; +} + +// 256 bits hash value +message HashValue{ + repeated uint64 bits = 1; +}; + +message Block { + message Header { + uint64 height = 1; + HashValue pre_hash = 2; + HashValue merkle_hash = 3; + uint64 nonce = 4; + }; + Header header = 1; + bytes transaction_data = 2; + HashValue hash = 3; + uint64 min_seq = 4; + uint64 max_seq = 5; + uint64 miner = 6; + uint64 block_time = 7; + uint64 mining_time = 8; +} + + diff --git a/platform/consensus/ordering/poc/proto/transaction.proto b/platform/consensus/ordering/poc/proto/transaction.proto new file mode 100644 index 00000000..4b03b447 --- /dev/null +++ b/platform/consensus/ordering/poc/proto/transaction.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +import "common/proto/signature_info.proto"; + +package resdb; + +message Transaction { + bytes data = 1; + KeyInfo public_key = 2; + bytes signature = 3; +} + diff --git a/platform/networkstrate/consensus_manager.h b/platform/networkstrate/consensus_manager.h index 57ecc836..61113ab9 100644 --- a/platform/networkstrate/consensus_manager.h +++ b/platform/networkstrate/consensus_manager.h @@ -49,7 +49,7 @@ class ConsensusManager : public ServiceInterface { void Stop(); // Should be called by the instance or test. - void Start(); + virtual void Start(); protected: // BroadCast will generate signatures whiling sending data to other replicas. diff --git a/platform/proto/resdb.proto b/platform/proto/resdb.proto index 47edac38..207dee9d 100644 --- a/platform/proto/resdb.proto +++ b/platform/proto/resdb.proto @@ -207,3 +207,24 @@ message CustomQueryResponse { bytes resp_str = 1; } +message BatchClientRequest { + message ClientRequest { + Request request = 1; + SignatureInfo signature = 2; + int32 id = 3; + }; + repeated ClientRequest client_requests = 1; + uint64 createtime = 2; + uint64 local_id = 3; +} + +message BatchClientResponse { + repeated bytes response = 1; + repeated SignatureInfo signatures = 2; + int32 proxy_id = 3; + uint64 seq = 4; + uint64 current_view = 5; + uint64 createtime = 6; + uint64 local_id = 7; +} + diff --git a/service/poc/BUILD b/service/poc/BUILD new file mode 100644 index 00000000..21fe49b3 --- /dev/null +++ b/service/poc/BUILD @@ -0,0 +1,12 @@ +package(default_visibility = ["//visibility:public"]) + +cc_binary( + name = "pow_server", + srcs = ["pow_server.cpp"], + deps = [ + "//platform/config:resdb_config_utils", + "//platform/consensus/ordering/poc/pow:consensus_service_pow", + "//service/utils:server_factory", + "//platform/statistic:stats" + ], +) diff --git a/service/poc/pow_server.cpp b/service/poc/pow_server.cpp new file mode 100644 index 00000000..5e8e451e --- /dev/null +++ b/service/poc/pow_server.cpp @@ -0,0 +1,56 @@ +#include "platform/config/resdb_config_utils.h" +#include "platform/consensus/ordering/poc/pow/consensus_service_pow.h" +#include "service/utils/server_factory.h" +#include "platform/statistic/stats.h" + +using resdb::CertificateInfo; +using resdb::ConsensusServicePoW; +using resdb::GenerateReplicaInfo; +using resdb::GenerateResDBConfig; +using resdb::KeyInfo; +using resdb::ReadConfig; +using resdb::Stats; +using resdb::ReplicaInfo; +using resdb::ResDBConfig; +using resdb::ResConfigData; +using resdb::ResDBPoCConfig; +using resdb::ServiceNetwork; + +void ShowUsage() { printf("<bft config> <pow config>\n"); } + +int main(int argc, char** argv) { + if (argc < 5) { + ShowUsage(); + exit(0); + } + + std::string bft_config_file = argv[1]; + std::string pow_config_file = argv[2]; + std::string private_key_file = argv[3]; + std::string cert_file = argv[4]; + LOG(ERROR) << "pow_config:" << pow_config_file; + + ResDBConfig bft_config = GenerateResDBConfig(bft_config_file); + + std::unique_ptr<ResDBConfig> pow_config = GenerateResDBConfig( + pow_config_file, private_key_file, cert_file, std::nullopt, + [&](const ResConfigData& config_data, + const ReplicaInfo& self_info, const KeyInfo& private_key, + const CertificateInfo& public_key_cert_info) { + return std::make_unique<ResDBPoCConfig>( + bft_config, config_data, self_info, private_key, public_key_cert_info); + }); + + + LOG(ERROR)<<"elf ip:"<<pow_config->GetSelfInfo().ip(); + ResDBPoCConfig* pow_config_ptr = + static_cast<ResDBPoCConfig*>(pow_config.get()); + + pow_config_ptr->SetMaxNonceBit(42); + pow_config_ptr->SetDifficulty(36); + + auto server = + std::make_unique<ServiceNetwork>(*pow_config_ptr, std::make_unique<ConsensusServicePoW>(*pow_config_ptr)); + server->Run(); +} +
