This is an automated email from the ASF dual-hosted git repository. junchao pushed a commit to branch new_deploy in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit ea3ba69d29a86783941d8aafbf7468bb0fd05c6e Author: cjcchen <[email protected]> AuthorDate: Fri May 3 01:12:05 2024 +0000 fix viewchange bug --- interface/common/resdb_txn_accessor.cpp | 21 +++-- .../consensus/ordering/pbft/checkpoint_manager.cpp | 5 +- platform/consensus/ordering/pbft/commitment.cpp | 12 +-- .../consensus/ordering/pbft/message_manager.cpp | 5 +- .../ordering/pbft/performance_manager.cpp | 10 ++- platform/consensus/ordering/pbft/query.cpp | 65 +++++++++++++--- .../consensus/ordering/pbft/response_manager.cpp | 31 +++++--- .../consensus/ordering/pbft/response_manager.h | 2 +- platform/consensus/recovery/recovery.cpp | 4 +- platform/networkstrate/async_acceptor.cpp | 2 +- platform/networkstrate/consensus_manager.cpp | 89 ++++++++++++++-------- platform/networkstrate/consensus_manager.h | 4 + platform/proto/resdb.proto | 5 ++ scripts/deploy/config/kv_server.conf | 8 +- scripts/deploy/script/deploy.sh | 23 +++++- service/kv/kv_service.cpp | 1 + service/tools/kv/api_tools/kv_client_txn_tools.cpp | 2 - 17 files changed, 203 insertions(+), 86 deletions(-) diff --git a/interface/common/resdb_txn_accessor.cpp b/interface/common/resdb_txn_accessor.cpp index 532d432a..ca19753d 100644 --- a/interface/common/resdb_txn_accessor.cpp +++ b/interface/common/resdb_txn_accessor.cpp @@ -59,15 +59,12 @@ ResDBTxnAccessor::GetTxn(uint64_t min_seq, uint64_t max_seq) { ths.push_back(std::thread( [&](NetChannel* client) { std::string response_str; - int ret = -1; - for (int i = 0; i < 3 && ret < 0; ++i) { - ret = client->SendRequest(request, Request::TYPE_QUERY); - if (ret) { - return; - } - client->SetRecvTimeout(1000); - ret = client->RecvRawMessageStr(&response_str); + int ret = client->SendRequest(request, Request::TYPE_QUERY); + if (ret) { + return; } + client->SetRecvTimeout(1000); + ret = client->RecvRawMessageStr(&response_str); if (ret == 0) { std::unique_lock<std::mutex> lck(mtx); recv_count[response_str]++; @@ -160,11 +157,11 @@ absl::StatusOr<uint64_t> ResDBTxnAccessor::GetBlockNumbers() { std::string final_str; std::mutex mtx; std::condition_variable resp_cv; + bool success = false; - std::unique_ptr<NetChannel> client = - GetNetChannel(replicas_[0].ip(), replicas_[0].port()); + std::unique_ptr<NetChannel> client = GetNetChannel(replicas_[0].ip(), replicas_[0].port()); - LOG(ERROR) << "ip:" << replicas_[0].ip() << " port:" << replicas_[0].port(); + LOG(INFO)<<"ip:"<<replicas_[0].ip()<<" port:"<<replicas_[0].port(); std::string response_str; int ret = 0; @@ -175,7 +172,7 @@ absl::StatusOr<uint64_t> ResDBTxnAccessor::GetBlockNumbers() { } client->SetRecvTimeout(100000); ret = client->RecvRawMessageStr(&response_str); - LOG(ERROR) << "receive str:" << ret << " len:" << response_str.size(); + LOG(INFO)<<"receive str:"<<ret<<" len:"<<response_str.size(); if (ret != 0) { continue; } diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp b/platform/consensus/ordering/pbft/checkpoint_manager.cpp index ef748308..db633c09 100644 --- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp +++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp @@ -312,11 +312,14 @@ void CheckPointManager::UpdateCheckPointStatus() { last_hash_ = GetHash(last_hash_, request->hash()); last_seq_++; } + bool is_recovery = request->is_recovery(); txn_db_->Put(std::move(request)); if (current_seq == last_ckpt_seq + water_mark) { last_ckpt_seq = current_seq; - BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs, stable_seqs); + if(!is_recovery){ + BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs, stable_seqs); + } } } return; diff --git a/platform/consensus/ordering/pbft/commitment.cpp b/platform/consensus/ordering/pbft/commitment.cpp index 081c11d2..aeebe835 100644 --- a/platform/consensus/ordering/pbft/commitment.cpp +++ b/platform/consensus/ordering/pbft/commitment.cpp @@ -81,7 +81,7 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context> context, // << message_manager_->GetCurrentPrimary() // << " seq:" << user_request->seq() // << " hash:" << user_request->hash(); - LOG(ERROR)<<"NOT PRIMARY, Primary is "<<message_manager_->GetCurrentPrimary(); + LOG(INFO)<<"NOT PRIMARY, Primary is "<<message_manager_->GetCurrentPrimary(); replica_communicator_->SendMessage(*user_request, message_manager_->GetCurrentPrimary()); { @@ -117,7 +117,6 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context> context, global_stats_->IncClientRequest(); if (duplicate_manager_->CheckAndAddProposed(user_request->hash())) { - LOG(ERROR) << "duplicate check fail:"; return -2; } auto seq = message_manager_->AssignNextSeq(); @@ -160,10 +159,13 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context> context, return -2; } if (request->is_recovery()) { - if (static_cast<int64_t>(request->seq()) >= - message_manager_->GetNextSeq()) { + if (message_manager_->GetNextSeq() == 0 || request->seq() == message_manager_->GetNextSeq()) { message_manager_->SetNextSeq(request->seq() + 1); } + else { + LOG(ERROR)<<" recovery request not valid:"<<" current seq:"<<message_manager_->GetNextSeq()<<" data seq:"<<request->seq(); + return 0; + } return message_manager_->AddConsensusMsg(context->signature, std::move(request)); } @@ -310,7 +312,7 @@ int Commitment::PostProcessExecutedMsg() { request.set_current_view(batch_resp->current_view()); request.set_proxy_id(batch_resp->proxy_id()); request.set_primary_id(batch_resp->primary_id()); - // LOG(ERROR)<<"send back to proxy:"<<batch_resp->proxy_id(); + //LOG(ERROR)<<"send back to proxy:"<<batch_resp->proxy_id(); batch_resp->SerializeToString(request.mutable_data()); replica_communicator_->SendMessage(request, request.proxy_id()); } diff --git a/platform/consensus/ordering/pbft/message_manager.cpp b/platform/consensus/ordering/pbft/message_manager.cpp index 37d3b5de..cc5e187c 100644 --- a/platform/consensus/ordering/pbft/message_manager.cpp +++ b/platform/consensus/ordering/pbft/message_manager.cpp @@ -39,6 +39,9 @@ MessageManager::MessageManager( [&](std::unique_ptr<Request> request, std::unique_ptr<BatchUserResponse> resp_msg) { if (request->is_recovery()) { + if (checkpoint_manager_) { + checkpoint_manager_->AddCommitData(std::move(request)); + } return; } resp_msg->set_proxy_id(request->proxy_id()); @@ -231,8 +234,6 @@ TransactionStatue MessageManager::GetTransactionState(uint64_t seq) { } int MessageManager::GetReplicaState(ReplicaState* state) { - state->set_view(GetCurrentView()); - *state->mutable_replica_info() = config_.GetSelfInfo(); *state->mutable_replica_config() = config_.GetConfigData(); return 0; } diff --git a/platform/consensus/ordering/pbft/performance_manager.cpp b/platform/consensus/ordering/pbft/performance_manager.cpp index a5ba979f..ebaf2ed4 100644 --- a/platform/consensus/ordering/pbft/performance_manager.cpp +++ b/platform/consensus/ordering/pbft/performance_manager.cpp @@ -191,8 +191,16 @@ CollectorResultCode PerformanceManager::AddResponseMsg( return CollectorResultCode::INVALID; } + std::unique_ptr<BatchUserResponse> batch_response = std::make_unique<BatchUserResponse>(); + if (!batch_response->ParseFromString(request->data())) { + LOG(ERROR) << "parse response fail:"<<request->data().size() + <<" seq:"<<request->seq(); + return CollectorResultCode::INVALID; + } + + uint64_t seq = batch_response->local_id(); + int type = request->type(); - uint64_t seq = request->seq(); int resp_received_count = 0; int ret = collector_pool_->GetCollector(seq)->AddRequest( std::move(request), signature, false, diff --git a/platform/consensus/ordering/pbft/query.cpp b/platform/consensus/ordering/pbft/query.cpp index 72bb4ed5..62a7d54f 100644 --- a/platform/consensus/ordering/pbft/query.cpp +++ b/platform/consensus/ordering/pbft/query.cpp @@ -50,28 +50,71 @@ int Query::ProcessGetReplicaState(std::unique_ptr<Context> context, int Query::ProcessQuery(std::unique_ptr<Context> context, std::unique_ptr<Request> request) { + + if (config_.GetPublicKeyCertificateInfo() + .public_key() + .public_key_info() + .type() == CertificateKeyInfo::CLIENT) { + + auto find_primary = [&](){ + auto config_data = config_.GetConfigData(); + for (const auto& r : config_data.region()) { + for (const auto& replica : r.replica_info()) { + if(replica.id() == 1){ + return replica; + } + } + } + }; + ReplicaInfo primary = find_primary(); + std::string ip = primary.ip(); + int port = primary.port(); + + LOG(ERROR)<<"redirect to primary:"<<ip<<" port:"<<port; + auto client = std::make_unique<NetChannel>(ip, port); + if (client->SendRawMessage(*request) == 0) { + QueryResponse resp; + if (client->RecvRawMessage(&resp) == 0) { + if (context != nullptr && context->client != nullptr) { + LOG(ERROR) << "send response from primary:"<<resp.transactions_size(); + int ret = context->client->SendRawMessage(resp); + if (ret) { + LOG(ERROR) << "send resp fail ret:" << ret; + } + } + } + } + return 0; + } + QueryRequest query; if (!query.ParseFromString(request->data())) { LOG(ERROR) << "parse data fail"; return -2; } - // LOG(ERROR) << "request:" << query.DebugString(); QueryResponse response; - for (uint64_t i = query.min_seq(); i <= query.max_seq(); ++i) { - Request* ret_request = message_manager_->GetRequest(i); - if (ret_request == nullptr) { - break; + if (query.max_seq() == 0 && query.min_seq() == 0){ + uint64_t mseq = message_manager_->GetNextSeq(); + response.set_max_seq(mseq-1); + LOG(ERROR)<<"get max seq:"<<mseq; + } + else { + for (uint64_t i = query.min_seq(); i <= query.max_seq(); ++i) { + Request* ret_request = message_manager_->GetRequest(i); + if (ret_request == nullptr) { + break; + } + Request* txn = response.add_transactions(); + txn->set_data(ret_request->data()); + txn->set_hash(ret_request->hash()); + txn->set_seq(ret_request->seq()); + txn->set_proxy_id(ret_request->proxy_id()); } - Request* txn = response.add_transactions(); - txn->set_data(ret_request->data()); - txn->set_hash(ret_request->hash()); - txn->set_seq(ret_request->seq()); - txn->set_proxy_id(ret_request->proxy_id()); } if (context != nullptr && context->client != nullptr) { - // LOG(ERROR) << "send response:" << response.DebugString(); + //LOG(ERROR) << "send response:" << response.DebugString(); int ret = context->client->SendRawMessage(response); if (ret) { LOG(ERROR) << "send resp fail ret:" << ret; diff --git a/platform/consensus/ordering/pbft/response_manager.cpp b/platform/consensus/ordering/pbft/response_manager.cpp index 6a7162d8..091a75cc 100644 --- a/platform/consensus/ordering/pbft/response_manager.cpp +++ b/platform/consensus/ordering/pbft/response_manager.cpp @@ -57,6 +57,7 @@ ResponseManager::ResponseManager(const ResDBConfig& config, verifier_(verifier) { stop_ = false; local_id_ = 1; + timeout_length_ = 5000000; if (config_.GetPublicKeyCertificateInfo() .public_key() @@ -172,8 +173,20 @@ CollectorResultCode ResponseManager::AddResponseMsg( return CollectorResultCode::INVALID; } + std::string hash = request->hash(); + + std::unique_ptr<BatchUserResponse> batch_response = std::make_unique<BatchUserResponse>(); + if (!batch_response->ParseFromString(request->data())) { + LOG(ERROR) << "parse response fail:"<<request->data().size() + <<" seq:"<<request->seq(); + RemoveWaitingResponseRequest(hash); + return CollectorResultCode::INVALID; + } + + uint64_t seq = batch_response->local_id(); + request->set_seq(seq); + int type = request->type(); - uint64_t seq = request->seq(); int resp_received_count = 0; int ret = collector_pool_->GetCollector(seq)->AddRequest( std::move(request), signature, false, @@ -190,6 +203,7 @@ CollectorResultCode ResponseManager::AddResponseMsg( } if (resp_received_count > 0) { collector_pool_->Update(seq); + RemoveWaitingResponseRequest(hash); return CollectorResultCode::STATE_CHANGED; } return CollectorResultCode::OK; @@ -295,7 +309,7 @@ int ResponseManager::DoBatch( if (!config_.IsPerformanceRunning()) { LOG(ERROR) << "add context list:" << new_request->seq() - << " list size:" << context_list.size(); + << " list size:" << context_list.size()<<" local_id:"<<local_id_; batch_request.set_local_id(local_id_); int ret = AddContextList(std::move(context_list), local_id_++); if (ret != 0) { @@ -318,13 +332,10 @@ int ResponseManager::DoBatch( batch_request.SerializeToString(new_request->mutable_data()); new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data())); new_request->set_proxy_id(config_.GetSelfInfo().id()); - /*for(int i=1; i<=4; i++){ - replica_communicator_->SendMessage(*new_request, i); - }*/ replica_communicator_->SendMessage(*new_request, GetPrimary()); send_num_++; - LOG(INFO) << "send msg to primary:" << GetPrimary() - << " batch size:" << batch_req.size(); + //LOG(INFO) << "send msg to primary:" << GetPrimary() + // << " batch size:" << batch_req.size(); AddWaitingResponseRequest(std::move(new_request)); return 0; } @@ -335,7 +346,8 @@ void ResponseManager::AddWaitingResponseRequest( return; } pm_lock_.lock(); - uint64_t time = GetCurrentTime() + this->timeout_length_; + assert(timeout_length_>0); + uint64_t time = GetCurrentTime() + timeout_length_; client_timeout_min_heap_.push( ResponseClientTimeout(request->hash(), time)); waiting_response_batches_.insert( @@ -344,7 +356,7 @@ void ResponseManager::AddWaitingResponseRequest( sem_post(&request_sent_signal_); } -void ResponseManager::RemoveWaitingResponseRequest(std::string hash) { +void ResponseManager::RemoveWaitingResponseRequest(const std::string& hash) { if (!config_.GetConfigData().enable_viewchange()) { return; } @@ -390,7 +402,6 @@ void ResponseManager::MonitoringClientTimeOut() { if (CheckTimeOut(client_timeout.hash)) { auto request = GetTimeOutRequest(client_timeout.hash); if (request) { - LOG(ERROR) << "Client Request Timeout " << client_timeout.hash; replica_communicator_->BroadCast(*request); } } diff --git a/platform/consensus/ordering/pbft/response_manager.h b/platform/consensus/ordering/pbft/response_manager.h index b238e0df..36412eaf 100644 --- a/platform/consensus/ordering/pbft/response_manager.h +++ b/platform/consensus/ordering/pbft/response_manager.h @@ -77,7 +77,7 @@ class ResponseManager { int GetPrimary(); void AddWaitingResponseRequest(std::unique_ptr<Request> request); - void RemoveWaitingResponseRequest(std::string hash); + void RemoveWaitingResponseRequest(const std::string& hash); bool CheckTimeOut(std::string hash); void ResponseTimer(std::string hash); void MonitoringClientTimeOut(); diff --git a/platform/consensus/recovery/recovery.cpp b/platform/consensus/recovery/recovery.cpp index fb1f6d50..d106075a 100644 --- a/platform/consensus/recovery/recovery.cpp +++ b/platform/consensus/recovery/recovery.cpp @@ -511,15 +511,17 @@ void Recovery::ReadLogsFromFiles( if (request_list.size() == 0) { ftruncate(fd, 0); } + uint64_t max_seq = 0; for (std::unique_ptr<RecoveryData>& recovery_data : request_list) { if (ckpt < recovery_data->request->seq()) { recovery_data->request->set_is_recovery(true); + max_seq = recovery_data->request->seq(); call_back(std::move(recovery_data->context), std::move(recovery_data->request)); } } - LOG(INFO) << "read log from files:" << path << " done"; + LOG(ERROR) << "read log from files:" << path << " done"<<" recovery max seq:"<<max_seq; close(fd); } diff --git a/platform/networkstrate/async_acceptor.cpp b/platform/networkstrate/async_acceptor.cpp index 9fd85a06..b61a510b 100644 --- a/platform/networkstrate/async_acceptor.cpp +++ b/platform/networkstrate/async_acceptor.cpp @@ -77,7 +77,7 @@ void AsyncAcceptor::Session::ReadDone() { delete recv_buffer_; } else { data_size_ = *reinterpret_cast<size_t*>(recv_buffer_); - if (data_size_ > 1e6) { + if (data_size_ > 1e10) { LOG(ERROR) << "read data size:" << data_size_ << " data size:" << sizeof(data_size_) << " close socket"; Close(); diff --git a/platform/networkstrate/consensus_manager.cpp b/platform/networkstrate/consensus_manager.cpp index 00b70810..813ad1fc 100644 --- a/platform/networkstrate/consensus_manager.cpp +++ b/platform/networkstrate/consensus_manager.cpp @@ -90,21 +90,44 @@ void ConsensusManager::HeartBeat() { std::mutex mutex; std::condition_variable cv; while (IsRunning()) { + { + std::unique_lock<std::mutex> lk(hb_mutex_); + SendHeartBeat(); + } + std::unique_lock<std::mutex> lk(mutex); + cv.wait_for(lk, std::chrono::microseconds(sleep_time * 1000000), + [&] { return !IsRunning(); }); + if (is_ready_) { + if (config_.IsTestMode()) { + sleep_time = 1; + } else { + sleep_time = 60; + } + } + } +} + +void ConsensusManager::SendHeartBeat() { auto keys = verifier_->GetAllPublicKeys(); std::vector<ReplicaInfo> replicas = GetAllReplicas(); LOG(ERROR) << "all replicas:" << replicas.size(); std::vector<ReplicaInfo> client_replicas = GetClientReplicas(); HeartBeatInfo hb_info; + hb_info.set_sender(config_.GetSelfInfo().id()); + hb_info.set_ip(config_.GetSelfInfo().ip()); + hb_info.set_port(config_.GetSelfInfo().port()); + hb_info.set_hb_version(version_); for (const auto& key : keys) { *hb_info.add_public_keys() = key; + hb_info.add_node_version(hb_[key.public_key_info().node_id()]); } for (const auto& client : client_replicas) { replicas.push_back(client); } auto client = GetReplicaClient(replicas, false); if (client == nullptr) { - continue; + return; } // If it is not a client node, broadcost the current primary to the client. @@ -131,17 +154,6 @@ void ConsensusManager::HeartBeat() { LOG(ERROR) << " server:" << config_.GetSelfInfo().id() << " sends HB fail:" << ret; } - std::unique_lock<std::mutex> lk(mutex); - cv.wait_for(lk, std::chrono::microseconds(sleep_time * 1000000), - [&] { return !IsRunning(); }); - if (is_ready_) { - if (config_.IsTestMode()) { - sleep_time = 1; - } else { - sleep_time = 60 * 2; - } - } - } } // Porcess the packages received from the network. @@ -158,35 +170,35 @@ int ConsensusManager::Process(std::unique_ptr<Context> context, return -1; } + std::unique_ptr<Request> request = std::make_unique<Request>(); + if (!request->ParseFromString(message.data())) { + LOG(ERROR) << "parse data info fail"; + return -1; + } + + if (request->type() == Request::TYPE_HEART_BEAT) { + return Dispatch(std::move(context), std::move(request)); + } + // Check if the certificate is valid. if (message.has_signature() && verifier_) { bool valid = verifier_->VerifyMessage(message.data(), message.signature()); if (!valid) { LOG(ERROR) << "request is not valid:" - << message.signature().DebugString(); - LOG(ERROR) << " msg:" << message.data().size(); + << message.signature().DebugString(); + LOG(ERROR) << " msg:" << message.data().size()<<" is recovery:"<<request->is_recovery(); return -2; } } else { } - std::unique_ptr<Request> request = std::make_unique<Request>(); - if (!request->ParseFromString(message.data())) { - LOG(ERROR) << "parse data info fail"; - return -1; - } - - std::string tmp; - if (!request->SerializeToString(&tmp)) { - return -1; - } + // forward the signature to the request so that it can be included in the + // request/response set if needed. + context->signature = message.signature(); + // LOG(ERROR) << "======= server:" << config_.GetSelfInfo().id() + // << " get request type:" << request->type() + // << " from:" << request->sender_id(); - // forward the signature to the request so that it can be included in the - // request/response set if needed. - context->signature = message.signature(); - // LOG(ERROR) << "======= server:" << config_.GetSelfInfo().id() - // << " get request type:" << request->type() - // << " from:" << request->sender_id(); return Dispatch(std::move(context), std::move(request)); } @@ -202,6 +214,7 @@ int ConsensusManager::Dispatch(std::unique_ptr<Context> context, int ConsensusManager::ProcessHeartBeat(std::unique_ptr<Context> context, std::unique_ptr<Request> request) { + std::unique_lock<std::mutex> lk(hb_mutex_); std::vector<ReplicaInfo> replicas = GetReplicas(); HeartBeatInfo hb_info; if (!hb_info.ParseFromString(request->data())) { @@ -212,7 +225,10 @@ int ConsensusManager::ProcessHeartBeat(std::unique_ptr<Context> context, LOG(ERROR) << "receive public size:" << hb_info.public_keys().size() << " primary:" << hb_info.primary() << " version:" << hb_info.version() - << " from region:" << request->region_info().region_id(); + << " from region:" << request->region_info().region_id() + << " sender:"<<hb_info.sender() + << " last send:"<< hb_info.hb_version() + << " current v:"<<hb_[hb_info.sender()]; if (request->region_info().region_id() == config_.GetConfigData().self_region_id()) { @@ -261,6 +277,17 @@ int ConsensusManager::ProcessHeartBeat(std::unique_ptr<Context> context, } } } + + if(!hb_info.ip().empty() && hb_info.hb_version() > 0&&hb_[hb_info.sender()] != hb_info.hb_version()) { + ReplicaInfo info; + info.set_ip(hb_info.ip()); + info.set_port(hb_info.port()); + info.set_id(hb_info.sender()); + //bc_client_->Flush(info); + hb_[hb_info.sender()] = hb_info.hb_version(); + SendHeartBeat(); + } + if (!is_ready_ && replica_num >= config_.GetMinDataReceiveNum()) { LOG(ERROR) << "============ Server " << config_.GetSelfInfo().id() << " is ready " diff --git a/platform/networkstrate/consensus_manager.h b/platform/networkstrate/consensus_manager.h index fffc9fc7..d775f9d8 100644 --- a/platform/networkstrate/consensus_manager.h +++ b/platform/networkstrate/consensus_manager.h @@ -89,6 +89,7 @@ class ConsensusManager : public ServiceInterface { private: void HeartBeat(); + void SendHeartBeat(); void BroadCastThread(); protected: @@ -105,6 +106,9 @@ class ConsensusManager : public ServiceInterface { std::unique_ptr<ReplicaCommunicator> bc_client_; std::vector<ReplicaInfo> clients_; Stats* global_stats_; + uint64_t version_; + std::map<int,uint64_t> hb_; + std::mutex hb_mutex_; }; } // namespace resdb diff --git a/platform/proto/resdb.proto b/platform/proto/resdb.proto index 71ed4591..0efea6bf 100644 --- a/platform/proto/resdb.proto +++ b/platform/proto/resdb.proto @@ -115,8 +115,13 @@ message BatchUserResponse { message HeartBeatInfo{ repeated CertificateKey public_keys = 1; + repeated int64 node_version = 8; uint32 primary = 2; uint64 version= 3; + int32 sender = 4; + string ip = 5; + int32 port = 6; + int64 hb_version = 7; } message ClientCertInfo { diff --git a/scripts/deploy/config/kv_server.conf b/scripts/deploy/config/kv_server.conf index a811f768..500e34d6 100644 --- a/scripts/deploy/config/kv_server.conf +++ b/scripts/deploy/config/kv_server.conf @@ -1,8 +1,8 @@ iplist=( -172.31.52.247 -172.31.54.193 -172.31.55.48 -172.31.53.140 +172.31.57.186 +172.31.57.186 +172.31.57.186 +172.31.57.186 172.31.57.186 ) diff --git a/scripts/deploy/script/deploy.sh b/scripts/deploy/script/deploy.sh index 87ab7dc9..6da8deb8 100755 --- a/scripts/deploy/script/deploy.sh +++ b/scripts/deploy/script/deploy.sh @@ -14,6 +14,7 @@ server=//service/kv:kv_service fi # obtain the src path +main_folder=resilientdb_app server_path=`echo "$server" | sed 's/:/\//g'` server_path=${server_path:1} server_name=`echo "$server" | awk -F':' '{print $NF}'` @@ -60,10 +61,12 @@ fi # commands functions function run_cmd(){ count=1 + idx=1 for ip in ${deploy_iplist[@]}; do - ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "$1" & + ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "cd ${main_folder}/$idx; $1" & ((count++)) + ((idx++)) done while [ $count -gt 0 ]; do @@ -76,6 +79,15 @@ function run_one_cmd(){ ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "$1" } +idx=1 +for ip in ${deploy_iplist[@]}; +do + run_one_cmd "mkdir -p ${main_folder}/$idx" & + ((count++)) + ((idx++)) +done + + run_cmd "killall -9 ${server_bin}" run_cmd "rm -rf ${server_bin}; rm ${server_bin}*.log; rm -rf server.config; rm -rf cert;" @@ -83,11 +95,13 @@ sleep 1 # upload config files and binary echo "upload configs" +idx=1 count=0 for ip in ${deploy_iplist[@]}; do - scp -i ${key} -r ${bin_path} ${output_path}/server.config ${output_path}/cert ubuntu@${ip}:/home/ubuntu/ & + scp -i ${key} -r ${bin_path} ${output_path}/server.config ${output_path}/cert ubuntu@${ip}:/home/ubuntu/${main_folder}/$idx & ((count++)) + ((idx++)) done while [ $count -gt 0 ]; do @@ -103,9 +117,10 @@ for ip in ${deploy_iplist[@]}; do private_key="cert/node_"${idx}".key.pri" cert="cert/cert_"${idx}".cert" - run_one_cmd "nohup ./${server_bin} server.config ${private_key} ${cert} ${grafna_port} > ${server_bin}.log 2>&1 &" & + run_one_cmd "cd ${main_folder}/$idx; nohup ./${server_bin} server.config ${private_key} ${cert} ${grafna_port} > ${server_bin}.log 2>&1 &" & ((count++)) ((idx++)) + ((grafna_port++)) done while [ $count -gt 0 ]; do @@ -120,7 +135,7 @@ do resp="" while [ "$resp" = "" ] do - resp=`ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "grep \"receive public size:${#iplist[@]}\" ${server_bin}.log"` + resp=`ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} "cd ${main_folder}/$idx; grep \"receive public size:${#iplist[@]}\" ${server_bin}.log"` if [ "$resp" = "" ]; then sleep 1 fi diff --git a/service/kv/kv_service.cpp b/service/kv/kv_service.cpp index 11272aa4..6cd20e1e 100644 --- a/service/kv/kv_service.cpp +++ b/service/kv/kv_service.cpp @@ -60,6 +60,7 @@ int main(int argc, char** argv) { exit(0); } google::InitGoogleLogging(argv[0]); + FLAGS_minloglevel = 1; char* config_file = argv[1]; char* private_key_file = argv[2]; diff --git a/service/tools/kv/api_tools/kv_client_txn_tools.cpp b/service/tools/kv/api_tools/kv_client_txn_tools.cpp index 4ebff4de..55f0f6c4 100644 --- a/service/tools/kv/api_tools/kv_client_txn_tools.cpp +++ b/service/tools/kv/api_tools/kv_client_txn_tools.cpp @@ -49,8 +49,6 @@ int main(int argc, char** argv) { ResDBTxnAccessor client(config); auto resp = client.GetTxn(min_seq, max_seq); - absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( - uint64_t min_seq, uint64_t max_seq); if (!resp.ok()) { LOG(ERROR) << "get replica state fail"; exit(1);
