This is an automated email from the ASF dual-hosted git repository.
harishgokul01 pushed a commit to branch development
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/development by this push:
new 23cf1226 very cool bpf tracing
23cf1226 is described below
commit 23cf12263c18b84cb310c7efbb0c59fa0ce44f3e
Author: bchou9 <[email protected]>
AuthorDate: Sat Jan 10 10:42:21 2026 +0000
very cool bpf tracing
---
.../consensus/execution/transaction_executor.cpp | 28 +++++
platform/consensus/ordering/pbft/commitment.cpp | 44 ++++++++
.../ordering/pbft/consensus_manager_pbft.cpp | 7 ++
platform/statistic/BUILD | 10 +-
platform/statistic/stats.cpp | 119 ++++++++++-----------
platform/statistic/stats.h | 10 +-
platform/statistic/trace_hooks.cpp | 101 +++++++++++++++++
platform/statistic/trace_hooks.h | 104 ++++++++++++++++++
8 files changed, 359 insertions(+), 64 deletions(-)
diff --git a/platform/consensus/execution/transaction_executor.cpp
b/platform/consensus/execution/transaction_executor.cpp
index 34eb3e3e..60148030 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -21,7 +21,10 @@
#include <glog/logging.h>
+#include <chrono>
+
#include "common/utils/utils.h"
+#include "platform/statistic/trace_hooks.h"
namespace resdb {
@@ -240,6 +243,8 @@ void TransactionExecutor::ExecuteMessageOutOfOrder() {
}
void TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
+ uint64_t req_ptr = reinterpret_cast<uint64_t>(request.get());
+ uint32_t req_type = static_cast<uint32_t>(request->type());
// Only Execute the request.
BatchUserRequest batch_request;
if (!batch_request.ParseFromString(request->data())) {
@@ -257,6 +262,11 @@ void
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
// id:"<<request->proxy_id();
std::unique_ptr<BatchUserResponse> response;
global_stats_->GetTransactionDetails(batch_request);
+ uint64_t meta_execute_start =
+ ResdbTracePackMeta(req_type, /*sender_id=*/0,
+ static_cast<uint32_t>(config_.GetSelfInfo().id()));
+ resdb_trace_pbft_execute_start(req_ptr, batch_request.seq(),
meta_execute_start,
+ batch_request.proxy_id(), ResdbTraceEpochNs());
global_stats_->RecordExecuteStart(batch_request.seq());
if (transaction_manager_) {
response = transaction_manager_->ExecuteBatchWithSeq(request->seq(),
@@ -266,10 +276,17 @@ void
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
// global_stats_->IncTotalRequest(batch_request.user_requests_size());
// global_stats_->IncExecuteDone();
global_stats_->RecordExecuteEnd(batch_request.seq());
+ uint64_t meta_execute_end =
+ ResdbTracePackMeta(req_type, /*sender_id=*/0,
+ static_cast<uint32_t>(config_.GetSelfInfo().id()));
+ resdb_trace_pbft_execute_end(req_ptr, batch_request.seq(), meta_execute_end,
+ batch_request.proxy_id(), ResdbTraceEpochNs());
}
void TransactionExecutor::Execute(std::unique_ptr<Request> request,
bool need_execute) {
+ uint64_t req_ptr = reinterpret_cast<uint64_t>(request.get());
+ uint32_t req_type = static_cast<uint32_t>(request->type());
std::unique_ptr<BatchUserRequest> batch_request = nullptr;
std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
data;
std::vector<std::unique_ptr<google::protobuf::Message>>* data_p = nullptr;
@@ -306,6 +323,12 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
global_stats_->GetTransactionDetails(*batch_request_p);
uint64_t seq = request->seq();
// Timeline: execution start
+ uint64_t meta_execute_start =
+ ResdbTracePackMeta(req_type, /*sender_id=*/0,
+ static_cast<uint32_t>(config_.GetSelfInfo().id()));
+ resdb_trace_pbft_execute_start(req_ptr, seq, meta_execute_start,
+ batch_request_p->proxy_id(),
+ ResdbTraceEpochNs());
global_stats_->RecordExecuteStart(seq);
if (transaction_manager_ && need_execute) {
if (execute_thread_num_ == 1) {
@@ -372,6 +395,11 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
// Timeline: execution end
global_stats_->RecordExecuteEnd(seq);
+ uint64_t meta_execute_end =
+ ResdbTracePackMeta(req_type, /*sender_id=*/0,
+ static_cast<uint32_t>(config_.GetSelfInfo().id()));
+ resdb_trace_pbft_execute_end(req_ptr, seq, meta_execute_end,
+ batch_request_p->proxy_id(),
ResdbTraceEpochNs());
global_stats_->IncExecuteDone();
}
diff --git a/platform/consensus/ordering/pbft/commitment.cpp
b/platform/consensus/ordering/pbft/commitment.cpp
index 0f4a970d..134cdc53 100644
--- a/platform/consensus/ordering/pbft/commitment.cpp
+++ b/platform/consensus/ordering/pbft/commitment.cpp
@@ -24,6 +24,7 @@
#include "common/utils/utils.h"
#include "platform/consensus/ordering/pbft/transaction_utils.h"
+#include "platform/statistic/trace_hooks.h"
namespace resdb {
@@ -136,6 +137,12 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context>
context,
}
global_stats_->RecordStateTime(*seq, "request");
+ uint64_t meta_request = ResdbTracePackMeta(
+ static_cast<uint32_t>(user_request->type()), /*sender_id=*/0,
+ static_cast<uint32_t>(config_.GetSelfInfo().id()));
+ resdb_trace_pbft_request(reinterpret_cast<uint64_t>(user_request.get()),
*seq,
+ meta_request, user_request->proxy_id(),
+ ResdbTraceEpochNs());
user_request->set_type(Request::TYPE_PRE_PREPARE);
user_request->set_current_view(message_manager_->GetCurrentView());
@@ -230,6 +237,19 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context>
context,
global_stats_->IncPropose();
global_stats_->RecordStateTime(request->seq(), "pre-prepare");
+ uint64_t meta_pre_prepare = ResdbTracePackMeta(
+ static_cast<uint32_t>(request->type()),
+ static_cast<uint32_t>(request->sender_id()),
+ static_cast<uint32_t>(config_.GetSelfInfo().id()));
+ resdb_trace_pbft_pre_prepare(reinterpret_cast<uint64_t>(request.get()),
+ request->seq(), meta_pre_prepare,
+ request->proxy_id(), ResdbTraceEpochNs());
+
+ // Keep epoch_ns aligned to Stats' system_clock domain for drop-in timeline
+ // compatibility.
+ // Note: system_clock can jump (NTP); monotonic nsecs remains best for
deltas.
+ // We emit both.
+ // (epoch_ns passed to per-phase hooks; ConsensusCommit hook remains
monotonic-only)
std::unique_ptr<Request> prepare_request = resdb::NewRequest(
Request::TYPE_PREPARE, *request, config_.GetSelfInfo().id());
prepare_request->clear_data();
@@ -265,7 +285,13 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context>
context,
}
uint64_t seq = request->seq();
int sender_id = request->sender_id();
+ uint64_t req_ptr = reinterpret_cast<uint64_t>(request.get());
+ uint64_t meta_prepare_recv = ResdbTracePackMeta(
+ static_cast<uint32_t>(request->type()), static_cast<uint32_t>(sender_id),
+ static_cast<uint32_t>(config_.GetSelfInfo().id()));
global_stats_->RecordPrepareRecv(seq, sender_id);
+ resdb_trace_pbft_prepare_recv(
+ req_ptr, seq, meta_prepare_recv, request->proxy_id(),
ResdbTraceEpochNs());
global_stats_->IncPrepare(seq);
std::unique_ptr<Request> commit_request = resdb::NewRequest(
Request::TYPE_COMMIT, *request, config_.GetSelfInfo().id());
@@ -291,6 +317,12 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context>
context,
// << commit_request->data_signature().DebugString();
}
global_stats_->RecordStateTime(seq, "prepare");
+ uint64_t meta_prepare_state = ResdbTracePackMeta(
+ static_cast<uint32_t>(Request::TYPE_PREPARE), /*sender_id=*/0,
+ static_cast<uint32_t>(config_.GetSelfInfo().id()));
+ resdb_trace_pbft_prepare_state(req_ptr, seq, meta_prepare_state,
+ commit_request->proxy_id(),
+ ResdbTraceEpochNs());
replica_communicator_->BroadCast(*commit_request);
}
return ret == CollectorResultCode::INVALID ? -2 : 0;
@@ -306,11 +338,18 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context>
context,
}
uint64_t seq = request->seq();
int sender_id = request->sender_id();
+ uint64_t proxy_id = request->proxy_id();
+ uint64_t req_ptr = reinterpret_cast<uint64_t>(request.get());
+ uint64_t meta_commit_recv = ResdbTracePackMeta(
+ static_cast<uint32_t>(request->type()), static_cast<uint32_t>(sender_id),
+ static_cast<uint32_t>(config_.GetSelfInfo().id()));
if (request->is_recovery()) {
return message_manager_->AddConsensusMsg(context->signature,
std::move(request));
}
global_stats_->RecordCommitRecv(seq, sender_id);
+ resdb_trace_pbft_commit_recv(
+ req_ptr, seq, meta_commit_recv, proxy_id, ResdbTraceEpochNs());
global_stats_->IncCommit(seq);
// Add request to message_manager.
// If it has received enough same requests(2f+1), message manager will
@@ -321,6 +360,11 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context>
context,
// LOG(ERROR)<<request->data().size();
// global_stats_->GetTransactionDetails(request->data());
global_stats_->RecordStateTime(seq, "commit");
+ uint64_t meta_commit_state = ResdbTracePackMeta(
+ static_cast<uint32_t>(Request::TYPE_COMMIT), /*sender_id=*/0,
+ static_cast<uint32_t>(config_.GetSelfInfo().id()));
+ resdb_trace_pbft_commit_state(req_ptr, seq, meta_commit_state, proxy_id,
+ ResdbTraceEpochNs());
}
return ret == CollectorResultCode::INVALID ? -2 : 0;
}
diff --git a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
index d7b38807..7dab7340 100644
--- a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
+++ b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp
@@ -19,10 +19,12 @@
#include "platform/consensus/ordering/pbft/consensus_manager_pbft.h"
+#include <cstdint>
#include <glog/logging.h>
#include <unistd.h>
#include "common/crypto/signature_verifier.h"
+#include "platform/statistic/trace_hooks.h"
namespace resdb {
@@ -148,6 +150,11 @@ ConsensusManagerPBFT::PopComplainedRequest() {
// The implementation of PBFT.
int ConsensusManagerPBFT::ConsensusCommit(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
+ resdb_trace_consensus_commit(
+ reinterpret_cast<uint64_t>(request.get()), request->seq(),
+ static_cast<uint32_t>(request->type()),
+ static_cast<uint32_t>(request->sender_id()), request->proxy_id());
+
LOG(INFO) << "recv impl type:" << request->type() << " "
<< "sender id:" << request->sender_id()
<< " primary:" << system_info_->GetPrimaryId();
diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD
index 81cafe7f..a094900d 100644
--- a/platform/statistic/BUILD
+++ b/platform/statistic/BUILD
@@ -24,8 +24,14 @@ package(default_visibility = [
cc_library(
name = "stats",
- srcs = ["stats.cpp"],
- hdrs = ["stats.h"],
+ srcs = [
+ "stats.cpp",
+ "trace_hooks.cpp",
+ ],
+ hdrs = [
+ "stats.h",
+ "trace_hooks.h",
+ ],
deps = [
":prometheus_handler",
"//common:asio",
diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp
index 8fd1a57f..c1a328f1 100644
--- a/platform/statistic/stats.cpp
+++ b/platform/statistic/stats.cpp
@@ -142,11 +142,29 @@ void Stats::CrowRoute() {
"Access-Control-Allow-Headers",
"Content-Type, Authorization"); // Specify allowed headers
- // Send your response - includes all consensus history with
timeline events
- {
- std::lock_guard<std::mutex> lock(consensus_history_mutex_);
- res.body = consensus_history_.dump();
+ // Send only the most recently executed sequence telemetry.
+ // We intentionally do not retain an unbounded history here.
+ uint64_t seq = last_executed_seq_.load();
+ nlohmann::json result;
+ result["seq_number"] = seq;
+ if (seq != 0) {
+ size_t shard = GetShardIndex(seq);
+ {
+ std::lock_guard<std::mutex> lock(telemetry_mutex_[shard]);
+ auto& map = transaction_telemetry_map_[shard];
+ auto it = map.find(seq);
+ if (it != map.end()) {
+ result = it->second.to_json();
+ } else {
+ result["error"] =
+ "Latest executed sequence not found in telemetry map";
+ }
+ }
+ } else {
+ result["error"] = "No executed sequence recorded yet";
}
+
+ res.body = result.dump();
res.end();
});
CROW_ROUTE(app, "/consensus_data/<int>")
@@ -160,34 +178,14 @@ void Stats::CrowRoute() {
"Content-Type, Authorization");
nlohmann::json result;
-
- // First check consensus_history_
- {
- std::lock_guard<std::mutex> lock(consensus_history_mutex_);
- std::string seq_str = std::to_string(seq_number);
- if (consensus_history_.find(seq_str) !=
consensus_history_.end()) {
- result = consensus_history_[seq_str];
- }
- }
-
- // Also check live telemetry map (may have more recent data)
uint64_t seq = static_cast<uint64_t>(seq_number);
- size_t shard = seq % 256; // kTelemetryShards
+ size_t shard = GetShardIndex(seq);
{
std::lock_guard<std::mutex> lock(telemetry_mutex_[shard]);
auto& map = transaction_telemetry_map_[shard];
auto it = map.find(seq);
if (it != map.end()) {
- // Merge live telemetry data (may be more complete)
- nlohmann::json live_data = it->second.to_json();
-
- // If we have data from consensus_history_, merge it
- if (!result.empty()) {
- // Prefer live data for most fields, but keep timeline
events from both
- result.update(live_data);
- } else {
- result = live_data;
- }
+ result = it->second.to_json();
}
}
@@ -282,9 +280,18 @@ bool Stats::IsFaulty() { return make_faulty_.load(); }
void Stats::ChangePrimary(int primary_id) {
transaction_summary_.primary_id = primary_id;
+ static_telemetry_info_.primary_id = primary_id;
make_faulty_.store(false);
- // Detect view change
+ // Update all existing transaction telemetry entries with new primary
+ for (size_t shard = 0; shard < kTelemetryShards; ++shard) {
+ std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+ auto& map = transaction_telemetry_map_[shard];
+ for (auto& entry : map) {
+ entry.second.primary_id = primary_id;
+ }
+ }
+
if (prometheus_ && previous_primary_id_ != -1 &&
previous_primary_id_ != primary_id) {
prometheus_->Inc(VIEW_CHANGE, 1);
@@ -312,6 +319,15 @@ void Stats::SetPrimaryId(int primary_id) {
transaction_summary_.primary_id = primary_id;
static_telemetry_info_.primary_id = primary_id;
+ // Update all existing transaction telemetry entries with new primary
+ for (size_t shard = 0; shard < kTelemetryShards; ++shard) {
+ std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
+ auto& map = transaction_telemetry_map_[shard];
+ for (auto& entry : map) {
+ entry.second.primary_id = primary_id;
+ }
+ }
+
if (prometheus_ && previous_primary_id_ != -1 &&
previous_primary_id_ != primary_id) {
prometheus_->Inc(VIEW_CHANGE, 1);
@@ -540,6 +556,13 @@ void Stats::RecordExecuteStart(uint64_t seq) {
}
void Stats::RecordExecuteEnd(uint64_t seq) {
+ // Track last executed sequence even if resview is disabled.
+ {
+ uint64_t cur = last_executed_seq_.load();
+ while (seq > cur && !last_executed_seq_.compare_exchange_weak(cur, seq)) {
+ // retry with updated cur
+ }
+ }
if (!enable_resview) {
return;
}
@@ -697,12 +720,6 @@ void Stats::SendSummary() {
summary_json_["ext_cache_hit_ratio"] =
transaction_summary_.ext_cache_hit_ratio_;
- {
- std::lock_guard<std::mutex> history_lock(consensus_history_mutex_);
- consensus_history_[std::to_string(transaction_summary_.txn_number)] =
- summary_json_;
- }
-
LOG(ERROR) << summary_json_.dump();
// Reset Transaction Summary Parameters
@@ -725,6 +742,15 @@ void Stats::SendSummary(uint64_t seq) {
return;
}
+ // Summary is generated after execution; treat this as another signal for the
+ // latest executed sequence.
+ {
+ uint64_t cur = last_executed_seq_.load();
+ while (seq > cur && !last_executed_seq_.compare_exchange_weak(cur, seq)) {
+ // retry with updated cur
+ }
+ }
+
size_t shard = GetShardIndex(seq);
std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]);
@@ -753,33 +779,6 @@ void Stats::SendSummary(uint64_t seq) {
// it
nlohmann::json summary_json = telemetry.to_json();
- {
- std::lock_guard<std::mutex> history_lock(consensus_history_mutex_);
- // Only update if we don't already have this sequence, or update existing
- // This prevents overwriting good data with bad data
- std::string seq_str = std::to_string(seq);
- if (consensus_history_.find(seq_str) == consensus_history_.end()) {
- // First time storing - always store
- consensus_history_[seq_str] = summary_json;
- } else {
- // Already exists - only update if new data has more complete information
- // (i.e., has transaction details or more timestamps)
- auto& existing = consensus_history_[seq_str];
- bool new_has_details = !summary_json["txn_commands"].empty();
- bool existing_has_details = !existing["txn_commands"].empty();
-
- // If new data has details and existing doesn't, merge them
- if (new_has_details && !existing_has_details) {
- existing["txn_commands"] = summary_json["txn_commands"];
- existing["txn_keys"] = summary_json["txn_keys"];
- existing["txn_values"] = summary_json["txn_values"];
- }
-
- // Always update execution_time as it's the latest
- existing["execution_time"] = summary_json["execution_time"];
- }
- }
-
LOG(ERROR) << summary_json.dump();
// DON'T delete the entry - per-sequence state means each seq has its own
diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h
index 0f29d8aa..d678f3ac 100644
--- a/platform/statistic/stats.h
+++ b/platform/statistic/stats.h
@@ -25,6 +25,7 @@
#include <future>
#include <mutex>
#include <nlohmann/json.hpp>
+#include <atomic>
#include <unordered_map>
#include "boost/asio.hpp"
@@ -227,6 +228,10 @@ class Stats {
void ServerProcess();
void SetPrometheus(const std::string& prometheus_address);
+ // Latest executed consensus sequence observed by this replica.
+ // This is updated when execution completes (and also when summaries are
sent).
+ uint64_t GetLastExecutedSeq() const { return last_executed_seq_.load(); }
+
protected:
Stats(int sleep_time = 5);
~Stats();
@@ -286,8 +291,9 @@ class Stats {
std::atomic<uint64_t> prev_num_prepare_;
std::atomic<uint64_t> prev_num_commit_;
nlohmann::json summary_json_;
- nlohmann::json consensus_history_;
- std::mutex consensus_history_mutex_; // Protect consensus_history_ access
+
+ // Tracks the most recently executed sequence number.
+ std::atomic<uint64_t> last_executed_seq_{0};
int previous_primary_id_ = -1; // Track for view change detection
diff --git a/platform/statistic/trace_hooks.cpp
b/platform/statistic/trace_hooks.cpp
new file mode 100644
index 00000000..5cba98a8
--- /dev/null
+++ b/platform/statistic/trace_hooks.cpp
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "platform/statistic/trace_hooks.h"
+
+// Keep the hook functions and their arguments from being optimized away.
+//
+// These functions intentionally do not do any work besides providing a stable
+// symbol that uprobes can attach to and a well-defined argument ABI.
+
+extern "C" {
+
+__attribute__((noinline)) void resdb_trace_consensus_commit(
+ uint64_t req_ptr, uint64_t seq, uint32_t type, uint32_t sender_id,
+ uint64_t proxy_id) {
+ asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(type), "r"(sender_id),
+ "r"(proxy_id)
+ : "memory");
+}
+
+__attribute__((noinline)) void resdb_trace_pbft_request(
+ uint64_t req_ptr, uint64_t seq, uint64_t meta, uint64_t proxy_id,
+ int64_t epoch_ns) {
+ asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(meta), "r"(proxy_id),
+ "r"(epoch_ns)
+ : "memory");
+}
+
+__attribute__((noinline)) void resdb_trace_pbft_pre_prepare(
+ uint64_t req_ptr, uint64_t seq, uint64_t meta, uint64_t proxy_id,
+ int64_t epoch_ns) {
+ asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(meta), "r"(proxy_id),
+ "r"(epoch_ns)
+ : "memory");
+}
+
+__attribute__((noinline)) void resdb_trace_pbft_prepare_recv(
+ uint64_t req_ptr, uint64_t seq, uint64_t meta, uint64_t proxy_id,
+ int64_t epoch_ns) {
+ asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(meta), "r"(proxy_id),
+ "r"(epoch_ns)
+ : "memory");
+}
+
+__attribute__((noinline)) void resdb_trace_pbft_prepare_state(
+ uint64_t req_ptr, uint64_t seq, uint64_t meta, uint64_t proxy_id,
+ int64_t epoch_ns) {
+ asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(meta), "r"(proxy_id),
+ "r"(epoch_ns)
+ : "memory");
+}
+
+__attribute__((noinline)) void resdb_trace_pbft_commit_recv(
+ uint64_t req_ptr, uint64_t seq, uint64_t meta, uint64_t proxy_id,
+ int64_t epoch_ns) {
+ asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(meta), "r"(proxy_id),
+ "r"(epoch_ns)
+ : "memory");
+}
+
+__attribute__((noinline)) void resdb_trace_pbft_commit_state(
+ uint64_t req_ptr, uint64_t seq, uint64_t meta, uint64_t proxy_id,
+ int64_t epoch_ns) {
+ asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(meta), "r"(proxy_id),
+ "r"(epoch_ns)
+ : "memory");
+}
+
+__attribute__((noinline)) void resdb_trace_pbft_execute_start(
+ uint64_t req_ptr, uint64_t seq, uint64_t meta, uint64_t proxy_id,
+ int64_t epoch_ns) {
+ asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(meta), "r"(proxy_id),
+ "r"(epoch_ns)
+ : "memory");
+}
+
+__attribute__((noinline)) void resdb_trace_pbft_execute_end(
+ uint64_t req_ptr, uint64_t seq, uint64_t meta, uint64_t proxy_id,
+ int64_t epoch_ns) {
+ asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(meta), "r"(proxy_id),
+ "r"(epoch_ns)
+ : "memory");
+}
+
+} // extern "C"
diff --git a/platform/statistic/trace_hooks.h b/platform/statistic/trace_hooks.h
new file mode 100644
index 00000000..438e5249
--- /dev/null
+++ b/platform/statistic/trace_hooks.h
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <chrono>
+#include <cstdint>
+
+// Unmangled trace hooks intended for user-space uprobes (e.g., bpftrace).
+//
+// Design goals:
+// - stable, easy-to-type symbol names (no C++ mangling)
+// - minimal overhead
+// - arguments are plain integers (no std::string, no protobuf types)
+//
+// Convention:
+// seq: PBFT sequence number
+// sender_id: sender replica id when applicable, else 0
+// self_id: local replica id
+// proxy_id: client proxy id when available, else 0
+
+extern "C" {
+
+// Existing request-level hook at ConsensusCommit entry.
+// arg0=req_ptr, arg1=seq, arg2=type, arg3=sender_id, arg4=proxy_id
+void resdb_trace_consensus_commit(uint64_t req_ptr, uint64_t seq, uint32_t
type,
+ uint32_t sender_id, uint64_t proxy_id);
+
+// PBFT phase hooks (mirrors the phases Stats emits).
+// NOTE: bpftrace user-space uprobes on x86_64 only expose arg0..arg5.
+// To keep hooks universally traceable, we pack fields into a single 64-bit
+// meta value:
+// meta[0..31] = type (uint32)
+// meta[32..47] = sender_id (uint16)
+// meta[48..63] = self_id (uint16)
+// epoch_ns is nanoseconds since Unix epoch (system_clock / CLOCK_REALTIME).
+void resdb_trace_pbft_request(uint64_t req_ptr, uint64_t seq, uint64_t meta,
+ uint64_t proxy_id, int64_t epoch_ns);
+void resdb_trace_pbft_pre_prepare(uint64_t req_ptr, uint64_t seq, uint64_t
meta,
+ uint64_t proxy_id, int64_t epoch_ns);
+
+void resdb_trace_pbft_prepare_recv(uint64_t req_ptr, uint64_t seq, uint64_t
meta,
+ uint64_t proxy_id, int64_t epoch_ns);
+void resdb_trace_pbft_prepare_state(uint64_t req_ptr, uint64_t seq, uint64_t
meta,
+ uint64_t proxy_id, int64_t epoch_ns);
+
+void resdb_trace_pbft_commit_recv(uint64_t req_ptr, uint64_t seq, uint64_t
meta,
+ uint64_t proxy_id, int64_t epoch_ns);
+void resdb_trace_pbft_commit_state(uint64_t req_ptr, uint64_t seq, uint64_t
meta,
+ uint64_t proxy_id, int64_t epoch_ns);
+
+void resdb_trace_pbft_execute_start(uint64_t req_ptr, uint64_t seq, uint64_t
meta,
+ uint64_t proxy_id, int64_t epoch_ns);
+void resdb_trace_pbft_execute_end(uint64_t req_ptr, uint64_t seq, uint64_t
meta,
+ uint64_t proxy_id, int64_t epoch_ns);
+
+} // extern "C"
+
+// Helpers for building/decoding the packed meta field used by PBFT phase
hooks.
+// Layout:
+// meta[0..31] = type (uint32)
+// meta[32..47] = sender_id (uint16)
+// meta[48..63] = self_id (uint16)
+inline uint64_t ResdbTracePackMeta(uint32_t type, uint32_t sender_id,
+ uint32_t
self_id) {
+ return (static_cast<uint64_t>(type) & 0xffffffffULL) |
+ ((static_cast<uint64_t>(sender_id) & 0xffffULL) << 32) |
+ ((static_cast<uint64_t>(self_id) & 0xffffULL) << 48);
+}
+
+inline uint32_t ResdbTraceMetaType(uint64_t meta) {
+ return static_cast<uint32_t>(meta & 0xffffffffULL);
+}
+
+inline uint32_t ResdbTraceMetaSender(uint64_t meta) {
+ return static_cast<uint32_t>((meta >> 32) & 0xffffULL);
+}
+
+inline uint32_t ResdbTraceMetaSelf(uint64_t meta) {
+ return static_cast<uint32_t>((meta >> 48) & 0xffffULL);
+}
+
+// Epoch nanoseconds (system_clock) for compatibility with Stats.cpp.
+inline int64_t ResdbTraceEpochNs() {
+ return std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+}