This is an automated email from the ASF dual-hosted git repository. harishgokul01 pushed a commit to branch demo in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit e25688b64aed807036e1d70ec5a79f6938bb53d3 Author: harish876 <[email protected]> AuthorDate: Wed Dec 31 08:47:36 2025 +0000 reverting view change bug fixes and removing reslens-tools-service --- .bazelrc | 3 +- .../consensus/ordering/pbft/response_manager.cpp | 28 +-- .../consensus/ordering/pbft/viewchange_manager.cpp | 46 +--- .../ordering/pbft/viewchange_manager_test.cpp | 10 +- scripts/deploy/script/restart_reslens.sh | 2 +- .../tools/kv/api_tools/monitoring/gunicorn.conf.py | 38 ---- .../tools/kv/api_tools/monitoring/requirements.txt | 10 - .../api_tools/monitoring/reslens_tools_service.py | 243 --------------------- .../monitoring/start_reslens_tools_service.sh | 75 ------- service/tools/kv/server_tools/start_kv_service.sh | 9 +- 10 files changed, 23 insertions(+), 441 deletions(-) diff --git a/.bazelrc b/.bazelrc index 2bb0a3a5..43484445 100644 --- a/.bazelrc +++ b/.bazelrc @@ -1,4 +1,5 @@ -build --cxxopt='-std=c++17' --copt="-pg" --linkopt="-pg" --strip=never --jobs=2 +build --cxxopt='-std=c++17' --copt="-g" --copt="-fno-omit-frame-pointer" --cxxopt="-fno-omit-frame-pointer" --linkopt="-g" --strip=never --jobs=2 +# build --cxxopt='-std=c++17' --copt="-pg" --linkopt="-pg" --linkopt="-g" --strip=never --jobs=2 #build --action_env=PYTHON_BIN_PATH="/usr/bin/python3.10" #build --action_env=PYTHON_LIB_PATH="/usr/include/python3.10" diff --git a/platform/consensus/ordering/pbft/response_manager.cpp b/platform/consensus/ordering/pbft/response_manager.cpp index 7cfa9abd..ca80c4a2 100644 --- a/platform/consensus/ordering/pbft/response_manager.cpp +++ b/platform/consensus/ordering/pbft/response_manager.cpp @@ -125,13 +125,7 @@ int ResponseManager::ProcessResponseMsg(std::unique_ptr<Context> context, // The callback will be triggered if it received f+1 messages. if (request->ret() == -2) { LOG(ERROR) << "get response fail:" << request->ret(); - int current = send_num_.load(); - if (current > 0) { - send_num_--; - } else { - LOG(ERROR) << "send_num_ is already " << current << ", not decrementing"; - send_num_.store(0); - } + send_num_--; return 0; } CollectorResultCode ret = @@ -226,13 +220,7 @@ void ResponseManager::SendResponseToClient( } else { LOG(ERROR) << "seq:" << local_id << " no resp"; } - int current = send_num_.load(); - if (current > 0) { - send_num_--; - } else { - LOG(ERROR) << "send_num_ is already " << current << ", not decrementing"; - send_num_.store(0); - } + send_num_--; if (config_.IsPerformanceRunning()) { return; @@ -265,14 +253,8 @@ int ResponseManager::BatchProposeMsg() { << " batch num:" << config_.ClientBatchNum(); std::vector<std::unique_ptr<QueueItem>> batch_req; while (!stop_) { - int current_send_num = send_num_.load(); - if (current_send_num < 0) { - LOG(ERROR) << "send_num_ is negative (" << current_send_num << "), resetting to 0"; - send_num_.store(0); - current_send_num = 0; - } - if (current_send_num > config_.GetMaxProcessTxn()) { - LOG(ERROR) << "send num too high, wait:" << current_send_num; + if (send_num_ > config_.GetMaxProcessTxn()) { + LOG(ERROR) << "send num too high, wait:" << send_num_; usleep(100); continue; } @@ -425,4 +407,4 @@ void ResponseManager::MonitoringClientTimeOut() { } } } -} // namespace resdb +} // namespace resdb \ No newline at end of file diff --git a/platform/consensus/ordering/pbft/viewchange_manager.cpp b/platform/consensus/ordering/pbft/viewchange_manager.cpp index bed36862..58900ce0 100644 --- a/platform/consensus/ordering/pbft/viewchange_manager.cpp +++ b/platform/consensus/ordering/pbft/viewchange_manager.cpp @@ -129,14 +129,7 @@ void ViewChangeManager::MayStart() { view_change_counter_++; } // std::lock_guard<std::mutex> lk(status_mutex_); - LOG(ERROR) << "[VIEWCHANGE] Timeout handler triggered - current status: " - << status_ << " attempting to change to READY_VIEW_CHANGE"; - bool changed = ChangeStatue(ViewChangeStatus::READY_VIEW_CHANGE); - LOG(ERROR) << "[VIEWCHANGE] ChangeStatue returned: " << changed - << " current status: " << status_; - if (changed) { - LOG(ERROR) << "[VIEWCHANGE] Status changed to READY_VIEW_CHANGE, sending " - "view change msg"; + if (ChangeStatue(ViewChangeStatus::READY_VIEW_CHANGE)) { SendViewChangeMsg(); auto viewchange_timer = std::make_shared<ViewChangeTimeout>( ViewChangeTimerType::TYPE_VIEWCHANGE, system_info_->GetCurrentView(), @@ -203,13 +196,10 @@ bool ViewChangeManager::IsValidViewChangeMsg( } std::string data; proof.request().SerializeToString(&data); - // Skip signature verification if verifier is null or disabled - // if (verifier_ && config_.SignatureVerifierEnabled()) { - // if (!verifier_->VerifyMessage(data, proof.signature())) { - // LOG(ERROR) << "proof signature not valid"; - // return false; - // } - // } + if (!verifier_->VerifyMessage(data, proof.signature())) { + LOG(ERROR) << "proof signature not valid"; + return false; + } } } return true; @@ -485,35 +475,21 @@ void ViewChangeManager::SendViewChangeMsg() { // n (sequence number of the latest checkpoint) and C (proof for the stable // checkpoint) - LOG(ERROR) << "[VIEWCHANGE] Getting stable checkpoint with votes"; *view_change_message.mutable_stable_ckpt() = checkpoint_manager_->GetStableCheckpointWithVotes(); - LOG(ERROR) << "[VIEWCHANGE] Got stable checkpoint, seq: " - << view_change_message.stable_ckpt().seq(); // P - P is a set containing a set Pm for each request m that prepared at i // with a sequence number higher than n. - LOG(ERROR) << "[VIEWCHANGE] Getting highest prepared seq"; int max_seq = checkpoint_manager_->GetHighestPreparedSeq(); - LOG(ERROR) << "[VIEWCHANGE] Got highest prepared seq: " << max_seq; - LOG(ERROR) << "[VIEWCHANGE] Check prepared or committed txns from " - << view_change_message.stable_ckpt().seq() + 1 << " to " - << max_seq; - - LOG(ERROR) << "[VIEWCHANGE] Checking prepared messages from " - << (view_change_message.stable_ckpt().seq() + 1) << " to " - << max_seq; + LOG(INFO) << "Check prepared or committed txns from " + << view_change_message.stable_ckpt().seq() + 1 << " to " << max_seq; + for (int i = view_change_message.stable_ckpt().seq() + 1; i <= max_seq; ++i) { // seq i has been prepared or committed. - LOG(ERROR) << "[VIEWCHANGE] Checking seq: " << i; if (message_manager_->GetTransactionState(i) >= TransactionStatue::READY_COMMIT) { - LOG(ERROR) << "[VIEWCHANGE] Seq " << i - << " is READY_COMMIT, getting proof"; std::vector<RequestInfo> proof_info = message_manager_->GetPreparedProof(i); - LOG(ERROR) << "[VIEWCHANGE] Got proof for seq " << i - << ", size: " << proof_info.size(); assert(proof_info.size() >= config_.GetMinDataReceiveNum()); auto txn = view_change_message.add_prepared_msg(); txn->set_seq(i); @@ -524,16 +500,12 @@ void ViewChangeManager::SendViewChangeMsg() { } } } - LOG(ERROR) << "[VIEWCHANGE] Finished checking prepared messages"; // Broadcast my view change request. - LOG(ERROR) << "[VIEWCHANGE] Creating view change request and broadcasting"; std::unique_ptr<Request> request = NewRequest( Request::TYPE_VIEWCHANGE, Request(), config_.GetSelfInfo().id()); view_change_message.SerializeToString(request->mutable_data()); - LOG(ERROR) << "[VIEWCHANGE] About to call BroadCast"; replica_communicator_->BroadCast(*request); - LOG(ERROR) << "[VIEWCHANGE] BroadCast called"; } void ViewChangeManager::AddComplaintTimer(uint64_t proxy_id, std::string hash) { @@ -643,4 +615,4 @@ void ViewChangeManager::SetDuplicateManager(DuplicateManager* manager) { duplicate_manager_ = manager; } -} // namespace resdb +} // namespace resdb \ No newline at end of file diff --git a/platform/consensus/ordering/pbft/viewchange_manager_test.cpp b/platform/consensus/ordering/pbft/viewchange_manager_test.cpp index 1803dd0b..7a2ec693 100644 --- a/platform/consensus/ordering/pbft/viewchange_manager_test.cpp +++ b/platform/consensus/ordering/pbft/viewchange_manager_test.cpp @@ -33,9 +33,6 @@ #include "platform/consensus/ordering/pbft/transaction_utils.h" #include "platform/networkstrate/mock_replica_communicator.h" #include "platform/proto/checkpoint_info.pb.h" -#include "common/crypto/signature_verifier.h" -#include <thread> -#include <chrono> namespace resdb { namespace { @@ -85,12 +82,7 @@ TEST_F(ViewChangeManagerTest, SendViewChange) { manager_->MayStart(); std::unique_ptr<Request> request = std::make_unique<Request>(); request->set_seq(1); - request->set_data("test_data"); - request->set_hash(SignatureVerifier::CalculateHash("test_data")); checkpoint_manager_->AddCommitData(std::move(request)); - - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - std::promise<bool> propose_done; std::future<bool> propose_done_future = propose_done.get_future(); EXPECT_CALL(replica_communicator_, BroadCast) @@ -129,4 +121,4 @@ TEST_F(ViewChangeManagerTest, SendNewView) { } // namespace -} // namespace resdb +} // namespace resdb \ No newline at end of file diff --git a/scripts/deploy/script/restart_reslens.sh b/scripts/deploy/script/restart_reslens.sh index 229fe830..66ffe4e8 100755 --- a/scripts/deploy/script/restart_reslens.sh +++ b/scripts/deploy/script/restart_reslens.sh @@ -2,7 +2,7 @@ set -euo pipefail -cd /home/ubuntu/resilient-monitoring/ResLens-Middleware +cd /home/ubuntu/resilient-ecosystem/DeepObserve/external/monitoring/ResLens-Middleware # Stop existing containers (if any) sudo docker-compose down || true diff --git a/service/tools/kv/api_tools/monitoring/gunicorn.conf.py b/service/tools/kv/api_tools/monitoring/gunicorn.conf.py deleted file mode 100644 index d763514e..00000000 --- a/service/tools/kv/api_tools/monitoring/gunicorn.conf.py +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/env python3 -""" -Gunicorn configuration for ResLens Flamegraph Analysis Service -""" - -# Server socket -bind = "0.0.0.0:8080" -backlog = 2048 - -# Worker processes -workers = 1 # Single worker for this service -worker_class = "sync" -worker_connections = 1000 -max_requests = 1000 -max_requests_jitter = 50 - -# Timeout -timeout = 30 -keepalive = 2 - -# Logging -accesslog = "-" -errorlog = "-" -loglevel = "info" -access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"' - -# Process naming -proc_name = "reslens-flamegraph-service" - -# Preload app for better performance -preload_app = True - -# Worker timeout -graceful_timeout = 30 - -# Restart workers after this many requests -max_requests = 1000 -max_requests_jitter = 50 \ No newline at end of file diff --git a/service/tools/kv/api_tools/monitoring/requirements.txt b/service/tools/kv/api_tools/monitoring/requirements.txt deleted file mode 100644 index 81e680cb..00000000 --- a/service/tools/kv/api_tools/monitoring/requirements.txt +++ /dev/null @@ -1,10 +0,0 @@ -blinker==1.9.0 -click==8.2.1 -Flask==3.1.1 -flask-cors==6.0.1 -gunicorn==23.0.0 -itsdangerous==2.2.0 -Jinja2==3.1.6 -MarkupSafe==3.0.2 -packaging==25.0 -Werkzeug==3.1.3 diff --git a/service/tools/kv/api_tools/monitoring/reslens_tools_service.py b/service/tools/kv/api_tools/monitoring/reslens_tools_service.py deleted file mode 100644 index 964e7311..00000000 --- a/service/tools/kv/api_tools/monitoring/reslens_tools_service.py +++ /dev/null @@ -1,243 +0,0 @@ -#!/usr/bin/env python3 -""" -ResLens Flamegraph Analysis Service - -A simple HTTP service for executing ResilientDB random data operations -as part of the ResLens monitoring and analysis toolkit. - -This utility is used mainly to seed data to create a long running data seeding job for flamegroah analysis as these flamegraph processes need to run for more than 30s. -""" - -import os -import sys -import json -import subprocess -import random -import threading -import time -from flask import Flask, request, jsonify -from flask_cors import CORS - -app = Flask(__name__) -CORS(app) - -class ResLensToolsService: - def __init__(self): - self.project_root = self._find_project_root() - self.seeding_running = False - self.seeding_thread = None - self.seeding_lock = threading.Lock() - print(f"ResLens Tools Service - Using project root: {self.project_root}") - - def _find_project_root(self): - """Find the ResilientDB project root directory.""" - resilientdb_root = os.getenv("RESILIENTDB_ROOT") - if resilientdb_root: - return resilientdb_root - - # Since we're now in service/tools/kv/api_tools/monitoring, go up to project root - current_dir = os.path.dirname(os.path.abspath(__file__)) - project_root = os.path.join(current_dir, "../../../../../../") - - # Verify this is the project root by checking for bazel-bin - if os.path.exists(os.path.join(project_root, "bazel-bin")): - return project_root - - # Fallback to common paths - possible_paths = [ - "/opt/resilientdb", - "/home/ubuntu/incubator-resilientdb", - os.path.join(os.path.expanduser("~"), "resilientdb") - ] - - for path in possible_paths: - tool_path = os.path.join(path, "bazel-bin/service/tools/kv/api_tools/kv_service_tools") - if os.path.exists(tool_path): - return path - - return "/opt/resilientdb" - - def _get_tool_path(self): - """Get the path to the kv_service_tools binary.""" - return f"{self.project_root}/bazel-bin/service/tools/kv/api_tools/kv_service_tools" - - def _check_tool_exists(self): - """Check if the CLI tool exists.""" - tool_path = self._get_tool_path() - return os.path.exists(tool_path) and os.access(tool_path, os.X_OK) - - def start_seeding(self, count): - """Start data seeding job in background thread.""" - if self.seeding_running: - return { - "service": "ResLens Flamegraph Analysis Service", - "status": "error", - "message": "Seeding job already running" - } - - self.seeding_running = True - - def seeding_worker(): - for i in range(count): - if not self.seeding_running: - break - - key = f"key{random.randint(0, 499)}" - value = f"value{random.randint(0, 499)}" - - cmd = [ - self._get_tool_path(), - "--config", f"{self.project_root}/service/tools/config/interface/service.config", - "--cmd", "set", - "--key", key, - "--value", value - ] - - try: - subprocess.run(cmd, capture_output=True, text=True, timeout=30) - except (subprocess.TimeoutExpired, Exception): - # Silently continue on errors - no logging needed - pass - - time.sleep(0.1) - - with self.seeding_lock: - self.seeding_running = False - - self.seeding_thread = threading.Thread(target=seeding_worker, daemon=True) - self.seeding_thread.start() - - return { - "service": "ResLens Flamegraph Analysis Service", - "status": "success", - "message": f"Started seeding job with {count} operations" - } - - def stop_seeding(self): - """Stop the data seeding job.""" - if not self.seeding_running: - return { - "service": "ResLens Flamegraph Analysis Service", - "status": "error", - "message": "No seeding job running" - } - - self.seeding_running = False - if self.seeding_thread and self.seeding_thread.is_alive(): - self.seeding_thread.join(timeout=5) - - return { - "service": "ResLens Flamegraph Analysis Service", - "status": "success", - "message": "Seeding job stopped" - } - - def get_seeding_status(self): - """Get current seeding job status.""" - with self.seeding_lock: - return { - "service": "ResLens Flamegraph Analysis Service", - "status": "running" if self.seeding_running else "stopped" - } - -# Global service instance -service = ResLensToolsService() - [email protected]('/health', methods=['GET']) -def health(): - """Health check endpoint.""" - return jsonify({ - "service": "ResLens Flamegraph Analysis Service", - "status": "ok" - }) - [email protected]('/seed', methods=['POST']) -def start_seeding(): - """Start data seeding job.""" - try: - data = request.get_json() - if not data: - return jsonify({ - "service": "ResLens Flamegraph Analysis Service", - "status": "error", - "message": "No JSON data provided" - }), 400 - - count = data.get('count') - if count is None: - return jsonify({ - "service": "ResLens Flamegraph Analysis Service", - "status": "error", - "message": "Missing 'count' parameter" - }), 400 - - if not isinstance(count, int) or count <= 0: - return jsonify({ - "service": "ResLens Flamegraph Analysis Service", - "status": "error", - "message": "Count must be a positive integer" - }), 400 - - # Check if CLI tool exists before starting - if not service._check_tool_exists(): - return jsonify({ - "service": "ResLens Flamegraph Analysis Service", - "status": "error", - "message": f"CLI tool not found at {service._get_tool_path()}" - }), 503 - - return jsonify(service.start_seeding(count)) - - except Exception as e: - return jsonify({ - "service": "ResLens Flamegraph Analysis Service", - "status": "error", - "message": str(e) - }), 500 - [email protected]('/stop', methods=['POST']) -def stop_seeding(): - """Stop data seeding job.""" - return jsonify(service.stop_seeding()) - [email protected]('/status', methods=['GET']) -def get_status(): - """Get seeding job status.""" - return jsonify(service.get_seeding_status()) - [email protected]('/', methods=['GET']) -def root(): - """Root endpoint with service information.""" - return jsonify({ - "service": "ResLens Flamegraph Analysis Service", - "description": "HTTP service for executing ResilientDB random data operations", - "endpoints": { - "GET /health": "Health check", - "POST /seed": "Start data seeding job (JSON body: {\"count\": 5})", - "POST /stop": "Stop data seeding job", - "GET /status": "Get seeding job status" - }, - "example": { - "POST /seed": { - "body": {"count": 10}, - "response": "Starts background job to execute 10 random set operations" - }, - "POST /stop": { - "body": "{}", - "response": "Stops the running seeding job" - } - } - }) - -if __name__ == '__main__': - port = int(sys.argv[1]) if len(sys.argv) > 1 else 8080 - - print(f"Starting ResLens Flamegraph Analysis Service on port {port}") - print("Available endpoints:") - print(" GET /health - Health check") - print(" POST /seed - Start data seeding job") - print(" POST /stop - Stop data seeding job") - print(" GET /status - Get seeding job status") - print(" GET / - Service information") - - app.run(host='0.0.0.0', port=port, debug=False, threaded=True) \ No newline at end of file diff --git a/service/tools/kv/api_tools/monitoring/start_reslens_tools_service.sh b/service/tools/kv/api_tools/monitoring/start_reslens_tools_service.sh deleted file mode 100755 index f4ac21f6..00000000 --- a/service/tools/kv/api_tools/monitoring/start_reslens_tools_service.sh +++ /dev/null @@ -1,75 +0,0 @@ -#!/bin/bash -# -# Startup script for ResLens Flamegraph Analysis Service -# - -# Set the directory to the script location -cd "$(dirname "$0")" - -# Function to find and activate virtual environment -activate_venv() { - # Check for common virtual environment locations - local venv_paths=( - "venv" - "env" - ".venv" - ".env" - "../venv" - "../env" - "../../venv" - "../../env" - "/opt/resilientdb/venv" - "/opt/resilientdb/env" - ) - - for venv_path in "${venv_paths[@]}"; do - if [[ -d "$venv_path" && -f "$venv_path/bin/activate" ]]; then - echo "Found virtual environment at: $venv_path" - source "$venv_path/bin/activate" - return 0 - fi - done - - # Check if we're already in a virtual environment - if [[ -n "$VIRTUAL_ENV" ]]; then - echo "Already in virtual environment: $VIRTUAL_ENV" - return 0 - fi - - echo "No virtual environment found. Using system Python." - return 1 -} - -# Try to activate virtual environment -if activate_venv; then - echo "Using virtual environment: $VIRTUAL_ENV" -else - echo "Using system Python" -fi - -# Check Python version -python_version=$(python3 --version 2>&1) -echo "Python version: $python_version" - -# Check if gunicorn is installed -if ! python3 -c "import gunicorn" &> /dev/null; then - echo "Gunicorn not found. Installing..." - pip install gunicorn -fi - -# Check if Flask is installed -if ! python3 -c "import flask" &> /dev/null; then - echo "Flask not found. Installing..." - pip install flask flask-cors -fi - -echo "Starting ResLens Flamegraph Analysis Service with Gunicorn..." - -# Run with gunicorn -gunicorn \ - --config gunicorn.conf.py \ - --bind 0.0.0.0:8080 \ - --workers 1 \ - --timeout 30 \ - --log-level info \ - reslens_tools_service:app \ No newline at end of file diff --git a/service/tools/kv/server_tools/start_kv_service.sh b/service/tools/kv/server_tools/start_kv_service.sh index 3fd53602..6bd71b74 100755 --- a/service/tools/kv/server_tools/start_kv_service.sh +++ b/service/tools/kv/server_tools/start_kv_service.sh @@ -23,10 +23,11 @@ SERVER_CONFIG=service/tools/config/server/server.config WORK_PATH=$PWD CERT_PATH=${WORK_PATH}/service/tools/data/cert/ -./service/tools/kv/server_tools/generate_keys_and_certs.sh || { - echo "Failed to generate configs/certificates" 1>&2 - exit 1 -} +# This has to be a one time operation +# ./service/tools/kv/server_tools/generate_keys_and_certs.sh || { +# echo "Failed to generate configs/certificates" 1>&2 +# exit 1 +# } bazel build //service/kv:kv_service $@ nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node1.key.pri $CERT_PATH/cert_1.cert > server0.log &
