This is an automated email from the ASF dual-hosted git repository.

junchao pushed a commit to branch new_deploy
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git

commit ea3ba69d29a86783941d8aafbf7468bb0fd05c6e
Author: cjcchen <[email protected]>
AuthorDate: Fri May 3 01:12:05 2024 +0000

    fix viewchange bug
---
 interface/common/resdb_txn_accessor.cpp            | 21 +++--
 .../consensus/ordering/pbft/checkpoint_manager.cpp |  5 +-
 platform/consensus/ordering/pbft/commitment.cpp    | 12 +--
 .../consensus/ordering/pbft/message_manager.cpp    |  5 +-
 .../ordering/pbft/performance_manager.cpp          | 10 ++-
 platform/consensus/ordering/pbft/query.cpp         | 65 +++++++++++++---
 .../consensus/ordering/pbft/response_manager.cpp   | 31 +++++---
 .../consensus/ordering/pbft/response_manager.h     |  2 +-
 platform/consensus/recovery/recovery.cpp           |  4 +-
 platform/networkstrate/async_acceptor.cpp          |  2 +-
 platform/networkstrate/consensus_manager.cpp       | 89 ++++++++++++++--------
 platform/networkstrate/consensus_manager.h         |  4 +
 platform/proto/resdb.proto                         |  5 ++
 scripts/deploy/config/kv_server.conf               |  8 +-
 scripts/deploy/script/deploy.sh                    | 23 +++++-
 service/kv/kv_service.cpp                          |  1 +
 service/tools/kv/api_tools/kv_client_txn_tools.cpp |  2 -
 17 files changed, 203 insertions(+), 86 deletions(-)

