This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bb5b4e8f7b [Feature](Recycler) Batch delete optimization for 
versioned tablet recycling (#59247)
1bb5b4e8f7b is described below

commit 1bb5b4e8f7bc63f54c81ebb2f81efdcfbacd5bc2
Author: Jimmy <[email protected]>
AuthorDate: Tue Dec 23 02:36:12 2025 +0800

    [Feature](Recycler) Batch delete optimization for versioned tablet 
recycling (#59247)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 cloud/src/common/bvars.cpp                 |   5 +
 cloud/src/common/bvars.h                   |   3 +
 cloud/src/recycler/recycler.cpp            | 365 +++++++++++++++++++++--
 cloud/src/recycler/recycler.h              |  20 +-
 cloud/test/recycle_versioned_keys_test.cpp | 450 +++++++++++++++++++++++++++++
 5 files changed, 813 insertions(+), 30 deletions(-)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 886106146af..92c2dc6bd22 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -180,6 +180,11 @@ BvarStatusWithTag<int64_t> 
g_bvar_recycler_packed_file_bytes_object_deleted(
 BvarStatusWithTag<int64_t> g_bvar_recycler_packed_file_rowset_scanned_num(
         "recycler", "packed_file_rowset_scanned_num");
 
+BvarStatusWithTag<int64_t> g_bvar_recycler_batch_delete_rowset_plan_count(
+        "recycler", "batch_delete_rowset_plan_count");
+BvarStatusWithTag<int64_t> g_bvar_recycler_batch_delete_failures(
+        "recycler", "batch_delete_failures");
+
 // txn_kv's bvars
 bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get");
 bvar::LatencyRecorder g_bvar_txn_kv_range_get("txn_kv", "range_get");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index cd6e3d2babb..254810b8f28 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -666,6 +666,9 @@ extern BvarStatusWithTag<int64_t> 
g_bvar_recycler_packed_file_recycle_cost_ms;
 extern BvarStatusWithTag<int64_t> g_bvar_recycler_packed_file_scanned_kv_num;
 extern BvarStatusWithTag<int64_t> g_bvar_recycler_packed_file_corrected_kv_num;
 extern BvarStatusWithTag<int64_t> 
g_bvar_recycler_packed_file_recycled_object_num;
+
+extern BvarStatusWithTag<int64_t> 
g_bvar_recycler_batch_delete_rowset_plan_count;
+extern BvarStatusWithTag<int64_t> g_bvar_recycler_batch_delete_failures;
 extern BvarStatusWithTag<int64_t> 
g_bvar_recycler_packed_file_bytes_object_deleted;
 extern BvarStatusWithTag<int64_t> 
g_bvar_recycler_packed_file_rowset_scanned_num;
 
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 8903907715e..7959a72efa3 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -4000,30 +4000,44 @@ int InstanceRecycler::recycle_versioned_tablet(int64_t 
tablet_id,
         max_rowset_expiration_time = std::max(max_rowset_expiration_time, 
rs_meta.txn_expiration());
     };
 
+    std::vector<RowsetDeleteTask> all_tasks;
+
+    auto create_delete_task = [this](const RowsetMetaCloudPB& rs_meta, 
std::string_view recycle_key,
+                                     std::string_view non_versioned_rowset_key 
=
+                                             "") -> RowsetDeleteTask {
+        RowsetDeleteTask task;
+        task.rowset_meta = rs_meta;
+        task.recycle_rowset_key = std::string(recycle_key);
+        task.non_versioned_rowset_key = std::string(non_versioned_rowset_key);
+        task.versioned_rowset_key = versioned::meta_rowset_key(
+                {instance_id_, rs_meta.tablet_id(), rs_meta.rowset_id_v2()});
+        return task;
+    };
+
     for (const auto& [rs_meta, versionstamp] : load_rowset_metas) {
         update_rowset_stats(rs_meta);
-        concurrent_delete_executor.add([tablet_id, versionstamp, rs_meta_pb = 
rs_meta, this]() {
-            // recycle both versioned and non-versioned rowset meta key
-            std::string rowset_load_key = versioned::meta_rowset_load_key(
-                    {instance_id_, tablet_id, rs_meta_pb.end_version()});
-            std::string rowset_key =
-                    meta_rowset_key({instance_id_, tablet_id, 
rs_meta_pb.end_version()});
-            return 
recycle_rowset_meta_and_data(encode_versioned_key(rowset_load_key, 
versionstamp),
-                                                rs_meta_pb, rowset_key);
-        });
+        // Version 0-1 rowset has no resource_id and no actual data files,
+        // but still needs ref_count key cleanup, so we add it to all_tasks.
+        // It will be filtered out in Phase 2 when building rowsets_to_delete.
+        std::string rowset_load_key =
+                versioned::meta_rowset_load_key({instance_id_, tablet_id, 
rs_meta.end_version()});
+        std::string rowset_key = meta_rowset_key({instance_id_, tablet_id, 
rs_meta.end_version()});
+        RowsetDeleteTask task = create_delete_task(
+                rs_meta, encode_versioned_key(rowset_load_key, versionstamp), 
rowset_key);
+        all_tasks.push_back(std::move(task));
     }
 
     for (const auto& [rs_meta, versionstamp] : compact_rowset_metas) {
         update_rowset_stats(rs_meta);
-        concurrent_delete_executor.add([tablet_id, versionstamp, rs_meta_pb = 
rs_meta, this]() {
-            // recycle both versioned and non-versioned rowset meta key
-            std::string rowset_load_key = versioned::meta_rowset_compact_key(
-                    {instance_id_, tablet_id, rs_meta_pb.end_version()});
-            std::string rowset_key =
-                    meta_rowset_key({instance_id_, tablet_id, 
rs_meta_pb.end_version()});
-            return 
recycle_rowset_meta_and_data(encode_versioned_key(rowset_load_key, 
versionstamp),
-                                                rs_meta_pb, rowset_key);
-        });
+        // Version 0-1 rowset has no resource_id and no actual data files,
+        // but still needs ref_count key cleanup, so we add it to all_tasks.
+        // It will be filtered out in Phase 2 when building rowsets_to_delete.
+        std::string rowset_compact_key = versioned::meta_rowset_compact_key(
+                {instance_id_, tablet_id, rs_meta.end_version()});
+        std::string rowset_key = meta_rowset_key({instance_id_, tablet_id, 
rs_meta.end_version()});
+        RowsetDeleteTask task = create_delete_task(
+                rs_meta, encode_versioned_key(rowset_compact_key, 
versionstamp), rowset_key);
+        all_tasks.push_back(std::move(task));
     }
 
     auto handle_recycle_rowset_kv = [&](std::string_view k, std::string_view 
v) {
@@ -4051,21 +4065,25 @@ int InstanceRecycler::recycle_versioned_tablet(int64_t 
tablet_id,
             decode_key(&k1, &out);
             // 0x01 "recycle" ${instance_id} "rowset" ${tablet_id} 
${rowset_id} -> RecycleRowsetPB
             const auto& rowset_id = std::get<std::string>(std::get<0>(out[4]));
-            LOG_INFO("delete rowset data")
+            LOG_INFO("delete old-version rowset data")
                     .tag("instance_id", instance_id_)
                     .tag("tablet_id", tablet_id)
                     .tag("rowset_id", rowset_id);
 
+            // Old version RecycleRowsetPB lacks full rowset_meta info 
(num_segments, schema, etc.),
+            // so we must use prefix deletion directly instead of batch delete.
             concurrent_delete_executor.add(
                     [tablet_id, resource_id = recycle_rowset.resource_id(), 
rowset_id, this]() {
                         // delete by prefix, the recycle rowset key will be 
deleted by range later.
                         return delete_rowset_data(resource_id, tablet_id, 
rowset_id);
                     });
         } else {
-            concurrent_delete_executor.add(
-                    [k = std::string(k), recycle_rowset = 
std::move(recycle_rowset), this]() {
-                        return recycle_rowset_meta_and_data(k, 
recycle_rowset.rowset_meta());
-                    });
+            const auto& rowset_meta = recycle_rowset.rowset_meta();
+            // Version 0-1 rowset has no resource_id and no actual data files,
+            // but still needs ref_count key cleanup, so we add it to 
all_tasks.
+            // It will be filtered out in Phase 2 when building 
rowsets_to_delete.
+            RowsetDeleteTask task = create_delete_task(rowset_meta, k);
+            all_tasks.push_back(std::move(task));
         }
         return 0;
     };
@@ -4078,6 +4096,75 @@ int InstanceRecycler::recycle_versioned_tablet(int64_t 
tablet_id,
         ret = -1;
     }
 
+    // Phase 1: Classify tasks by ref_count
+    std::vector<RowsetDeleteTask> batch_delete_tasks;
+    for (auto& task : all_tasks) {
+        int classify_ret = classify_rowset_task_by_ref_count(task, 
batch_delete_tasks);
+        if (classify_ret < 0) {
+            LOG_WARNING("failed to classify rowset task, fallback to old 
logic")
+                    .tag("instance_id", instance_id_)
+                    .tag("tablet_id", tablet_id)
+                    .tag("rowset_id", task.rowset_meta.rowset_id_v2());
+            concurrent_delete_executor.add([this, t = std::move(task)]() 
mutable {
+                return recycle_rowset_meta_and_data(t.recycle_rowset_key, 
t.rowset_meta,
+                                                    
t.non_versioned_rowset_key);
+            });
+        }
+    }
+
+    g_bvar_recycler_batch_delete_rowset_plan_count.put(instance_id_, 
batch_delete_tasks.size());
+
+    LOG_INFO("batch delete plan created")
+            .tag("instance_id", instance_id_)
+            .tag("tablet_id", tablet_id)
+            .tag("plan_count", batch_delete_tasks.size());
+
+    // Phase 2: Execute batch delete using existing delete_rowset_data
+    if (!batch_delete_tasks.empty()) {
+        std::map<std::string, RowsetMetaCloudPB> rowsets_to_delete;
+        for (const auto& task : batch_delete_tasks) {
+            // Version 0-1 rowset has no resource_id and no actual data files, 
skip it
+            if (task.rowset_meta.resource_id().empty()) {
+                LOG_INFO("skip rowset with empty resource_id in batch delete")
+                        .tag("instance_id", instance_id_)
+                        .tag("tablet_id", tablet_id)
+                        .tag("rowset_id", task.rowset_meta.rowset_id_v2());
+                continue;
+            }
+            rowsets_to_delete[task.rowset_meta.rowset_id_v2()] = 
task.rowset_meta;
+        }
+
+        // Only call delete_rowset_data if there are rowsets with actual data 
to delete
+        bool delete_success = true;
+        if (!rowsets_to_delete.empty()) {
+            RecyclerMetricsContext batch_metrics_context(instance_id_,
+                                                         
"batch_delete_versioned_tablet");
+            int delete_ret = delete_rowset_data(
+                    rowsets_to_delete, RowsetRecyclingState::FORMAL_ROWSET, 
batch_metrics_context);
+            if (delete_ret != 0) {
+                LOG_WARNING("batch delete execution failed")
+                        .tag("instance_id", instance_id_)
+                        .tag("tablet_id", tablet_id);
+                g_bvar_recycler_batch_delete_failures.put(instance_id_, 1);
+                ret = -1;
+                delete_success = false;
+            }
+        }
+
+        // Phase 3: Only cleanup metadata if data deletion succeeded.
+        // If deletion failed, keep recycle_rowset_key so next round will 
retry.
+        if (delete_success) {
+            int cleanup_ret = cleanup_rowset_metadata(batch_delete_tasks);
+            if (cleanup_ret != 0) {
+                LOG_WARNING("batch delete cleanup failed")
+                        .tag("instance_id", instance_id_)
+                        .tag("tablet_id", tablet_id);
+                ret = -1;
+            }
+        }
+    }
+
+    // Always wait for fallback tasks to complete before returning
     bool finished = true;
     std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
     for (int r : rets) {
@@ -4089,7 +4176,7 @@ int InstanceRecycler::recycle_versioned_tablet(int64_t 
tablet_id,
     ret = finished ? ret : -1;
 
     if (ret != 0) { // failed recycle tablet data
-        LOG_WARNING("ret!=0")
+        LOG_WARNING("recycle versioned tablet failed")
                 .tag("finished", finished)
                 .tag("ret", ret)
                 .tag("instance_id", instance_id_)
@@ -4840,7 +4927,7 @@ int InstanceRecycler::recycle_versioned_rowsets() {
 
 int InstanceRecycler::recycle_rowset_meta_and_data(std::string_view 
recycle_rowset_key,
                                                    const RowsetMetaCloudPB& 
rowset_meta,
-                                                   std::string_view 
secondary_rowset_key) {
+                                                   std::string_view 
non_versioned_rowset_key) {
     constexpr int MAX_RETRY = 10;
     int64_t tablet_id = rowset_meta.tablet_id();
     const std::string& rowset_id = rowset_meta.rowset_id_v2();
@@ -4939,9 +5026,9 @@ int 
InstanceRecycler::recycle_rowset_meta_and_data(std::string_view recycle_rows
             txn->remove(recycle_rowset_key);
             LOG_INFO("remove recycle rowset key").tag("key", 
hex(recycle_rowset_key));
         }
-        if (!secondary_rowset_key.empty()) {
-            txn->remove(secondary_rowset_key);
-            LOG_INFO("remove secondary rowset key").tag("key", 
hex(secondary_rowset_key));
+        if (!non_versioned_rowset_key.empty()) {
+            txn->remove(non_versioned_rowset_key);
+            LOG_INFO("remove non versioned rowset key").tag("key", 
hex(non_versioned_rowset_key));
         }
 
         err = txn->commit();
@@ -6742,4 +6829,226 @@ int 
InstanceRecycler::scan_and_statistics_restore_jobs() {
     return ret;
 }
 
+int InstanceRecycler::classify_rowset_task_by_ref_count(
+        RowsetDeleteTask& task, std::vector<RowsetDeleteTask>& 
batch_delete_tasks) {
+    constexpr int MAX_RETRY = 10;
+    const auto& rowset_meta = task.rowset_meta;
+    int64_t tablet_id = rowset_meta.tablet_id();
+    const std::string& rowset_id = rowset_meta.rowset_id_v2();
+    std::string_view reference_instance_id = instance_id_;
+    if (rowset_meta.has_reference_instance_id()) {
+        reference_instance_id = rowset_meta.reference_instance_id();
+    }
+
+    for (int i = 0; i < MAX_RETRY; ++i) {
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv_->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            LOG_WARNING("failed to create txn when classifying rowset task")
+                    .tag("instance_id", instance_id_)
+                    .tag("tablet_id", tablet_id)
+                    .tag("rowset_id", rowset_id)
+                    .tag("err", err);
+            return -1;
+        }
+
+        std::string rowset_ref_count_key =
+                versioned::data_rowset_ref_count_key({reference_instance_id, 
tablet_id, rowset_id});
+        task.rowset_ref_count_key = rowset_ref_count_key;
+
+        int64_t ref_count = 0;
+        {
+            std::string value;
+            TxnErrorCode err = txn->get(rowset_ref_count_key, &value);
+            if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                ref_count = 1;
+            } else if (err != TxnErrorCode::TXN_OK) {
+                LOG_WARNING("failed to get rowset ref count key when 
classifying")
+                        .tag("instance_id", instance_id_)
+                        .tag("tablet_id", tablet_id)
+                        .tag("rowset_id", rowset_id)
+                        .tag("err", err);
+                return -1;
+            } else if (!txn->decode_atomic_int(value, &ref_count)) {
+                LOG_WARNING("failed to decode rowset data ref count when 
classifying")
+                        .tag("instance_id", instance_id_)
+                        .tag("tablet_id", tablet_id)
+                        .tag("rowset_id", rowset_id)
+                        .tag("value", hex(value));
+                return -1;
+            }
+        }
+
+        if (ref_count > 1) {
+            // ref_count > 1: decrement count, remove recycle keys, don't add 
to batch delete
+            txn->atomic_add(rowset_ref_count_key, -1);
+            LOG_INFO("decrease rowset data ref count in classification phase")
+                    .tag("instance_id", instance_id_)
+                    .tag("tablet_id", tablet_id)
+                    .tag("rowset_id", rowset_id)
+                    .tag("ref_count", ref_count - 1)
+                    .tag("ref_count_key", hex(rowset_ref_count_key));
+
+            if (!task.recycle_rowset_key.empty()) {
+                txn->remove(task.recycle_rowset_key);
+                LOG_INFO("remove recycle rowset key in classification phase")
+                        .tag("key", hex(task.recycle_rowset_key));
+            }
+            if (!task.non_versioned_rowset_key.empty()) {
+                txn->remove(task.non_versioned_rowset_key);
+                LOG_INFO("remove non versioned rowset key in classification 
phase")
+                        .tag("key", hex(task.non_versioned_rowset_key));
+            }
+
+            err = txn->commit();
+            if (err == TxnErrorCode::TXN_CONFLICT) {
+                VLOG_DEBUG << "decrease rowset ref count but txn conflict in 
classification, retry"
+                           << " tablet_id=" << tablet_id << " rowset_id=" << 
rowset_id
+                           << ", ref_count=" << ref_count << ", retry=" << i;
+                std::this_thread::sleep_for(std::chrono::milliseconds(500));
+                continue;
+            } else if (err != TxnErrorCode::TXN_OK) {
+                LOG_WARNING("failed to commit txn when classifying rowset 
task")
+                        .tag("instance_id", instance_id_)
+                        .tag("tablet_id", tablet_id)
+                        .tag("rowset_id", rowset_id)
+                        .tag("err", err);
+                return -1;
+            }
+            return 1; // handled, not added to batch delete
+        } else {
+            // ref_count == 1: Add to batch delete plan without modifying any 
KV.
+            // Keep recycle_rowset_key as "pending recycle" marker until data 
is actually deleted.
+            LOG_INFO("add rowset to batch delete plan")
+                    .tag("instance_id", instance_id_)
+                    .tag("tablet_id", tablet_id)
+                    .tag("rowset_id", rowset_id)
+                    .tag("resource_id", rowset_meta.resource_id())
+                    .tag("ref_count", ref_count);
+
+            batch_delete_tasks.push_back(std::move(task));
+            return 0; // added to batch delete
+        }
+    }
+
+    LOG_WARNING("failed to classify rowset task after retry")
+            .tag("instance_id", instance_id_)
+            .tag("tablet_id", tablet_id)
+            .tag("rowset_id", rowset_id)
+            .tag("retry", MAX_RETRY);
+    return -1;
+}
+
+int InstanceRecycler::cleanup_rowset_metadata(const 
std::vector<RowsetDeleteTask>& tasks) {
+    int ret = 0;
+    for (const auto& task : tasks) {
+        int64_t tablet_id = task.rowset_meta.tablet_id();
+        const std::string& rowset_id = task.rowset_meta.rowset_id_v2();
+
+        // Note: decrement_packed_file_ref_counts is already called in 
delete_rowset_data,
+        // so we don't need to call it again here.
+
+        // Remove all metadata keys in one transaction
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv_->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            LOG_WARNING("failed to create txn when cleaning up metadata")
+                    .tag("instance_id", instance_id_)
+                    .tag("tablet_id", tablet_id)
+                    .tag("rowset_id", rowset_id)
+                    .tag("err", err);
+            ret = -1;
+            continue;
+        }
+
+        std::string_view reference_instance_id = instance_id_;
+        if (task.rowset_meta.has_reference_instance_id()) {
+            reference_instance_id = task.rowset_meta.reference_instance_id();
+        }
+
+        txn->remove(task.rowset_ref_count_key);
+        LOG_INFO("delete rowset data ref count key in cleanup phase")
+                .tag("instance_id", instance_id_)
+                .tag("tablet_id", tablet_id)
+                .tag("rowset_id", rowset_id)
+                .tag("ref_count_key", hex(task.rowset_ref_count_key));
+
+        std::string dbm_start_key =
+                meta_delete_bitmap_key({reference_instance_id, tablet_id, 
rowset_id, 0, 0});
+        std::string dbm_end_key = meta_delete_bitmap_key(
+                {reference_instance_id, tablet_id, rowset_id,
+                 std::numeric_limits<int64_t>::max(), 
std::numeric_limits<int64_t>::max()});
+        txn->remove(dbm_start_key, dbm_end_key);
+        LOG_INFO("remove delete bitmap kv in cleanup phase")
+                .tag("instance_id", instance_id_)
+                .tag("tablet_id", tablet_id)
+                .tag("rowset_id", rowset_id)
+                .tag("begin", hex(dbm_start_key))
+                .tag("end", hex(dbm_end_key));
+
+        std::string versioned_dbm_start_key =
+                versioned::meta_delete_bitmap_key({reference_instance_id, 
tablet_id, rowset_id});
+        std::string versioned_dbm_end_key = versioned_dbm_start_key;
+        encode_int64(INT64_MAX, &versioned_dbm_end_key);
+        txn->remove(versioned_dbm_start_key, versioned_dbm_end_key);
+        LOG_INFO("remove versioned delete bitmap kv in cleanup phase")
+                .tag("instance_id", instance_id_)
+                .tag("tablet_id", tablet_id)
+                .tag("rowset_id", rowset_id)
+                .tag("begin", hex(versioned_dbm_start_key))
+                .tag("end", hex(versioned_dbm_end_key));
+
+        // Remove versioned meta rowset key
+        if (!task.versioned_rowset_key.empty()) {
+            std::string versioned_rowset_key_end = task.versioned_rowset_key;
+            encode_int64(INT64_MAX, &versioned_rowset_key_end);
+            txn->remove(task.versioned_rowset_key, versioned_rowset_key_end);
+            LOG_INFO("remove versioned meta rowset key in cleanup phase")
+                    .tag("instance_id", instance_id_)
+                    .tag("tablet_id", tablet_id)
+                    .tag("rowset_id", rowset_id)
+                    .tag("begin", hex(task.versioned_rowset_key))
+                    .tag("end", hex(versioned_rowset_key_end));
+        }
+
+        if (!task.non_versioned_rowset_key.empty()) {
+            txn->remove(task.non_versioned_rowset_key);
+            LOG_INFO("remove non versioned rowset key in cleanup phase")
+                    .tag("instance_id", instance_id_)
+                    .tag("tablet_id", tablet_id)
+                    .tag("rowset_id", rowset_id)
+                    .tag("key", hex(task.non_versioned_rowset_key));
+        }
+
+        // Remove recycle_rowset_key last to ensure retry safety:
+        // if cleanup fails, this key remains and triggers next round retry.
+        if (!task.recycle_rowset_key.empty()) {
+            txn->remove(task.recycle_rowset_key);
+            LOG_INFO("remove recycle rowset key in cleanup phase")
+                    .tag("instance_id", instance_id_)
+                    .tag("tablet_id", tablet_id)
+                    .tag("rowset_id", rowset_id)
+                    .tag("key", hex(task.recycle_rowset_key));
+        }
+
+        err = txn->commit();
+        if (err != TxnErrorCode::TXN_OK) {
+            // Metadata cleanup failed. recycle_rowset_key remains, next round 
will retry.
+            LOG_WARNING("failed to commit cleanup metadata txn, will retry 
next round")
+                    .tag("instance_id", instance_id_)
+                    .tag("tablet_id", tablet_id)
+                    .tag("rowset_id", rowset_id)
+                    .tag("err", err);
+            ret = -1;
+            continue;
+        }
+
+        LOG_INFO("cleanup rowset metadata success")
+                .tag("instance_id", instance_id_)
+                .tag("tablet_id", tablet_id)
+                .tag("rowset_id", rowset_id);
+    }
+    return ret;
+}
+
 } // namespace doris::cloud
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index b69e5fa16dd..760b78aef81 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -131,6 +131,15 @@ enum class RowsetRecyclingState {
     TMP_ROWSET,
 };
 
+// Represents a single rowset deletion task for batch delete
+struct RowsetDeleteTask {
+    RowsetMetaCloudPB rowset_meta;
+    std::string recycle_rowset_key;       // Primary key marking "pending 
recycle"
+    std::string non_versioned_rowset_key; // Legacy non-versioned rowset meta 
key
+    std::string versioned_rowset_key;     // Versioned meta rowset key
+    std::string rowset_ref_count_key;
+};
+
 class RecyclerMetricsContext {
 public:
     RecyclerMetricsContext() = default;
@@ -470,12 +479,19 @@ private:
 
     // Recycle rowset meta and data, return 0 for success otherwise error
     //
-    // Both recycle_rowset_key and secondary_rowset_key will be removed in the 
same transaction.
+    // Both recycle_rowset_key and non_versioned_rowset_key will be removed in 
the same transaction.
     //
     // This function will decrease the rowset ref count and remove the rowset 
meta and data if the ref count is 1.
     int recycle_rowset_meta_and_data(std::string_view recycle_rowset_key,
                                      const RowsetMetaCloudPB& rowset_meta,
-                                     std::string_view secondary_rowset_key = 
"");
+                                     std::string_view non_versioned_rowset_key 
= "");
+
+    // Classify rowset task by ref_count, return 0 to add to batch delete, 1 
if handled (ref>1), -1 on error
+    int classify_rowset_task_by_ref_count(RowsetDeleteTask& task,
+                                          std::vector<RowsetDeleteTask>& 
batch_delete_tasks);
+
+    // Cleanup metadata for deleted rowsets, return 0 for success otherwise 
error
+    int cleanup_rowset_metadata(const std::vector<RowsetDeleteTask>& tasks);
 
     // Whether the instance has any snapshots, return 0 for success otherwise 
error.
     int has_cluster_snapshots(bool* any);
diff --git a/cloud/test/recycle_versioned_keys_test.cpp 
b/cloud/test/recycle_versioned_keys_test.cpp
index 4f0b54c3064..670cccfac4a 100644
--- a/cloud/test/recycle_versioned_keys_test.cpp
+++ b/cloud/test/recycle_versioned_keys_test.cpp
@@ -211,6 +211,8 @@ void add_tablet(CreateTabletsRequest& req, int64_t 
table_id, int64_t index_id, i
     first_rowset->set_tablet_id(tablet_id);
     first_rowset->set_start_version(0);
     first_rowset->set_end_version(1);
+    // Note: version 0-1 rowset has no resource_id and no actual data files,
+    // only KV metadata needs to be cleaned up during recycling.
     first_rowset->mutable_tablet_schema()->CopyFrom(*schema);
 }
 
@@ -1897,3 +1899,451 @@ TEST(RecycleVersionedKeysTest, RecycleDeletedInstance) {
         }
     }
 }
+
+// ============================================================================
+// Batch Delete Tests for recycle_versioned_tablet
+// ============================================================================
+
+// Test: ref_count==1 rowsets enter batch delete plan and recycle_rowset_key 
is cleaned up
+TEST(RecycleVersionedKeysTest, BatchDeleteRefCountOne) {
+    auto meta_service = get_meta_service();
+    auto txn_kv = meta_service->txn_kv();
+    std::string instance_id = "batch_delete_ref_count_one_test_instance";
+    std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
+    ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), 
instance_id));
+
+    int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id 
= 5;
+
+    {
+        ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), 
cloud_unique_id, db_id,
+                                                         table_id, index_id));
+        ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
+                meta_service.get(), cloud_unique_id, db_id, table_id, 
partition_id, index_id));
+        ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), 
cloud_unique_id, db_id, table_id,
+                                              index_id, partition_id, 
tablet_id));
+    }
+
+    size_t num_rowsets = 5;
+    std::vector<std::string> rowset_ids;
+    std::shared_ptr<MockAccessor> accessor = std::make_shared<MockAccessor>();
+    {
+        for (size_t i = 0; i < num_rowsets; ++i) {
+            std::string rowset_id;
+            ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 
cloud_unique_id, db_id,
+                                                  fmt::format("label_{}", i), 
table_id,
+                                                  partition_id, tablet_id, 
&rowset_id));
+            rowset_ids.push_back(rowset_id);
+            // Create mock files for each rowset
+            accessor->put_file(segment_path(tablet_id, rowset_id, 0), 
"segment_data");
+        }
+    }
+
+    // All rowsets have ref_count==1 (default), should enter batch delete
+    config::force_immediate_recycle = true;
+    DORIS_CLOUD_DEFER {
+        config::force_immediate_recycle = false;
+    };
+
+    InstanceInfoPB instance_info;
+    ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, 
instance_info));
+    auto recycler = get_instance_recycler(meta_service.get(), instance_info, 
accessor);
+    RecyclerMetricsContext ctx;
+    ASSERT_EQ(0, recycler->recycle_tablet(tablet_id, ctx));
+
+    {
+        // Verify all rowset data is deleted from storage
+        std::unique_ptr<ListIterator> list_iter;
+        ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id), 
&list_iter));
+        EXPECT_FALSE(list_iter->has_next()) << "All rowset data should be 
deleted";
+    }
+
+    {
+        // Verify all recycle_rowset keys are cleaned up
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        std::string begin_key = recycle_rowset_key({instance_id, tablet_id, 
""});
+        std::string end_key = recycle_rowset_key({instance_id, tablet_id + 1, 
""});
+        std::unique_ptr<RangeGetIterator> it;
+        ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+        size_t count = 0;
+        while (it->has_next()) {
+            it->next();
+            ++count;
+        }
+        EXPECT_EQ(count, 0) << "All recycle_rowset keys should be cleaned up";
+    }
+
+    {
+        // Verify all ref_count keys are cleaned up
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        std::string begin_key = 
versioned::data_rowset_ref_count_key({instance_id, tablet_id, ""});
+        std::string end_key =
+                versioned::data_rowset_ref_count_key({instance_id, tablet_id + 
1, ""});
+        std::unique_ptr<RangeGetIterator> it;
+        ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+        size_t count = 0;
+        while (it->has_next()) {
+            it->next();
+            ++count;
+        }
+        EXPECT_EQ(count, 0) << "All ref_count keys should be cleaned up";
+    }
+}
+
+// Test: ref_count>1 rowsets only decrement count, do not enter batch delete 
plan
+TEST(RecycleVersionedKeysTest, BatchDeleteRefCountGreaterThanOne) {
+    auto meta_service = get_meta_service();
+    auto txn_kv = meta_service->txn_kv();
+    std::string instance_id = "batch_delete_ref_count_gt_one_test_instance";
+    std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
+    ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), 
instance_id));
+
+    int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id 
= 5;
+
+    {
+        ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), 
cloud_unique_id, db_id,
+                                                         table_id, index_id));
+        ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
+                meta_service.get(), cloud_unique_id, db_id, table_id, 
partition_id, index_id));
+        ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), 
cloud_unique_id, db_id, table_id,
+                                              index_id, partition_id, 
tablet_id));
+    }
+
+    size_t num_rowsets = 3;
+    std::vector<std::string> rowset_ids;
+    std::shared_ptr<MockAccessor> accessor = std::make_shared<MockAccessor>();
+    {
+        for (size_t i = 0; i < num_rowsets; ++i) {
+            std::string rowset_id;
+            ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 
cloud_unique_id, db_id,
+                                                  fmt::format("label_{}", i), 
table_id,
+                                                  partition_id, tablet_id, 
&rowset_id));
+            rowset_ids.push_back(rowset_id);
+            accessor->put_file(segment_path(tablet_id, rowset_id, 0), 
"segment_data");
+        }
+    }
+
+    {
+        // Set ref_count > 1 for all rowsets (simulate shared rowsets)
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        for (const auto& rowset_id : rowset_ids) {
+            auto ref_count_key =
+                    versioned::data_rowset_ref_count_key({instance_id, 
tablet_id, rowset_id});
+            txn->atomic_add(ref_count_key, 1); // ref_count becomes 2
+        }
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    config::force_immediate_recycle = true;
+    DORIS_CLOUD_DEFER {
+        config::force_immediate_recycle = false;
+    };
+
+    InstanceInfoPB instance_info;
+    ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, 
instance_info));
+    auto recycler = get_instance_recycler(meta_service.get(), instance_info, 
accessor);
+    RecyclerMetricsContext ctx;
+    ASSERT_EQ(0, recycler->recycle_tablet(tablet_id, ctx));
+
+    {
+        // Verify rowset data is NOT deleted (ref_count > 1)
+        std::unique_ptr<ListIterator> list_iter;
+        ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id), 
&list_iter));
+        size_t file_count = 0;
+        while (list_iter->has_next()) {
+            list_iter->next();
+            ++file_count;
+        }
+        EXPECT_EQ(file_count, num_rowsets)
+                << "Rowset data should NOT be deleted when ref_count > 1";
+    }
+
+    {
+        // Verify ref_count is decremented to 1
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        for (const auto& rowset_id : rowset_ids) {
+            auto ref_count_key =
+                    versioned::data_rowset_ref_count_key({instance_id, 
tablet_id, rowset_id});
+            std::string value;
+            auto rc = txn->get(ref_count_key, &value);
+            ASSERT_EQ(rc, TxnErrorCode::TXN_OK);
+            int64_t ref_count = 0;
+            ASSERT_TRUE(txn->decode_atomic_int(value, &ref_count));
+            EXPECT_EQ(ref_count, 1) << "ref_count should be decremented to 1";
+        }
+    }
+
+    {
+        // Verify recycle_rowset keys are cleaned up (since ref_count > 1 path 
removes them)
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        std::string begin_key = recycle_rowset_key({instance_id, tablet_id, 
""});
+        std::string end_key = recycle_rowset_key({instance_id, tablet_id + 1, 
""});
+        std::unique_ptr<RangeGetIterator> it;
+        ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+        size_t count = 0;
+        while (it->has_next()) {
+            it->next();
+            ++count;
+        }
+        EXPECT_EQ(count, 0) << "recycle_rowset keys should be removed for 
ref_count > 1 case";
+    }
+}
+
+// Test: Mixed ref_count scenario - some rowsets have ref_count==1, others 
have ref_count>1
+TEST(RecycleVersionedKeysTest, BatchDeleteMixedRefCount) {
+    auto meta_service = get_meta_service();
+    auto txn_kv = meta_service->txn_kv();
+    std::string instance_id = "batch_delete_mixed_ref_count_test_instance";
+    std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
+    ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), 
instance_id));
+
+    int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id 
= 5;
+
+    {
+        ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), 
cloud_unique_id, db_id,
+                                                         table_id, index_id));
+        ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
+                meta_service.get(), cloud_unique_id, db_id, table_id, 
partition_id, index_id));
+        ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), 
cloud_unique_id, db_id, table_id,
+                                              index_id, partition_id, 
tablet_id));
+    }
+
+    size_t num_rowsets = 4;
+    std::vector<std::string> rowset_ids;
+    std::shared_ptr<MockAccessor> accessor = std::make_shared<MockAccessor>();
+    {
+        for (size_t i = 0; i < num_rowsets; ++i) {
+            std::string rowset_id;
+            ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 
cloud_unique_id, db_id,
+                                                  fmt::format("label_{}", i), 
table_id,
+                                                  partition_id, tablet_id, 
&rowset_id));
+            rowset_ids.push_back(rowset_id);
+            accessor->put_file(segment_path(tablet_id, rowset_id, 0), 
"segment_data");
+        }
+    }
+
+    // Set ref_count > 1 for first two rowsets only
+    std::vector<std::string> shared_rowset_ids = {rowset_ids[0], 
rowset_ids[1]};
+    std::vector<std::string> unique_rowset_ids = {rowset_ids[2], 
rowset_ids[3]};
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        for (const auto& rowset_id : shared_rowset_ids) {
+            auto ref_count_key =
+                    versioned::data_rowset_ref_count_key({instance_id, 
tablet_id, rowset_id});
+            txn->atomic_add(ref_count_key, 1); // ref_count becomes 2
+        }
+        ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+    }
+
+    config::force_immediate_recycle = true;
+    DORIS_CLOUD_DEFER {
+        config::force_immediate_recycle = false;
+    };
+
+    InstanceInfoPB instance_info;
+    ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, 
instance_info));
+    auto recycler = get_instance_recycler(meta_service.get(), instance_info, 
accessor);
+    RecyclerMetricsContext ctx;
+    ASSERT_EQ(0, recycler->recycle_tablet(tablet_id, ctx));
+
+    {
+        // Verify only shared rowsets' data remains
+        std::unique_ptr<ListIterator> list_iter;
+        ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id), 
&list_iter));
+        std::set<std::string> remaining_files;
+        while (list_iter->has_next()) {
+            auto file = list_iter->next();
+            remaining_files.insert(file->path);
+        }
+        EXPECT_EQ(remaining_files.size(), shared_rowset_ids.size())
+                << "Only shared rowsets' data should remain";
+
+        for (const auto& rowset_id : shared_rowset_ids) {
+            std::string expected_path = segment_path(tablet_id, rowset_id, 0);
+            EXPECT_TRUE(remaining_files.count(expected_path) > 0)
+                    << "Shared rowset data should remain: " << expected_path;
+        }
+    }
+
+    {
+        // Verify shared rowsets' ref_count is decremented to 1
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        for (const auto& rowset_id : shared_rowset_ids) {
+            auto ref_count_key =
+                    versioned::data_rowset_ref_count_key({instance_id, 
tablet_id, rowset_id});
+            std::string value;
+            auto rc = txn->get(ref_count_key, &value);
+            ASSERT_EQ(rc, TxnErrorCode::TXN_OK);
+            int64_t ref_count = 0;
+            ASSERT_TRUE(txn->decode_atomic_int(value, &ref_count));
+            EXPECT_EQ(ref_count, 1) << "Shared rowset ref_count should be 1";
+        }
+
+        // Verify unique rowsets' ref_count keys are deleted
+        for (const auto& rowset_id : unique_rowset_ids) {
+            auto ref_count_key =
+                    versioned::data_rowset_ref_count_key({instance_id, 
tablet_id, rowset_id});
+            std::string value;
+            auto rc = txn->get(ref_count_key, &value);
+            EXPECT_EQ(rc, TxnErrorCode::TXN_KEY_NOT_FOUND)
+                    << "Unique rowset ref_count key should be deleted";
+        }
+    }
+}
+
+// Test: Batch delete with multiple vaults (resource_ids)
+TEST(RecycleVersionedKeysTest, BatchDeleteMultipleVaults) {
+    auto meta_service = get_meta_service();
+    auto txn_kv = meta_service->txn_kv();
+    std::string instance_id = "batch_delete_multi_vault_test_instance";
+    std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
+    ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), 
instance_id));
+
+    int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id 
= 5;
+
+    {
+        ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), 
cloud_unique_id, db_id,
+                                                         table_id, index_id));
+        ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
+                meta_service.get(), cloud_unique_id, db_id, table_id, 
partition_id, index_id));
+        ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), 
cloud_unique_id, db_id, table_id,
+                                              index_id, partition_id, 
tablet_id));
+    }
+
+    // Create two accessors for different vaults
+    std::string resource_id_1 = std::string(RESOURCE_ID);
+    std::string resource_id_2 = "mock_resource_id_2";
+    auto accessor_1 = std::make_shared<MockAccessor>();
+    auto accessor_2 = std::make_shared<MockAccessor>();
+
+    std::vector<std::string> rowset_ids;
+    {
+        // Insert rowsets - they will use RESOURCE_ID by default
+        for (size_t i = 0; i < 3; ++i) {
+            std::string rowset_id;
+            ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 
cloud_unique_id, db_id,
+                                                  fmt::format("label_{}", i), 
table_id,
+                                                  partition_id, tablet_id, 
&rowset_id));
+            rowset_ids.push_back(rowset_id);
+            accessor_1->put_file(segment_path(tablet_id, rowset_id, 0), 
"segment_data");
+        }
+    }
+
+    config::force_immediate_recycle = true;
+    DORIS_CLOUD_DEFER {
+        config::force_immediate_recycle = false;
+    };
+
+    InstanceInfoPB instance_info;
+    ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, 
instance_info));
+
+    auto txn_lazy_committer = 
std::make_shared<TxnLazyCommitter>(meta_service->txn_kv());
+    auto recycler = std::make_unique<InstanceRecycler>(meta_service->txn_kv(), 
instance_info,
+                                                       thread_group, 
txn_lazy_committer);
+    // Add both accessors
+    recycler->TEST_add_accessor(resource_id_1, accessor_1);
+    recycler->TEST_add_accessor(resource_id_2, accessor_2);
+
+    RecyclerMetricsContext ctx;
+    ASSERT_EQ(0, recycler->recycle_tablet(tablet_id, ctx));
+
+    {
+        // Verify accessor_1's data is deleted
+        std::unique_ptr<ListIterator> list_iter;
+        ASSERT_EQ(0, accessor_1->list_directory(tablet_path_prefix(tablet_id), 
&list_iter));
+        EXPECT_FALSE(list_iter->has_next()) << "Vault 1 data should be 
deleted";
+    }
+
+    {
+        // Verify all metadata is cleaned up
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        std::string begin_key = 
versioned::data_rowset_ref_count_key({instance_id, tablet_id, ""});
+        std::string end_key =
+                versioned::data_rowset_ref_count_key({instance_id, tablet_id + 
1, ""});
+        std::unique_ptr<RangeGetIterator> it;
+        ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+        size_t count = 0;
+        while (it->has_next()) {
+            it->next();
+            ++count;
+        }
+        EXPECT_EQ(count, 0) << "All ref_count keys should be cleaned up";
+    }
+}
+
+// Test: Batch size exceeds max_batch_size, verify correct batching
+TEST(RecycleVersionedKeysTest, BatchDeleteExceedsMaxBatchSize) {
+    auto meta_service = get_meta_service();
+    auto txn_kv = meta_service->txn_kv();
+    std::string instance_id = 
"batch_delete_exceeds_max_batch_size_test_instance";
+    std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
+    ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), 
instance_id));
+
+    int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id 
= 5;
+
+    {
+        ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), 
cloud_unique_id, db_id,
+                                                         table_id, index_id));
+        ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
+                meta_service.get(), cloud_unique_id, db_id, table_id, 
partition_id, index_id));
+        ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), 
cloud_unique_id, db_id, table_id,
+                                              index_id, partition_id, 
tablet_id));
+    }
+
+    // Create multiple rowsets to test batch delete
+    size_t num_rowsets = 10;
+    std::vector<std::string> rowset_ids;
+    std::shared_ptr<MockAccessor> accessor = std::make_shared<MockAccessor>();
+    {
+        for (size_t i = 0; i < num_rowsets; ++i) {
+            std::string rowset_id;
+            ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 
cloud_unique_id, db_id,
+                                                  fmt::format("label_{}", i), 
table_id,
+                                                  partition_id, tablet_id, 
&rowset_id));
+            rowset_ids.push_back(rowset_id);
+            accessor->put_file(segment_path(tablet_id, rowset_id, 0), 
"segment_data");
+        }
+    }
+
+    config::force_immediate_recycle = true;
+    DORIS_CLOUD_DEFER {
+        config::force_immediate_recycle = false;
+    };
+
+    InstanceInfoPB instance_info;
+    ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, 
instance_info));
+    auto recycler = get_instance_recycler(meta_service.get(), instance_info, 
accessor);
+    RecyclerMetricsContext ctx;
+    ASSERT_EQ(0, recycler->recycle_tablet(tablet_id, ctx));
+
+    {
+        // Verify all data is deleted
+        std::unique_ptr<ListIterator> list_iter;
+        ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id), 
&list_iter));
+        EXPECT_FALSE(list_iter->has_next()) << "All data should be deleted";
+    }
+
+    {
+        // Verify all metadata is cleaned up
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        std::string begin_key = 
versioned::data_rowset_ref_count_key({instance_id, tablet_id, ""});
+        std::string end_key =
+                versioned::data_rowset_ref_count_key({instance_id, tablet_id + 
1, ""});
+        std::unique_ptr<RangeGetIterator> it;
+        ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+        size_t count = 0;
+        while (it->has_next()) {
+            it->next();
+            ++count;
+        }
+        EXPECT_EQ(count, 0) << "All ref_count keys should be cleaned up";
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to