This is an automated email from the ASF dual-hosted git repository.
harishgokul01 pushed a commit to branch development
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/development by this push:
new e25688b6 reverting view change bug fixes and removing
reslens-tools-service
e25688b6 is described below
commit e25688b64aed807036e1d70ec5a79f6938bb53d3
Author: harish876 <[email protected]>
AuthorDate: Wed Dec 31 08:47:36 2025 +0000
reverting view change bug fixes and removing reslens-tools-service
---
.bazelrc | 3 +-
.../consensus/ordering/pbft/response_manager.cpp | 28 +--
.../consensus/ordering/pbft/viewchange_manager.cpp | 46 +---
.../ordering/pbft/viewchange_manager_test.cpp | 10 +-
scripts/deploy/script/restart_reslens.sh | 2 +-
.../tools/kv/api_tools/monitoring/gunicorn.conf.py | 38 ----
.../tools/kv/api_tools/monitoring/requirements.txt | 10 -
.../api_tools/monitoring/reslens_tools_service.py | 243 ---------------------
.../monitoring/start_reslens_tools_service.sh | 75 -------
service/tools/kv/server_tools/start_kv_service.sh | 9 +-
10 files changed, 23 insertions(+), 441 deletions(-)
diff --git a/.bazelrc b/.bazelrc
index 2bb0a3a5..43484445 100644
--- a/.bazelrc
+++ b/.bazelrc
@@ -1,4 +1,5 @@
-build --cxxopt='-std=c++17' --copt="-pg" --linkopt="-pg" --strip=never --jobs=2
+build --cxxopt='-std=c++17' --copt="-g" --copt="-fno-omit-frame-pointer"
--cxxopt="-fno-omit-frame-pointer" --linkopt="-g" --strip=never --jobs=2
+# build --cxxopt='-std=c++17' --copt="-pg" --linkopt="-pg" --linkopt="-g"
--strip=never --jobs=2
#build --action_env=PYTHON_BIN_PATH="/usr/bin/python3.10"
#build --action_env=PYTHON_LIB_PATH="/usr/include/python3.10"
diff --git a/platform/consensus/ordering/pbft/response_manager.cpp
b/platform/consensus/ordering/pbft/response_manager.cpp
index 7cfa9abd..ca80c4a2 100644
--- a/platform/consensus/ordering/pbft/response_manager.cpp
+++ b/platform/consensus/ordering/pbft/response_manager.cpp
@@ -125,13 +125,7 @@ int
ResponseManager::ProcessResponseMsg(std::unique_ptr<Context> context,
// The callback will be triggered if it received f+1 messages.
if (request->ret() == -2) {
LOG(ERROR) << "get response fail:" << request->ret();
- int current = send_num_.load();
- if (current > 0) {
- send_num_--;
- } else {
- LOG(ERROR) << "send_num_ is already " << current << ", not decrementing";
- send_num_.store(0);
- }
+ send_num_--;
return 0;
}
CollectorResultCode ret =
@@ -226,13 +220,7 @@ void ResponseManager::SendResponseToClient(
} else {
LOG(ERROR) << "seq:" << local_id << " no resp";
}
- int current = send_num_.load();
- if (current > 0) {
- send_num_--;
- } else {
- LOG(ERROR) << "send_num_ is already " << current << ", not decrementing";
- send_num_.store(0);
- }
+ send_num_--;
if (config_.IsPerformanceRunning()) {
return;
@@ -265,14 +253,8 @@ int ResponseManager::BatchProposeMsg() {
<< " batch num:" << config_.ClientBatchNum();
std::vector<std::unique_ptr<QueueItem>> batch_req;
while (!stop_) {
- int current_send_num = send_num_.load();
- if (current_send_num < 0) {
- LOG(ERROR) << "send_num_ is negative (" << current_send_num << "),
resetting to 0";
- send_num_.store(0);
- current_send_num = 0;
- }
- if (current_send_num > config_.GetMaxProcessTxn()) {
- LOG(ERROR) << "send num too high, wait:" << current_send_num;
+ if (send_num_ > config_.GetMaxProcessTxn()) {
+ LOG(ERROR) << "send num too high, wait:" << send_num_;
usleep(100);
continue;
}
@@ -425,4 +407,4 @@ void ResponseManager::MonitoringClientTimeOut() {
}
}
}
-} // namespace resdb
+} // namespace resdb
\ No newline at end of file
diff --git a/platform/consensus/ordering/pbft/viewchange_manager.cpp
b/platform/consensus/ordering/pbft/viewchange_manager.cpp
index bed36862..58900ce0 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager.cpp
+++ b/platform/consensus/ordering/pbft/viewchange_manager.cpp
@@ -129,14 +129,7 @@ void ViewChangeManager::MayStart() {
view_change_counter_++;
}
// std::lock_guard<std::mutex> lk(status_mutex_);
- LOG(ERROR) << "[VIEWCHANGE] Timeout handler triggered - current status: "
- << status_ << " attempting to change to READY_VIEW_CHANGE";
- bool changed = ChangeStatue(ViewChangeStatus::READY_VIEW_CHANGE);
- LOG(ERROR) << "[VIEWCHANGE] ChangeStatue returned: " << changed
- << " current status: " << status_;
- if (changed) {
- LOG(ERROR) << "[VIEWCHANGE] Status changed to READY_VIEW_CHANGE, sending
"
- "view change msg";
+ if (ChangeStatue(ViewChangeStatus::READY_VIEW_CHANGE)) {
SendViewChangeMsg();
auto viewchange_timer = std::make_shared<ViewChangeTimeout>(
ViewChangeTimerType::TYPE_VIEWCHANGE, system_info_->GetCurrentView(),
@@ -203,13 +196,10 @@ bool ViewChangeManager::IsValidViewChangeMsg(
}
std::string data;
proof.request().SerializeToString(&data);
- // Skip signature verification if verifier is null or disabled
- // if (verifier_ && config_.SignatureVerifierEnabled()) {
- // if (!verifier_->VerifyMessage(data, proof.signature())) {
- // LOG(ERROR) << "proof signature not valid";
- // return false;
- // }
- // }
+ if (!verifier_->VerifyMessage(data, proof.signature())) {
+ LOG(ERROR) << "proof signature not valid";
+ return false;
+ }
}
}
return true;
@@ -485,35 +475,21 @@ void ViewChangeManager::SendViewChangeMsg() {
// n (sequence number of the latest checkpoint) and C (proof for the stable
// checkpoint)
- LOG(ERROR) << "[VIEWCHANGE] Getting stable checkpoint with votes";
*view_change_message.mutable_stable_ckpt() =
checkpoint_manager_->GetStableCheckpointWithVotes();
- LOG(ERROR) << "[VIEWCHANGE] Got stable checkpoint, seq: "
- << view_change_message.stable_ckpt().seq();
// P - P is a set containing a set Pm for each request m that prepared at i
// with a sequence number higher than n.
- LOG(ERROR) << "[VIEWCHANGE] Getting highest prepared seq";
int max_seq = checkpoint_manager_->GetHighestPreparedSeq();
- LOG(ERROR) << "[VIEWCHANGE] Got highest prepared seq: " << max_seq;
- LOG(ERROR) << "[VIEWCHANGE] Check prepared or committed txns from "
- << view_change_message.stable_ckpt().seq() + 1 << " to "
- << max_seq;
-
- LOG(ERROR) << "[VIEWCHANGE] Checking prepared messages from "
- << (view_change_message.stable_ckpt().seq() + 1) << " to "
- << max_seq;
+ LOG(INFO) << "Check prepared or committed txns from "
+ << view_change_message.stable_ckpt().seq() + 1 << " to " <<
max_seq;
+
for (int i = view_change_message.stable_ckpt().seq() + 1; i <= max_seq; ++i)
{
// seq i has been prepared or committed.
- LOG(ERROR) << "[VIEWCHANGE] Checking seq: " << i;
if (message_manager_->GetTransactionState(i) >=
TransactionStatue::READY_COMMIT) {
- LOG(ERROR) << "[VIEWCHANGE] Seq " << i
- << " is READY_COMMIT, getting proof";
std::vector<RequestInfo> proof_info =
message_manager_->GetPreparedProof(i);
- LOG(ERROR) << "[VIEWCHANGE] Got proof for seq " << i
- << ", size: " << proof_info.size();
assert(proof_info.size() >= config_.GetMinDataReceiveNum());
auto txn = view_change_message.add_prepared_msg();
txn->set_seq(i);
@@ -524,16 +500,12 @@ void ViewChangeManager::SendViewChangeMsg() {
}
}
}
- LOG(ERROR) << "[VIEWCHANGE] Finished checking prepared messages";
// Broadcast my view change request.
- LOG(ERROR) << "[VIEWCHANGE] Creating view change request and broadcasting";
std::unique_ptr<Request> request = NewRequest(
Request::TYPE_VIEWCHANGE, Request(), config_.GetSelfInfo().id());
view_change_message.SerializeToString(request->mutable_data());
- LOG(ERROR) << "[VIEWCHANGE] About to call BroadCast";
replica_communicator_->BroadCast(*request);
- LOG(ERROR) << "[VIEWCHANGE] BroadCast called";
}
void ViewChangeManager::AddComplaintTimer(uint64_t proxy_id, std::string hash)
{
@@ -643,4 +615,4 @@ void
ViewChangeManager::SetDuplicateManager(DuplicateManager* manager) {
duplicate_manager_ = manager;
}
-} // namespace resdb
+} // namespace resdb
\ No newline at end of file
diff --git a/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
b/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
index 1803dd0b..7a2ec693 100644
--- a/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
+++ b/platform/consensus/ordering/pbft/viewchange_manager_test.cpp
@@ -33,9 +33,6 @@
#include "platform/consensus/ordering/pbft/transaction_utils.h"
#include "platform/networkstrate/mock_replica_communicator.h"
#include "platform/proto/checkpoint_info.pb.h"
-#include "common/crypto/signature_verifier.h"
-#include <thread>
-#include <chrono>
namespace resdb {
namespace {
@@ -85,12 +82,7 @@ TEST_F(ViewChangeManagerTest, SendViewChange) {
manager_->MayStart();
std::unique_ptr<Request> request = std::make_unique<Request>();
request->set_seq(1);
- request->set_data("test_data");
- request->set_hash(SignatureVerifier::CalculateHash("test_data"));
checkpoint_manager_->AddCommitData(std::move(request));
-
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
-
std::promise<bool> propose_done;
std::future<bool> propose_done_future = propose_done.get_future();
EXPECT_CALL(replica_communicator_, BroadCast)
@@ -129,4 +121,4 @@ TEST_F(ViewChangeManagerTest, SendNewView) {
} // namespace
-} // namespace resdb
+} // namespace resdb
\ No newline at end of file
diff --git a/scripts/deploy/script/restart_reslens.sh
b/scripts/deploy/script/restart_reslens.sh
index 229fe830..66ffe4e8 100755
--- a/scripts/deploy/script/restart_reslens.sh
+++ b/scripts/deploy/script/restart_reslens.sh
@@ -2,7 +2,7 @@
set -euo pipefail
-cd /home/ubuntu/resilient-monitoring/ResLens-Middleware
+cd
/home/ubuntu/resilient-ecosystem/DeepObserve/external/monitoring/ResLens-Middleware
# Stop existing containers (if any)
sudo docker-compose down || true
diff --git a/service/tools/kv/api_tools/monitoring/gunicorn.conf.py
b/service/tools/kv/api_tools/monitoring/gunicorn.conf.py
deleted file mode 100644
index d763514e..00000000
--- a/service/tools/kv/api_tools/monitoring/gunicorn.conf.py
+++ /dev/null
@@ -1,38 +0,0 @@
-#!/usr/bin/env python3
-"""
-Gunicorn configuration for ResLens Flamegraph Analysis Service
-"""
-
-# Server socket
-bind = "0.0.0.0:8080"
-backlog = 2048
-
-# Worker processes
-workers = 1 # Single worker for this service
-worker_class = "sync"
-worker_connections = 1000
-max_requests = 1000
-max_requests_jitter = 50
-
-# Timeout
-timeout = 30
-keepalive = 2
-
-# Logging
-accesslog = "-"
-errorlog = "-"
-loglevel = "info"
-access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s"
"%(a)s"'
-
-# Process naming
-proc_name = "reslens-flamegraph-service"
-
-# Preload app for better performance
-preload_app = True
-
-# Worker timeout
-graceful_timeout = 30
-
-# Restart workers after this many requests
-max_requests = 1000
-max_requests_jitter = 50
\ No newline at end of file
diff --git a/service/tools/kv/api_tools/monitoring/requirements.txt
b/service/tools/kv/api_tools/monitoring/requirements.txt
deleted file mode 100644
index 81e680cb..00000000
--- a/service/tools/kv/api_tools/monitoring/requirements.txt
+++ /dev/null
@@ -1,10 +0,0 @@
-blinker==1.9.0
-click==8.2.1
-Flask==3.1.1
-flask-cors==6.0.1
-gunicorn==23.0.0
-itsdangerous==2.2.0
-Jinja2==3.1.6
-MarkupSafe==3.0.2
-packaging==25.0
-Werkzeug==3.1.3
diff --git a/service/tools/kv/api_tools/monitoring/reslens_tools_service.py
b/service/tools/kv/api_tools/monitoring/reslens_tools_service.py
deleted file mode 100644
index 964e7311..00000000
--- a/service/tools/kv/api_tools/monitoring/reslens_tools_service.py
+++ /dev/null
@@ -1,243 +0,0 @@
-#!/usr/bin/env python3
-"""
-ResLens Flamegraph Analysis Service
-
-A simple HTTP service for executing ResilientDB random data operations
-as part of the ResLens monitoring and analysis toolkit.
-
-This utility is used mainly to seed data to create a long running data seeding
job for flamegroah analysis as these flamegraph processes need to run for more
than 30s.
-"""
-
-import os
-import sys
-import json
-import subprocess
-import random
-import threading
-import time
-from flask import Flask, request, jsonify
-from flask_cors import CORS
-
-app = Flask(__name__)
-CORS(app)
-
-class ResLensToolsService:
- def __init__(self):
- self.project_root = self._find_project_root()
- self.seeding_running = False
- self.seeding_thread = None
- self.seeding_lock = threading.Lock()
- print(f"ResLens Tools Service - Using project root:
{self.project_root}")
-
- def _find_project_root(self):
- """Find the ResilientDB project root directory."""
- resilientdb_root = os.getenv("RESILIENTDB_ROOT")
- if resilientdb_root:
- return resilientdb_root
-
- # Since we're now in service/tools/kv/api_tools/monitoring, go up to
project root
- current_dir = os.path.dirname(os.path.abspath(__file__))
- project_root = os.path.join(current_dir, "../../../../../../")
-
- # Verify this is the project root by checking for bazel-bin
- if os.path.exists(os.path.join(project_root, "bazel-bin")):
- return project_root
-
- # Fallback to common paths
- possible_paths = [
- "/opt/resilientdb",
- "/home/ubuntu/incubator-resilientdb",
- os.path.join(os.path.expanduser("~"), "resilientdb")
- ]
-
- for path in possible_paths:
- tool_path = os.path.join(path,
"bazel-bin/service/tools/kv/api_tools/kv_service_tools")
- if os.path.exists(tool_path):
- return path
-
- return "/opt/resilientdb"
-
- def _get_tool_path(self):
- """Get the path to the kv_service_tools binary."""
- return
f"{self.project_root}/bazel-bin/service/tools/kv/api_tools/kv_service_tools"
-
- def _check_tool_exists(self):
- """Check if the CLI tool exists."""
- tool_path = self._get_tool_path()
- return os.path.exists(tool_path) and os.access(tool_path, os.X_OK)
-
- def start_seeding(self, count):
- """Start data seeding job in background thread."""
- if self.seeding_running:
- return {
- "service": "ResLens Flamegraph Analysis Service",
- "status": "error",
- "message": "Seeding job already running"
- }
-
- self.seeding_running = True
-
- def seeding_worker():
- for i in range(count):
- if not self.seeding_running:
- break
-
- key = f"key{random.randint(0, 499)}"
- value = f"value{random.randint(0, 499)}"
-
- cmd = [
- self._get_tool_path(),
- "--config",
f"{self.project_root}/service/tools/config/interface/service.config",
- "--cmd", "set",
- "--key", key,
- "--value", value
- ]
-
- try:
- subprocess.run(cmd, capture_output=True, text=True,
timeout=30)
- except (subprocess.TimeoutExpired, Exception):
- # Silently continue on errors - no logging needed
- pass
-
- time.sleep(0.1)
-
- with self.seeding_lock:
- self.seeding_running = False
-
- self.seeding_thread = threading.Thread(target=seeding_worker,
daemon=True)
- self.seeding_thread.start()
-
- return {
- "service": "ResLens Flamegraph Analysis Service",
- "status": "success",
- "message": f"Started seeding job with {count} operations"
- }
-
- def stop_seeding(self):
- """Stop the data seeding job."""
- if not self.seeding_running:
- return {
- "service": "ResLens Flamegraph Analysis Service",
- "status": "error",
- "message": "No seeding job running"
- }
-
- self.seeding_running = False
- if self.seeding_thread and self.seeding_thread.is_alive():
- self.seeding_thread.join(timeout=5)
-
- return {
- "service": "ResLens Flamegraph Analysis Service",
- "status": "success",
- "message": "Seeding job stopped"
- }
-
- def get_seeding_status(self):
- """Get current seeding job status."""
- with self.seeding_lock:
- return {
- "service": "ResLens Flamegraph Analysis Service",
- "status": "running" if self.seeding_running else "stopped"
- }
-
-# Global service instance
-service = ResLensToolsService()
-
[email protected]('/health', methods=['GET'])
-def health():
- """Health check endpoint."""
- return jsonify({
- "service": "ResLens Flamegraph Analysis Service",
- "status": "ok"
- })
-
[email protected]('/seed', methods=['POST'])
-def start_seeding():
- """Start data seeding job."""
- try:
- data = request.get_json()
- if not data:
- return jsonify({
- "service": "ResLens Flamegraph Analysis Service",
- "status": "error",
- "message": "No JSON data provided"
- }), 400
-
- count = data.get('count')
- if count is None:
- return jsonify({
- "service": "ResLens Flamegraph Analysis Service",
- "status": "error",
- "message": "Missing 'count' parameter"
- }), 400
-
- if not isinstance(count, int) or count <= 0:
- return jsonify({
- "service": "ResLens Flamegraph Analysis Service",
- "status": "error",
- "message": "Count must be a positive integer"
- }), 400
-
- # Check if CLI tool exists before starting
- if not service._check_tool_exists():
- return jsonify({
- "service": "ResLens Flamegraph Analysis Service",
- "status": "error",
- "message": f"CLI tool not found at {service._get_tool_path()}"
- }), 503
-
- return jsonify(service.start_seeding(count))
-
- except Exception as e:
- return jsonify({
- "service": "ResLens Flamegraph Analysis Service",
- "status": "error",
- "message": str(e)
- }), 500
-
[email protected]('/stop', methods=['POST'])
-def stop_seeding():
- """Stop data seeding job."""
- return jsonify(service.stop_seeding())
-
[email protected]('/status', methods=['GET'])
-def get_status():
- """Get seeding job status."""
- return jsonify(service.get_seeding_status())
-
[email protected]('/', methods=['GET'])
-def root():
- """Root endpoint with service information."""
- return jsonify({
- "service": "ResLens Flamegraph Analysis Service",
- "description": "HTTP service for executing ResilientDB random data
operations",
- "endpoints": {
- "GET /health": "Health check",
- "POST /seed": "Start data seeding job (JSON body: {\"count\": 5})",
- "POST /stop": "Stop data seeding job",
- "GET /status": "Get seeding job status"
- },
- "example": {
- "POST /seed": {
- "body": {"count": 10},
- "response": "Starts background job to execute 10 random set
operations"
- },
- "POST /stop": {
- "body": "{}",
- "response": "Stops the running seeding job"
- }
- }
- })
-
-if __name__ == '__main__':
- port = int(sys.argv[1]) if len(sys.argv) > 1 else 8080
-
- print(f"Starting ResLens Flamegraph Analysis Service on port {port}")
- print("Available endpoints:")
- print(" GET /health - Health check")
- print(" POST /seed - Start data seeding job")
- print(" POST /stop - Stop data seeding job")
- print(" GET /status - Get seeding job status")
- print(" GET / - Service information")
-
- app.run(host='0.0.0.0', port=port, debug=False, threaded=True)
\ No newline at end of file
diff --git
a/service/tools/kv/api_tools/monitoring/start_reslens_tools_service.sh
b/service/tools/kv/api_tools/monitoring/start_reslens_tools_service.sh
deleted file mode 100755
index f4ac21f6..00000000
--- a/service/tools/kv/api_tools/monitoring/start_reslens_tools_service.sh
+++ /dev/null
@@ -1,75 +0,0 @@
-#!/bin/bash
-#
-# Startup script for ResLens Flamegraph Analysis Service
-#
-
-# Set the directory to the script location
-cd "$(dirname "$0")"
-
-# Function to find and activate virtual environment
-activate_venv() {
- # Check for common virtual environment locations
- local venv_paths=(
- "venv"
- "env"
- ".venv"
- ".env"
- "../venv"
- "../env"
- "../../venv"
- "../../env"
- "/opt/resilientdb/venv"
- "/opt/resilientdb/env"
- )
-
- for venv_path in "${venv_paths[@]}"; do
- if [[ -d "$venv_path" && -f "$venv_path/bin/activate" ]]; then
- echo "Found virtual environment at: $venv_path"
- source "$venv_path/bin/activate"
- return 0
- fi
- done
-
- # Check if we're already in a virtual environment
- if [[ -n "$VIRTUAL_ENV" ]]; then
- echo "Already in virtual environment: $VIRTUAL_ENV"
- return 0
- fi
-
- echo "No virtual environment found. Using system Python."
- return 1
-}
-
-# Try to activate virtual environment
-if activate_venv; then
- echo "Using virtual environment: $VIRTUAL_ENV"
-else
- echo "Using system Python"
-fi
-
-# Check Python version
-python_version=$(python3 --version 2>&1)
-echo "Python version: $python_version"
-
-# Check if gunicorn is installed
-if ! python3 -c "import gunicorn" &> /dev/null; then
- echo "Gunicorn not found. Installing..."
- pip install gunicorn
-fi
-
-# Check if Flask is installed
-if ! python3 -c "import flask" &> /dev/null; then
- echo "Flask not found. Installing..."
- pip install flask flask-cors
-fi
-
-echo "Starting ResLens Flamegraph Analysis Service with Gunicorn..."
-
-# Run with gunicorn
-gunicorn \
- --config gunicorn.conf.py \
- --bind 0.0.0.0:8080 \
- --workers 1 \
- --timeout 30 \
- --log-level info \
- reslens_tools_service:app
\ No newline at end of file
diff --git a/service/tools/kv/server_tools/start_kv_service.sh
b/service/tools/kv/server_tools/start_kv_service.sh
index 3fd53602..6bd71b74 100755
--- a/service/tools/kv/server_tools/start_kv_service.sh
+++ b/service/tools/kv/server_tools/start_kv_service.sh
@@ -23,10 +23,11 @@ SERVER_CONFIG=service/tools/config/server/server.config
WORK_PATH=$PWD
CERT_PATH=${WORK_PATH}/service/tools/data/cert/
-./service/tools/kv/server_tools/generate_keys_and_certs.sh || {
- echo "Failed to generate configs/certificates" 1>&2
- exit 1
-}
+# This has to be a one time operation
+# ./service/tools/kv/server_tools/generate_keys_and_certs.sh || {
+# echo "Failed to generate configs/certificates" 1>&2
+# exit 1
+# }
bazel build //service/kv:kv_service $@
nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node1.key.pri
$CERT_PATH/cert_1.cert > server0.log &