diff --git a/interface/common/resdb_txn_accessor.cpp 
b/interface/common/resdb_txn_accessor.cpp
index 532d432a..ca19753d 100644
--- a/interface/common/resdb_txn_accessor.cpp
+++ b/interface/common/resdb_txn_accessor.cpp
@@ -59,15 +59,12 @@ ResDBTxnAccessor::GetTxn(uint64_t min_seq, uint64_t 
max_seq) {
     ths.push_back(std::thread(
         [&](NetChannel* client) {
           std::string response_str;
-          int ret = -1;
-          for (int i = 0; i < 3 && ret < 0; ++i) {
-            ret = client->SendRequest(request, Request::TYPE_QUERY);
-            if (ret) {
-              return;
-            }
-            client->SetRecvTimeout(1000);
-            ret = client->RecvRawMessageStr(&response_str);
+          int ret = client->SendRequest(request, Request::TYPE_QUERY);
+          if (ret) {
+            return;
           }
+          client->SetRecvTimeout(1000);
+          ret = client->RecvRawMessageStr(&response_str);
           if (ret == 0) {
             std::unique_lock<std::mutex> lck(mtx);
             recv_count[response_str]++;
@@ -160,11 +157,11 @@ absl::StatusOr<uint64_t> 
ResDBTxnAccessor::GetBlockNumbers() {
   std::string final_str;
   std::mutex mtx;
   std::condition_variable resp_cv;
+  bool success = false;
 
-  std::unique_ptr<NetChannel> client =
-      GetNetChannel(replicas_[0].ip(), replicas_[0].port());
+  std::unique_ptr<NetChannel> client = GetNetChannel(replicas_[0].ip(), 
replicas_[0].port());
 
-  LOG(ERROR) << "ip:" << replicas_[0].ip() << " port:" << replicas_[0].port();
+  LOG(INFO)<<"ip:"<<replicas_[0].ip()<<" port:"<<replicas_[0].port();
 
   std::string response_str;
   int ret = 0;
@@ -175,7 +172,7 @@ absl::StatusOr<uint64_t> 
ResDBTxnAccessor::GetBlockNumbers() {
     }
     client->SetRecvTimeout(100000);
     ret = client->RecvRawMessageStr(&response_str);
-    LOG(ERROR) << "receive str:" << ret << " len:" << response_str.size();
+         LOG(INFO)<<"receive str:"<<ret<<" len:"<<response_str.size();
     if (ret != 0) {
       continue;
     }
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp 
b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
index ef748308..db633c09 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
@@ -312,11 +312,14 @@ void CheckPointManager::UpdateCheckPointStatus() {
       last_hash_ = GetHash(last_hash_, request->hash());
       last_seq_++;
     }
+    bool is_recovery = request->is_recovery();
     txn_db_->Put(std::move(request));
 
     if (current_seq == last_ckpt_seq + water_mark) {
       last_ckpt_seq = current_seq;
-      BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs, 
stable_seqs);
+      if(!is_recovery){
+        BroadcastCheckPoint(last_ckpt_seq, last_hash_, stable_hashs, 
stable_seqs);
+      }
     }
   }
   return;
diff --git a/platform/consensus/ordering/pbft/commitment.cpp 
b/platform/consensus/ordering/pbft/commitment.cpp
index 081c11d2..aeebe835 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -81,7 +81,7 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context> 
context,
     //            << message_manager_->GetCurrentPrimary()
     //            << " seq:" << user_request->seq()
     //            << " hash:" << user_request->hash();
-    LOG(ERROR)<<"NOT PRIMARY, Primary is 
"<<message_manager_->GetCurrentPrimary();
+    LOG(INFO)<<"NOT PRIMARY, Primary is 
"<<message_manager_->GetCurrentPrimary();
     replica_communicator_->SendMessage(*user_request,
                                        message_manager_->GetCurrentPrimary());
     {
@@ -117,7 +117,6 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context> 
context,
 
   global_stats_->IncClientRequest();
   if (duplicate_manager_->CheckAndAddProposed(user_request->hash())) {
-    LOG(ERROR) << "duplicate check fail:";
     return -2;
   }
   auto seq = message_manager_->AssignNextSeq();
@@ -160,10 +159,13 @@ int 
Commitment::ProcessProposeMsg(std::unique_ptr<Context> context,
     return -2;
   }
   if (request->is_recovery()) {
-    if (static_cast<int64_t>(request->seq()) >=
-        message_manager_->GetNextSeq()) {
+    if (message_manager_->GetNextSeq() == 0 || request->seq() == 
message_manager_->GetNextSeq()) {
       message_manager_->SetNextSeq(request->seq() + 1);
     }
+    else {
+      LOG(ERROR)<<" recovery request not valid:"<<" current 
seq:"<<message_manager_->GetNextSeq()<<" data seq:"<<request->seq();
+      return 0;
+    }
     return message_manager_->AddConsensusMsg(context->signature,
                                              std::move(request));
   }
@@ -310,7 +312,7 @@ int Commitment::PostProcessExecutedMsg() {
     request.set_current_view(batch_resp->current_view());
     request.set_proxy_id(batch_resp->proxy_id());
     request.set_primary_id(batch_resp->primary_id());
-    // LOG(ERROR)<<"send back to proxy:"<<batch_resp->proxy_id();
+    //LOG(ERROR)<<"send back to proxy:"<<batch_resp->proxy_id();
     batch_resp->SerializeToString(request.mutable_data());
     replica_communicator_->SendMessage(request, request.proxy_id());
   }
diff --git a/platform/consensus/ordering/pbft/message_manager.cpp 
b/platform/consensus/ordering/pbft/message_manager.cpp
index 37d3b5de..cc5e187c 100644
--- a/platform/consensus/ordering/pbft/message_manager.cpp
+++ b/platform/consensus/ordering/pbft/message_manager.cpp
@@ -39,6 +39,9 @@ MessageManager::MessageManager(
           [&](std::unique_ptr<Request> request,
               std::unique_ptr<BatchUserResponse> resp_msg) {
             if (request->is_recovery()) {
+              if (checkpoint_manager_) {
+                checkpoint_manager_->AddCommitData(std::move(request));
+              }
               return;
             }
             resp_msg->set_proxy_id(request->proxy_id());
@@ -231,8 +234,6 @@ TransactionStatue 
MessageManager::GetTransactionState(uint64_t seq) {
 }
 
 int MessageManager::GetReplicaState(ReplicaState* state) {
-  state->set_view(GetCurrentView());
-  *state->mutable_replica_info() = config_.GetSelfInfo();
   *state->mutable_replica_config() = config_.GetConfigData();
   return 0;
 }
diff --git a/platform/consensus/ordering/pbft/performance_manager.cpp 
b/platform/consensus/ordering/pbft/performance_manager.cpp
index a5ba979f..ebaf2ed4 100644
--- a/platform/consensus/ordering/pbft/performance_manager.cpp
+++ b/platform/consensus/ordering/pbft/performance_manager.cpp
@@ -191,8 +191,16 @@ CollectorResultCode PerformanceManager::AddResponseMsg(
     return CollectorResultCode::INVALID;
   }
 
+  std::unique_ptr<BatchUserResponse> batch_response = 
std::make_unique<BatchUserResponse>();
+  if (!batch_response->ParseFromString(request->data())) {
+    LOG(ERROR) << "parse response fail:"<<request->data().size()
+    <<" seq:"<<request->seq(); 
+    return CollectorResultCode::INVALID;
+  }
+
+  uint64_t seq = batch_response->local_id();
+
   int type = request->type();
-  uint64_t seq = request->seq();
   int resp_received_count = 0;
   int ret = collector_pool_->GetCollector(seq)->AddRequest(
       std::move(request), signature, false,
diff --git a/platform/consensus/ordering/pbft/query.cpp 
b/platform/consensus/ordering/pbft/query.cpp
index 72bb4ed5..62a7d54f 100644
--- a/platform/consensus/ordering/pbft/query.cpp
+++ b/platform/consensus/ordering/pbft/query.cpp
@@ -50,28 +50,71 @@ int Query::ProcessGetReplicaState(std::unique_ptr<Context> 
context,
 
 int Query::ProcessQuery(std::unique_ptr<Context> context,
                         std::unique_ptr<Request> request) {
+
+  if (config_.GetPublicKeyCertificateInfo()
+      .public_key()
+      .public_key_info()
+      .type() == CertificateKeyInfo::CLIENT) {
+
+    auto find_primary = [&](){
+      auto config_data = config_.GetConfigData();
+      for (const auto& r : config_data.region()) {
+        for (const auto& replica : r.replica_info()) {
+          if(replica.id() == 1){
+            return replica;
+          }
+        }
+      }
+    };
+    ReplicaInfo primary = find_primary();
+    std::string ip = primary.ip();
+    int port = primary.port();
+
+    LOG(ERROR)<<"redirect to primary:"<<ip<<" port:"<<port;
+    auto client = std::make_unique<NetChannel>(ip, port);
+    if (client->SendRawMessage(*request) == 0) {
+      QueryResponse resp;
+      if (client->RecvRawMessage(&resp) == 0) {
+        if (context != nullptr && context->client != nullptr) {
+          LOG(ERROR) << "send response from 
primary:"<<resp.transactions_size();
+          int ret = context->client->SendRawMessage(resp);
+          if (ret) {
+            LOG(ERROR) << "send resp fail ret:" << ret;
+          }
+        }
+      } 
+    } 
+    return 0;
+  }
+
   QueryRequest query;
   if (!query.ParseFromString(request->data())) {
     LOG(ERROR) << "parse data fail";
     return -2;
   }
-  // LOG(ERROR) << "request:" << query.DebugString();
 
   QueryResponse response;
-  for (uint64_t i = query.min_seq(); i <= query.max_seq(); ++i) {
-    Request* ret_request = message_manager_->GetRequest(i);
-    if (ret_request == nullptr) {
-      break;
+  if (query.max_seq() == 0 && query.min_seq() == 0){
+    uint64_t mseq = message_manager_->GetNextSeq();
+    response.set_max_seq(mseq-1);
+    LOG(ERROR)<<"get max seq:"<<mseq;
+  }
+  else {
+    for (uint64_t i = query.min_seq(); i <= query.max_seq(); ++i) {
+      Request* ret_request = message_manager_->GetRequest(i);
+      if (ret_request == nullptr) {
+        break;
+      }
+      Request* txn = response.add_transactions();
+      txn->set_data(ret_request->data());
+      txn->set_hash(ret_request->hash());
+      txn->set_seq(ret_request->seq());
+      txn->set_proxy_id(ret_request->proxy_id());
     }
-    Request* txn = response.add_transactions();
-    txn->set_data(ret_request->data());
-    txn->set_hash(ret_request->hash());
-    txn->set_seq(ret_request->seq());
-    txn->set_proxy_id(ret_request->proxy_id());
   }
 
   if (context != nullptr && context->client != nullptr) {
-    // LOG(ERROR) << "send response:" << response.DebugString();
+    //LOG(ERROR) << "send response:" << response.DebugString();
     int ret = context->client->SendRawMessage(response);
     if (ret) {
       LOG(ERROR) << "send resp fail ret:" << ret;
diff --git a/platform/consensus/ordering/pbft/response_manager.cpp 
b/platform/consensus/ordering/pbft/response_manager.cpp
index 6a7162d8..091a75cc 100644
--- a/platform/consensus/ordering/pbft/response_manager.cpp
+++ b/platform/consensus/ordering/pbft/response_manager.cpp
@@ -57,6 +57,7 @@ ResponseManager::ResponseManager(const ResDBConfig& config,
       verifier_(verifier) {
   stop_ = false;
   local_id_ = 1;
+  timeout_length_ = 5000000;
 
   if (config_.GetPublicKeyCertificateInfo()
               .public_key()
@@ -172,8 +173,20 @@ CollectorResultCode ResponseManager::AddResponseMsg(
     return CollectorResultCode::INVALID;
   }
 
+  std::string hash = request->hash();
+
+  std::unique_ptr<BatchUserResponse> batch_response = 
std::make_unique<BatchUserResponse>();
+  if (!batch_response->ParseFromString(request->data())) {
+    LOG(ERROR) << "parse response fail:"<<request->data().size()
+    <<" seq:"<<request->seq(); 
+    RemoveWaitingResponseRequest(hash);
+    return CollectorResultCode::INVALID;
+  }
+
+  uint64_t seq = batch_response->local_id();
+  request->set_seq(seq);
+
   int type = request->type();
-  uint64_t seq = request->seq();
   int resp_received_count = 0;
   int ret = collector_pool_->GetCollector(seq)->AddRequest(
       std::move(request), signature, false,
@@ -190,6 +203,7 @@ CollectorResultCode ResponseManager::AddResponseMsg(
   }
   if (resp_received_count > 0) {
     collector_pool_->Update(seq);
+    RemoveWaitingResponseRequest(hash);
     return CollectorResultCode::STATE_CHANGED;
   }
   return CollectorResultCode::OK;
@@ -295,7 +309,7 @@ int ResponseManager::DoBatch(
 
   if (!config_.IsPerformanceRunning()) {
     LOG(ERROR) << "add context list:" << new_request->seq()
-               << " list size:" << context_list.size();
+               << " list size:" << context_list.size()<<" 
local_id:"<<local_id_;
     batch_request.set_local_id(local_id_);
     int ret = AddContextList(std::move(context_list), local_id_++);
     if (ret != 0) {
@@ -318,13 +332,10 @@ int ResponseManager::DoBatch(
   batch_request.SerializeToString(new_request->mutable_data());
   new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data()));
   new_request->set_proxy_id(config_.GetSelfInfo().id());
-  /*for(int i=1; i<=4; i++){
-      replica_communicator_->SendMessage(*new_request, i);
-  }*/
   replica_communicator_->SendMessage(*new_request, GetPrimary());
   send_num_++;
-  LOG(INFO) << "send msg to primary:" << GetPrimary()
-            << " batch size:" << batch_req.size();
+  //LOG(INFO) << "send msg to primary:" << GetPrimary()
+  //          << " batch size:" << batch_req.size();
   AddWaitingResponseRequest(std::move(new_request));
   return 0;
 }
@@ -335,7 +346,8 @@ void ResponseManager::AddWaitingResponseRequest(
     return;
   }
   pm_lock_.lock();
-  uint64_t time = GetCurrentTime() + this->timeout_length_;
+  assert(timeout_length_>0);
+  uint64_t time = GetCurrentTime() + timeout_length_;
   client_timeout_min_heap_.push(
       ResponseClientTimeout(request->hash(), time));
   waiting_response_batches_.insert(
@@ -344,7 +356,7 @@ void ResponseManager::AddWaitingResponseRequest(
   sem_post(&request_sent_signal_);
 }
 
-void ResponseManager::RemoveWaitingResponseRequest(std::string hash) {
+void ResponseManager::RemoveWaitingResponseRequest(const std::string& hash) {
   if (!config_.GetConfigData().enable_viewchange()) {
     return;
   }
@@ -390,7 +402,6 @@ void ResponseManager::MonitoringClientTimeOut() {
     if (CheckTimeOut(client_timeout.hash)) {
       auto request = GetTimeOutRequest(client_timeout.hash);
       if (request) {
-        LOG(ERROR) << "Client Request Timeout " << client_timeout.hash;
         replica_communicator_->BroadCast(*request);
       }
     }
diff --git a/platform/consensus/ordering/pbft/response_manager.h 
b/platform/consensus/ordering/pbft/response_manager.h
index b238e0df..36412eaf 100644
--- a/platform/consensus/ordering/pbft/response_manager.h
+++ b/platform/consensus/ordering/pbft/response_manager.h
@@ -77,7 +77,7 @@ class ResponseManager {
   int GetPrimary();
 
   void AddWaitingResponseRequest(std::unique_ptr<Request> request);
-  void RemoveWaitingResponseRequest(std::string hash);
+  void RemoveWaitingResponseRequest(const std::string& hash);
   bool CheckTimeOut(std::string hash);
   void ResponseTimer(std::string hash);
   void MonitoringClientTimeOut();
diff --git a/platform/consensus/recovery/recovery.cpp 
b/platform/consensus/recovery/recovery.cpp
index fb1f6d50..d106075a 100644
--- a/platform/consensus/recovery/recovery.cpp
+++ b/platform/consensus/recovery/recovery.cpp
@@ -511,15 +511,17 @@ void Recovery::ReadLogsFromFiles(
   if (request_list.size() == 0) {
     ftruncate(fd, 0);
   }
+  uint64_t max_seq = 0;
   for (std::unique_ptr<RecoveryData>& recovery_data : request_list) {
     if (ckpt < recovery_data->request->seq()) {
       recovery_data->request->set_is_recovery(true);
+      max_seq = recovery_data->request->seq();
       call_back(std::move(recovery_data->context),
                 std::move(recovery_data->request));
     }
   }
 
-  LOG(INFO) << "read log from files:" << path << " done";
+  LOG(ERROR) << "read log from files:" << path << " done"<<" recovery max 
seq:"<<max_seq;
 
   close(fd);
 }
diff --git a/platform/networkstrate/async_acceptor.cpp 
b/platform/networkstrate/async_acceptor.cpp
index 9fd85a06..b61a510b 100644
--- a/platform/networkstrate/async_acceptor.cpp
+++ b/platform/networkstrate/async_acceptor.cpp
@@ -77,7 +77,7 @@ void AsyncAcceptor::Session::ReadDone() {
     delete recv_buffer_;
   } else {
     data_size_ = *reinterpret_cast<size_t*>(recv_buffer_);
-    if (data_size_ > 1e6) {
+    if (data_size_ > 1e10) {
       LOG(ERROR) << "read data size:" << data_size_
                  << " data size:" << sizeof(data_size_) << " close socket";
       Close();
diff --git a/platform/networkstrate/consensus_manager.cpp 
b/platform/networkstrate/consensus_manager.cpp
index 00b70810..813ad1fc 100644
--- a/platform/networkstrate/consensus_manager.cpp
+++ b/platform/networkstrate/consensus_manager.cpp
@@ -90,21 +90,44 @@ void ConsensusManager::HeartBeat() {
   std::mutex mutex;
   std::condition_variable cv;
   while (IsRunning()) {
+    {
+      std::unique_lock<std::mutex> lk(hb_mutex_);
+      SendHeartBeat();
+    }
+    std::unique_lock<std::mutex> lk(mutex);
+    cv.wait_for(lk, std::chrono::microseconds(sleep_time * 1000000),
+                [&] { return !IsRunning(); });
+    if (is_ready_) {
+      if (config_.IsTestMode()) {
+        sleep_time = 1;
+      } else {
+        sleep_time = 60;
+      }
+    }
+  }
+}
+
+void ConsensusManager::SendHeartBeat() {
     auto keys = verifier_->GetAllPublicKeys();
 
     std::vector<ReplicaInfo> replicas = GetAllReplicas();
     LOG(ERROR) << "all replicas:" << replicas.size();
     std::vector<ReplicaInfo> client_replicas = GetClientReplicas();
     HeartBeatInfo hb_info;
+    hb_info.set_sender(config_.GetSelfInfo().id());
+    hb_info.set_ip(config_.GetSelfInfo().ip());
+    hb_info.set_port(config_.GetSelfInfo().port());
+    hb_info.set_hb_version(version_);
     for (const auto& key : keys) {
       *hb_info.add_public_keys() = key;
+      hb_info.add_node_version(hb_[key.public_key_info().node_id()]);
     }
     for (const auto& client : client_replicas) {
       replicas.push_back(client);
     }
     auto client = GetReplicaClient(replicas, false);
     if (client == nullptr) {
-      continue;
+      return;
     }
 
     // If it is not a client node, broadcost the current primary to the client.
@@ -131,17 +154,6 @@ void ConsensusManager::HeartBeat() {
       LOG(ERROR) << " server:" << config_.GetSelfInfo().id()
                  << " sends HB fail:" << ret;
     }
-    std::unique_lock<std::mutex> lk(mutex);
-    cv.wait_for(lk, std::chrono::microseconds(sleep_time * 1000000),
-                [&] { return !IsRunning(); });
-    if (is_ready_) {
-      if (config_.IsTestMode()) {
-        sleep_time = 1;
-      } else {
-        sleep_time = 60 * 2;
-      }
-    }
-  }
 }
 
 // Porcess the packages received from the network.
@@ -158,35 +170,35 @@ int ConsensusManager::Process(std::unique_ptr<Context> 
context,
     return -1;
   }
 
+  std::unique_ptr<Request> request = std::make_unique<Request>();
+  if (!request->ParseFromString(message.data())) {
+    LOG(ERROR) << "parse data info fail";
+    return -1;
+  }
+
+  if (request->type() == Request::TYPE_HEART_BEAT) {
+    return Dispatch(std::move(context), std::move(request));
+  }
+
   // Check if the certificate is valid.
   if (message.has_signature() && verifier_) {
     bool valid = verifier_->VerifyMessage(message.data(), message.signature());
     if (!valid) {
       LOG(ERROR) << "request is not valid:"
-                 << message.signature().DebugString();
-      LOG(ERROR) << " msg:" << message.data().size();
+        << message.signature().DebugString();
+      LOG(ERROR) << " msg:" << message.data().size()<<" is 
recovery:"<<request->is_recovery();
       return -2;
     }
   } else {
   }
 
-  std::unique_ptr<Request> request = std::make_unique<Request>();
-  if (!request->ParseFromString(message.data())) {
-    LOG(ERROR) << "parse data info fail";
-    return -1;
-  }
-
-  std::string tmp;
-  if (!request->SerializeToString(&tmp)) {
-    return -1;
-  }
+    // forward the signature to the request so that it can be included in the
+    // request/response set if needed.
+    context->signature = message.signature();
+    // LOG(ERROR) << "======= server:" << config_.GetSelfInfo().id()
+    //          << " get request type:" << request->type()
+    //         << " from:" << request->sender_id();
 
-  // forward the signature to the request so that it can be included in the
-  // request/response set if needed.
-  context->signature = message.signature();
-  // LOG(ERROR) << "======= server:" << config_.GetSelfInfo().id()
-  //          << " get request type:" << request->type()
-  //         << " from:" << request->sender_id();
   return Dispatch(std::move(context), std::move(request));
 }
 
@@ -202,6 +214,7 @@ int ConsensusManager::Dispatch(std::unique_ptr<Context> 
context,
 
 int ConsensusManager::ProcessHeartBeat(std::unique_ptr<Context> context,
                                        std::unique_ptr<Request> request) {
+  std::unique_lock<std::mutex> lk(hb_mutex_);
   std::vector<ReplicaInfo> replicas = GetReplicas();
   HeartBeatInfo hb_info;
   if (!hb_info.ParseFromString(request->data())) {
@@ -212,7 +225,10 @@ int 
ConsensusManager::ProcessHeartBeat(std::unique_ptr<Context> context,
   LOG(ERROR) << "receive public size:" << hb_info.public_keys().size()
              << " primary:" << hb_info.primary()
              << " version:" << hb_info.version()
-             << " from region:" << request->region_info().region_id();
+            << " from region:" << request->region_info().region_id() 
+            << " sender:"<<hb_info.sender() 
+            << " last send:"<< hb_info.hb_version() 
+            << " current v:"<<hb_[hb_info.sender()];
 
   if (request->region_info().region_id() ==
       config_.GetConfigData().self_region_id()) {
@@ -261,6 +277,17 @@ int 
ConsensusManager::ProcessHeartBeat(std::unique_ptr<Context> context,
       }
     }
   }
+
+  if(!hb_info.ip().empty() && hb_info.hb_version() > 0&&hb_[hb_info.sender()] 
!= hb_info.hb_version()) {
+    ReplicaInfo info;
+    info.set_ip(hb_info.ip());
+    info.set_port(hb_info.port());
+    info.set_id(hb_info.sender());
+    //bc_client_->Flush(info);
+    hb_[hb_info.sender()] = hb_info.hb_version();
+    SendHeartBeat();
+  }
+
   if (!is_ready_ && replica_num >= config_.GetMinDataReceiveNum()) {
     LOG(ERROR) << "============ Server " << config_.GetSelfInfo().id()
                << " is ready "
diff --git a/platform/networkstrate/consensus_manager.h 
b/platform/networkstrate/consensus_manager.h
index fffc9fc7..d775f9d8 100644
--- a/platform/networkstrate/consensus_manager.h
+++ b/platform/networkstrate/consensus_manager.h
@@ -89,6 +89,7 @@ class ConsensusManager : public ServiceInterface {
 
  private:
   void HeartBeat();
+  void SendHeartBeat();
   void BroadCastThread();
 
  protected:
@@ -105,6 +106,9 @@ class ConsensusManager : public ServiceInterface {
   std::unique_ptr<ReplicaCommunicator> bc_client_;
   std::vector<ReplicaInfo> clients_;
   Stats* global_stats_;
+  uint64_t version_;
+  std::map<int,uint64_t> hb_;
+  std::mutex hb_mutex_;
 };
 
 }  // namespace resdb
diff --git a/platform/proto/resdb.proto b/platform/proto/resdb.proto
index 71ed4591..0efea6bf 100644
--- a/platform/proto/resdb.proto
+++ b/platform/proto/resdb.proto
@@ -115,8 +115,13 @@ message BatchUserResponse {
 
 message HeartBeatInfo{
   repeated CertificateKey public_keys = 1;
+  repeated int64 node_version = 8;
   uint32 primary = 2;
   uint64 version= 3;
+  int32 sender = 4;
+  string ip = 5;
+  int32 port = 6;
+  int64 hb_version = 7;
 }
 
 message ClientCertInfo {
diff --git a/scripts/deploy/config/kv_server.conf 
b/scripts/deploy/config/kv_server.conf
index a811f768..500e34d6 100644
--- a/scripts/deploy/config/kv_server.conf
+++ b/scripts/deploy/config/kv_server.conf
@@ -1,8 +1,8 @@
 iplist=(
-172.31.52.247
-172.31.54.193
-172.31.55.48
-172.31.53.140
+172.31.57.186
+172.31.57.186
+172.31.57.186
+172.31.57.186
 172.31.57.186
 )
 
diff --git a/scripts/deploy/script/deploy.sh b/scripts/deploy/script/deploy.sh
index 87ab7dc9..6da8deb8 100755
--- a/scripts/deploy/script/deploy.sh
+++ b/scripts/deploy/script/deploy.sh
@@ -14,6 +14,7 @@ server=//service/kv:kv_service
 fi
 
 # obtain the src path
+main_folder=resilientdb_app
 server_path=`echo "$server" | sed 's/:/\//g'`
 server_path=${server_path:1}
 server_name=`echo "$server" | awk -F':' '{print $NF}'`
@@ -60,10 +61,12 @@ fi
 # commands functions
 function run_cmd(){
   count=1
+  idx=1
   for ip in ${deploy_iplist[@]};
   do
-     ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no 
ubuntu@${ip} "$1" &
+     ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no 
ubuntu@${ip} "cd ${main_folder}/$idx; $1" &
     ((count++))
+    ((idx++))
   done
 
   while [ $count -gt 0 ]; do
@@ -76,6 +79,15 @@ function run_one_cmd(){
   ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no ubuntu@${ip} 
"$1" 
 }
 
+idx=1
+for ip in ${deploy_iplist[@]};
+do
+  run_one_cmd "mkdir -p ${main_folder}/$idx" &
+  ((count++))
+  ((idx++))
+done
+
+
 run_cmd "killall -9 ${server_bin}"
 run_cmd "rm -rf ${server_bin}; rm ${server_bin}*.log; rm -rf server.config; rm 
-rf cert;"
 
@@ -83,11 +95,13 @@ sleep 1
 
 # upload config files and binary
 echo "upload configs"
+idx=1
 count=0
 for ip in ${deploy_iplist[@]};
 do
-  scp -i ${key} -r ${bin_path} ${output_path}/server.config 
${output_path}/cert ubuntu@${ip}:/home/ubuntu/ &
+  scp -i ${key} -r ${bin_path} ${output_path}/server.config 
${output_path}/cert ubuntu@${ip}:/home/ubuntu/${main_folder}/$idx &
   ((count++))
+  ((idx++))
 done
 
 while [ $count -gt 0 ]; do
@@ -103,9 +117,10 @@ for ip in ${deploy_iplist[@]};
 do
   private_key="cert/node_"${idx}".key.pri"
   cert="cert/cert_"${idx}".cert"
-  run_one_cmd "nohup ./${server_bin} server.config ${private_key} ${cert} 
${grafna_port} > ${server_bin}.log 2>&1 &" &
+  run_one_cmd "cd ${main_folder}/$idx; nohup ./${server_bin} server.config 
${private_key} ${cert} ${grafna_port} > ${server_bin}.log 2>&1 &" &
   ((count++))
   ((idx++))
+  ((grafna_port++))
 done
 
 while [ $count -gt 0 ]; do
@@ -120,7 +135,7 @@ do
   resp=""
   while [ "$resp" = "" ]
   do
-    resp=`ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no 
ubuntu@${ip} "grep \"receive public size:${#iplist[@]}\" ${server_bin}.log"` 
+    resp=`ssh -i ${key} -n -o BatchMode=yes -o StrictHostKeyChecking=no 
ubuntu@${ip} "cd ${main_folder}/$idx; grep \"receive public 
size:${#iplist[@]}\" ${server_bin}.log"` 
     if [ "$resp" = "" ]; then
       sleep 1
     fi
diff --git a/service/kv/kv_service.cpp b/service/kv/kv_service.cpp
index 11272aa4..6cd20e1e 100644
--- a/service/kv/kv_service.cpp
+++ b/service/kv/kv_service.cpp
@@ -60,6 +60,7 @@ int main(int argc, char** argv) {
     exit(0);
   }
   google::InitGoogleLogging(argv[0]);
+  FLAGS_minloglevel = 1;
 
   char* config_file = argv[1];
   char* private_key_file = argv[2];
diff --git a/service/tools/kv/api_tools/kv_client_txn_tools.cpp 
b/service/tools/kv/api_tools/kv_client_txn_tools.cpp
index 4ebff4de..55f0f6c4 100644
--- a/service/tools/kv/api_tools/kv_client_txn_tools.cpp
+++ b/service/tools/kv/api_tools/kv_client_txn_tools.cpp
@@ -49,8 +49,6 @@ int main(int argc, char** argv) {
 
   ResDBTxnAccessor client(config);
   auto resp = client.GetTxn(min_seq, max_seq);
-  absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn(
-      uint64_t min_seq, uint64_t max_seq);
   if (!resp.ok()) {
     LOG(ERROR) << "get replica state fail";
     exit(1);

Reply via email to