This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new afa679b6e41 [fix](cloud) recycle tmp rowsets before recycling deleted
instance (#61167)
afa679b6e41 is described below
commit afa679b6e4148426d8c6ea0a67f1fac2488220d2
Author: walter <[email protected]>
AuthorDate: Tue Mar 10 17:49:28 2026 +0800
[fix](cloud) recycle tmp rowsets before recycling deleted instance (#61167)
Ensure orphan tmp rowsets (with ref_count=1) are cleaned up before
recycling versioned rowsets, otherwise the instance cannot be fully
deleted and data files leak.
---
cloud/src/recycler/recycler.cpp | 23 +++++++++--
cloud/test/recycler_test.cpp | 90 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 110 insertions(+), 3 deletions(-)
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 23068ab66ff..47958da4f5b 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -819,19 +819,36 @@ int InstanceRecycler::recycle_deleted_instance() {
<< "s, instance_id=" << instance_id_;
};
- // Step 1: Recycle versioned rowsets in recycle space (already marked for
deletion)
+ // Step 1: Recycle tmp rowsets (contains ref count but txn is not
committed)
+ auto recycle_tmp_rowsets_with_mark_delete_enabled = [&]() -> int {
+ int res = recycle_tmp_rowsets();
+ if (res == 0 && config::enable_mark_delete_rowset_before_recycle) {
+ // If mark_delete_rowset_before_recycle is enabled, we will mark
delete rowsets before recycling them,
+ // so we need to recycle tmp rowsets again to make sure all
rowsets in recycle space are marked for
+ // deletion, otherwise we may meet some corner cases that some
rowsets are not marked for deletion
+ // and cannot be recycled.
+ res = recycle_tmp_rowsets();
+ }
+ return res;
+ };
+ if (recycle_tmp_rowsets_with_mark_delete_enabled() != 0) {
+ LOG_WARNING("failed to recycle tmp rowsets").tag("instance_id",
instance_id_);
+ return -1;
+ }
+
+ // Step 2: Recycle versioned rowsets in recycle space (already marked for
deletion)
if (recycle_versioned_rowsets() != 0) {
LOG_WARNING("failed to recycle versioned rowsets").tag("instance_id",
instance_id_);
return -1;
}
- // Step 2: Recycle operation logs (can recycle logs not referenced by
snapshots)
+ // Step 3: Recycle operation logs (can recycle logs not referenced by
snapshots)
if (recycle_operation_logs() != 0) {
LOG_WARNING("failed to recycle operation logs").tag("instance_id",
instance_id_);
return -1;
}
- // Step 3: Check if there are still cluster snapshots
+ // Step 4: Check if there are still cluster snapshots
bool has_snapshots = false;
if (has_cluster_snapshots(&has_snapshots) != 0) {
LOG(WARNING) << "check instance cluster snapshots failed,
instance_id=" << instance_id_;
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 976ca7a18d9..2f3981b439b 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -3326,6 +3326,96 @@ TEST(RecyclerTest, recycle_deleted_instance) {
}
}
+// Regression test: if commit_rowset is called but the txn is never committed,
+// the rowset stays in meta_rowset_tmp_key with data_rowset_ref_count_key=1.
+// recycle_deleted_instance() must call recycle_tmp_rowsets() first so that
+// the orphan ref_count is cleaned up before recycle_ref_rowsets() runs,
+// otherwise the instance is never fully deleted and data files are leaked.
+TEST(RecyclerTest, recycle_deleted_instance_with_orphan_tmp_rowset) {
+ config::retention_seconds = 0;
+ config::force_immediate_recycle = true;
+ DORIS_CLOUD_DEFER {
+ config::force_immediate_recycle = false;
+ };
+
+ auto txn_kv =
std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>());
+ ASSERT_NE(txn_kv.get(), nullptr);
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ // Create instance with multi-version read/write and snapshot support
+ InstanceInfoPB instance_info;
+ instance_info.set_instance_id(instance_id);
+
instance_info.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_READ_WRITE);
+
instance_info.set_snapshot_switch_status(SnapshotSwitchStatus::SNAPSHOT_SWITCH_ON);
+ auto* obj_info = instance_info.add_obj_info();
+ obj_info->set_id("orphan_tmp_rowset_test");
+
+ // Write instance info to FDB (required by
OperationLogRecycleChecker::init())
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string key = instance_key({instance_id});
+ std::string val = instance_info.SerializeAsString();
+ txn->put(key, val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ InstanceRecycler recycler(txn_kv, instance_info, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+ ASSERT_EQ(recycler.init(), 0);
+ auto accessor = recycler.accessor_map_.begin()->second;
+
+ constexpr int64_t tablet_id = 20001;
+ constexpr int64_t index_id = 20002;
+ constexpr int64_t txn_id = 888888;
+
+ // Simulate commit_rowset: write meta_rowset_tmp_key + increment ref_count,
+ // but do NOT commit the txn (no meta_rowset_key, no operation log).
+ doris::TabletSchemaCloudPB schema;
+ schema.set_schema_version(0);
+ auto rowset = create_rowset("orphan_tmp_rowset_test", tablet_id, index_id,
2, schema, txn_id);
+ ASSERT_EQ(0, create_tmp_rowset(txn_kv.get(), accessor.get(), rowset,
false));
+
+ // Verify the data file exists
+ {
+ std::unique_ptr<ListIterator> list_iter;
+ ASSERT_EQ(0, accessor->list_all(&list_iter));
+ ASSERT_TRUE(list_iter->has_next());
+ }
+
+ // Verify the ref_count key exists
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::unique_ptr<RangeGetIterator> it;
+ auto begin_key = versioned::data_rowset_ref_count_key({instance_id, 0,
""});
+ auto end_key = versioned::data_rowset_ref_count_key({instance_id,
INT64_MAX, ""});
+ ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(it->size(), 1);
+ }
+
+ // Recycle deleted instance
+ ASSERT_EQ(0, recycler.recycle_deleted_instance());
+
+ // All data files must be deleted
+ {
+ std::unique_ptr<ListIterator> list_iter;
+ ASSERT_EQ(0, accessor->list_all(&list_iter));
+ ASSERT_FALSE(list_iter->has_next());
+ }
+
+ // All ref_count keys must be cleaned up
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::unique_ptr<RangeGetIterator> it;
+ auto begin_key = versioned::data_rowset_ref_count_key({instance_id, 0,
""});
+ auto end_key = versioned::data_rowset_ref_count_key({instance_id,
INT64_MAX, ""});
+ ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(it->size(), 0);
+ }
+}
+
TEST(RecyclerTest, multi_recycler) {
config::recycle_concurrency = 2;
config::recycle_interval_seconds = 10;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]