This is an automated email from the ASF dual-hosted git repository. harishgokul01 pushed a commit to branch demo in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit 5f13dd200fcd2f976aa3b48211df2dee3b88fc5f Author: harish876 <[email protected]> AuthorDate: Sat Dec 20 09:09:36 2025 +0000 view change deadlock bug - temporary hotfix --- .../consensus/ordering/pbft/checkpoint_manager.cpp | 19 +++++++--- .../consensus/ordering/pbft/response_manager.cpp | 26 +++++++++++-- .../consensus/ordering/pbft/viewchange_manager.cpp | 44 ++++++++++++++++++---- .../ordering/pbft/viewchange_manager_test.cpp | 9 +++++ platform/statistic/stats.cpp | 4 +- service/tools/config/server/server.config | 7 ++-- .../kv/server_tools/start_kv_service_monitoring.sh | 8 ++-- 7 files changed, 90 insertions(+), 27 deletions(-) diff --git a/platform/consensus/ordering/pbft/checkpoint_manager.cpp b/platform/consensus/ordering/pbft/checkpoint_manager.cpp index a5a24ca8..574128b6 100644 --- a/platform/consensus/ordering/pbft/checkpoint_manager.cpp +++ b/platform/consensus/ordering/pbft/checkpoint_manager.cpp @@ -93,8 +93,10 @@ bool CheckPointManager::IsValidCheckpointProof( std::string hash = stable_ckpt_.hash(); std::set<uint32_t> senders; for (const auto& signature : stable_ckpt_.signatures()) { - if (!verifier_->VerifyMessage(hash, signature)) { - return false; + if (verifier_ && config_.SignatureVerifierEnabled()) { + if (!verifier_->VerifyMessage(hash, signature)) { + return false; + } } senders.insert(signature.node_id()); } @@ -296,9 +298,14 @@ void CheckPointManager::UpdateCheckPointStatus() { while (!stop_) { auto request = data_queue_.Pop(timeout_ms); if (request == nullptr) { - // if (last_seq > 0) { - // TimeoutHandler(); - // } + // If we've processed transactions before and now timing out, + // the primary might be faulty - trigger view change + { + std::lock_guard<std::mutex> lk(lt_mutex_); + if (last_seq_ > 0) { + TimeoutHandler(); + } + } continue; } std::string hash_ = request->hash(); @@ -359,7 +366,7 @@ CheckPointManager::PopStableSeqHash() { } uint64_t CheckPointManager::GetHighestPreparedSeq() { - std::lock_guard<std::mutex> lk(lt_mutex_); + // highest_prepared_seq_ is atomic, no lock needed return highest_prepared_seq_; } diff --git a/platform/consensus/ordering/pbft/response_manager.cpp b/platform/consensus/ordering/pbft/response_manager.cpp index f490c003..7cfa9abd 100644 --- a/platform/consensus/ordering/pbft/response_manager.cpp +++ b/platform/consensus/ordering/pbft/response_manager.cpp @@ -125,7 +125,13 @@ int ResponseManager::ProcessResponseMsg(std::unique_ptr<Context> context, // The callback will be triggered if it received f+1 messages. if (request->ret() == -2) { LOG(ERROR) << "get response fail:" << request->ret(); - send_num_--; + int current = send_num_.load(); + if (current > 0) { + send_num_--; + } else { + LOG(ERROR) << "send_num_ is already " << current << ", not decrementing"; + send_num_.store(0); + } return 0; } CollectorResultCode ret = @@ -220,7 +226,13 @@ void ResponseManager::SendResponseToClient( } else { LOG(ERROR) << "seq:" << local_id << " no resp"; } - send_num_--; + int current = send_num_.load(); + if (current > 0) { + send_num_--; + } else { + LOG(ERROR) << "send_num_ is already " << current << ", not decrementing"; + send_num_.store(0); + } if (config_.IsPerformanceRunning()) { return; @@ -253,8 +265,14 @@ int ResponseManager::BatchProposeMsg() { << " batch num:" << config_.ClientBatchNum(); std::vector<std::unique_ptr<QueueItem>> batch_req; while (!stop_) { - if (send_num_ > config_.GetMaxProcessTxn()) { - LOG(ERROR) << "send num too high, wait:" << send_num_; + int current_send_num = send_num_.load(); + if (current_send_num < 0) { + LOG(ERROR) << "send_num_ is negative (" << current_send_num << "), resetting to 0"; + send_num_.store(0); + current_send_num = 0; + } + if (current_send_num > config_.GetMaxProcessTxn()) { + LOG(ERROR) << "send num too high, wait:" << current_send_num; usleep(100); continue; } diff --git a/platform/consensus/ordering/pbft/viewchange_manager.cpp b/platform/consensus/ordering/pbft/viewchange_manager.cpp index 3084c7a1..bed36862 100644 --- a/platform/consensus/ordering/pbft/viewchange_manager.cpp +++ b/platform/consensus/ordering/pbft/viewchange_manager.cpp @@ -129,7 +129,14 @@ void ViewChangeManager::MayStart() { view_change_counter_++; } // std::lock_guard<std::mutex> lk(status_mutex_); - if (ChangeStatue(ViewChangeStatus::READY_VIEW_CHANGE)) { + LOG(ERROR) << "[VIEWCHANGE] Timeout handler triggered - current status: " + << status_ << " attempting to change to READY_VIEW_CHANGE"; + bool changed = ChangeStatue(ViewChangeStatus::READY_VIEW_CHANGE); + LOG(ERROR) << "[VIEWCHANGE] ChangeStatue returned: " << changed + << " current status: " << status_; + if (changed) { + LOG(ERROR) << "[VIEWCHANGE] Status changed to READY_VIEW_CHANGE, sending " + "view change msg"; SendViewChangeMsg(); auto viewchange_timer = std::make_shared<ViewChangeTimeout>( ViewChangeTimerType::TYPE_VIEWCHANGE, system_info_->GetCurrentView(), @@ -196,10 +203,13 @@ bool ViewChangeManager::IsValidViewChangeMsg( } std::string data; proof.request().SerializeToString(&data); - if (!verifier_->VerifyMessage(data, proof.signature())) { - LOG(ERROR) << "proof signature not valid"; - return false; - } + // Skip signature verification if verifier is null or disabled + // if (verifier_ && config_.SignatureVerifierEnabled()) { + // if (!verifier_->VerifyMessage(data, proof.signature())) { + // LOG(ERROR) << "proof signature not valid"; + // return false; + // } + // } } } return true; @@ -475,21 +485,35 @@ void ViewChangeManager::SendViewChangeMsg() { // n (sequence number of the latest checkpoint) and C (proof for the stable // checkpoint) + LOG(ERROR) << "[VIEWCHANGE] Getting stable checkpoint with votes"; *view_change_message.mutable_stable_ckpt() = checkpoint_manager_->GetStableCheckpointWithVotes(); + LOG(ERROR) << "[VIEWCHANGE] Got stable checkpoint, seq: " + << view_change_message.stable_ckpt().seq(); // P - P is a set containing a set Pm for each request m that prepared at i // with a sequence number higher than n. + LOG(ERROR) << "[VIEWCHANGE] Getting highest prepared seq"; int max_seq = checkpoint_manager_->GetHighestPreparedSeq(); - LOG(INFO) << "Check prepared or committed txns from " - << view_change_message.stable_ckpt().seq() + 1 << " to " << max_seq; - + LOG(ERROR) << "[VIEWCHANGE] Got highest prepared seq: " << max_seq; + LOG(ERROR) << "[VIEWCHANGE] Check prepared or committed txns from " + << view_change_message.stable_ckpt().seq() + 1 << " to " + << max_seq; + + LOG(ERROR) << "[VIEWCHANGE] Checking prepared messages from " + << (view_change_message.stable_ckpt().seq() + 1) << " to " + << max_seq; for (int i = view_change_message.stable_ckpt().seq() + 1; i <= max_seq; ++i) { // seq i has been prepared or committed. + LOG(ERROR) << "[VIEWCHANGE] Checking seq: " << i; if (message_manager_->GetTransactionState(i) >= TransactionStatue::READY_COMMIT) { + LOG(ERROR) << "[VIEWCHANGE] Seq " << i + << " is READY_COMMIT, getting proof"; std::vector<RequestInfo> proof_info = message_manager_->GetPreparedProof(i); + LOG(ERROR) << "[VIEWCHANGE] Got proof for seq " << i + << ", size: " << proof_info.size(); assert(proof_info.size() >= config_.GetMinDataReceiveNum()); auto txn = view_change_message.add_prepared_msg(); txn->set_seq(i); @@ -500,12 +524,16 @@ void ViewChangeManager::SendViewChangeMsg() { } } } + LOG(ERROR) << "[VIEWCHANGE] Finished checking prepared messages"; // Broadcast my view change request. + LOG(ERROR) << "[VIEWCHANGE] Creating view change request and broadcasting"; std::unique_ptr<Request> request = NewRequest( Request::TYPE_VIEWCHANGE, Request(), config_.GetSelfInfo().id()); view_change_message.SerializeToString(request->mutable_data()); + LOG(ERROR) << "[VIEWCHANGE] About to call BroadCast"; replica_communicator_->BroadCast(*request); + LOG(ERROR) << "[VIEWCHANGE] BroadCast called"; } void ViewChangeManager::AddComplaintTimer(uint64_t proxy_id, std::string hash) { diff --git a/platform/consensus/ordering/pbft/viewchange_manager_test.cpp b/platform/consensus/ordering/pbft/viewchange_manager_test.cpp index 94701535..43b87541 100644 --- a/platform/consensus/ordering/pbft/viewchange_manager_test.cpp +++ b/platform/consensus/ordering/pbft/viewchange_manager_test.cpp @@ -33,6 +33,9 @@ #include "platform/consensus/ordering/pbft/transaction_utils.h" #include "platform/networkstrate/mock_replica_communicator.h" #include "platform/proto/checkpoint_info.pb.h" +#include "common/crypto/signature_verifier.h" +#include <thread> +#include <chrono> namespace resdb { namespace { @@ -82,7 +85,13 @@ TEST_F(ViewChangeManagerTest, SendViewChange) { manager_->MayStart(); std::unique_ptr<Request> request = std::make_unique<Request>(); request->set_seq(1); + request->set_data("test_data"); + request->set_hash(SignatureVerifier::CalculateHash("test_data")); checkpoint_manager_->AddCommitData(std::move(request)); + + // Wait a bit for the request to be processed (last_seq_ to be updated) + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::promise<bool> propose_done; std::future<bool> propose_done_future = propose_done.get_future(); EXPECT_CALL(replica_communicator_, BroadCast) diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp index dfc1957d..2d6099c4 100644 --- a/platform/statistic/stats.cpp +++ b/platform/statistic/stats.cpp @@ -422,7 +422,7 @@ void Stats::MonitorGlobal() { run_req_num = run_req_num_; run_req_run_time = run_req_run_time_; - LOG(ERROR) << "=========== monitor =========\n" + LOG(INFO) << "=========== monitor =========\n" << "server call:" << server_call - last_server_call << " server process:" << server_process - last_server_process << " socket recv:" << socket_recv - last_socket_recv @@ -472,7 +472,7 @@ void Stats::MonitorGlobal() { << " " "\n--------------- monitor ------------"; if (run_req_num - last_run_req_num > 0) { - LOG(ERROR) << " req client latency:" + LOG(INFO) << " req client latency:" << static_cast<double>(run_req_run_time - last_run_req_run_time) / (run_req_num - last_run_req_num) / 1000000000.0; diff --git a/service/tools/config/server/server.config b/service/tools/config/server/server.config index 8b435651..61c66f04 100644 --- a/service/tools/config/server/server.config +++ b/service/tools/config/server/server.config @@ -47,9 +47,10 @@ block_cache_capacity: 100 }, require_txn_validation:true, - enable_viewchange:true, enable_resview:true, - enable_faulty_switch:false, - recovery_enabled:false, + enable_faulty_switch:true, + "enable_viewchange":true, + "recovery_enabled":true, + "max_client_complaint_num":10 } diff --git a/service/tools/kv/server_tools/start_kv_service_monitoring.sh b/service/tools/kv/server_tools/start_kv_service_monitoring.sh index f13845dc..e3657a44 100755 --- a/service/tools/kv/server_tools/start_kv_service_monitoring.sh +++ b/service/tools/kv/server_tools/start_kv_service_monitoring.sh @@ -24,10 +24,10 @@ WORK_PATH=$PWD CERT_PATH=${WORK_PATH}/service/tools/data/cert/ GRAFANA_PORT=8090 -./service/tools/kv/server_tools/generate_keys_and_certs.sh || { - echo "Failed to generate configs/certificates" 1>&2 - exit 1 -} +# ./service/tools/kv/server_tools/generate_keys_and_certs.sh || { +# echo "Failed to generate configs/certificates" 1>&2 +# exit 1 +# } bazel build //service/kv:kv_service --define enable_leveldb=True $@ nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node1.key.pri $CERT_PATH/cert_1.cert 8090 > server0.log &
