This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6bf54a8ebda [Enhancement](Recycler) Add bvar metrics for operation
logs (#60520)
6bf54a8ebda is described below
commit 6bf54a8ebda2cfc57a6c5e64c8c656a5967b1745
Author: Jimmy <[email protected]>
AuthorDate: Mon Feb 9 10:57:15 2026 +0800
[Enhancement](Recycler) Add bvar metrics for operation logs (#60520)
- Add dedicated bvar metrics for the recycle_operation_logs path, which
previously had no observability through the monitoring system
- Introduce OplogRecycleStats to track per-round recycling statistics
including skipped-by-snapshot count, earliest recyclable version,
failure count, and generated recycle key counts
(partition/index/rowset/txn)
- Add scan_and_statistics_operation_logs() to collect pre-recycling
statistics via RecyclerMetricsContext, consistent with existing
scan_and_statistics_* functions
- Use a PendingCounts pattern in OperationLogRecycler to avoid inflating
committed metrics on transaction failure — local counts are only merged
into OplogRecycleStats atomics after successful
commit()
- All new statistics are gated behind the enable_recycler_stats_metrics
config flag
---
cloud/src/common/bvars.cpp | 40 +++
cloud/src/common/bvars.h | 23 ++
cloud/src/recycler/recycler.cpp | 40 +++
cloud/src/recycler/recycler.h | 25 +-
cloud/src/recycler/recycler_operation_log.cpp | 90 ++++-
cloud/test/recycler_operation_log_test.cpp | 478 ++++++++++++++++++++++++++
6 files changed, 692 insertions(+), 4 deletions(-)
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 6c3ba1eee50..91b1e0bb649 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -187,6 +187,46 @@ BvarStatusWithTag<int64_t>
g_bvar_recycler_batch_delete_rowset_plan_count(
BvarStatusWithTag<int64_t> g_bvar_recycler_batch_delete_failures(
"recycler", "batch_delete_failures");
+// Operation Log Recycler BVars
+mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_total_num(
+ "recycler_oplog_last_round_total_num", {"instance_id"});
+mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_not_recycled_num(
+ "recycler_oplog_last_round_not_recycled_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycle_failed_num(
+ "recycler_oplog_recycle_failed_num", {"instance_id"});
+mBvarStatus<int64_t>
g_bvar_recycler_oplog_last_round_recycled_commit_partition_num(
+ "recycler_oplog_last_round_recycled_commit_partition_num",
{"instance_id"});
+mBvarStatus<int64_t>
g_bvar_recycler_oplog_last_round_recycled_drop_partition_num(
+ "recycler_oplog_last_round_recycled_drop_partition_num",
{"instance_id"});
+mBvarStatus<int64_t>
g_bvar_recycler_oplog_last_round_recycled_commit_index_num(
+ "recycler_oplog_last_round_recycled_commit_index_num",
{"instance_id"});
+mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_drop_index_num(
+ "recycler_oplog_last_round_recycled_drop_index_num", {"instance_id"});
+mBvarStatus<int64_t>
g_bvar_recycler_oplog_last_round_recycled_update_tablet_num(
+ "recycler_oplog_last_round_recycled_update_tablet_num",
{"instance_id"});
+mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_compaction_num(
+ "recycler_oplog_last_round_recycled_compaction_num", {"instance_id"});
+mBvarStatus<int64_t>
g_bvar_recycler_oplog_last_round_recycled_schema_change_num(
+ "recycler_oplog_last_round_recycled_schema_change_num",
{"instance_id"});
+mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_recycled_commit_txn_num(
+ "recycler_oplog_last_round_recycled_commit_txn_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_partition_num(
+ "recycler_oplog_recycled_commit_partition_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycled_drop_partition_num(
+ "recycler_oplog_recycled_drop_partition_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_index_num(
+ "recycler_oplog_recycled_commit_index_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycled_drop_index_num(
+ "recycler_oplog_recycled_drop_index_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycled_update_tablet_num(
+ "recycler_oplog_recycled_update_tablet_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycled_compaction_num(
+ "recycler_oplog_recycled_compaction_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycled_schema_change_num(
+ "recycler_oplog_recycled_schema_change_num", {"instance_id"});
+mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_txn_num(
+ "recycler_oplog_recycled_commit_txn_num", {"instance_id"});
+
// txn_kv's bvars
bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get");
bvar::LatencyRecorder g_bvar_txn_kv_range_get("txn_kv", "range_get");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 0e7a6314ad2..e4b9789c1bf 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -674,6 +674,29 @@ extern BvarStatusWithTag<int64_t>
g_bvar_recycler_batch_delete_failures;
extern BvarStatusWithTag<int64_t>
g_bvar_recycler_packed_file_bytes_object_deleted;
extern BvarStatusWithTag<int64_t>
g_bvar_recycler_packed_file_rowset_scanned_num;
+// Operation Log Recycler BVars
+// Note: generic metrics (last_round_to_recycle_num/bytes,
last_round_recycled_num/bytes, etc.)
+// are reported by RecyclerMetricsContext with operation_type =
"recycle_operation_logs".
+extern mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_total_num;
+extern mBvarStatus<int64_t> g_bvar_recycler_oplog_last_round_not_recycled_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycle_failed_num;
+extern mBvarStatus<int64_t>
g_bvar_recycler_oplog_last_round_recycled_commit_partition_num;
+extern mBvarStatus<int64_t>
g_bvar_recycler_oplog_last_round_recycled_drop_partition_num;
+extern mBvarStatus<int64_t>
g_bvar_recycler_oplog_last_round_recycled_commit_index_num;
+extern mBvarStatus<int64_t>
g_bvar_recycler_oplog_last_round_recycled_drop_index_num;
+extern mBvarStatus<int64_t>
g_bvar_recycler_oplog_last_round_recycled_update_tablet_num;
+extern mBvarStatus<int64_t>
g_bvar_recycler_oplog_last_round_recycled_compaction_num;
+extern mBvarStatus<int64_t>
g_bvar_recycler_oplog_last_round_recycled_schema_change_num;
+extern mBvarStatus<int64_t>
g_bvar_recycler_oplog_last_round_recycled_commit_txn_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_partition_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycled_drop_partition_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_index_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycled_drop_index_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycled_update_tablet_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycled_compaction_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycled_schema_change_num;
+extern mBvarIntAdder g_bvar_recycler_oplog_recycled_commit_txn_num;
+
// txn_kv's bvars
extern bvar::LatencyRecorder g_bvar_txn_kv_get;
extern bvar::LatencyRecorder g_bvar_txn_kv_range_get;
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 67484c81605..a92a02f1953 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -7057,6 +7057,46 @@ int InstanceRecycler::scan_and_statistics_restore_jobs()
{
return ret;
}
+void InstanceRecycler::scan_and_statistics_operation_logs() {
+ if (!should_recycle_versioned_keys()) {
+ return;
+ }
+
+ RecyclerMetricsContext metrics_context(instance_id_,
"recycle_operation_logs");
+
+ OperationLogRecycleChecker recycle_checker(instance_id_, txn_kv_.get(),
instance_info_);
+ if (recycle_checker.init() != 0) {
+ return;
+ }
+
+ std::string log_key_prefix = versioned::log_key(instance_id_);
+ std::string begin_key = encode_versioned_key(log_key_prefix,
Versionstamp::min());
+ std::string end_key = encode_versioned_key(log_key_prefix,
Versionstamp::max());
+
+ std::unique_ptr<BlobIterator> iter = blob_get_range(txn_kv_, begin_key,
end_key);
+ for (; iter->valid(); iter->next()) {
+ OperationLogPB operation_log;
+ if (!iter->parse_value(&operation_log)) {
+ continue;
+ }
+
+ std::string_view key = iter->key();
+ Versionstamp log_versionstamp;
+ if (!decode_versioned_key(&key, &log_versionstamp)) {
+ continue;
+ }
+
+ OperationLogReferenceInfo ref_info;
+ if (recycle_checker.can_recycle(log_versionstamp,
operation_log.min_timestamp(),
+ &ref_info)) {
+ metrics_context.total_need_recycle_num++;
+ metrics_context.total_need_recycle_data_size +=
operation_log.ByteSizeLong();
+ }
+ }
+
+ metrics_context.report(true);
+}
+
int InstanceRecycler::classify_rowset_task_by_ref_count(
RowsetDeleteTask& task, std::vector<RowsetDeleteTask>&
batch_delete_tasks) {
constexpr int MAX_RETRY = 10;
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index a24a68516d1..f722bef29ef 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -244,6 +244,8 @@ public:
: RecyclerMetricsContext("global_recycler", "recycle_segment") {}
};
+struct OplogRecycleStats;
+
class InstanceRecycler {
public:
struct PackedFileRecycleStats {
@@ -398,6 +400,8 @@ public:
int scan_and_statistics_restore_jobs();
+ void scan_and_statistics_operation_logs();
+
/**
* Decode the key of a packed-file metadata record into the persisted
object path.
*
@@ -476,7 +480,8 @@ private:
//
// Both `operation_log` and `raw_keys` will be removed in the same
transaction, to ensure atomicity.
int recycle_operation_log(Versionstamp log_version, const
std::vector<std::string>& raw_keys,
- OperationLogPB operation_log);
+ OperationLogPB operation_log,
+ OplogRecycleStats* oplog_stats = nullptr);
// Recycle rowset meta and data, return 0 for success otherwise error
//
@@ -605,6 +610,24 @@ struct OperationLogReferenceInfo {
Versionstamp referenced_snapshot_timestamp;
};
+struct OplogRecycleStats {
+ // Total oplog count scanned per round
+ std::atomic<int64_t> total_num {0};
+ // Oplogs not recycled this round (per round, written to mBvarStatus)
+ std::atomic<int64_t> not_recycled_num {0};
+ // Recycle failures (per round, accumulated to mBvarIntAdder at end)
+ std::atomic<int64_t> failed_num {0};
+ // Per-oplog-type recycled counts (incremented after successful commit)
+ std::atomic<int64_t> recycled_commit_partition {0};
+ std::atomic<int64_t> recycled_drop_partition {0};
+ std::atomic<int64_t> recycled_commit_index {0};
+ std::atomic<int64_t> recycled_drop_index {0};
+ std::atomic<int64_t> recycled_update_tablet {0};
+ std::atomic<int64_t> recycled_compaction {0};
+ std::atomic<int64_t> recycled_schema_change {0};
+ std::atomic<int64_t> recycled_commit_txn {0};
+};
+
// Helper class to check if operation logs can be recycled based on snapshots
and versionstamps
class OperationLogRecycleChecker {
public:
diff --git a/cloud/src/recycler/recycler_operation_log.cpp
b/cloud/src/recycler/recycler_operation_log.cpp
index 71eda034d5f..2caa32ef705 100644
--- a/cloud/src/recycler/recycler_operation_log.cpp
+++ b/cloud/src/recycler/recycler_operation_log.cpp
@@ -31,6 +31,8 @@
#include <utility>
#include <vector>
+#include "common/bvars.h"
+#include "common/config.h"
#include "common/defer.h"
#include "common/encryption_util.h"
#include "common/logging.h"
@@ -650,6 +652,50 @@ static TxnErrorCode get_txn_info(TxnKv* txn_kv,
std::string_view instance_id, in
return TxnErrorCode::TXN_OK;
}
+static void report_oplog_recycle_stats(const std::string& instance_id,
+ const OplogRecycleStats& stats) {
+ g_bvar_recycler_oplog_last_round_total_num.put({instance_id},
stats.total_num.load());
+ g_bvar_recycler_oplog_last_round_not_recycled_num.put({instance_id},
+
stats.not_recycled_num.load());
+ if (stats.failed_num.load() > 0) {
+ g_bvar_recycler_oplog_recycle_failed_num.put({instance_id},
stats.failed_num.load());
+ }
+ // Per-type last round counts (mBvarStatus, overwritten each round)
+ g_bvar_recycler_oplog_last_round_recycled_commit_partition_num.put(
+ {instance_id}, stats.recycled_commit_partition.load());
+ g_bvar_recycler_oplog_last_round_recycled_drop_partition_num.put(
+ {instance_id}, stats.recycled_drop_partition.load());
+ g_bvar_recycler_oplog_last_round_recycled_commit_index_num.put(
+ {instance_id}, stats.recycled_commit_index.load());
+ g_bvar_recycler_oplog_last_round_recycled_drop_index_num.put({instance_id},
+
stats.recycled_drop_index.load());
+ g_bvar_recycler_oplog_last_round_recycled_update_tablet_num.put(
+ {instance_id}, stats.recycled_update_tablet.load());
+ g_bvar_recycler_oplog_last_round_recycled_compaction_num.put({instance_id},
+
stats.recycled_compaction.load());
+ g_bvar_recycler_oplog_last_round_recycled_schema_change_num.put(
+ {instance_id}, stats.recycled_schema_change.load());
+ g_bvar_recycler_oplog_last_round_recycled_commit_txn_num.put({instance_id},
+
stats.recycled_commit_txn.load());
+ // Per-type cumulative counts (mBvarIntAdder, accumulated across rounds)
+ g_bvar_recycler_oplog_recycled_commit_partition_num.put({instance_id},
+
stats.recycled_commit_partition.load());
+ g_bvar_recycler_oplog_recycled_drop_partition_num.put({instance_id},
+
stats.recycled_drop_partition.load());
+ g_bvar_recycler_oplog_recycled_commit_index_num.put({instance_id},
+
stats.recycled_commit_index.load());
+ g_bvar_recycler_oplog_recycled_drop_index_num.put({instance_id},
+
stats.recycled_drop_index.load());
+ g_bvar_recycler_oplog_recycled_update_tablet_num.put({instance_id},
+
stats.recycled_update_tablet.load());
+ g_bvar_recycler_oplog_recycled_compaction_num.put({instance_id},
+
stats.recycled_compaction.load());
+ g_bvar_recycler_oplog_recycled_schema_change_num.put({instance_id},
+
stats.recycled_schema_change.load());
+ g_bvar_recycler_oplog_recycled_commit_txn_num.put({instance_id},
+
stats.recycled_commit_txn.load());
+}
+
int InstanceRecycler::recycle_operation_logs() {
if (!should_recycle_versioned_keys()) {
VLOG_DEBUG << "instance " << instance_id_
@@ -664,6 +710,18 @@ int InstanceRecycler::recycle_operation_logs() {
AnnotateTag tag("instance_id", instance_id_);
LOG_WARNING("begin to recycle operation logs");
+ const std::string task_name = "recycle_operation_logs";
+ RecyclerMetricsContext metrics_context(instance_id_, task_name);
+ OplogRecycleStats oplog_stats;
+
+ // scan_and_statistics_operation_logs() is expensive (scans lots of KVs),
+ // so it's controlled by enable_recycler_stats_metrics.
+ // The other stats (counting what was actually recycled) are lightweight
+ // and always collected.
+ if (config::enable_recycler_stats_metrics) {
+ scan_and_statistics_operation_logs();
+ }
+
StopWatch stop_watch;
size_t total_operation_logs = 0;
size_t recycled_operation_logs = 0;
@@ -672,6 +730,9 @@ int InstanceRecycler::recycle_operation_logs() {
size_t recycled_operation_log_data_size = 0;
DORIS_CLOUD_DEFER {
+ metrics_context.finish_report();
+ report_oplog_recycle_stats(instance_id_, oplog_stats);
+
int64_t cost = stop_watch.elapsed_us() / 1000'000;
LOG_WARNING("recycle operation logs, cost={}s", cost)
.tag("total_operation_logs", total_operation_logs)
@@ -705,16 +766,24 @@ int InstanceRecycler::recycle_operation_logs() {
OperationLogReferenceInfo reference_info;
if (recycle_checker.can_recycle(log_versionstamp,
operation_log.min_timestamp(),
&reference_info)) {
+ metrics_context.total_need_recycle_num++;
+ metrics_context.total_need_recycle_data_size += value_size;
+
AnnotateTag tag("log_key", hex(key));
- int res = recycle_operation_log(log_versionstamp, raw_keys,
std::move(operation_log));
+ int res = recycle_operation_log(log_versionstamp, raw_keys,
std::move(operation_log),
+ &oplog_stats);
if (res != 0) {
LOG_WARNING("failed to recycle operation
log").tag("error_code", res);
+ oplog_stats.failed_num.fetch_add(1, std::memory_order_relaxed);
return res;
}
recycled_operation_logs++;
recycled_operation_log_data_size += value_size;
+ metrics_context.total_recycled_num++;
+ metrics_context.total_recycled_data_size += value_size;
} else {
+ oplog_stats.not_recycled_num.fetch_add(1,
std::memory_order_relaxed);
int res = calculator.calculate_operation_log_data_size(key,
operation_log,
reference_info);
if (res != 0) {
@@ -726,6 +795,8 @@ int InstanceRecycler::recycle_operation_logs() {
total_operation_logs++;
operation_log_data_size += value_size;
max_operation_log_data_size = std::max(max_operation_log_data_size,
value_size);
+ oplog_stats.total_num.fetch_add(1, std::memory_order_relaxed);
+ metrics_context.report();
return 0;
};
@@ -800,8 +871,11 @@ int InstanceRecycler::recycle_operation_logs() {
int InstanceRecycler::recycle_operation_log(Versionstamp log_version,
const std::vector<std::string>&
raw_keys,
- OperationLogPB operation_log) {
+ OperationLogPB operation_log,
+ OplogRecycleStats* oplog_stats) {
int recycle_log_count = 0;
+ // Track which oplog type was recycled (only one per log entry)
+ std::atomic<int64_t>* recycled_counter = nullptr;
OperationLogRecycler log_recycler(instance_id_, txn_kv_.get(), log_version,
operation_log.min_timestamp(), raw_keys);
RETURN_ON_FAILURE(log_recycler.begin());
@@ -817,6 +891,9 @@ int InstanceRecycler::recycle_operation_log(Versionstamp
log_version,
return res; \
} \
recycle_log_count++; \
+ if (oplog_stats) { \
+ recycled_counter = &oplog_stats->recycled_##log_type; \
+ } \
} \
} while (0)
@@ -858,6 +935,9 @@ int InstanceRecycler::recycle_operation_log(Versionstamp
log_version,
}
recycle_log_count++;
+ if (oplog_stats) {
+ recycled_counter = &oplog_stats->recycled_commit_txn;
+ }
}
if (recycle_log_count > 1) {
@@ -868,7 +948,11 @@ int InstanceRecycler::recycle_operation_log(Versionstamp
log_version,
return -1; // This is an unexpected condition, should not happen
}
- return log_recycler.commit();
+ int ret = log_recycler.commit();
+ if (ret == 0 && recycled_counter) {
+ recycled_counter->fetch_add(1, std::memory_order_relaxed);
+ }
+ return ret;
}
} // namespace doris::cloud
diff --git a/cloud/test/recycler_operation_log_test.cpp
b/cloud/test/recycler_operation_log_test.cpp
index 8eb1cb6f4b2..1d3c3e65065 100644
--- a/cloud/test/recycler_operation_log_test.cpp
+++ b/cloud/test/recycler_operation_log_test.cpp
@@ -16,15 +16,18 @@
// under the License.
#include <butil/strings/string_split.h>
+#include <bvar/variable.h>
#include <fmt/core.h>
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <gtest/gtest.h>
+#include <climits>
#include <cstdint>
#include <memory>
#include <string>
+#include "common/bvars.h"
#include "common/config.h"
#include "common/util.h"
#include "cpp/sync_point.h"
@@ -2807,4 +2810,479 @@ TEST(RecycleOperationLogTest,
RecycleCompactionLogKeepKeyBoundsWhenDisabled) {
}
}
+// ============================================================================
+// Tests for OplogRecycleStats and recycle_operation_logs metrics integration
+// ============================================================================
+
+TEST(RecycleOperationLogTest, OplogRecycleStatsAtomicOperations) {
+ OplogRecycleStats stats;
+
+ // Test initial values
+ EXPECT_EQ(stats.total_num.load(), 0);
+ EXPECT_EQ(stats.not_recycled_num.load(), 0);
+ EXPECT_EQ(stats.failed_num.load(), 0);
+ EXPECT_EQ(stats.recycled_commit_partition.load(), 0);
+ EXPECT_EQ(stats.recycled_drop_partition.load(), 0);
+ EXPECT_EQ(stats.recycled_commit_index.load(), 0);
+ EXPECT_EQ(stats.recycled_drop_index.load(), 0);
+ EXPECT_EQ(stats.recycled_update_tablet.load(), 0);
+ EXPECT_EQ(stats.recycled_compaction.load(), 0);
+ EXPECT_EQ(stats.recycled_schema_change.load(), 0);
+ EXPECT_EQ(stats.recycled_commit_txn.load(), 0);
+
+ // Test fetch_add
+ stats.not_recycled_num.fetch_add(3, std::memory_order_relaxed);
+ EXPECT_EQ(stats.not_recycled_num.load(), 3);
+
+ stats.failed_num.fetch_add(1, std::memory_order_relaxed);
+ EXPECT_EQ(stats.failed_num.load(), 1);
+
+ stats.recycled_drop_partition.fetch_add(5, std::memory_order_relaxed);
+ stats.recycled_drop_index.fetch_add(2, std::memory_order_relaxed);
+ stats.recycled_compaction.fetch_add(10, std::memory_order_relaxed);
+ stats.recycled_commit_txn.fetch_add(7, std::memory_order_relaxed);
+ EXPECT_EQ(stats.recycled_drop_partition.load(), 5);
+ EXPECT_EQ(stats.recycled_drop_index.load(), 2);
+ EXPECT_EQ(stats.recycled_compaction.load(), 10);
+ EXPECT_EQ(stats.recycled_commit_txn.load(), 7);
+
+ // Test total_num
+ stats.total_num.fetch_add(10, std::memory_order_relaxed);
+ EXPECT_EQ(stats.total_num.load(), 10);
+}
+
+TEST(RecycleOperationLogTest, OplogRecycleStatsPerTypeCounters) {
+ OplogRecycleStats stats;
+
+ // Simulate multiple oplog recycling events
+ stats.recycled_drop_partition.fetch_add(1, std::memory_order_relaxed);
+ stats.recycled_drop_partition.fetch_add(1, std::memory_order_relaxed);
+ stats.recycled_drop_index.fetch_add(1, std::memory_order_relaxed);
+ stats.recycled_compaction.fetch_add(1, std::memory_order_relaxed);
+ stats.recycled_compaction.fetch_add(1, std::memory_order_relaxed);
+ stats.recycled_compaction.fetch_add(1, std::memory_order_relaxed);
+ stats.recycled_commit_txn.fetch_add(1, std::memory_order_relaxed);
+
+ EXPECT_EQ(stats.recycled_commit_partition.load(), 0);
+ EXPECT_EQ(stats.recycled_drop_partition.load(), 2);
+ EXPECT_EQ(stats.recycled_commit_index.load(), 0);
+ EXPECT_EQ(stats.recycled_drop_index.load(), 1);
+ EXPECT_EQ(stats.recycled_update_tablet.load(), 0);
+ EXPECT_EQ(stats.recycled_compaction.load(), 3);
+ EXPECT_EQ(stats.recycled_schema_change.load(), 0);
+ EXPECT_EQ(stats.recycled_commit_txn.load(), 1);
+}
+
+// Test recycle_operation_logs with stats enabled: drop partition + drop index
logs
+// Verifies per-type recycled counts and bvar reporting
+TEST(RecycleOperationLogTest, RecycleOperationLogsWithStatsEnabled) {
+ auto old_flag = config::enable_recycler_stats_metrics;
+ config::enable_recycler_stats_metrics = true;
+ DORIS_CLOUD_DEFER {
+ config::enable_recycler_stats_metrics = old_flag;
+ };
+
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ txn_kv->update_commit_version(1000);
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED);
+ auto* obj_info = instance.add_obj_info();
+ obj_info->set_id("recycle_oplog_stats");
+ obj_info->set_ak(config::test_s3_ak);
+ obj_info->set_sk(config::test_s3_sk);
+ obj_info->set_endpoint(config::test_s3_endpoint);
+ obj_info->set_region(config::test_s3_region);
+ obj_info->set_bucket(config::test_s3_bucket);
+ obj_info->set_prefix("recycle_oplog_stats");
+ update_instance_info(txn_kv.get(), instance);
+
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+ ASSERT_EQ(recycler.init(), 0);
+
+ uint64_t db_id = 1;
+ uint64_t table_id = 100;
+ uint64_t index_id1 = 201;
+ uint64_t index_id2 = 202;
+ uint64_t partition_id1 = 301;
+ uint64_t partition_id2 = 302;
+ int64_t expiration = ::time(nullptr) + 3600;
+
+ // Create table version so the recycler can remove it
+ {
+ std::string ver_key = versioned::table_version_key({instance_id,
table_id});
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ versioned_put(txn.get(), ver_key, "");
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ // Put a drop partition log with 2 partitions
+ {
+ std::string log_key = versioned::log_key(instance_id);
+ OperationLogPB operation_log;
+ operation_log.set_min_timestamp(100);
+ auto* drop_partition = operation_log.mutable_drop_partition();
+ drop_partition->set_db_id(db_id);
+ drop_partition->set_table_id(table_id);
+ drop_partition->add_index_ids(index_id1);
+ drop_partition->add_partition_ids(partition_id1);
+ drop_partition->add_partition_ids(partition_id2);
+ drop_partition->set_expired_at_s(expiration);
+ drop_partition->set_update_table_version(true);
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ versioned::blob_put(txn.get(), log_key, operation_log);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ // Put a drop index log with 2 indexes
+ {
+ std::string log_key = versioned::log_key(instance_id);
+ OperationLogPB operation_log;
+ operation_log.set_min_timestamp(101);
+ auto* drop_index = operation_log.mutable_drop_index();
+ drop_index->set_db_id(db_id);
+ drop_index->set_table_id(table_id);
+ drop_index->add_index_ids(index_id1);
+ drop_index->add_index_ids(index_id2);
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ versioned::blob_put(txn.get(), log_key, operation_log);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ // Save cumulative baselines before recycling (mBvarIntAdder accumulates
across tests)
+ int64_t baseline_drop_partition =
+
g_bvar_recycler_oplog_recycled_drop_partition_num.get({instance_id});
+ int64_t baseline_drop_index =
g_bvar_recycler_oplog_recycled_drop_index_num.get({instance_id});
+
+ // Recycle the operation logs - stats should be collected
+ ASSERT_EQ(recycler.recycle_operation_logs(), 0);
+
+ // Verify recycle partition records were created
+ {
+ std::string recycle_key1 = recycle_partition_key({instance_id,
partition_id1});
+ std::string recycle_key2 = recycle_partition_key({instance_id,
partition_id2});
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string value;
+ ASSERT_EQ(txn->get(recycle_key1, &value), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(recycle_key2, &value), TxnErrorCode::TXN_OK);
+ }
+
+ // Verify recycle index records were created
+ {
+ std::string recycle_key1 = recycle_index_key({instance_id, index_id1});
+ std::string recycle_key2 = recycle_index_key({instance_id, index_id2});
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string value;
+ ASSERT_EQ(txn->get(recycle_key1, &value), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(recycle_key2, &value), TxnErrorCode::TXN_OK);
+ }
+
+ // Verify total oplog count: 2 logs (1 drop_partition + 1 drop_index)
+ {
+ int64_t total =
g_bvar_recycler_oplog_last_round_total_num.get({instance_id});
+ EXPECT_EQ(total, 2) << "Should have scanned 2 operation logs in total";
+ }
+
+ // Verify per-type last round recycled counts: 1 drop_partition log and 1
drop_index log
+ {
+ int last_round_drop_partition =
+
g_bvar_recycler_oplog_last_round_recycled_drop_partition_num.get({instance_id});
+ EXPECT_EQ(last_round_drop_partition, 1) << "Should have recycled 1
drop_partition log";
+ }
+ {
+ int last_round_drop_index =
+
g_bvar_recycler_oplog_last_round_recycled_drop_index_num.get({instance_id});
+ EXPECT_EQ(last_round_drop_index, 1) << "Should have recycled 1
drop_index log";
+ }
+
+ // Verify per-type cumulative recycled counts (check delta from baseline)
+ {
+ int64_t recycled_drop_partition =
+
g_bvar_recycler_oplog_recycled_drop_partition_num.get({instance_id});
+ EXPECT_EQ(recycled_drop_partition - baseline_drop_partition, 1)
+ << "Cumulative drop_partition delta should be 1";
+ }
+ {
+ int64_t recycled_drop_index =
+
g_bvar_recycler_oplog_recycled_drop_index_num.get({instance_id});
+ EXPECT_EQ(recycled_drop_index - baseline_drop_index, 1)
+ << "Cumulative drop_index delta should be 1";
+ }
+}
+
+// Test recycle_operation_logs with snapshot protection: some logs skipped
+TEST(RecycleOperationLogTest, RecycleOperationLogsSkippedBySnapshot) {
+ auto old_flag = config::enable_recycler_stats_metrics;
+ config::enable_recycler_stats_metrics = true;
+ DORIS_CLOUD_DEFER {
+ config::enable_recycler_stats_metrics = old_flag;
+ };
+
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ txn_kv->update_commit_version(1000);
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ std::string test_instance_id = "test_oplog_not_recycled";
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(test_instance_id);
+
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED);
+ auto* obj_info = instance.add_obj_info();
+ obj_info->set_id("recycle_snapshot_skip");
+ obj_info->set_ak(config::test_s3_ak);
+ obj_info->set_sk(config::test_s3_sk);
+ obj_info->set_endpoint(config::test_s3_endpoint);
+ obj_info->set_region(config::test_s3_region);
+ obj_info->set_bucket(config::test_s3_bucket);
+ obj_info->set_prefix("recycle_snapshot_skip");
+
+ // Store instance info
+ {
+ std::string key = instance_key({test_instance_id});
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(key, instance.SerializeAsString());
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ auto insert_empty_value = [&]() {
+ std::unique_ptr<Transaction> txn;
+ EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put("dummy_snapshot_test", "");
+ EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ };
+
+ // Create table version
+ uint64_t table_id = 500;
+ {
+ std::string ver_key = versioned::table_version_key({test_instance_id,
table_id});
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ versioned_put(txn.get(), ver_key, "");
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ // Put an old operation log (will be recyclable)
+ {
+ std::string log_key = versioned::log_key(test_instance_id);
+ OperationLogPB operation_log;
+ operation_log.set_min_timestamp(100);
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ versioned::blob_put(txn.get(), log_key, operation_log);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ insert_empty_value();
+
+ // Write a snapshot (this creates a protection boundary)
+ {
+ SnapshotPB snapshot;
+ std::string snapshot_key =
versioned::snapshot_full_key(test_instance_id);
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ versioned_put(txn.get(), snapshot_key, snapshot.SerializeAsString());
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ insert_empty_value();
+
+ // Put a new operation log after the snapshot (min_timestamp before
snapshot)
+ // This log should be skipped by snapshot protection
+ {
+ std::string log_key = versioned::log_key(test_instance_id);
+ OperationLogPB operation_log;
+ // min_timestamp is set to a value before the snapshot, so the snapshot
+ // falls within [min_timestamp, log_version) and the log can't be
recycled
+ operation_log.set_min_timestamp(100);
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ versioned::blob_put(txn.get(), log_key, operation_log);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ insert_empty_value();
+
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+ ASSERT_EQ(recycler.init(), 0);
+
+ // Recycle - some logs should be recycled, some not recycled due to
snapshot
+ ASSERT_EQ(recycler.recycle_operation_logs(), 0);
+
+ // Verify not_recycled bvar is set
+ {
+ int64_t not_recycled =
+
g_bvar_recycler_oplog_last_round_not_recycled_num.get({test_instance_id});
+ EXPECT_GE(not_recycled, 1) << "At least one log should not be
recycled";
+ }
+}
+
+// Test recycle_operation_logs with stats disabled (default)
+TEST(RecycleOperationLogTest, RecycleOperationLogsStatsDisabled) {
+ auto old_flag = config::enable_recycler_stats_metrics;
+ config::enable_recycler_stats_metrics = false;
+ DORIS_CLOUD_DEFER {
+ config::enable_recycler_stats_metrics = old_flag;
+ };
+
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ txn_kv->update_commit_version(1000);
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED);
+ auto* obj_info = instance.add_obj_info();
+ obj_info->set_id("recycle_stats_disabled");
+ obj_info->set_ak(config::test_s3_ak);
+ obj_info->set_sk(config::test_s3_sk);
+ obj_info->set_endpoint(config::test_s3_endpoint);
+ obj_info->set_region(config::test_s3_region);
+ obj_info->set_bucket(config::test_s3_bucket);
+ obj_info->set_prefix("recycle_stats_disabled");
+ update_instance_info(txn_kv.get(), instance);
+
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+ ASSERT_EQ(recycler.init(), 0);
+
+ // Put an empty operation log
+ {
+ std::string log_key = versioned::log_key(instance_id);
+ Versionstamp versionstamp(123, 0);
+ OperationLogPB operation_log;
+ operation_log.set_min_timestamp(versionstamp.version());
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ versioned::blob_put(txn.get(), log_key, versionstamp, operation_log);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ // Recycle should succeed without stats collection
+ ASSERT_EQ(recycler.recycle_operation_logs(), 0);
+
+ // Verify all logs were recycled
+ remove_instance_info(txn_kv.get());
+ ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get());
+}
+
+// Test recycle_operation_logs compaction log with stats: recycled_compaction
tracking
+TEST(RecycleOperationLogTest, RecycleCompactionLogWithStats) {
+ auto old_flag = config::enable_recycler_stats_metrics;
+ config::enable_recycler_stats_metrics = true;
+ DORIS_CLOUD_DEFER {
+ config::enable_recycler_stats_metrics = old_flag;
+ };
+
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ txn_kv->update_commit_version(1000);
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ std::string test_instance_id = "test_compaction_log_stats";
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(test_instance_id);
+
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED);
+ auto* obj_info = instance.add_obj_info();
+ obj_info->set_id("compaction_stats");
+ obj_info->set_ak(config::test_s3_ak);
+ obj_info->set_sk(config::test_s3_sk);
+ obj_info->set_endpoint(config::test_s3_endpoint);
+ obj_info->set_region(config::test_s3_region);
+ obj_info->set_bucket(config::test_s3_bucket);
+ obj_info->set_prefix("compaction_stats");
+
+ {
+ std::string key = instance_key({test_instance_id});
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(key, instance.SerializeAsString());
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+ ASSERT_EQ(recycler.init(), 0);
+
+ int64_t table_id = 600;
+ int64_t tablet_id = 603;
+
+ // Create table version
+ {
+ std::string ver_key = versioned::table_version_key({test_instance_id,
table_id});
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ versioned_put(txn.get(), ver_key, "");
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ // Put a compaction log with 3 recycle rowsets
+ {
+ std::string log_key = versioned::log_key(test_instance_id);
+ OperationLogPB operation_log;
+ operation_log.set_min_timestamp(100);
+ auto* compaction = operation_log.mutable_compaction();
+ compaction->set_tablet_id(tablet_id);
+
+ for (int i = 0; i < 3; i++) {
+ auto* recycle_rowset = compaction->add_recycle_rowsets();
+ auto* rowset_meta = recycle_rowset->mutable_rowset_meta();
+ rowset_meta->set_rowset_id(0);
+ rowset_meta->set_tablet_id(tablet_id);
+ rowset_meta->set_rowset_id_v2(fmt::format("compaction_rowset_{}",
i));
+ rowset_meta->set_start_version(i * 10);
+ rowset_meta->set_end_version(i * 10 + 9);
+ }
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ versioned::blob_put(txn.get(), log_key, operation_log);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ // Recycle the operation logs
+ ASSERT_EQ(recycler.recycle_operation_logs(), 0);
+
+ // Verify recycle rowset records were created (3 rowsets from compaction
log)
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ for (int i = 0; i < 3; i++) {
+ std::string rowset_id = fmt::format("compaction_rowset_{}", i);
+ std::string recycle_key = recycle_rowset_key({test_instance_id,
tablet_id, rowset_id});
+ std::string value;
+ TxnErrorCode err = txn->get(recycle_key, &value);
+ EXPECT_EQ(err, TxnErrorCode::TXN_OK)
+ << "Recycle rowset key not found for rowset " << rowset_id;
+ }
+ }
+
+ // Verify last_round recycled_compaction bvar count (1 compaction log
recycled)
+ {
+ int last_round_compaction =
+
g_bvar_recycler_oplog_last_round_recycled_compaction_num.get({test_instance_id});
+ EXPECT_EQ(last_round_compaction, 1) << "Last round should have
recycled 1 compaction log";
+ }
+ // Verify cumulative recycled_compaction bvar count
+ {
+ int recycled_compaction =
+
g_bvar_recycler_oplog_recycled_compaction_num.get({test_instance_id});
+ EXPECT_EQ(recycled_compaction, 1) << "Cumulative compaction should be
1";
+ }
+}
+
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]