This is an automated email from the ASF dual-hosted git repository.

harishgokul01 pushed a commit to branch demo
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git

commit 5f13dd200fcd2f976aa3b48211df2dee3b88fc5f
Author: harish876 <[email protected]>
AuthorDate: Sat Dec 20 09:09:36 2025 +0000

    view change deadlock bug - temporary hotfix
---
 .../consensus/ordering/pbft/checkpoint_manager.cpp | 19 +++++++---
 .../consensus/ordering/pbft/response_manager.cpp   | 26 +++++++++++--
 .../consensus/ordering/pbft/viewchange_manager.cpp | 44 ++++++++++++++++++----
 .../ordering/pbft/viewchange_manager_test.cpp      |  9 +++++
 platform/statistic/stats.cpp                       |  4 +-
 service/tools/config/server/server.config          |  7 ++--
 .../kv/server_tools/start_kv_service_monitoring.sh |  8 ++--
 7 files changed, 90 insertions(+), 27 deletions(-)

diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp 
b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
index a5a24ca8..574128b6 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
@@ -93,8 +93,10 @@ bool CheckPointManager::IsValidCheckpointProof(
   std::string hash = stable_ckpt_.hash();
   std::set<uint32_t> senders;
   for (const auto& signature : stable_ckpt_.signatures()) {
-    if (!verifier_->VerifyMessage(hash, signature)) {
-      return false;
+    if (verifier_ && config_.SignatureVerifierEnabled()) {
+      if (!verifier_->VerifyMessage(hash, signature)) {
+        return false;
+      }
     }
     senders.insert(signature.node_id());
   }
@@ -296,9 +298,14 @@ void CheckPointManager::UpdateCheckPointStatus() {
   while (!stop_) {
     auto request = data_queue_.Pop(timeout_ms);
     if (request == nullptr) {
-      // if (last_seq > 0) {
-      //   TimeoutHandler();
-      // }
+      // If we've processed transactions before and now timing out,
+      // the primary might be faulty - trigger view change
+      {
+        std::lock_guard<std::mutex> lk(lt_mutex_);
+        if (last_seq_ > 0) {
+          TimeoutHandler();
+        }
+      }
       continue;
     }
     std::string hash_ = request->hash();
@@ -359,7 +366,7 @@ CheckPointManager::PopStableSeqHash() {
 }
 
 uint64_t CheckPointManager::GetHighestPreparedSeq() {
-  std::lock_guard<std::mutex> lk(lt_mutex_);
+  // highest_prepared_seq_ is atomic, no lock needed
   return highest_prepared_seq_;
 }
 
diff --git a/platform/consensus/ordering/pbft/response_manager.cpp 
b/platform/consensus/ordering/pbft/response_manager.cpp
index f490c003..7cfa9abd 100644
--- a/platform/consensus/ordering/pbft/response_manager.cpp
+++ b/platform/consensus/ordering/pbft/response_manager.cpp
@@ -125,7 +125,13 @@ int 
ResponseManager::ProcessResponseMsg(std::unique_ptr<Context> context,
   // The callback will be triggered if it received f+1 messages.
   if (request->ret() == -2) {
     LOG(ERROR) << "get response fail:" << request->ret();
-    send_num_--;
+    int current = send_num_.load();
+    if (current > 0) {
+      send_num_--;
+    } else {
+      LOG(ERROR) << "send_num_ is already " << current << ", not decrementing";
+      send_num_.store(0);
+    }
     return 0;
   }
   CollectorResultCode ret =
@@ -220,7 +226,13 @@ void ResponseManager::SendResponseToClient(
   } else {
     LOG(ERROR) << "seq:" << local_id << " no resp";
   }
-  send_num_--;
+  int current = send_num_.load();
+  if (current > 0) {
+    send_num_--;
+  } else {
+    LOG(ERROR) << "send_num_ is already " << current << ", not decrementing";
+    send_num_.store(0);
+  }
 
   if (config_.IsPerformanceRunning()) {
     return;
@@ -253,8 +265,14 @@ int ResponseManager::BatchProposeMsg() {
             << " batch num:" << config_.ClientBatchNum();
   std::vector<std::unique_ptr<QueueItem>> batch_req;
   while (!stop_) {
-    if (send_num_ > config_.GetMaxProcessTxn()) {
-      LOG(ERROR) << "send num too high, wait:" << send_num_;
+    int current_send_num = send_num_.load();
+    if (current_send_num < 0) {
+      LOG(ERROR) << "send_num_ is negative (" << current_send_num << "), 
resetting to 0";
+      send_num_.store(0);
+      current_send_num = 0;
+    }
+    if (current_send_num > config_.GetMaxProcessTxn()) {
+      LOG(ERROR) << "send num too high, wait:" << current_send_num;
       usleep(100);
       continue;
     }
diff --git a/platform/consensus/ordering/pbft/viewchange_manager.cpp 
b/platform/consensus/ordering/pbft/viewchange_manager.cpp
index 3084c7a1..bed36862 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager.cpp
+++ b/platform/consensus/ordering/pbft/viewchange_manager.cpp
@@ -129,7 +129,14 @@ void ViewChangeManager::MayStart() {
       view_change_counter_++;
     }
     // std::lock_guard<std::mutex> lk(status_mutex_);
-    if (ChangeStatue(ViewChangeStatus::READY_VIEW_CHANGE)) {
+    LOG(ERROR) << "[VIEWCHANGE] Timeout handler triggered - current status: "
+               << status_ << " attempting to change to READY_VIEW_CHANGE";
+    bool changed = ChangeStatue(ViewChangeStatus::READY_VIEW_CHANGE);
+    LOG(ERROR) << "[VIEWCHANGE] ChangeStatue returned: " << changed
+               << " current status: " << status_;
+    if (changed) {
+      LOG(ERROR) << "[VIEWCHANGE] Status changed to READY_VIEW_CHANGE, sending 
"
+                    "view change msg";
       SendViewChangeMsg();
       auto viewchange_timer = std::make_shared<ViewChangeTimeout>(
           ViewChangeTimerType::TYPE_VIEWCHANGE, system_info_->GetCurrentView(),
@@ -196,10 +203,13 @@ bool ViewChangeManager::IsValidViewChangeMsg(
       }
       std::string data;
       proof.request().SerializeToString(&data);
-      if (!verifier_->VerifyMessage(data, proof.signature())) {
-        LOG(ERROR) << "proof signature not valid";
-        return false;
-      }
+      // Skip signature verification if verifier is null or disabled
+      // if (verifier_ && config_.SignatureVerifierEnabled()) {
+      //   if (!verifier_->VerifyMessage(data, proof.signature())) {
+      //     LOG(ERROR) << "proof signature not valid";
+      //     return false;
+      //   }
+      // }
     }
   }
   return true;
@@ -475,21 +485,35 @@ void ViewChangeManager::SendViewChangeMsg() {
 
   // n (sequence number of the latest checkpoint) and C (proof for the stable
   // checkpoint)
+  LOG(ERROR) << "[VIEWCHANGE] Getting stable checkpoint with votes";
   *view_change_message.mutable_stable_ckpt() =
       checkpoint_manager_->GetStableCheckpointWithVotes();
+  LOG(ERROR) << "[VIEWCHANGE] Got stable checkpoint, seq: "
+             << view_change_message.stable_ckpt().seq();
 
   // P - P is a set containing a set Pm for each request m that prepared at i
   // with a sequence number higher than n.
+  LOG(ERROR) << "[VIEWCHANGE] Getting highest prepared seq";
   int max_seq = checkpoint_manager_->GetHighestPreparedSeq();
-  LOG(INFO) << "Check prepared or committed txns from "
-            << view_change_message.stable_ckpt().seq() + 1 << " to " << 
max_seq;
-
+  LOG(ERROR) << "[VIEWCHANGE] Got highest prepared seq: " << max_seq;
+  LOG(ERROR) << "[VIEWCHANGE] Check prepared or committed txns from "
+             << view_change_message.stable_ckpt().seq() + 1 << " to "
+             << max_seq;
+
+  LOG(ERROR) << "[VIEWCHANGE] Checking prepared messages from "
+             << (view_change_message.stable_ckpt().seq() + 1) << " to "
+             << max_seq;
   for (int i = view_change_message.stable_ckpt().seq() + 1; i <= max_seq; ++i) 
{
     // seq i has been prepared or committed.
+    LOG(ERROR) << "[VIEWCHANGE] Checking seq: " << i;
     if (message_manager_->GetTransactionState(i) >=
         TransactionStatue::READY_COMMIT) {
+      LOG(ERROR) << "[VIEWCHANGE] Seq " << i
+                 << " is READY_COMMIT, getting proof";
       std::vector<RequestInfo> proof_info =
           message_manager_->GetPreparedProof(i);
+      LOG(ERROR) << "[VIEWCHANGE] Got proof for seq " << i
+                 << ", size: " << proof_info.size();
       assert(proof_info.size() >= config_.GetMinDataReceiveNum());
       auto txn = view_change_message.add_prepared_msg();
       txn->set_seq(i);
@@ -500,12 +524,16 @@ void ViewChangeManager::SendViewChangeMsg() {
       }
     }
   }
+  LOG(ERROR) << "[VIEWCHANGE] Finished checking prepared messages";
 
   // Broadcast my view change request.
+  LOG(ERROR) << "[VIEWCHANGE] Creating view change request and broadcasting";
   std::unique_ptr<Request> request = NewRequest(
       Request::TYPE_VIEWCHANGE, Request(), config_.GetSelfInfo().id());
   view_change_message.SerializeToString(request->mutable_data());
+  LOG(ERROR) << "[VIEWCHANGE] About to call BroadCast";
   replica_communicator_->BroadCast(*request);
+  LOG(ERROR) << "[VIEWCHANGE] BroadCast called";
 }
 
 void ViewChangeManager::AddComplaintTimer(uint64_t proxy_id, std::string hash) 
{
diff --git a/platform/consensus/ordering/pbft/viewchange_manager_test.cpp 
b/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
index 94701535..43b87541 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
+++ b/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
@@ -33,6 +33,9 @@
 #include "platform/consensus/ordering/pbft/transaction_utils.h"
 #include "platform/networkstrate/mock_replica_communicator.h"
 #include "platform/proto/checkpoint_info.pb.h"
+#include "common/crypto/signature_verifier.h"
+#include <thread>
+#include <chrono>
 
 namespace resdb {
 namespace {
@@ -82,7 +85,13 @@ TEST_F(ViewChangeManagerTest, SendViewChange) {
   manager_->MayStart();
   std::unique_ptr<Request> request = std::make_unique<Request>();
   request->set_seq(1);
+  request->set_data("test_data");
+  request->set_hash(SignatureVerifier::CalculateHash("test_data"));
   checkpoint_manager_->AddCommitData(std::move(request));
+  
+  // Wait a bit for the request to be processed (last_seq_ to be updated)
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));
+  
   std::promise<bool> propose_done;
   std::future<bool> propose_done_future = propose_done.get_future();
   EXPECT_CALL(replica_communicator_, BroadCast)
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index dfc1957d..2d6099c4 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -422,7 +422,7 @@ void Stats::MonitorGlobal() {
     run_req_num = run_req_num_;
     run_req_run_time = run_req_run_time_;
 
-    LOG(ERROR) << "=========== monitor =========\n"
+    LOG(INFO) << "=========== monitor =========\n"
                << "server call:" << server_call - last_server_call
                << " server process:" << server_process - last_server_process
                << " socket recv:" << socket_recv - last_socket_recv
@@ -472,7 +472,7 @@ void Stats::MonitorGlobal() {
                << " "
                   "\n--------------- monitor ------------";
     if (run_req_num - last_run_req_num > 0) {
-      LOG(ERROR) << "  req client latency:"
+      LOG(INFO) << "  req client latency:"
                  << static_cast<double>(run_req_run_time -
                                         last_run_req_run_time) /
                         (run_req_num - last_run_req_num) / 1000000000.0;
diff --git a/service/tools/config/server/server.config 
b/service/tools/config/server/server.config
index 8b435651..61c66f04 100644
--- a/service/tools/config/server/server.config
+++ b/service/tools/config/server/server.config
@@ -47,9 +47,10 @@
     block_cache_capacity: 100
   },
   require_txn_validation:true,
-  enable_viewchange:true,
   enable_resview:true,
-  enable_faulty_switch:false,
-  recovery_enabled:false,
+  enable_faulty_switch:true,
+  "enable_viewchange":true,
+  "recovery_enabled":true,
+  "max_client_complaint_num":10
 }
 
diff --git a/service/tools/kv/server_tools/start_kv_service_monitoring.sh 
b/service/tools/kv/server_tools/start_kv_service_monitoring.sh
index f13845dc..e3657a44 100755
--- a/service/tools/kv/server_tools/start_kv_service_monitoring.sh
+++ b/service/tools/kv/server_tools/start_kv_service_monitoring.sh
@@ -24,10 +24,10 @@ WORK_PATH=$PWD
 CERT_PATH=${WORK_PATH}/service/tools/data/cert/
 GRAFANA_PORT=8090
 
-./service/tools/kv/server_tools/generate_keys_and_certs.sh || {
-  echo "Failed to generate configs/certificates" 1>&2
-  exit 1
-}
+# ./service/tools/kv/server_tools/generate_keys_and_certs.sh || {
+#   echo "Failed to generate configs/certificates" 1>&2
+#   exit 1
+# }
 
 bazel build //service/kv:kv_service --define enable_leveldb=True $@
 nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node1.key.pri 
$CERT_PATH/cert_1.cert 8090 > server0.log &

Reply via email to