This is an automated email from the ASF dual-hosted git repository. harishgokul01 pushed a commit to branch demo-final in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit 23cf12263c18b84cb310c7efbb0c59fa0ce44f3e Author: bchou9 <[email protected]> AuthorDate: Sat Jan 10 10:42:21 2026 +0000 very cool bpf tracing --- .../consensus/execution/transaction_executor.cpp | 28 +++++ platform/consensus/ordering/pbft/commitment.cpp | 44 ++++++++ .../ordering/pbft/consensus_manager_pbft.cpp | 7 ++ platform/statistic/BUILD | 10 +- platform/statistic/stats.cpp | 119 ++++++++++----------- platform/statistic/stats.h | 10 +- platform/statistic/trace_hooks.cpp | 101 +++++++++++++++++ platform/statistic/trace_hooks.h | 104 ++++++++++++++++++ 8 files changed, 359 insertions(+), 64 deletions(-) diff --git a/platform/consensus/execution/transaction_executor.cpp b/platform/consensus/execution/transaction_executor.cpp index 34eb3e3e..60148030 100644 --- a/platform/consensus/execution/transaction_executor.cpp +++ b/platform/consensus/execution/transaction_executor.cpp @@ -21,7 +21,10 @@ #include <glog/logging.h> +#include <chrono> + #include "common/utils/utils.h" +#include "platform/statistic/trace_hooks.h" namespace resdb { @@ -240,6 +243,8 @@ void TransactionExecutor::ExecuteMessageOutOfOrder() { } void TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) { + uint64_t req_ptr = reinterpret_cast<uint64_t>(request.get()); + uint32_t req_type = static_cast<uint32_t>(request->type()); // Only Execute the request. BatchUserRequest batch_request; if (!batch_request.ParseFromString(request->data())) { @@ -257,6 +262,11 @@ void TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) { // id:"<<request->proxy_id(); std::unique_ptr<BatchUserResponse> response; global_stats_->GetTransactionDetails(batch_request); + uint64_t meta_execute_start = + ResdbTracePackMeta(req_type, /*sender_id=*/0, + static_cast<uint32_t>(config_.GetSelfInfo().id())); + resdb_trace_pbft_execute_start(req_ptr, batch_request.seq(), meta_execute_start, + batch_request.proxy_id(), ResdbTraceEpochNs()); global_stats_->RecordExecuteStart(batch_request.seq()); if (transaction_manager_) { response = transaction_manager_->ExecuteBatchWithSeq(request->seq(), @@ -266,10 +276,17 @@ void TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) { // global_stats_->IncTotalRequest(batch_request.user_requests_size()); // global_stats_->IncExecuteDone(); global_stats_->RecordExecuteEnd(batch_request.seq()); + uint64_t meta_execute_end = + ResdbTracePackMeta(req_type, /*sender_id=*/0, + static_cast<uint32_t>(config_.GetSelfInfo().id())); + resdb_trace_pbft_execute_end(req_ptr, batch_request.seq(), meta_execute_end, + batch_request.proxy_id(), ResdbTraceEpochNs()); } void TransactionExecutor::Execute(std::unique_ptr<Request> request, bool need_execute) { + uint64_t req_ptr = reinterpret_cast<uint64_t>(request.get()); + uint32_t req_type = static_cast<uint32_t>(request->type()); std::unique_ptr<BatchUserRequest> batch_request = nullptr; std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>> data; std::vector<std::unique_ptr<google::protobuf::Message>>* data_p = nullptr; @@ -306,6 +323,12 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> request, global_stats_->GetTransactionDetails(*batch_request_p); uint64_t seq = request->seq(); // Timeline: execution start + uint64_t meta_execute_start = + ResdbTracePackMeta(req_type, /*sender_id=*/0, + static_cast<uint32_t>(config_.GetSelfInfo().id())); + resdb_trace_pbft_execute_start(req_ptr, seq, meta_execute_start, + batch_request_p->proxy_id(), + ResdbTraceEpochNs()); global_stats_->RecordExecuteStart(seq); if (transaction_manager_ && need_execute) { if (execute_thread_num_ == 1) { @@ -372,6 +395,11 @@ void TransactionExecutor::Execute(std::unique_ptr<Request> request, // Timeline: execution end global_stats_->RecordExecuteEnd(seq); + uint64_t meta_execute_end = + ResdbTracePackMeta(req_type, /*sender_id=*/0, + static_cast<uint32_t>(config_.GetSelfInfo().id())); + resdb_trace_pbft_execute_end(req_ptr, seq, meta_execute_end, + batch_request_p->proxy_id(), ResdbTraceEpochNs()); global_stats_->IncExecuteDone(); } diff --git a/platform/consensus/ordering/pbft/commitment.cpp b/platform/consensus/ordering/pbft/commitment.cpp index 0f4a970d..134cdc53 100644 --- a/platform/consensus/ordering/pbft/commitment.cpp +++ b/platform/consensus/ordering/pbft/commitment.cpp @@ -24,6 +24,7 @@ #include "common/utils/utils.h" #include "platform/consensus/ordering/pbft/transaction_utils.h" +#include "platform/statistic/trace_hooks.h" namespace resdb { @@ -136,6 +137,12 @@ int Commitment::ProcessNewRequest(std::unique_ptr<Context> context, } global_stats_->RecordStateTime(*seq, "request"); + uint64_t meta_request = ResdbTracePackMeta( + static_cast<uint32_t>(user_request->type()), /*sender_id=*/0, + static_cast<uint32_t>(config_.GetSelfInfo().id())); + resdb_trace_pbft_request(reinterpret_cast<uint64_t>(user_request.get()), *seq, + meta_request, user_request->proxy_id(), + ResdbTraceEpochNs()); user_request->set_type(Request::TYPE_PRE_PREPARE); user_request->set_current_view(message_manager_->GetCurrentView()); @@ -230,6 +237,19 @@ int Commitment::ProcessProposeMsg(std::unique_ptr<Context> context, global_stats_->IncPropose(); global_stats_->RecordStateTime(request->seq(), "pre-prepare"); + uint64_t meta_pre_prepare = ResdbTracePackMeta( + static_cast<uint32_t>(request->type()), + static_cast<uint32_t>(request->sender_id()), + static_cast<uint32_t>(config_.GetSelfInfo().id())); + resdb_trace_pbft_pre_prepare(reinterpret_cast<uint64_t>(request.get()), + request->seq(), meta_pre_prepare, + request->proxy_id(), ResdbTraceEpochNs()); + + // Keep epoch_ns aligned to Stats' system_clock domain for drop-in timeline + // compatibility. + // Note: system_clock can jump (NTP); monotonic nsecs remains best for deltas. + // We emit both. + // (epoch_ns passed to per-phase hooks; ConsensusCommit hook remains monotonic-only) std::unique_ptr<Request> prepare_request = resdb::NewRequest( Request::TYPE_PREPARE, *request, config_.GetSelfInfo().id()); prepare_request->clear_data(); @@ -265,7 +285,13 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context> context, } uint64_t seq = request->seq(); int sender_id = request->sender_id(); + uint64_t req_ptr = reinterpret_cast<uint64_t>(request.get()); + uint64_t meta_prepare_recv = ResdbTracePackMeta( + static_cast<uint32_t>(request->type()), static_cast<uint32_t>(sender_id), + static_cast<uint32_t>(config_.GetSelfInfo().id())); global_stats_->RecordPrepareRecv(seq, sender_id); + resdb_trace_pbft_prepare_recv( + req_ptr, seq, meta_prepare_recv, request->proxy_id(), ResdbTraceEpochNs()); global_stats_->IncPrepare(seq); std::unique_ptr<Request> commit_request = resdb::NewRequest( Request::TYPE_COMMIT, *request, config_.GetSelfInfo().id()); @@ -291,6 +317,12 @@ int Commitment::ProcessPrepareMsg(std::unique_ptr<Context> context, // << commit_request->data_signature().DebugString(); } global_stats_->RecordStateTime(seq, "prepare"); + uint64_t meta_prepare_state = ResdbTracePackMeta( + static_cast<uint32_t>(Request::TYPE_PREPARE), /*sender_id=*/0, + static_cast<uint32_t>(config_.GetSelfInfo().id())); + resdb_trace_pbft_prepare_state(req_ptr, seq, meta_prepare_state, + commit_request->proxy_id(), + ResdbTraceEpochNs()); replica_communicator_->BroadCast(*commit_request); } return ret == CollectorResultCode::INVALID ? -2 : 0; @@ -306,11 +338,18 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context> context, } uint64_t seq = request->seq(); int sender_id = request->sender_id(); + uint64_t proxy_id = request->proxy_id(); + uint64_t req_ptr = reinterpret_cast<uint64_t>(request.get()); + uint64_t meta_commit_recv = ResdbTracePackMeta( + static_cast<uint32_t>(request->type()), static_cast<uint32_t>(sender_id), + static_cast<uint32_t>(config_.GetSelfInfo().id())); if (request->is_recovery()) { return message_manager_->AddConsensusMsg(context->signature, std::move(request)); } global_stats_->RecordCommitRecv(seq, sender_id); + resdb_trace_pbft_commit_recv( + req_ptr, seq, meta_commit_recv, proxy_id, ResdbTraceEpochNs()); global_stats_->IncCommit(seq); // Add request to message_manager. // If it has received enough same requests(2f+1), message manager will @@ -321,6 +360,11 @@ int Commitment::ProcessCommitMsg(std::unique_ptr<Context> context, // LOG(ERROR)<<request->data().size(); // global_stats_->GetTransactionDetails(request->data()); global_stats_->RecordStateTime(seq, "commit"); + uint64_t meta_commit_state = ResdbTracePackMeta( + static_cast<uint32_t>(Request::TYPE_COMMIT), /*sender_id=*/0, + static_cast<uint32_t>(config_.GetSelfInfo().id())); + resdb_trace_pbft_commit_state(req_ptr, seq, meta_commit_state, proxy_id, + ResdbTraceEpochNs()); } return ret == CollectorResultCode::INVALID ? -2 : 0; } diff --git a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp index d7b38807..7dab7340 100644 --- a/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp +++ b/platform/consensus/ordering/pbft/consensus_manager_pbft.cpp @@ -19,10 +19,12 @@ #include "platform/consensus/ordering/pbft/consensus_manager_pbft.h" +#include <cstdint> #include <glog/logging.h> #include <unistd.h> #include "common/crypto/signature_verifier.h" +#include "platform/statistic/trace_hooks.h" namespace resdb { @@ -148,6 +150,11 @@ ConsensusManagerPBFT::PopComplainedRequest() { // The implementation of PBFT. int ConsensusManagerPBFT::ConsensusCommit(std::unique_ptr<Context> context, std::unique_ptr<Request> request) { + resdb_trace_consensus_commit( + reinterpret_cast<uint64_t>(request.get()), request->seq(), + static_cast<uint32_t>(request->type()), + static_cast<uint32_t>(request->sender_id()), request->proxy_id()); + LOG(INFO) << "recv impl type:" << request->type() << " " << "sender id:" << request->sender_id() << " primary:" << system_info_->GetPrimaryId(); diff --git a/platform/statistic/BUILD b/platform/statistic/BUILD index 81cafe7f..a094900d 100644 --- a/platform/statistic/BUILD +++ b/platform/statistic/BUILD @@ -24,8 +24,14 @@ package(default_visibility = [ cc_library( name = "stats", - srcs = ["stats.cpp"], - hdrs = ["stats.h"], + srcs = [ + "stats.cpp", + "trace_hooks.cpp", + ], + hdrs = [ + "stats.h", + "trace_hooks.h", + ], deps = [ ":prometheus_handler", "//common:asio", diff --git a/platform/statistic/stats.cpp b/platform/statistic/stats.cpp index 8fd1a57f..c1a328f1 100644 --- a/platform/statistic/stats.cpp +++ b/platform/statistic/stats.cpp @@ -142,11 +142,29 @@ void Stats::CrowRoute() { "Access-Control-Allow-Headers", "Content-Type, Authorization"); // Specify allowed headers - // Send your response - includes all consensus history with timeline events - { - std::lock_guard<std::mutex> lock(consensus_history_mutex_); - res.body = consensus_history_.dump(); + // Send only the most recently executed sequence telemetry. + // We intentionally do not retain an unbounded history here. + uint64_t seq = last_executed_seq_.load(); + nlohmann::json result; + result["seq_number"] = seq; + if (seq != 0) { + size_t shard = GetShardIndex(seq); + { + std::lock_guard<std::mutex> lock(telemetry_mutex_[shard]); + auto& map = transaction_telemetry_map_[shard]; + auto it = map.find(seq); + if (it != map.end()) { + result = it->second.to_json(); + } else { + result["error"] = + "Latest executed sequence not found in telemetry map"; + } + } + } else { + result["error"] = "No executed sequence recorded yet"; } + + res.body = result.dump(); res.end(); }); CROW_ROUTE(app, "/consensus_data/<int>") @@ -160,34 +178,14 @@ void Stats::CrowRoute() { "Content-Type, Authorization"); nlohmann::json result; - - // First check consensus_history_ - { - std::lock_guard<std::mutex> lock(consensus_history_mutex_); - std::string seq_str = std::to_string(seq_number); - if (consensus_history_.find(seq_str) != consensus_history_.end()) { - result = consensus_history_[seq_str]; - } - } - - // Also check live telemetry map (may have more recent data) uint64_t seq = static_cast<uint64_t>(seq_number); - size_t shard = seq % 256; // kTelemetryShards + size_t shard = GetShardIndex(seq); { std::lock_guard<std::mutex> lock(telemetry_mutex_[shard]); auto& map = transaction_telemetry_map_[shard]; auto it = map.find(seq); if (it != map.end()) { - // Merge live telemetry data (may be more complete) - nlohmann::json live_data = it->second.to_json(); - - // If we have data from consensus_history_, merge it - if (!result.empty()) { - // Prefer live data for most fields, but keep timeline events from both - result.update(live_data); - } else { - result = live_data; - } + result = it->second.to_json(); } } @@ -282,9 +280,18 @@ bool Stats::IsFaulty() { return make_faulty_.load(); } void Stats::ChangePrimary(int primary_id) { transaction_summary_.primary_id = primary_id; + static_telemetry_info_.primary_id = primary_id; make_faulty_.store(false); - // Detect view change + // Update all existing transaction telemetry entries with new primary + for (size_t shard = 0; shard < kTelemetryShards; ++shard) { + std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]); + auto& map = transaction_telemetry_map_[shard]; + for (auto& entry : map) { + entry.second.primary_id = primary_id; + } + } + if (prometheus_ && previous_primary_id_ != -1 && previous_primary_id_ != primary_id) { prometheus_->Inc(VIEW_CHANGE, 1); @@ -312,6 +319,15 @@ void Stats::SetPrimaryId(int primary_id) { transaction_summary_.primary_id = primary_id; static_telemetry_info_.primary_id = primary_id; + // Update all existing transaction telemetry entries with new primary + for (size_t shard = 0; shard < kTelemetryShards; ++shard) { + std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]); + auto& map = transaction_telemetry_map_[shard]; + for (auto& entry : map) { + entry.second.primary_id = primary_id; + } + } + if (prometheus_ && previous_primary_id_ != -1 && previous_primary_id_ != primary_id) { prometheus_->Inc(VIEW_CHANGE, 1); @@ -540,6 +556,13 @@ void Stats::RecordExecuteStart(uint64_t seq) { } void Stats::RecordExecuteEnd(uint64_t seq) { + // Track last executed sequence even if resview is disabled. + { + uint64_t cur = last_executed_seq_.load(); + while (seq > cur && !last_executed_seq_.compare_exchange_weak(cur, seq)) { + // retry with updated cur + } + } if (!enable_resview) { return; } @@ -697,12 +720,6 @@ void Stats::SendSummary() { summary_json_["ext_cache_hit_ratio"] = transaction_summary_.ext_cache_hit_ratio_; - { - std::lock_guard<std::mutex> history_lock(consensus_history_mutex_); - consensus_history_[std::to_string(transaction_summary_.txn_number)] = - summary_json_; - } - LOG(ERROR) << summary_json_.dump(); // Reset Transaction Summary Parameters @@ -725,6 +742,15 @@ void Stats::SendSummary(uint64_t seq) { return; } + // Summary is generated after execution; treat this as another signal for the + // latest executed sequence. + { + uint64_t cur = last_executed_seq_.load(); + while (seq > cur && !last_executed_seq_.compare_exchange_weak(cur, seq)) { + // retry with updated cur + } + } + size_t shard = GetShardIndex(seq); std::unique_lock<std::mutex> lock(telemetry_mutex_[shard]); @@ -753,33 +779,6 @@ void Stats::SendSummary(uint64_t seq) { // it nlohmann::json summary_json = telemetry.to_json(); - { - std::lock_guard<std::mutex> history_lock(consensus_history_mutex_); - // Only update if we don't already have this sequence, or update existing - // This prevents overwriting good data with bad data - std::string seq_str = std::to_string(seq); - if (consensus_history_.find(seq_str) == consensus_history_.end()) { - // First time storing - always store - consensus_history_[seq_str] = summary_json; - } else { - // Already exists - only update if new data has more complete information - // (i.e., has transaction details or more timestamps) - auto& existing = consensus_history_[seq_str]; - bool new_has_details = !summary_json["txn_commands"].empty(); - bool existing_has_details = !existing["txn_commands"].empty(); - - // If new data has details and existing doesn't, merge them - if (new_has_details && !existing_has_details) { - existing["txn_commands"] = summary_json["txn_commands"]; - existing["txn_keys"] = summary_json["txn_keys"]; - existing["txn_values"] = summary_json["txn_values"]; - } - - // Always update execution_time as it's the latest - existing["execution_time"] = summary_json["execution_time"]; - } - } - LOG(ERROR) << summary_json.dump(); // DON'T delete the entry - per-sequence state means each seq has its own diff --git a/platform/statistic/stats.h b/platform/statistic/stats.h index 0f29d8aa..d678f3ac 100644 --- a/platform/statistic/stats.h +++ b/platform/statistic/stats.h @@ -25,6 +25,7 @@ #include <future> #include <mutex> #include <nlohmann/json.hpp> +#include <atomic> #include <unordered_map> #include "boost/asio.hpp" @@ -227,6 +228,10 @@ class Stats { void ServerProcess(); void SetPrometheus(const std::string& prometheus_address); + // Latest executed consensus sequence observed by this replica. + // This is updated when execution completes (and also when summaries are sent). + uint64_t GetLastExecutedSeq() const { return last_executed_seq_.load(); } + protected: Stats(int sleep_time = 5); ~Stats(); @@ -286,8 +291,9 @@ class Stats { std::atomic<uint64_t> prev_num_prepare_; std::atomic<uint64_t> prev_num_commit_; nlohmann::json summary_json_; - nlohmann::json consensus_history_; - std::mutex consensus_history_mutex_; // Protect consensus_history_ access + + // Tracks the most recently executed sequence number. + std::atomic<uint64_t> last_executed_seq_{0}; int previous_primary_id_ = -1; // Track for view change detection diff --git a/platform/statistic/trace_hooks.cpp b/platform/statistic/trace_hooks.cpp new file mode 100644 index 00000000..5cba98a8 --- /dev/null +++ b/platform/statistic/trace_hooks.cpp @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "platform/statistic/trace_hooks.h" + +// Keep the hook functions and their arguments from being optimized away. +// +// These functions intentionally do not do any work besides providing a stable +// symbol that uprobes can attach to and a well-defined argument ABI. + +extern "C" { + +__attribute__((noinline)) void resdb_trace_consensus_commit( + uint64_t req_ptr, uint64_t seq, uint32_t type, uint32_t sender_id, + uint64_t proxy_id) { + asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(type), "r"(sender_id), + "r"(proxy_id) + : "memory"); +} + +__attribute__((noinline)) void resdb_trace_pbft_request( + uint64_t req_ptr, uint64_t seq, uint64_t meta, uint64_t proxy_id, + int64_t epoch_ns) { + asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(meta), "r"(proxy_id), + "r"(epoch_ns) + : "memory"); +} + +__attribute__((noinline)) void resdb_trace_pbft_pre_prepare( + uint64_t req_ptr, uint64_t seq, uint64_t meta, uint64_t proxy_id, + int64_t epoch_ns) { + asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(meta), "r"(proxy_id), + "r"(epoch_ns) + : "memory"); +} + +__attribute__((noinline)) void resdb_trace_pbft_prepare_recv( + uint64_t req_ptr, uint64_t seq, uint64_t meta, uint64_t proxy_id, + int64_t epoch_ns) { + asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(meta), "r"(proxy_id), + "r"(epoch_ns) + : "memory"); +} + +__attribute__((noinline)) void resdb_trace_pbft_prepare_state( + uint64_t req_ptr, uint64_t seq, uint64_t meta, uint64_t proxy_id, + int64_t epoch_ns) { + asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(meta), "r"(proxy_id), + "r"(epoch_ns) + : "memory"); +} + +__attribute__((noinline)) void resdb_trace_pbft_commit_recv( + uint64_t req_ptr, uint64_t seq, uint64_t meta, uint64_t proxy_id, + int64_t epoch_ns) { + asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(meta), "r"(proxy_id), + "r"(epoch_ns) + : "memory"); +} + +__attribute__((noinline)) void resdb_trace_pbft_commit_state( + uint64_t req_ptr, uint64_t seq, uint64_t meta, uint64_t proxy_id, + int64_t epoch_ns) { + asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(meta), "r"(proxy_id), + "r"(epoch_ns) + : "memory"); +} + +__attribute__((noinline)) void resdb_trace_pbft_execute_start( + uint64_t req_ptr, uint64_t seq, uint64_t meta, uint64_t proxy_id, + int64_t epoch_ns) { + asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(meta), "r"(proxy_id), + "r"(epoch_ns) + : "memory"); +} + +__attribute__((noinline)) void resdb_trace_pbft_execute_end( + uint64_t req_ptr, uint64_t seq, uint64_t meta, uint64_t proxy_id, + int64_t epoch_ns) { + asm volatile("" : : "r"(req_ptr), "r"(seq), "r"(meta), "r"(proxy_id), + "r"(epoch_ns) + : "memory"); +} + +} // extern "C" diff --git a/platform/statistic/trace_hooks.h b/platform/statistic/trace_hooks.h new file mode 100644 index 00000000..438e5249 --- /dev/null +++ b/platform/statistic/trace_hooks.h @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <chrono> +#include <cstdint> + +// Unmangled trace hooks intended for user-space uprobes (e.g., bpftrace). +// +// Design goals: +// - stable, easy-to-type symbol names (no C++ mangling) +// - minimal overhead +// - arguments are plain integers (no std::string, no protobuf types) +// +// Convention: +// seq: PBFT sequence number +// sender_id: sender replica id when applicable, else 0 +// self_id: local replica id +// proxy_id: client proxy id when available, else 0 + +extern "C" { + +// Existing request-level hook at ConsensusCommit entry. +// arg0=req_ptr, arg1=seq, arg2=type, arg3=sender_id, arg4=proxy_id +void resdb_trace_consensus_commit(uint64_t req_ptr, uint64_t seq, uint32_t type, + uint32_t sender_id, uint64_t proxy_id); + +// PBFT phase hooks (mirrors the phases Stats emits). +// NOTE: bpftrace user-space uprobes on x86_64 only expose arg0..arg5. +// To keep hooks universally traceable, we pack fields into a single 64-bit +// meta value: +// meta[0..31] = type (uint32) +// meta[32..47] = sender_id (uint16) +// meta[48..63] = self_id (uint16) +// epoch_ns is nanoseconds since Unix epoch (system_clock / CLOCK_REALTIME). +void resdb_trace_pbft_request(uint64_t req_ptr, uint64_t seq, uint64_t meta, + uint64_t proxy_id, int64_t epoch_ns); +void resdb_trace_pbft_pre_prepare(uint64_t req_ptr, uint64_t seq, uint64_t meta, + uint64_t proxy_id, int64_t epoch_ns); + +void resdb_trace_pbft_prepare_recv(uint64_t req_ptr, uint64_t seq, uint64_t meta, + uint64_t proxy_id, int64_t epoch_ns); +void resdb_trace_pbft_prepare_state(uint64_t req_ptr, uint64_t seq, uint64_t meta, + uint64_t proxy_id, int64_t epoch_ns); + +void resdb_trace_pbft_commit_recv(uint64_t req_ptr, uint64_t seq, uint64_t meta, + uint64_t proxy_id, int64_t epoch_ns); +void resdb_trace_pbft_commit_state(uint64_t req_ptr, uint64_t seq, uint64_t meta, + uint64_t proxy_id, int64_t epoch_ns); + +void resdb_trace_pbft_execute_start(uint64_t req_ptr, uint64_t seq, uint64_t meta, + uint64_t proxy_id, int64_t epoch_ns); +void resdb_trace_pbft_execute_end(uint64_t req_ptr, uint64_t seq, uint64_t meta, + uint64_t proxy_id, int64_t epoch_ns); + +} // extern "C" + +// Helpers for building/decoding the packed meta field used by PBFT phase hooks. +// Layout: +// meta[0..31] = type (uint32) +// meta[32..47] = sender_id (uint16) +// meta[48..63] = self_id (uint16) +inline uint64_t ResdbTracePackMeta(uint32_t type, uint32_t sender_id, + uint32_t self_id) { + return (static_cast<uint64_t>(type) & 0xffffffffULL) | + ((static_cast<uint64_t>(sender_id) & 0xffffULL) << 32) | + ((static_cast<uint64_t>(self_id) & 0xffffULL) << 48); +} + +inline uint32_t ResdbTraceMetaType(uint64_t meta) { + return static_cast<uint32_t>(meta & 0xffffffffULL); +} + +inline uint32_t ResdbTraceMetaSender(uint64_t meta) { + return static_cast<uint32_t>((meta >> 32) & 0xffffULL); +} + +inline uint32_t ResdbTraceMetaSelf(uint64_t meta) { + return static_cast<uint32_t>((meta >> 48) & 0xffffULL); +} + +// Epoch nanoseconds (system_clock) for compatibility with Stats.cpp. +inline int64_t ResdbTraceEpochNs() { + return std::chrono::duration_cast<std::chrono::nanoseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); +}
