This is an automated email from the ASF dual-hosted git repository.
harishgokul01 pushed a commit to branch development
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/development by this push:
new 5f13dd20 view change deadlock bug - temporary hotfix
5f13dd20 is described below
commit 5f13dd200fcd2f976aa3b48211df2dee3b88fc5f
Author: harish876 <[email protected]>
AuthorDate: Sat Dec 20 09:09:36 2025 +0000
view change deadlock bug - temporary hotfix
---
.../consensus/ordering/pbft/checkpoint_manager.cpp | 19 +++++++---
.../consensus/ordering/pbft/response_manager.cpp | 26 +++++++++++--
.../consensus/ordering/pbft/viewchange_manager.cpp | 44 ++++++++++++++++++----
.../ordering/pbft/viewchange_manager_test.cpp | 9 +++++
platform/statistic/stats.cpp | 4 +-
service/tools/config/server/server.config | 7 ++--
.../kv/server_tools/start_kv_service_monitoring.sh | 8 ++--
7 files changed, 90 insertions(+), 27 deletions(-)
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
index a5a24ca8..574128b6 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp
@@ -93,8 +93,10 @@ bool CheckPointManager::IsValidCheckpointProof(
std::string hash = stable_ckpt_.hash();
std::set<uint32_t> senders;
for (const auto& signature : stable_ckpt_.signatures()) {
- if (!verifier_->VerifyMessage(hash, signature)) {
- return false;
+ if (verifier_ && config_.SignatureVerifierEnabled()) {
+ if (!verifier_->VerifyMessage(hash, signature)) {
+ return false;
+ }
}
senders.insert(signature.node_id());
}
@@ -296,9 +298,14 @@ void CheckPointManager::UpdateCheckPointStatus() {
while (!stop_) {
auto request = data_queue_.Pop(timeout_ms);
if (request == nullptr) {
- // if (last_seq > 0) {
- // TimeoutHandler();
- // }
+ // If we've processed transactions before and now timing out,
+ // the primary might be faulty - trigger view change
+ {
+ std::lock_guard<std::mutex> lk(lt_mutex_);
+ if (last_seq_ > 0) {
+ TimeoutHandler();
+ }
+ }
continue;
}
std::string hash_ = request->hash();
@@ -359,7 +366,7 @@ CheckPointManager::PopStableSeqHash() {
}
uint64_t CheckPointManager::GetHighestPreparedSeq() {
- std::lock_guard<std::mutex> lk(lt_mutex_);
+ // highest_prepared_seq_ is atomic, no lock needed
return highest_prepared_seq_;
}
diff --git a/platform/consensus/ordering/pbft/response_manager.cpp
b/platform/consensus/ordering/pbft/response_manager.cpp
index f490c003..7cfa9abd 100644
--- a/platform/consensus/ordering/pbft/response_manager.cpp
+++ b/platform/consensus/ordering/pbft/response_manager.cpp
@@ -125,7 +125,13 @@ int
ResponseManager::ProcessResponseMsg(std::unique_ptr<Context> context,
// The callback will be triggered if it received f+1 messages.
if (request->ret() == -2) {
LOG(ERROR) << "get response fail:" << request->ret();
- send_num_--;
+ int current = send_num_.load();
+ if (current > 0) {
+ send_num_--;
+ } else {
+ LOG(ERROR) << "send_num_ is already " << current << ", not decrementing";
+ send_num_.store(0);
+ }
return 0;
}
CollectorResultCode ret =
@@ -220,7 +226,13 @@ void ResponseManager::SendResponseToClient(
} else {
LOG(ERROR) << "seq:" << local_id << " no resp";
}
- send_num_--;
+ int current = send_num_.load();
+ if (current > 0) {
+ send_num_--;
+ } else {
+ LOG(ERROR) << "send_num_ is already " << current << ", not decrementing";
+ send_num_.store(0);
+ }
if (config_.IsPerformanceRunning()) {
return;
@@ -253,8 +265,14 @@ int ResponseManager::BatchProposeMsg() {
<< " batch num:" << config_.ClientBatchNum();
std::vector<std::unique_ptr<QueueItem>> batch_req;
while (!stop_) {
- if (send_num_ > config_.GetMaxProcessTxn()) {
- LOG(ERROR) << "send num too high, wait:" << send_num_;
+ int current_send_num = send_num_.load();
+ if (current_send_num < 0) {
+ LOG(ERROR) << "send_num_ is negative (" << current_send_num << "),
resetting to 0";
+ send_num_.store(0);
+ current_send_num = 0;
+ }
+ if (current_send_num > config_.GetMaxProcessTxn()) {
+ LOG(ERROR) << "send num too high, wait:" << current_send_num;
usleep(100);
continue;
}
diff --git a/platform/consensus/ordering/pbft/viewchange_manager.cpp
b/platform/consensus/ordering/pbft/viewchange_manager.cpp
index 3084c7a1..bed36862 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager.cpp
+++ b/platform/consensus/ordering/pbft/viewchange_manager.cpp
@@ -129,7 +129,14 @@ void ViewChangeManager::MayStart() {
view_change_counter_++;
}
// std::lock_guard<std::mutex> lk(status_mutex_);
- if (ChangeStatue(ViewChangeStatus::READY_VIEW_CHANGE)) {
+ LOG(ERROR) << "[VIEWCHANGE] Timeout handler triggered - current status: "
+ << status_ << " attempting to change to READY_VIEW_CHANGE";
+ bool changed = ChangeStatue(ViewChangeStatus::READY_VIEW_CHANGE);
+ LOG(ERROR) << "[VIEWCHANGE] ChangeStatue returned: " << changed
+ << " current status: " << status_;
+ if (changed) {
+ LOG(ERROR) << "[VIEWCHANGE] Status changed to READY_VIEW_CHANGE, sending
"
+ "view change msg";
SendViewChangeMsg();
auto viewchange_timer = std::make_shared<ViewChangeTimeout>(
ViewChangeTimerType::TYPE_VIEWCHANGE, system_info_->GetCurrentView(),
@@ -196,10 +203,13 @@ bool ViewChangeManager::IsValidViewChangeMsg(
}
std::string data;
proof.request().SerializeToString(&data);
- if (!verifier_->VerifyMessage(data, proof.signature())) {
- LOG(ERROR) << "proof signature not valid";
- return false;
- }
+ // Skip signature verification if verifier is null or disabled
+ // if (verifier_ && config_.SignatureVerifierEnabled()) {
+ // if (!verifier_->VerifyMessage(data, proof.signature())) {
+ // LOG(ERROR) << "proof signature not valid";
+ // return false;
+ // }
+ // }
}
}
return true;
@@ -475,21 +485,35 @@ void ViewChangeManager::SendViewChangeMsg() {
// n (sequence number of the latest checkpoint) and C (proof for the stable
// checkpoint)
+ LOG(ERROR) << "[VIEWCHANGE] Getting stable checkpoint with votes";
*view_change_message.mutable_stable_ckpt() =
checkpoint_manager_->GetStableCheckpointWithVotes();
+ LOG(ERROR) << "[VIEWCHANGE] Got stable checkpoint, seq: "
+ << view_change_message.stable_ckpt().seq();
// P - P is a set containing a set Pm for each request m that prepared at i
// with a sequence number higher than n.
+ LOG(ERROR) << "[VIEWCHANGE] Getting highest prepared seq";
int max_seq = checkpoint_manager_->GetHighestPreparedSeq();
- LOG(INFO) << "Check prepared or committed txns from "
- << view_change_message.stable_ckpt().seq() + 1 << " to " <<
max_seq;
-
+ LOG(ERROR) << "[VIEWCHANGE] Got highest prepared seq: " << max_seq;
+ LOG(ERROR) << "[VIEWCHANGE] Check prepared or committed txns from "
+ << view_change_message.stable_ckpt().seq() + 1 << " to "
+ << max_seq;
+
+ LOG(ERROR) << "[VIEWCHANGE] Checking prepared messages from "
+ << (view_change_message.stable_ckpt().seq() + 1) << " to "
+ << max_seq;
for (int i = view_change_message.stable_ckpt().seq() + 1; i <= max_seq; ++i)
{
// seq i has been prepared or committed.
+ LOG(ERROR) << "[VIEWCHANGE] Checking seq: " << i;
if (message_manager_->GetTransactionState(i) >=
TransactionStatue::READY_COMMIT) {
+ LOG(ERROR) << "[VIEWCHANGE] Seq " << i
+ << " is READY_COMMIT, getting proof";
std::vector<RequestInfo> proof_info =
message_manager_->GetPreparedProof(i);
+ LOG(ERROR) << "[VIEWCHANGE] Got proof for seq " << i
+ << ", size: " << proof_info.size();
assert(proof_info.size() >= config_.GetMinDataReceiveNum());
auto txn = view_change_message.add_prepared_msg();
txn->set_seq(i);
@@ -500,12 +524,16 @@ void ViewChangeManager::SendViewChangeMsg() {
}
}
}
+ LOG(ERROR) << "[VIEWCHANGE] Finished checking prepared messages";
// Broadcast my view change request.
+ LOG(ERROR) << "[VIEWCHANGE] Creating view change request and broadcasting";
std::unique_ptr<Request> request = NewRequest(
Request::TYPE_VIEWCHANGE, Request(), config_.GetSelfInfo().id());
view_change_message.SerializeToString(request->mutable_data());
+ LOG(ERROR) << "[VIEWCHANGE] About to call BroadCast";
replica_communicator_->BroadCast(*request);
+ LOG(ERROR) << "[VIEWCHANGE] BroadCast called";
}
void ViewChangeManager::AddComplaintTimer(uint64_t proxy_id, std::string hash)
{
diff --git a/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
b/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
index 94701535..43b87541 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
+++ b/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
@@ -33,6 +33,9 @@
#include "platform/consensus/ordering/pbft/transaction_utils.h"
#include "platform/networkstrate/mock_replica_communicator.h"
#include "platform/proto/checkpoint_info.pb.h"
+#include "common/crypto/signature_verifier.h"
+#include <thread>
+#include <chrono>
namespace resdb {
namespace {
@@ -82,7 +85,13 @@ TEST_F(ViewChangeManagerTest, SendViewChange) {
manager_->MayStart();
std::unique_ptr<Request> request = std::make_unique<Request>();
request->set_seq(1);
+ request->set_data("test_data");
+ request->set_hash(SignatureVerifier::CalculateHash("test_data"));
checkpoint_manager_->AddCommitData(std::move(request));
+
+ // Wait a bit for the request to be processed (last_seq_ to be updated)
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
std::promise<bool> propose_done;
std::future<bool> propose_done_future = propose_done.get_future();
EXPECT_CALL(replica_communicator_, BroadCast)
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index dfc1957d..2d6099c4 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -422,7 +422,7 @@ void Stats::MonitorGlobal() {
run_req_num = run_req_num_;
run_req_run_time = run_req_run_time_;
- LOG(ERROR) << "=========== monitor =========\n"
+ LOG(INFO) << "=========== monitor =========\n"
<< "server call:" << server_call - last_server_call
<< " server process:" << server_process - last_server_process
<< " socket recv:" << socket_recv - last_socket_recv
@@ -472,7 +472,7 @@ void Stats::MonitorGlobal() {
<< " "
"\n--------------- monitor ------------";
if (run_req_num - last_run_req_num > 0) {
- LOG(ERROR) << " req client latency:"
+ LOG(INFO) << " req client latency:"
<< static_cast<double>(run_req_run_time -
last_run_req_run_time) /
(run_req_num - last_run_req_num) / 1000000000.0;
diff --git a/service/tools/config/server/server.config
b/service/tools/config/server/server.config
index 8b435651..61c66f04 100644
--- a/service/tools/config/server/server.config
+++ b/service/tools/config/server/server.config
@@ -47,9 +47,10 @@
block_cache_capacity: 100
},
require_txn_validation:true,
- enable_viewchange:true,
enable_resview:true,
- enable_faulty_switch:false,
- recovery_enabled:false,
+ enable_faulty_switch:true,
+ "enable_viewchange":true,
+ "recovery_enabled":true,
+ "max_client_complaint_num":10
}
diff --git a/service/tools/kv/server_tools/start_kv_service_monitoring.sh
b/service/tools/kv/server_tools/start_kv_service_monitoring.sh
index f13845dc..e3657a44 100755
--- a/service/tools/kv/server_tools/start_kv_service_monitoring.sh
+++ b/service/tools/kv/server_tools/start_kv_service_monitoring.sh
@@ -24,10 +24,10 @@ WORK_PATH=$PWD
CERT_PATH=${WORK_PATH}/service/tools/data/cert/
GRAFANA_PORT=8090
-./service/tools/kv/server_tools/generate_keys_and_certs.sh || {
- echo "Failed to generate configs/certificates" 1>&2
- exit 1
-}
+# ./service/tools/kv/server_tools/generate_keys_and_certs.sh || {
+# echo "Failed to generate configs/certificates" 1>&2
+# exit 1
+# }
bazel build //service/kv:kv_service --define enable_leveldb=True $@
nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node1.key.pri
$CERT_PATH/cert_1.cert 8090 > server0.log &