This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a7c827ddfd6 branch-3.0: [feat](recycler) Add http api for statistics 
recycler metrics #52523 (#53117)
a7c827ddfd6 is described below

commit a7c827ddfd62662201ee47c08152c6eb8518b8f0
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Jul 12 10:17:34 2025 +0800

    branch-3.0: [feat](recycler) Add http api for statistics recycler metrics 
#52523 (#53117)
    
    Cherry-picked from #52523
    
    Co-authored-by: Uniqueyou <[email protected]>
---
 cloud/src/common/bvars.cpp              |    6 +-
 cloud/src/common/bvars.h                |    6 +-
 cloud/src/common/config.h               |    4 +-
 cloud/src/recycler/recycler.cpp         | 1532 +++++++++++++++++--------------
 cloud/src/recycler/recycler.h           |   55 +-
 cloud/src/recycler/recycler_service.cpp |  261 ++++++
 cloud/src/recycler/recycler_service.h   |    2 +
 gensrc/proto/cloud.proto                |    5 +
 8 files changed, 1172 insertions(+), 699 deletions(-)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 4485fb9f5d6..5999320af83 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -110,11 +110,11 @@ bvar::Adder<int64_t> 
g_bvar_recycler_instance_recycle_task_concurrency;
 // recycler's mbvars
 bvar::Adder<int64_t> 
g_bvar_recycler_instance_running_counter("recycler_instance_running_counter");
 // cost time of the last whole recycle process
-mBvarStatus<int64_t> 
g_bvar_recycler_instance_last_recycle_duration("recycler_instance_last_recycle_duration",{"instance_id"});
+mBvarStatus<int64_t> 
g_bvar_recycler_instance_last_round_recycle_duration("recycler_instance_last_round_recycle_duration",{"instance_id"});
 mBvarStatus<int64_t> 
g_bvar_recycler_instance_next_ts("recycler_instance_next_ts",{"instance_id"});
 // start and end timestamps of the recycle process
-mBvarStatus<int64_t> 
g_bvar_recycler_instance_recycle_st_ts("recycler_instance_recycle_st_ts",{"instance_id"});
-mBvarStatus<int64_t> 
g_bvar_recycler_instance_recycle_ed_ts("recycler_instance_recycle_ed_ts",{"instance_id"});
+mBvarStatus<int64_t> 
g_bvar_recycler_instance_recycle_start_ts("recycler_instance_recycle_start_ts",{"instance_id"});
+mBvarStatus<int64_t> 
g_bvar_recycler_instance_recycle_end_ts("recycler_instance_recycle_end_ts",{"instance_id"});
 mBvarStatus<int64_t> 
g_bvar_recycler_instance_recycle_last_success_ts("recycler_instance_recycle_last_success_ts",{"instance_id"});
 
 // recycler's mbvars
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 83ab481764f..3ca3e1b2cab 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -262,10 +262,10 @@ extern BvarStatusWithTag<int64_t> 
g_bvar_recycler_recycle_expired_txn_label_earl
 extern bvar::Status<int64_t> g_bvar_recycler_task_max_concurrency;
 extern bvar::Adder<int64_t> g_bvar_recycler_instance_recycle_task_concurrency;
 extern bvar::Adder<int64_t> g_bvar_recycler_instance_running_counter;
-extern mBvarStatus<int64_t> g_bvar_recycler_instance_last_recycle_duration;
+extern mBvarStatus<int64_t> 
g_bvar_recycler_instance_last_round_recycle_duration;
 extern mBvarStatus<int64_t> g_bvar_recycler_instance_next_ts;
-extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_st_ts;
-extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_ed_ts;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_start_ts;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_end_ts;
 extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_last_success_ts;
 
 extern mBvarIntAdder g_bvar_recycler_vault_recycle_status;
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 62cd5c9343b..6fd933d3a15 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -89,6 +89,8 @@ CONF_Strings(recycle_whitelist, ""); // Comma seprated list
 CONF_Strings(recycle_blacklist, ""); // Comma seprated list
 // IO worker thread pool concurrency: object list, delete
 CONF_mInt32(instance_recycler_worker_pool_size, "32");
+// The worker pool size for http api `statistics_recycle` worker pool
+CONF_mInt32(instance_recycler_statistics_recycle_worker_pool_size, "5");
 CONF_Bool(enable_checker, "false");
 // The parallelism for parallel recycle operation
 // s3_producer_pool recycle_tablet_pool, delete single object in this pool
@@ -105,7 +107,7 @@ CONF_mInt32(scan_instances_interval_seconds, "60"); // 1min
 // interval for check object
 CONF_mInt32(check_object_interval_seconds, "43200"); // 12hours
 // enable recycler metrics statistics
-CONF_Bool(enable_recycler_metrics, "false");
+CONF_Bool(enable_recycler_stats_metrics, "false");
 
 CONF_mInt64(check_recycle_task_interval_seconds, "600"); // 10min
 CONF_mInt64(recycler_sleep_before_scheduling_seconds, "60");
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index e285886acfb..e07dda0669a 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -68,8 +68,8 @@ namespace doris::cloud {
 
 using namespace std::chrono;
 
-static RecyclerMetricsContext tablet_metrics_context_("global_recycler", 
"recycle_tablet");
-static RecyclerMetricsContext segment_metrics_context_("global_recycler", 
"recycle_segment");
+RecyclerMetricsContext tablet_metrics_context_("global_recycler", 
"recycle_tablet");
+RecyclerMetricsContext segment_metrics_context_("global_recycler", 
"recycle_segment");
 
 // return 0 for success get a key, 1 for key not found, negative for error
 [[maybe_unused]] static int txn_get(TxnKv* txn_kv, std::string_view key, 
std::string& val) {
@@ -288,7 +288,7 @@ void Recycler::recycle_callback() {
         auto ctime_ms = 
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
         g_bvar_recycler_instance_recycle_task_concurrency << 1;
         g_bvar_recycler_instance_running_counter << 1;
-        g_bvar_recycler_instance_recycle_st_ts.put({instance_id}, ctime_ms);
+        g_bvar_recycler_instance_recycle_start_ts.put({instance_id}, ctime_ms);
         tablet_metrics_context_.reset();
         segment_metrics_context_.reset();
         ret = instance_recycler->do_recycle();
@@ -308,8 +308,8 @@ void Recycler::recycle_callback() {
 
         auto now = 
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
         auto elpased_ms = now - ctime_ms;
-        g_bvar_recycler_instance_recycle_ed_ts.put({instance_id}, now);
-        g_bvar_recycler_instance_last_recycle_duration.put({instance_id}, 
elpased_ms);
+        g_bvar_recycler_instance_recycle_end_ts.put({instance_id}, now);
+        
g_bvar_recycler_instance_last_round_recycle_duration.put({instance_id}, 
elpased_ms);
         g_bvar_recycler_instance_next_ts.put({instance_id},
                                              now + 
config::recycle_interval_seconds * 1000);
         LOG(INFO) << "recycle instance done, "
@@ -812,6 +812,178 @@ int InstanceRecycler::recycle_deleted_instance() {
     return ret;
 }
 
+bool is_txn_finished(std::shared_ptr<TxnKv> txn_kv, const std::string& 
instance_id,
+                     int64_t txn_id) {
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG(WARNING) << "failed to create txn, txn_id=" << txn_id << " 
instance_id=" << instance_id;
+        return false;
+    }
+
+    std::string index_val;
+    const std::string index_key = txn_index_key({instance_id, txn_id});
+    err = txn->get(index_key, &index_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        if (TxnErrorCode::TXN_KEY_NOT_FOUND == err) {
+            TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_has_been_recycled");
+            // txn has been recycled;
+            LOG(INFO) << "txn index key has been recycled, txn_id=" << txn_id
+                      << " instance_id=" << instance_id;
+            return true;
+        }
+        LOG(WARNING) << "failed to get txn index key, txn_id=" << txn_id
+                     << " instance_id=" << instance_id << " key=" << 
hex(index_key)
+                     << " err=" << err;
+        return false;
+    }
+
+    TxnIndexPB index_pb;
+    if (!index_pb.ParseFromString(index_val)) {
+        LOG(WARNING) << "failed to parse txn_index_pb, txn_id=" << txn_id
+                     << " instance_id=" << instance_id;
+        return false;
+    }
+
+    DCHECK(index_pb.has_tablet_index() == true);
+    if (!index_pb.tablet_index().has_db_id()) {
+        // In the previous version, the db_id was not set in the index_pb.
+        // If updating to the version which enable txn lazy commit, the db_id 
will be set.
+        LOG(INFO) << "txn index has no db_id, txn_id=" << txn_id << " 
instance_id=" << instance_id
+                  << " index=" << index_pb.ShortDebugString();
+        return true;
+    }
+
+    int64_t db_id = index_pb.tablet_index().db_id();
+    DCHECK_GT(db_id, 0) << "db_id=" << db_id << " txn_id=" << txn_id
+                        << " instance_id=" << instance_id;
+
+    std::string info_val;
+    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+    err = txn->get(info_key, &info_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        if (TxnErrorCode::TXN_KEY_NOT_FOUND == err) {
+            // txn info has been recycled;
+            LOG(INFO) << "txn info key has been recycled, db_id=" << db_id << 
" txn_id=" << txn_id
+                      << " instance_id=" << instance_id;
+            return true;
+        }
+
+        DCHECK(err != TxnErrorCode::TXN_KEY_NOT_FOUND);
+        LOG(WARNING) << "failed to get txn info key, txn_id=" << txn_id
+                     << " instance_id=" << instance_id << " key=" << 
hex(info_key)
+                     << " err=" << err;
+        return false;
+    }
+
+    TxnInfoPB txn_info;
+    if (!txn_info.ParseFromString(info_val)) {
+        LOG(WARNING) << "failed to parse txn_info, txn_id=" << txn_id
+                     << " instance_id=" << instance_id;
+        return false;
+    }
+
+    DCHECK(txn_info.txn_id() == txn_id) << "txn_id=" << txn_id << " 
instance_id=" << instance_id
+                                        << " txn_info=" << 
txn_info.ShortDebugString();
+
+    if (TxnStatusPB::TXN_STATUS_ABORTED == txn_info.status() ||
+        TxnStatusPB::TXN_STATUS_VISIBLE == txn_info.status()) {
+        TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_has_been_aborted", 
&txn_info);
+        return true;
+    }
+
+    TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_not_finished", &txn_info);
+    return false;
+}
+
+int64_t calculate_rowset_expired_time(const std::string& instance_id_, const 
RecycleRowsetPB& rs,
+                                      int64_t* earlest_ts /* rowset earliest 
expiration ts */) {
+    if (config::force_immediate_recycle) {
+        return 0L;
+    }
+    // RecycleRowsetPB created by compacted or dropped rowset has no 
expiration time, and will be recycled when exceed retention time
+    int64_t expiration = rs.expiration() > 0 ? rs.expiration() : 
rs.creation_time();
+    int64_t retention_seconds = config::retention_seconds;
+    if (rs.type() == RecycleRowsetPB::COMPACT || rs.type() == 
RecycleRowsetPB::DROP) {
+        retention_seconds = 
std::min(config::compacted_rowset_retention_seconds, retention_seconds);
+    }
+    int64_t final_expiration = expiration + retention_seconds;
+    if (*earlest_ts > final_expiration) {
+        *earlest_ts = final_expiration;
+        g_bvar_recycler_recycle_rowset_earlest_ts.put(instance_id_, 
*earlest_ts);
+    }
+    return final_expiration;
+}
+
+int64_t calculate_partition_expired_time(
+        const std::string& instance_id_, const RecyclePartitionPB& 
partition_meta_pb,
+        int64_t* earlest_ts /* partition earliest expiration ts */) {
+    if (config::force_immediate_recycle) {
+        return 0L;
+    }
+    int64_t expiration = partition_meta_pb.expiration() > 0 ? 
partition_meta_pb.expiration()
+                                                            : 
partition_meta_pb.creation_time();
+    int64_t retention_seconds = config::retention_seconds;
+    if (partition_meta_pb.state() == RecyclePartitionPB::DROPPED) {
+        retention_seconds =
+                std::min(config::dropped_partition_retention_seconds, 
retention_seconds);
+    }
+    int64_t final_expiration = expiration + retention_seconds;
+    if (*earlest_ts > final_expiration) {
+        *earlest_ts = final_expiration;
+        g_bvar_recycler_recycle_partition_earlest_ts.put(instance_id_, 
*earlest_ts);
+    }
+    return final_expiration;
+}
+
+int64_t calculate_index_expired_time(const std::string& instance_id_,
+                                     const RecycleIndexPB& index_meta_pb,
+                                     int64_t* earlest_ts /* index earliest 
expiration ts */) {
+    if (config::force_immediate_recycle) {
+        return 0L;
+    }
+    int64_t expiration = index_meta_pb.expiration() > 0 ? 
index_meta_pb.expiration()
+                                                        : 
index_meta_pb.creation_time();
+    int64_t retention_seconds = config::retention_seconds;
+    if (index_meta_pb.state() == RecycleIndexPB::DROPPED) {
+        retention_seconds = std::min(config::dropped_index_retention_seconds, 
retention_seconds);
+    }
+    int64_t final_expiration = expiration + retention_seconds;
+    if (*earlest_ts > final_expiration) {
+        *earlest_ts = final_expiration;
+        g_bvar_recycler_recycle_index_earlest_ts.put(instance_id_, 
*earlest_ts);
+    }
+    return final_expiration;
+}
+
+int64_t calculate_tmp_rowset_expired_time(
+        const std::string& instance_id_, const doris::RowsetMetaCloudPB& 
tmp_rowset_meta_pb,
+        int64_t* earlest_ts /* tmp_rowset earliest expiration ts */) {
+    // ATTN: `txn_expiration` should > 0, however we use `creation_time` + a 
large `retention_time` (> 1 day in production environment)
+    //  when `txn_expiration` <= 0 in some unexpected situation (usually when 
there are bugs). This is usually safe, coz loading
+    //  duration or timeout always < `retention_time` in practice.
+    int64_t expiration = tmp_rowset_meta_pb.txn_expiration() > 0
+                                 ? tmp_rowset_meta_pb.txn_expiration()
+                                 : tmp_rowset_meta_pb.creation_time();
+    expiration = config::force_immediate_recycle ? 0 : expiration;
+    int64_t final_expiration = expiration + config::retention_seconds;
+    if (*earlest_ts > final_expiration) {
+        *earlest_ts = final_expiration;
+        g_bvar_recycler_recycle_tmp_rowset_earlest_ts.put(instance_id_, 
*earlest_ts);
+    }
+    return final_expiration;
+}
+
+int64_t calculate_txn_expired_time(const std::string& instance_id_, const 
RecycleTxnPB& txn_meta_pb,
+                                   int64_t* earlest_ts /* txn earliest 
expiration ts */) {
+    int64_t final_expiration = txn_meta_pb.creation_time() + 
config::label_keep_max_second * 1000L;
+    if (*earlest_ts > final_expiration / 1000) {
+        *earlest_ts = final_expiration / 1000;
+        g_bvar_recycler_recycle_expired_txn_label_earlest_ts.put(instance_id_, 
*earlest_ts);
+    }
+    return final_expiration;
+}
+
 int InstanceRecycler::recycle_indexes() {
     const std::string task_name = "recycle_indexes";
     int64_t num_scanned = 0;
@@ -845,24 +1017,6 @@ int InstanceRecycler::recycle_indexes() {
 
     int64_t earlest_ts = std::numeric_limits<int64_t>::max();
 
-    auto calc_expiration = [&earlest_ts, this](const RecycleIndexPB& index) {
-        if (config::force_immediate_recycle) {
-            return 0L;
-        }
-        int64_t expiration = index.expiration() > 0 ? index.expiration() : 
index.creation_time();
-        int64_t retention_seconds = config::retention_seconds;
-        if (index.state() == RecycleIndexPB::DROPPED) {
-            retention_seconds =
-                    std::min(config::dropped_index_retention_seconds, 
retention_seconds);
-        }
-        int64_t final_expiration = expiration + retention_seconds;
-        if (earlest_ts > final_expiration) {
-            earlest_ts = final_expiration;
-            g_bvar_recycler_recycle_index_earlest_ts.put(instance_id_, 
earlest_ts);
-        }
-        return final_expiration;
-    };
-
     // Elements in `index_keys` has the same lifetime as `it` in 
`scan_and_recycle`
     std::vector<std::string_view> index_keys;
     auto recycle_func = [&, this](std::string_view k, std::string_view v) -> 
int {
@@ -873,7 +1027,8 @@ int InstanceRecycler::recycle_indexes() {
             return -1;
         }
         int64_t current_time = ::time(nullptr);
-        if (current_time < calc_expiration(index_pb)) { // not expired
+        if (current_time <
+            calculate_index_expired_time(instance_id_, index_pb, &earlest_ts)) 
{ // not expired
             return 0;
         }
         ++num_expired;
@@ -933,47 +1088,6 @@ int InstanceRecycler::recycle_indexes() {
         return 0;
     };
 
-    // for calculate the total num or bytes of recyled objects
-    auto scan_and_statistics = [&](std::string_view k, std::string_view v) -> 
int {
-        RecycleIndexPB index_pb;
-        if (!index_pb.ParseFromArray(v.data(), v.size())) {
-            return 0;
-        }
-        int64_t current_time = ::time(nullptr);
-        if (current_time < calc_expiration(index_pb)) {
-            return 0;
-        }
-        // decode index_id
-        auto k1 = k;
-        k1.remove_prefix(1);
-        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
-        decode_key(&k1, &out);
-        // 0x01 "recycle" ${instance_id} "index" ${index_id} -> RecycleIndexPB
-        auto index_id = std::get<int64_t>(std::get<0>(out[3]));
-        std::unique_ptr<Transaction> txn;
-        TxnErrorCode err = txn_kv_->create_txn(&txn);
-        if (err != TxnErrorCode::TXN_OK) {
-            return 0;
-        }
-        std::string val;
-        err = txn->get(k, &val);
-        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-            return 0;
-        }
-        if (err != TxnErrorCode::TXN_OK) {
-            return 0;
-        }
-        index_pb.Clear();
-        if (!index_pb.ParseFromString(val)) {
-            return 0;
-        }
-        if (scan_tablets_and_statistics(index_pb.table_id(), index_id, 
metrics_context) != 0) {
-            return 0;
-        }
-        metrics_context.total_need_recycle_num++;
-        return 0;
-    };
-
     auto loop_done = [&index_keys, this]() -> int {
         if (index_keys.empty()) return 0;
         DORIS_CLOUD_DEFER {
@@ -986,9 +1100,11 @@ int InstanceRecycler::recycle_indexes() {
         return 0;
     };
 
-    return scan_for_recycle_and_statistics(index_key0, index_key1, "indexes", 
metrics_context,
-                                           std::move(scan_and_statistics), 
std::move(recycle_func),
-                                           std::move(loop_done));
+    if (config::enable_recycler_stats_metrics) {
+        scan_and_statistics_indexes();
+    }
+    // recycle_func and loop_done for scan and recycle
+    return scan_and_recycle(index_key0, index_key1, std::move(recycle_func), 
std::move(loop_done));
 }
 
 bool check_lazy_txn_finished(std::shared_ptr<TxnKv> txn_kv, const std::string 
instance_id,
@@ -1109,25 +1225,6 @@ int InstanceRecycler::recycle_partitions() {
 
     int64_t earlest_ts = std::numeric_limits<int64_t>::max();
 
-    auto calc_expiration = [&earlest_ts, this](const RecyclePartitionPB& 
partition) {
-        if (config::force_immediate_recycle) {
-            return 0L;
-        }
-        int64_t expiration =
-                partition.expiration() > 0 ? partition.expiration() : 
partition.creation_time();
-        int64_t retention_seconds = config::retention_seconds;
-        if (partition.state() == RecyclePartitionPB::DROPPED) {
-            retention_seconds =
-                    std::min(config::dropped_partition_retention_seconds, 
retention_seconds);
-        }
-        int64_t final_expiration = expiration + retention_seconds;
-        if (earlest_ts > final_expiration) {
-            earlest_ts = final_expiration;
-            g_bvar_recycler_recycle_partition_earlest_ts.put(instance_id_, 
earlest_ts);
-        }
-        return final_expiration;
-    };
-
     // Elements in `partition_keys` has the same lifetime as `it` in 
`scan_and_recycle`
     std::vector<std::string_view> partition_keys;
     std::vector<std::string> partition_version_keys;
@@ -1139,7 +1236,8 @@ int InstanceRecycler::recycle_partitions() {
             return -1;
         }
         int64_t current_time = ::time(nullptr);
-        if (current_time < calc_expiration(part_pb)) { // not expired
+        if (current_time <
+            calculate_partition_expired_time(instance_id_, part_pb, 
&earlest_ts)) { // not expired
             return 0;
         }
         ++num_expired;
@@ -1213,54 +1311,6 @@ int InstanceRecycler::recycle_partitions() {
         return ret;
     };
 
-    // for calculate the total num or bytes of recyled objects
-    auto scan_and_statistics = [&, this](std::string_view k, std::string_view 
v) -> int {
-        RecyclePartitionPB part_pb;
-        if (!part_pb.ParseFromArray(v.data(), v.size())) {
-            return 0;
-        }
-        int64_t current_time = ::time(nullptr);
-        if (current_time < calc_expiration(part_pb)) {
-            return 0;
-        }
-        // decode partition_id
-        auto k1 = k;
-        k1.remove_prefix(1);
-        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
-        decode_key(&k1, &out);
-        // 0x01 "recycle" ${instance_id} "partition" ${partition_id} -> 
RecyclePartitionPB
-        auto partition_id = std::get<int64_t>(std::get<0>(out[3]));
-        // Change state to RECYCLING
-        std::unique_ptr<Transaction> txn;
-        TxnErrorCode err = txn_kv_->create_txn(&txn);
-        if (err != TxnErrorCode::TXN_OK) {
-            return 0;
-        }
-        std::string val;
-        err = txn->get(k, &val);
-        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-            return 0;
-        }
-        if (err != TxnErrorCode::TXN_OK) {
-            return 0;
-        }
-        part_pb.Clear();
-        if (!part_pb.ParseFromString(val)) {
-            return 0;
-        }
-        // Partitions with PREPARED state MUST have no data
-        bool is_empty_tablet = part_pb.state() == RecyclePartitionPB::PREPARED;
-        int ret = 0;
-        for (int64_t index_id : part_pb.index_id()) {
-            if (scan_tablets_and_statistics(part_pb.table_id(), index_id, 
metrics_context,
-                                            partition_id, is_empty_tablet) != 
0) {
-                ret = 0;
-            }
-        }
-        metrics_context.total_need_recycle_num++;
-        return ret;
-    };
-
     auto loop_done = [&partition_keys, &partition_version_keys, this]() -> int 
{
         if (partition_keys.empty()) return 0;
         DORIS_CLOUD_DEFER {
@@ -1288,9 +1338,11 @@ int InstanceRecycler::recycle_partitions() {
         return 0;
     };
 
-    return scan_for_recycle_and_statistics(part_key0, part_key1, "partitions", 
metrics_context,
-                                           std::move(scan_and_statistics), 
std::move(recycle_func),
-                                           std::move(loop_done));
+    if (config::enable_recycler_stats_metrics) {
+        scan_and_statistics_partitions();
+    }
+    // recycle_func and loop_done for scan and recycle
+    return scan_and_recycle(part_key0, part_key1, std::move(recycle_func), 
std::move(loop_done));
 }
 
 int InstanceRecycler::recycle_versions() {
@@ -1370,48 +1422,11 @@ int InstanceRecycler::recycle_versions() {
         return 0;
     };
 
-    int64_t last_scanned_table_id_t = 0;
-    bool is_recycled_t = false; // Is last scanned kv recycled
-    // for calculate the total num or bytes of recyled objects
-    auto scan_and_statistics = [&metrics_context, &last_scanned_table_id_t, 
&is_recycled_t, this](
-                                       std::string_view k, std::string_view) {
-        auto k1 = k;
-        k1.remove_prefix(1);
-        // 0x01 "version" ${instance_id} "partition" ${db_id} ${tbl_id} 
${partition_id}
-        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
-        decode_key(&k1, &out);
-        DCHECK_EQ(out.size(), 6) << k;
-        auto table_id = std::get<int64_t>(std::get<0>(out[4]));
-        if (table_id == last_scanned_table_id_t) { // Already handle kvs of 
this table
-            metrics_context.total_need_recycle_num +=
-                    is_recycled_t; // Version kv of this table has been 
recycled
-            return 0;
-        }
-        last_scanned_table_id_t = table_id;
-        is_recycled_t = false;
-        auto tablet_key_begin = stats_tablet_key({instance_id_, table_id, 0, 
0, 0});
-        auto tablet_key_end = stats_tablet_key({instance_id_, table_id, 
INT64_MAX, 0, 0});
-        std::unique_ptr<Transaction> txn;
-        TxnErrorCode err = txn_kv_->create_txn(&txn);
-        if (err != TxnErrorCode::TXN_OK) {
-            return 0;
-        }
-        std::unique_ptr<RangeGetIterator> iter;
-        err = txn->get(tablet_key_begin, tablet_key_end, &iter, false, 1);
-        if (err != TxnErrorCode::TXN_OK) {
-            return 0;
-        }
-        if (iter->has_next()) { // Table is useful, should not recycle table 
and partition versions
-            return 0;
-        }
-        metrics_context.total_need_recycle_num++;
-        is_recycled_t = true;
-        return 0;
-    };
-
-    return scan_for_recycle_and_statistics(version_key_begin, version_key_end, 
"versions",
-                                           metrics_context, 
std::move(scan_and_statistics),
-                                           std::move(recycle_func));
+    if (config::enable_recycler_stats_metrics) {
+        scan_and_statistics_versions();
+    }
+    // recycle_func and loop_done for scan and recycle
+    return scan_and_recycle(version_key_begin, version_key_end, 
std::move(recycle_func));
 }
 
 int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id,
@@ -1935,7 +1950,12 @@ int 
InstanceRecycler::scan_tablets_and_statistics(int64_t table_id, int64_t inde
         }
         return 0;
     };
-    return scan_and_recycle(tablet_key_begin, tablet_key_end, 
std::move(scan_and_statistics));
+    return scan_and_recycle(tablet_key_begin, tablet_key_end, 
std::move(scan_and_statistics),
+                            [&metrics_context]() -> int {
+                                metrics_context.report();
+                                tablet_metrics_context_.report();
+                                return 0;
+                            });
 }
 
 int InstanceRecycler::scan_tablet_and_statistics(int64_t tablet_id,
@@ -2033,7 +2053,7 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id, 
RecyclerMetricsContext&
     std::string recyc_rs_key0 = recycle_rowset_key({instance_id_, tablet_id, 
""});
     std::string recyc_rs_key1 = recycle_rowset_key({instance_id_, tablet_id + 
1, ""});
 
-    std::vector<std::string> rowset_meta;
+    std::set<std::string> resource_ids;
     int64_t recycle_rowsets_number = 0;
     int64_t recycle_segments_number = 0;
     int64_t recycle_rowsets_data_size = 0;
@@ -2139,17 +2159,16 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id, 
RecyclerMetricsContext&
         max_rowset_creation_time = std::max(max_rowset_creation_time, 
rs_meta.creation_time());
         min_rowset_expiration_time = std::min(min_rowset_expiration_time, 
rs_meta.txn_expiration());
         max_rowset_expiration_time = std::max(max_rowset_expiration_time, 
rs_meta.txn_expiration());
-        rowset_meta.emplace_back(rs_meta.resource_id());
-        LOG(INFO) << "rs_meta.resource_id()=" << rs_meta.resource_id();
+        resource_ids.emplace(rs_meta.resource_id());
     }
 
     LOG_INFO("recycle tablet start to delete object")
             .tag("instance id", instance_id_)
             .tag("tablet id", tablet_id)
             .tag("recycle tablet resource ids are",
-                 std::accumulate(rowset_meta.begin(), rowset_meta.begin(), 
std::string(),
-                                 [](std::string acc, const auto& it) {
-                                     return acc.empty() ? it : acc + ", " + it;
+                 std::accumulate(resource_ids.begin(), resource_ids.begin(), 
std::string(),
+                                 [](std::string rs_id, const auto& it) {
+                                     return rs_id.empty() ? it : rs_id + ", " 
+ it;
                                  }));
 
     SyncExecutor<int> concurrent_delete_executor(
@@ -2161,7 +2180,7 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id, 
RecyclerMetricsContext&
     // ATTN: there may be data leak if not all accessor initilized successfully
     //       partial data deleted if the tablet is stored cross-storage vault
     //       vault id is not attached to TabletMeta...
-    for (const auto& resource_id : rowset_meta) {
+    for (const auto& resource_id : resource_ids) {
         concurrent_delete_executor.add([&, rs_id = resource_id,
                                         accessor_ptr = 
accessor_map_[resource_id]]() {
             std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, 
[&](int*) {
@@ -2208,8 +2227,11 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id, 
RecyclerMetricsContext&
     segment_metrics_context_.total_recycled_num += recycle_segments_number;
     segment_metrics_context_.total_recycled_data_size +=
             recycle_rowsets_data_size + recycle_rowsets_index_size;
+    metrics_context.total_recycled_data_size +=
+            recycle_rowsets_data_size + recycle_rowsets_index_size;
     tablet_metrics_context_.report();
     segment_metrics_context_.report();
+    metrics_context.report();
 
     txn.reset();
     if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
@@ -2339,77 +2361,23 @@ int InstanceRecycler::recycle_rowsets() {
 
     int64_t earlest_ts = std::numeric_limits<int64_t>::max();
 
-    auto calc_expiration = [&earlest_ts, this](const RecycleRowsetPB& rs) {
-        if (config::force_immediate_recycle) {
-            return 0L;
-        }
-        // RecycleRowsetPB created by compacted or dropped rowset has no 
expiration time, and will be recycled when exceed retention time
-        int64_t expiration = rs.expiration() > 0 ? rs.expiration() : 
rs.creation_time();
-        int64_t retention_seconds = config::retention_seconds;
-        if (rs.type() == RecycleRowsetPB::COMPACT || rs.type() == 
RecycleRowsetPB::DROP) {
-            retention_seconds =
-                    std::min(config::compacted_rowset_retention_seconds, 
retention_seconds);
-        }
-        int64_t final_expiration = expiration + retention_seconds;
-        if (earlest_ts > final_expiration) {
-            earlest_ts = final_expiration;
-            g_bvar_recycler_recycle_rowset_earlest_ts.put(instance_id_, 
earlest_ts);
-        }
-        return final_expiration;
-    };
-
-    // for calculate the total num or bytes of recyled objects
-    auto scan_and_statistics = [&](std::string_view k, std::string_view v) -> 
int {
+    auto handle_rowset_kv = [&, this](std::string_view k, std::string_view v) 
-> int {
+        ++num_scanned;
+        total_rowset_key_size += k.size();
+        total_rowset_value_size += v.size();
         RecycleRowsetPB rowset;
         if (!rowset.ParseFromArray(v.data(), v.size())) {
-            return 0;
+            LOG_WARNING("malformed recycle rowset").tag("key", hex(k));
+            return -1;
         }
+
+        int final_expiration = calculate_rowset_expired_time(instance_id_, 
rowset, &earlest_ts);
+
+        VLOG_DEBUG << "recycle rowset scan, key=" << hex(k) << " num_scanned=" 
<< num_scanned
+                   << " num_expired=" << num_expired << " expiration=" << 
final_expiration
+                   << " RecycleRowsetPB=" << rowset.ShortDebugString();
         int64_t current_time = ::time(nullptr);
-        if (current_time < calc_expiration(rowset)) { // not expired
-            return 0;
-        }
-        if (!rowset.has_type()) {
-            if (!rowset.has_resource_id()) [[unlikely]] {
-                return 0;
-            }
-            if (rowset.resource_id().empty()) [[unlikely]] {
-                return 0;
-            }
-            return 0;
-        }
-        auto* rowset_meta = rowset.mutable_rowset_meta();
-        if (!rowset_meta->has_resource_id()) [[unlikely]] {
-            if (rowset.type() == RecycleRowsetPB::PREPARE || 
rowset_meta->num_segments() != 0) {
-                return 0;
-            }
-        }
-        if (rowset.type() != RecycleRowsetPB::PREPARE) {
-            if (rowset_meta->num_segments() > 0) {
-                metrics_context.total_need_recycle_num++;
-                segment_metrics_context_.total_need_recycle_num += 
rowset_meta->num_segments();
-                segment_metrics_context_.total_need_recycle_data_size +=
-                        rowset_meta->total_disk_size();
-                metrics_context.total_need_recycle_data_size += 
rowset_meta->total_disk_size();
-            }
-        }
-        return 0;
-    };
-
-    auto handle_rowset_kv = [&](std::string_view k, std::string_view v) -> int 
{
-        ++num_scanned;
-        total_rowset_key_size += k.size();
-        total_rowset_value_size += v.size();
-        RecycleRowsetPB rowset;
-        if (!rowset.ParseFromArray(v.data(), v.size())) {
-            LOG_WARNING("malformed recycle rowset").tag("key", hex(k));
-            return -1;
-        }
-
-        VLOG_DEBUG << "recycle rowset scan, key=" << hex(k) << " num_scanned=" 
<< num_scanned
-                   << " num_expired=" << num_expired << " expiration=" << 
calc_expiration(rowset)
-                   << " RecycleRowsetPB=" << rowset.ShortDebugString();
-        int64_t current_time = ::time(nullptr);
-        if (current_time < calc_expiration(rowset)) { // not expired
+        if (current_time < final_expiration) { // not expired
             return 0;
         }
         ++num_expired;
@@ -2504,9 +2472,13 @@ int InstanceRecycler::recycle_rowsets() {
         return 0;
     };
 
-    int ret = scan_for_recycle_and_statistics(recyc_rs_key0, recyc_rs_key1, 
"rowsets",
-                                              metrics_context, 
std::move(scan_and_statistics),
-                                              std::move(handle_rowset_kv), 
std::move(loop_done));
+    if (config::enable_recycler_stats_metrics) {
+        scan_and_statistics_rowsets();
+    }
+    // recycle_func and loop_done for scan and recycle
+    int ret = scan_and_recycle(recyc_rs_key0, recyc_rs_key1, 
std::move(handle_rowset_kv),
+                               std::move(loop_done));
+
     worker_pool->stop();
 
     if (!async_recycled_rowset_keys.empty()) {
@@ -2520,90 +2492,6 @@ int InstanceRecycler::recycle_rowsets() {
     return ret;
 }
 
-bool is_txn_finished(std::shared_ptr<TxnKv> txn_kv, const std::string& 
instance_id,
-                     int64_t txn_id) {
-    std::unique_ptr<Transaction> txn;
-    TxnErrorCode err = txn_kv->create_txn(&txn);
-    if (err != TxnErrorCode::TXN_OK) {
-        LOG(WARNING) << "failed to create txn, txn_id=" << txn_id << " 
instance_id=" << instance_id;
-        return false;
-    }
-
-    std::string index_val;
-    const std::string index_key = txn_index_key({instance_id, txn_id});
-    err = txn->get(index_key, &index_val);
-    if (err != TxnErrorCode::TXN_OK) {
-        if (TxnErrorCode::TXN_KEY_NOT_FOUND == err) {
-            TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_has_been_recycled");
-            // txn has been recycled;
-            LOG(INFO) << "txn index key has been recycled, txn_id=" << txn_id
-                      << " instance_id=" << instance_id;
-            return true;
-        }
-        LOG(WARNING) << "failed to get txn index key, txn_id=" << txn_id
-                     << " instance_id=" << instance_id << " key=" << 
hex(index_key)
-                     << " err=" << err;
-        return false;
-    }
-
-    TxnIndexPB index_pb;
-    if (!index_pb.ParseFromString(index_val)) {
-        LOG(WARNING) << "failed to parse txn_index_pb, txn_id=" << txn_id
-                     << " instance_id=" << instance_id;
-        return false;
-    }
-
-    DCHECK(index_pb.has_tablet_index() == true);
-    if (!index_pb.tablet_index().has_db_id()) {
-        // In the previous version, the db_id was not set in the index_pb.
-        // If updating to the version which enable txn lazy commit, the db_id 
will be set.
-        LOG(INFO) << "txn index has no db_id, txn_id=" << txn_id << " 
instance_id=" << instance_id
-                  << " index=" << index_pb.ShortDebugString();
-        return true;
-    }
-
-    int64_t db_id = index_pb.tablet_index().db_id();
-    DCHECK_GT(db_id, 0) << "db_id=" << db_id << " txn_id=" << txn_id
-                        << " instance_id=" << instance_id;
-
-    std::string info_val;
-    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
-    err = txn->get(info_key, &info_val);
-    if (err != TxnErrorCode::TXN_OK) {
-        if (TxnErrorCode::TXN_KEY_NOT_FOUND == err) {
-            // txn info has been recycled;
-            LOG(INFO) << "txn info key has been recycled, db_id=" << db_id << 
" txn_id=" << txn_id
-                      << " instance_id=" << instance_id;
-            return true;
-        }
-
-        DCHECK(err != TxnErrorCode::TXN_KEY_NOT_FOUND);
-        LOG(WARNING) << "failed to get txn info key, txn_id=" << txn_id
-                     << " instance_id=" << instance_id << " key=" << 
hex(info_key)
-                     << " err=" << err;
-        return false;
-    }
-
-    TxnInfoPB txn_info;
-    if (!txn_info.ParseFromString(info_val)) {
-        LOG(WARNING) << "failed to parse txn_info, txn_id=" << txn_id
-                     << " instance_id=" << instance_id;
-        return false;
-    }
-
-    DCHECK(txn_info.txn_id() == txn_id) << "txn_id=" << txn_id << " 
instance_id=" << instance_id
-                                        << " txn_info=" << 
txn_info.ShortDebugString();
-
-    if (TxnStatusPB::TXN_STATUS_ABORTED == txn_info.status() ||
-        TxnStatusPB::TXN_STATUS_VISIBLE == txn_info.status()) {
-        TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_has_been_aborted", 
&txn_info);
-        return true;
-    }
-
-    TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_not_finished", &txn_info);
-    return false;
-}
-
 int InstanceRecycler::recycle_tmp_rowsets() {
     const std::string task_name = "recycle_tmp_rowsets";
     int64_t num_scanned = 0;
@@ -2648,25 +2536,10 @@ int InstanceRecycler::recycle_tmp_rowsets() {
     std::map<std::string, doris::RowsetMetaCloudPB> tmp_rowsets;
 
     int64_t earlest_ts = std::numeric_limits<int64_t>::max();
-    auto calc_expiration = [&earlest_ts, this](const doris::RowsetMetaCloudPB& 
rowset) {
-        // ATTN: `txn_expiration` should > 0, however we use `creation_time` + 
a large `retention_time` (> 1 day in production environment)
-        //  when `txn_expiration` <= 0 in some unexpected situation (usually 
when there are bugs). This is usually safe, coz loading
-        //  duration or timeout always < `retention_time` in practice.
-        int64_t expiration =
-                rowset.txn_expiration() > 0 ? rowset.txn_expiration() : 
rowset.creation_time();
-        expiration = config::force_immediate_recycle ? 0 : expiration;
-        int64_t final_expiration = expiration + config::retention_seconds;
-        if (earlest_ts > final_expiration) {
-            earlest_ts = final_expiration;
-            g_bvar_recycler_recycle_tmp_rowset_earlest_ts.put(instance_id_, 
earlest_ts);
-        }
-        return final_expiration;
-    };
 
     auto handle_rowset_kv = [&num_scanned, &num_expired, &tmp_rowset_keys, 
&tmp_rowsets,
                              &expired_rowset_size, &total_rowset_key_size, 
&total_rowset_value_size,
-                             &calc_expiration,
-                             this](std::string_view k, std::string_view v) -> 
int {
+                             &earlest_ts, this](std::string_view k, 
std::string_view v) -> int {
         ++num_scanned;
         total_rowset_key_size += k.size();
         total_rowset_value_size += v.size();
@@ -2675,7 +2548,7 @@ int InstanceRecycler::recycle_tmp_rowsets() {
             LOG_WARNING("malformed rowset meta").tag("key", hex(k));
             return -1;
         }
-        int64_t expiration = calc_expiration(rowset);
+        int64_t expiration = calculate_tmp_rowset_expired_time(instance_id_, 
rowset, &earlest_ts);
         VLOG_DEBUG << "recycle tmp rowset scan, key=" << hex(k) << " 
num_scanned=" << num_scanned
                    << " num_expired=" << num_expired << " expiration=" << 
expiration
                    << " txn_expiration=" << rowset.txn_expiration()
@@ -2724,41 +2597,6 @@ int InstanceRecycler::recycle_tmp_rowsets() {
         return 0;
     };
 
-    // for calculate the total num or bytes of recyled objects
-    auto scan_and_statistics = [&](std::string_view k, std::string_view v) -> 
int {
-        doris::RowsetMetaCloudPB rowset;
-        if (!rowset.ParseFromArray(v.data(), v.size())) {
-            return 0;
-        }
-        int64_t expiration = calc_expiration(rowset);
-        int64_t current_time = ::time(nullptr);
-        if (current_time < expiration) {
-            return 0;
-        }
-
-        DCHECK_GT(rowset.txn_id(), 0)
-                << "txn_id=" << rowset.txn_id() << " rowset=" << 
rowset.ShortDebugString();
-        if (!is_txn_finished(txn_kv_, instance_id_, rowset.txn_id())) {
-            return 0;
-        }
-
-        if (!rowset.has_resource_id()) {
-            if (rowset.num_segments() > 0) [[unlikely]] { // impossible
-                return 0;
-            }
-            metrics_context.total_need_recycle_num++;
-            return 0;
-        }
-
-        metrics_context.total_need_recycle_num++;
-        if (rowset.num_segments() > 0) {
-            metrics_context.total_need_recycle_data_size += 
rowset.total_disk_size();
-            segment_metrics_context_.total_need_recycle_data_size += 
rowset.total_disk_size();
-            segment_metrics_context_.total_need_recycle_num += 
rowset.num_segments();
-        }
-        return 0;
-    };
-
     auto loop_done = [&tmp_rowset_keys, &tmp_rowsets, &num_recycled, 
&metrics_context,
                       this]() -> int {
         DORIS_CLOUD_DEFER {
@@ -2778,30 +2616,12 @@ int InstanceRecycler::recycle_tmp_rowsets() {
         return 0;
     };
 
-    return scan_for_recycle_and_statistics(tmp_rs_key0, tmp_rs_key1, 
"tmp_rowsets", metrics_context,
-                                           std::move(scan_and_statistics),
-                                           std::move(handle_rowset_kv), 
std::move(loop_done));
-}
-
-int InstanceRecycler::scan_for_recycle_and_statistics(
-        std::string begin, std::string_view end, std::string task_name,
-        RecyclerMetricsContext& metrics_context,
-        std::function<int(std::string_view k, std::string_view v)> 
statistics_func,
-        std::function<int(std::string_view k, std::string_view v)> 
recycle_func,
-        std::function<int()> loop_done) {
-    if (config::enable_recycler_metrics) {
-        scan_and_recycle(begin, end, std::move(statistics_func));
-
-        // report to bvar
-        metrics_context.report(true);
-        tablet_metrics_context_.report(true);
-        segment_metrics_context_.report(true);
-
-        int ret = scan_and_recycle(begin, end, std::move(recycle_func), 
std::move(loop_done));
-        return ret;
-    } else {
-        return scan_and_recycle(begin, end, std::move(recycle_func), 
std::move(loop_done));
+    if (config::enable_recycler_stats_metrics) {
+        scan_and_statistics_tmp_rowsets();
     }
+    // recycle_func and loop_done for scan and recycle
+    return scan_and_recycle(tmp_rs_key0, tmp_rs_key1, 
std::move(handle_rowset_kv),
+                            std::move(loop_done));
 }
 
 int InstanceRecycler::scan_and_recycle(
@@ -3002,50 +2822,12 @@ int InstanceRecycler::abort_timeout_txn() {
         return 0;
     };
 
-    // for calculate the total num or bytes of recyled objects
-    auto scan_and_statistics = [&metrics_context, &current_time, 
this](std::string_view k,
-                                                                       
std::string_view v) -> int {
-        std::unique_ptr<Transaction> txn;
-        TxnErrorCode err = txn_kv_->create_txn(&txn);
-        if (err != TxnErrorCode::TXN_OK) {
-            return 0;
-        }
-        std::string_view k1 = k;
-        k1.remove_prefix(1);
-        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
-        if (decode_key(&k1, &out) != 0) {
-            return 0;
-        }
-        int64_t db_id = std::get<int64_t>(std::get<0>(out[3]));
-        int64_t txn_id = std::get<int64_t>(std::get<0>(out[4]));
-        // Update txn_info
-        std::string txn_inf_key, txn_inf_val;
-        txn_info_key({instance_id_, db_id, txn_id}, &txn_inf_key);
-        err = txn->get(txn_inf_key, &txn_inf_val);
-        if (err != TxnErrorCode::TXN_OK) {
-            return 0;
-        }
-        TxnInfoPB txn_info;
-        if (!txn_info.ParseFromString(txn_inf_val)) {
-            return 0;
-        }
-
-        if (TxnStatusPB::TXN_STATUS_COMMITTED != txn_info.status()) {
-            TxnRunningPB txn_running_pb;
-            if (!txn_running_pb.ParseFromArray(v.data(), v.size())) {
-                return 0;
-            }
-            if (!config::force_immediate_recycle && 
txn_running_pb.timeout_time() > current_time) {
-                return 0;
-            }
-            metrics_context.total_need_recycle_num++;
-        }
-        return 0;
-    };
-
-    return scan_for_recycle_and_statistics(
-            begin_txn_running_key, end_txn_running_key, "abort_timeout_txns", 
metrics_context,
-            std::move(scan_and_statistics), std::move(handle_txn_running_kv));
+    if (config::enable_recycler_stats_metrics) {
+        scan_and_statistics_abort_timeout_txn();
+    }
+    // recycle_func and loop_done for scan and recycle
+    return scan_and_recycle(begin_txn_running_key, end_txn_running_key,
+                            std::move(handle_txn_running_kv));
 }
 
 int InstanceRecycler::recycle_expired_txn_label() {
@@ -3081,15 +2863,6 @@ int InstanceRecycler::recycle_expired_txn_label() {
     };
 
     int64_t earlest_ts = std::numeric_limits<int64_t>::max();
-    auto calc_expiration = [&earlest_ts, this](const RecycleTxnPB& 
recycle_txn_pb) {
-        int64_t final_expiration =
-                recycle_txn_pb.creation_time() + config::label_keep_max_second 
* 1000L;
-        if (earlest_ts > final_expiration / 1000) {
-            earlest_ts = final_expiration / 1000;
-            
g_bvar_recycler_recycle_expired_txn_label_earlest_ts.put(instance_id_, 
earlest_ts);
-        }
-        return final_expiration;
-    };
 
     SyncExecutor<int> concurrent_delete_executor(
             _thread_pool_group.s3_producer_pool,
@@ -3099,7 +2872,7 @@ int InstanceRecycler::recycle_expired_txn_label() {
     int64_t current_time_ms =
             
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
 
-    auto handle_recycle_txn_kv = [&](std::string_view k, std::string_view v) 
-> int {
+    auto handle_recycle_txn_kv = [&, this](std::string_view k, 
std::string_view v) -> int {
         ++num_scanned;
         RecycleTxnPB recycle_txn_pb;
         if (!recycle_txn_pb.ParseFromArray(v.data(), v.size())) {
@@ -3108,7 +2881,8 @@ int InstanceRecycler::recycle_expired_txn_label() {
         }
         if ((config::force_immediate_recycle) ||
             (recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
-            (calc_expiration(recycle_txn_pb) <= current_time_ms)) {
+            (calculate_txn_expired_time(instance_id_, recycle_txn_pb, 
&earlest_ts) <=
+             current_time_ms)) {
             VLOG_DEBUG << "found recycle txn, key=" << hex(k);
             num_expired++;
             recycle_txn_info_keys.emplace_back(k);
@@ -3116,20 +2890,6 @@ int InstanceRecycler::recycle_expired_txn_label() {
         return 0;
     };
 
-    // for calculate the total num or bytes of recyled objects
-    auto scan_and_statistics = [&](std::string_view k, std::string_view v) -> 
int {
-        RecycleTxnPB recycle_txn_pb;
-        if (!recycle_txn_pb.ParseFromArray(v.data(), v.size())) {
-            return 0;
-        }
-        if ((config::force_immediate_recycle) ||
-            (recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
-            (calc_expiration(recycle_txn_pb) <= current_time_ms)) {
-            metrics_context.total_need_recycle_num++;
-        }
-        return 0;
-    };
-
     auto delete_recycle_txn_kv = [&](const std::string& k) -> int {
         std::string_view k1 = k;
         //RecycleTxnKeyInfo 0:instance_id  1:db_id  2:txn_id
@@ -3256,9 +3016,12 @@ int InstanceRecycler::recycle_expired_txn_label() {
         return ret;
     };
 
-    return scan_for_recycle_and_statistics(
-            begin_recycle_txn_key, end_recycle_txn_key, "expired_txn_labels", 
metrics_context,
-            std::move(scan_and_statistics), std::move(handle_recycle_txn_kv), 
std::move(loop_done));
+    if (config::enable_recycler_stats_metrics) {
+        scan_and_statistics_expired_txn_label();
+    }
+    // recycle_func and loop_done for scan and recycle
+    return scan_and_recycle(begin_recycle_txn_key, end_recycle_txn_key,
+                            std::move(handle_recycle_txn_kv), 
std::move(loop_done));
 }
 
 struct CopyJobIdTuple {
@@ -3523,46 +3286,12 @@ int InstanceRecycler::recycle_copy_jobs() {
         return 0;
     };
 
-    // for calculate the total num or bytes of recyled objects
-    auto scan_and_statistics = [&metrics_context](std::string_view k, 
std::string_view v) -> int {
-        CopyJobPB copy_job;
-        if (!copy_job.ParseFromArray(v.data(), v.size())) {
-            LOG_WARNING("malformed copy job").tag("key", hex(k));
-            return 0;
-        }
-
-        if (copy_job.job_status() == CopyJobPB::FINISH) {
-            if (copy_job.stage_type() == StagePB::EXTERNAL) {
-                int64_t current_time =
-                        
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
-                if (copy_job.finish_time_ms() > 0) {
-                    if (!config::force_immediate_recycle &&
-                        current_time < copy_job.finish_time_ms() +
-                                               
config::copy_job_max_retention_second * 1000) {
-                        return 0;
-                    }
-                } else {
-                    if (!config::force_immediate_recycle &&
-                        current_time < copy_job.start_time_ms() +
-                                               
config::copy_job_max_retention_second * 1000) {
-                        return 0;
-                    }
-                }
-            }
-        } else if (copy_job.job_status() == CopyJobPB::LOADING) {
-            int64_t current_time =
-                    
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
-            if (!config::force_immediate_recycle && current_time <= 
copy_job.timeout_time_ms()) {
-                return 0;
-            }
-        }
-        metrics_context.total_need_recycle_num++;
-        return 0;
-    };
-
-    return scan_for_recycle_and_statistics(key0, key1, "copy_jobs", 
metrics_context,
-                                           std::move(scan_and_statistics), 
std::move(recycle_func));
-}
+    if (config::enable_recycler_stats_metrics) {
+        scan_and_statistics_copy_jobs();
+    }
+    // recycle_func and loop_done for scan and recycle
+    return scan_and_recycle(key0, key1, std::move(recycle_func));
+}
 
 int InstanceRecycler::init_copy_job_accessor(const std::string& stage_id,
                                              const StagePB::StageType& 
stage_type,
@@ -3663,8 +3392,8 @@ int InstanceRecycler::init_copy_job_accessor(const 
std::string& stage_id,
 int InstanceRecycler::recycle_stage() {
     int64_t num_scanned = 0;
     int64_t num_recycled = 0;
-    RecyclerMetricsContext metrics_context(instance_id_, "recycle_stage");
     const std::string task_name = "recycle_stage";
+    RecyclerMetricsContext metrics_context(instance_id_, task_name);
 
     LOG_WARNING("begin to recycle stage").tag("instance_id", instance_id_);
 
@@ -3751,51 +3480,6 @@ int InstanceRecycler::recycle_stage() {
         return 0;
     };
 
-    // for calculate the total num or bytes of recyled objects
-    auto scan_and_statistics = [&metrics_context, this](std::string_view k,
-                                                        std::string_view v) -> 
int {
-        RecycleStagePB recycle_stage;
-        if (!recycle_stage.ParseFromArray(v.data(), v.size())) {
-            LOG_WARNING("malformed recycle stage").tag("key", hex(k));
-            return 0;
-        }
-
-        int idx = stoi(recycle_stage.stage().obj_info().id());
-        if (idx > instance_info_.obj_info().size() || idx < 1) {
-            LOG(WARNING) << "invalid idx: " << idx;
-            return 0;
-        }
-
-        std::shared_ptr<StorageVaultAccessor> accessor;
-        int ret = SYNC_POINT_HOOK_RETURN_VALUE(
-                [&] {
-                    auto& old_obj = instance_info_.obj_info()[idx - 1];
-                    auto s3_conf = S3Conf::from_obj_store_info(old_obj);
-                    if (!s3_conf) {
-                        return 0;
-                    }
-
-                    s3_conf->prefix = 
recycle_stage.stage().obj_info().prefix();
-                    std::shared_ptr<S3Accessor> s3_accessor;
-                    int ret = S3Accessor::create(std::move(s3_conf.value()), 
&s3_accessor);
-                    if (ret != 0) {
-                        return 0;
-                    }
-
-                    accessor = std::move(s3_accessor);
-                    return 0;
-                }(),
-                "recycle_stage:get_accessor", &accessor);
-
-        if (ret != 0) {
-            LOG(WARNING) << "failed to init accessor ret=" << ret;
-            return 0;
-        }
-
-        metrics_context.total_need_recycle_num++;
-        return 0;
-    };
-
     auto loop_done = [&stage_keys, this]() -> int {
         if (stage_keys.empty()) return 0;
         DORIS_CLOUD_DEFER {
@@ -3807,9 +3491,11 @@ int InstanceRecycler::recycle_stage() {
         }
         return 0;
     };
-    return scan_for_recycle_and_statistics(key0, key1, "stages", 
metrics_context,
-                                           std::move(scan_and_statistics), 
std::move(recycle_func),
-                                           std::move(loop_done));
+    if (config::enable_recycler_stats_metrics) {
+        scan_and_statistics_stage();
+    }
+    // recycle_func and loop_done for scan and recycle
+    return scan_and_recycle(key0, key1, std::move(recycle_func), 
std::move(loop_done));
 }
 
 int InstanceRecycler::recycle_expired_stage_objects() {
@@ -3827,108 +3513,69 @@ int InstanceRecycler::recycle_expired_stage_objects() {
     };
 
     int ret = 0;
-    // for calculate the total num or bytes of recyled objects
-    auto scan_and_statistics = [&metrics_context, this]() {
-        for (const auto& stage : instance_info_.stages()) {
-            if (stopped()) {
-                break;
-            }
-            if (stage.type() == StagePB::EXTERNAL) {
-                continue;
-            }
-            int idx = stoi(stage.obj_info().id());
-            if (idx > instance_info_.obj_info().size() || idx < 1) {
-                continue;
-            }
-            const auto& old_obj = instance_info_.obj_info()[idx - 1];
-            auto s3_conf = S3Conf::from_obj_store_info(old_obj);
-            if (!s3_conf) {
-                continue;
-            }
-            s3_conf->prefix = stage.obj_info().prefix();
-            std::shared_ptr<S3Accessor> accessor;
-            int ret1 = S3Accessor::create(*s3_conf, &accessor);
-            if (ret1 != 0) {
-                continue;
-            }
-            if (s3_conf->prefix.find("/stage/") == std::string::npos) {
-                continue;
-            }
-            metrics_context.total_need_recycle_num++;
-        }
-    };
-
-    auto handle_recycle_func = [&, this]() {
-        for (const auto& stage : instance_info_.stages()) {
-            std::stringstream ss;
-            ss << "instance_id=" << instance_id_ << ", stage_id=" << 
stage.stage_id()
-               << ", user_name="
-               << (stage.mysql_user_name().empty() ? "null" : 
stage.mysql_user_name().at(0))
-               << ", user_id="
-               << (stage.mysql_user_id().empty() ? "null" : 
stage.mysql_user_id().at(0))
-               << ", prefix=" << stage.obj_info().prefix();
-
-            if (stopped()) {
-                break;
-            }
-            if (stage.type() == StagePB::EXTERNAL) {
-                continue;
-            }
-            int idx = stoi(stage.obj_info().id());
-            if (idx > instance_info_.obj_info().size() || idx < 1) {
-                LOG(WARNING) << "invalid idx: " << idx << ", id: " << 
stage.obj_info().id();
-                continue;
-            }
 
-            const auto& old_obj = instance_info_.obj_info()[idx - 1];
-            auto s3_conf = S3Conf::from_obj_store_info(old_obj);
-            if (!s3_conf) {
-                LOG(WARNING) << "failed to init s3_conf with obj_info="
-                             << old_obj.ShortDebugString();
-                continue;
-            }
+    if (config::enable_recycler_stats_metrics) {
+        scan_and_statistics_expired_stage_objects();
+    }
 
-            s3_conf->prefix = stage.obj_info().prefix();
-            std::shared_ptr<S3Accessor> accessor;
-            int ret1 = S3Accessor::create(*s3_conf, &accessor);
-            if (ret1 != 0) {
-                LOG(WARNING) << "failed to init s3 accessor ret=" << ret1 << " 
" << ss.str();
-                ret = -1;
-                continue;
-            }
+    for (const auto& stage : instance_info_.stages()) {
+        std::stringstream ss;
+        ss << "instance_id=" << instance_id_ << ", stage_id=" << 
stage.stage_id() << ", user_name="
+           << (stage.mysql_user_name().empty() ? "null" : 
stage.mysql_user_name().at(0))
+           << ", user_id=" << (stage.mysql_user_id().empty() ? "null" : 
stage.mysql_user_id().at(0))
+           << ", prefix=" << stage.obj_info().prefix();
 
-            if (s3_conf->prefix.find("/stage/") == std::string::npos) {
-                LOG(WARNING) << "try to delete illegal prefix, which is 
catastrophic, " << ss.str();
-                ret = -1;
-                continue;
-            }
+        if (stopped()) {
+            break;
+        }
+        if (stage.type() == StagePB::EXTERNAL) {
+            continue;
+        }
+        int idx = stoi(stage.obj_info().id());
+        if (idx > instance_info_.obj_info().size() || idx < 1) {
+            LOG(WARNING) << "invalid idx: " << idx << ", id: " << 
stage.obj_info().id();
+            continue;
+        }
 
-            LOG(INFO) << "recycle expired stage objects, " << ss.str();
-            int64_t expiration_time =
-                    
duration_cast<seconds>(system_clock::now().time_since_epoch()).count() -
-                    config::internal_stage_objects_expire_time_second;
-            if (config::force_immediate_recycle) {
-                expiration_time = INT64_MAX;
-            }
-            ret1 = accessor->delete_all(expiration_time);
-            if (ret1 != 0) {
-                LOG(WARNING) << "failed to recycle expired stage objects, 
ret=" << ret1 << " "
-                             << ss.str();
-                ret = -1;
-                continue;
-            }
-            metrics_context.total_recycled_num++;
-            metrics_context.report();
+        const auto& old_obj = instance_info_.obj_info()[idx - 1];
+        auto s3_conf = S3Conf::from_obj_store_info(old_obj);
+        if (!s3_conf) {
+            LOG(WARNING) << "failed to init s3_conf with obj_info=" << 
old_obj.ShortDebugString();
+            continue;
         }
-    };
 
-    // for calculate the total num or bytes of recyled objects
-    scan_and_statistics();
+        s3_conf->prefix = stage.obj_info().prefix();
+        std::shared_ptr<S3Accessor> accessor;
+        int ret1 = S3Accessor::create(*s3_conf, &accessor);
+        if (ret1 != 0) {
+            LOG(WARNING) << "failed to init s3 accessor ret=" << ret1 << " " 
<< ss.str();
+            ret = -1;
+            continue;
+        }
 
-    // report to bvar
-    metrics_context.report(true);
+        if (s3_conf->prefix.find("/stage/") == std::string::npos) {
+            LOG(WARNING) << "try to delete illegal prefix, which is 
catastrophic, " << ss.str();
+            ret = -1;
+            continue;
+        }
 
-    handle_recycle_func();
+        LOG(INFO) << "recycle expired stage objects, " << ss.str();
+        int64_t expiration_time =
+                
duration_cast<seconds>(system_clock::now().time_since_epoch()).count() -
+                config::internal_stage_objects_expire_time_second;
+        if (config::force_immediate_recycle) {
+            expiration_time = INT64_MAX;
+        }
+        ret1 = accessor->delete_all(expiration_time);
+        if (ret1 != 0) {
+            LOG(WARNING) << "failed to recycle expired stage objects, ret=" << 
ret1 << " "
+                         << ss.str();
+            ret = -1;
+            continue;
+        }
+        metrics_context.total_recycled_num++;
+        metrics_context.report();
+    }
     return ret;
 }
 
@@ -3965,4 +3612,543 @@ bool InstanceRecycler::check_recycle_tasks() {
     return found;
 }
 
+// Scan and statistics indexes that need to be recycled
+int InstanceRecycler::scan_and_statistics_indexes() {
+    RecyclerMetricsContext metrics_context(instance_id_, "recycle_indexes");
+
+    RecycleIndexKeyInfo index_key_info0 {instance_id_, 0};
+    RecycleIndexKeyInfo index_key_info1 {instance_id_, INT64_MAX};
+    std::string index_key0;
+    std::string index_key1;
+    recycle_index_key(index_key_info0, &index_key0);
+    recycle_index_key(index_key_info1, &index_key1);
+    int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+    auto handle_index_kv = [&, this](std::string_view k, std::string_view v) 
-> int {
+        RecycleIndexPB index_pb;
+        if (!index_pb.ParseFromArray(v.data(), v.size())) {
+            return 0;
+        }
+        int64_t current_time = ::time(nullptr);
+        if (current_time <
+            calculate_index_expired_time(instance_id_, index_pb, &earlest_ts)) 
{ // not expired
+            return 0;
+        }
+        // decode index_id
+        auto k1 = k;
+        k1.remove_prefix(1);
+        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
+        decode_key(&k1, &out);
+        // 0x01 "recycle" ${instance_id} "index" ${index_id} -> RecycleIndexPB
+        auto index_id = std::get<int64_t>(std::get<0>(out[3]));
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv_->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            return 0;
+        }
+        std::string val;
+        err = txn->get(k, &val);
+        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            return 0;
+        }
+        if (err != TxnErrorCode::TXN_OK) {
+            return 0;
+        }
+        index_pb.Clear();
+        if (!index_pb.ParseFromString(val)) {
+            return 0;
+        }
+        if (scan_tablets_and_statistics(index_pb.table_id(), index_id, 
metrics_context) != 0) {
+            return 0;
+        }
+        metrics_context.total_need_recycle_num++;
+        return 0;
+    };
+
+    return scan_and_recycle(index_key0, index_key1, std::move(handle_index_kv),
+                            [&metrics_context]() -> int {
+                                metrics_context.report(true);
+                                segment_metrics_context_.report(true);
+                                tablet_metrics_context_.report(true);
+                                return 0;
+                            });
+}
+
+// Scan and statistics partitions that need to be recycled
+int InstanceRecycler::scan_and_statistics_partitions() {
+    RecyclerMetricsContext metrics_context(instance_id_, "recycle_partitions");
+
+    RecyclePartKeyInfo part_key_info0 {instance_id_, 0};
+    RecyclePartKeyInfo part_key_info1 {instance_id_, INT64_MAX};
+    std::string part_key0;
+    std::string part_key1;
+    int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+    recycle_partition_key(part_key_info0, &part_key0);
+    recycle_partition_key(part_key_info1, &part_key1);
+    auto handle_partition_kv = [&, this](std::string_view k, std::string_view 
v) -> int {
+        RecyclePartitionPB part_pb;
+        if (!part_pb.ParseFromArray(v.data(), v.size())) {
+            return 0;
+        }
+        int64_t current_time = ::time(nullptr);
+        if (current_time <
+            calculate_partition_expired_time(instance_id_, part_pb, 
&earlest_ts)) { // not expired
+            return 0;
+        }
+        // decode partition_id
+        auto k1 = k;
+        k1.remove_prefix(1);
+        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
+        decode_key(&k1, &out);
+        // 0x01 "recycle" ${instance_id} "partition" ${partition_id} -> 
RecyclePartitionPB
+        auto partition_id = std::get<int64_t>(std::get<0>(out[3]));
+        // Change state to RECYCLING
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv_->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            return 0;
+        }
+        std::string val;
+        err = txn->get(k, &val);
+        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            return 0;
+        }
+        if (err != TxnErrorCode::TXN_OK) {
+            return 0;
+        }
+        part_pb.Clear();
+        if (!part_pb.ParseFromString(val)) {
+            return 0;
+        }
+        // Partitions with PREPARED state MUST have no data
+        bool is_empty_tablet = part_pb.state() == RecyclePartitionPB::PREPARED;
+        int ret = 0;
+        for (int64_t index_id : part_pb.index_id()) {
+            if (scan_tablets_and_statistics(part_pb.table_id(), index_id, 
metrics_context,
+                                            partition_id, is_empty_tablet) != 
0) {
+                ret = 0;
+            }
+        }
+        metrics_context.total_need_recycle_num++;
+        return ret;
+    };
+    return scan_and_recycle(part_key0, part_key1, 
std::move(handle_partition_kv),
+                            [&metrics_context]() -> int {
+                                metrics_context.report(true);
+                                segment_metrics_context_.report(true);
+                                tablet_metrics_context_.report(true);
+                                return 0;
+                            });
+}
+
+// Scan and statistics rowsets that need to be recycled
+int InstanceRecycler::scan_and_statistics_rowsets() {
+    RecyclerMetricsContext metrics_context(instance_id_, "recycle_rowsets");
+    RecycleRowsetKeyInfo recyc_rs_key_info0 {instance_id_, 0, ""};
+    RecycleRowsetKeyInfo recyc_rs_key_info1 {instance_id_, INT64_MAX, ""};
+    std::string recyc_rs_key0;
+    std::string recyc_rs_key1;
+    recycle_rowset_key(recyc_rs_key_info0, &recyc_rs_key0);
+    recycle_rowset_key(recyc_rs_key_info1, &recyc_rs_key1);
+    int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+    auto handle_rowset_kv = [&, this](std::string_view k, std::string_view v) 
-> int {
+        RecycleRowsetPB rowset;
+        if (!rowset.ParseFromArray(v.data(), v.size())) {
+            return 0;
+        }
+        int64_t current_time = ::time(nullptr);
+        if (current_time <
+            calculate_rowset_expired_time(instance_id_, rowset, &earlest_ts)) 
{ // not expired
+            return 0;
+        }
+        if (!rowset.has_type()) {
+            if (!rowset.has_resource_id()) [[unlikely]] {
+                return 0;
+            }
+            if (rowset.resource_id().empty()) [[unlikely]] {
+                return 0;
+            }
+            return 0;
+        }
+        auto* rowset_meta = rowset.mutable_rowset_meta();
+        if (!rowset_meta->has_resource_id()) [[unlikely]] {
+            if (rowset.type() == RecycleRowsetPB::PREPARE || 
rowset_meta->num_segments() != 0) {
+                return 0;
+            }
+        }
+        if (rowset.type() != RecycleRowsetPB::PREPARE) {
+            if (rowset_meta->num_segments() > 0) {
+                metrics_context.total_need_recycle_num++;
+                segment_metrics_context_.total_need_recycle_num += 
rowset_meta->num_segments();
+                segment_metrics_context_.total_need_recycle_data_size +=
+                        rowset_meta->total_disk_size();
+                metrics_context.total_need_recycle_data_size += 
rowset_meta->total_disk_size();
+            }
+        }
+        return 0;
+    };
+    return scan_and_recycle(recyc_rs_key0, recyc_rs_key1, 
std::move(handle_rowset_kv),
+                            [&metrics_context]() -> int {
+                                metrics_context.report(true);
+                                segment_metrics_context_.report(true);
+                                return 0;
+                            });
+}
+
+// Scan and statistics tmp_rowsets that need to be recycled
+int InstanceRecycler::scan_and_statistics_tmp_rowsets() {
+    RecyclerMetricsContext metrics_context(instance_id_, 
"recycle_tmp_rowsets");
+    MetaRowsetTmpKeyInfo tmp_rs_key_info0 {instance_id_, 0, 0};
+    MetaRowsetTmpKeyInfo tmp_rs_key_info1 {instance_id_, INT64_MAX, 0};
+    std::string tmp_rs_key0;
+    std::string tmp_rs_key1;
+    meta_rowset_tmp_key(tmp_rs_key_info0, &tmp_rs_key0);
+    meta_rowset_tmp_key(tmp_rs_key_info1, &tmp_rs_key1);
+
+    int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+    auto handle_tmp_rowsets_kv = [&, this](std::string_view k, 
std::string_view v) -> int {
+        doris::RowsetMetaCloudPB rowset;
+        if (!rowset.ParseFromArray(v.data(), v.size())) {
+            return 0;
+        }
+        int64_t expiration = calculate_tmp_rowset_expired_time(instance_id_, 
rowset, &earlest_ts);
+        int64_t current_time = ::time(nullptr);
+        if (current_time < expiration) {
+            return 0;
+        }
+
+        DCHECK_GT(rowset.txn_id(), 0)
+                << "txn_id=" << rowset.txn_id() << " rowset=" << 
rowset.ShortDebugString();
+        if (!is_txn_finished(txn_kv_, instance_id_, rowset.txn_id())) {
+            return 0;
+        }
+
+        if (!rowset.has_resource_id()) {
+            if (rowset.num_segments() > 0) [[unlikely]] { // impossible
+                return 0;
+            }
+            metrics_context.total_need_recycle_num++;
+            return 0;
+        }
+
+        metrics_context.total_need_recycle_num++;
+        if (rowset.num_segments() > 0) {
+            metrics_context.total_need_recycle_data_size += 
rowset.total_disk_size();
+            segment_metrics_context_.total_need_recycle_data_size += 
rowset.total_disk_size();
+            segment_metrics_context_.total_need_recycle_num += 
rowset.num_segments();
+        }
+        return 0;
+    };
+    return scan_and_recycle(tmp_rs_key0, tmp_rs_key1, 
std::move(handle_tmp_rowsets_kv),
+                            [&metrics_context]() -> int {
+                                metrics_context.report(true);
+                                segment_metrics_context_.report(true);
+                                return 0;
+                            });
+}
+
+// Scan and statistics abort_timeout_txn that need to be recycled
+int InstanceRecycler::scan_and_statistics_abort_timeout_txn() {
+    RecyclerMetricsContext metrics_context(instance_id_, "abort_timeout_txn");
+
+    TxnRunningKeyInfo txn_running_key_info0 {instance_id_, 0, 0};
+    TxnRunningKeyInfo txn_running_key_info1 {instance_id_, INT64_MAX, 
INT64_MAX};
+    std::string begin_txn_running_key;
+    std::string end_txn_running_key;
+    txn_running_key(txn_running_key_info0, &begin_txn_running_key);
+    txn_running_key(txn_running_key_info1, &end_txn_running_key);
+
+    int64_t current_time =
+            
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+
+    auto handle_abort_timeout_txn_kv = [&metrics_context, &current_time, this](
+                                               std::string_view k, 
std::string_view v) -> int {
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv_->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            return 0;
+        }
+        std::string_view k1 = k;
+        k1.remove_prefix(1);
+        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
+        if (decode_key(&k1, &out) != 0) {
+            return 0;
+        }
+        int64_t db_id = std::get<int64_t>(std::get<0>(out[3]));
+        int64_t txn_id = std::get<int64_t>(std::get<0>(out[4]));
+        // Update txn_info
+        std::string txn_inf_key, txn_inf_val;
+        txn_info_key({instance_id_, db_id, txn_id}, &txn_inf_key);
+        err = txn->get(txn_inf_key, &txn_inf_val);
+        if (err != TxnErrorCode::TXN_OK) {
+            return 0;
+        }
+        TxnInfoPB txn_info;
+        if (!txn_info.ParseFromString(txn_inf_val)) {
+            return 0;
+        }
+
+        if (TxnStatusPB::TXN_STATUS_COMMITTED != txn_info.status()) {
+            TxnRunningPB txn_running_pb;
+            if (!txn_running_pb.ParseFromArray(v.data(), v.size())) {
+                return 0;
+            }
+            if (!config::force_immediate_recycle && 
txn_running_pb.timeout_time() > current_time) {
+                return 0;
+            }
+            metrics_context.total_need_recycle_num++;
+        }
+        return 0;
+    };
+    return scan_and_recycle(begin_txn_running_key, end_txn_running_key,
+                            std::move(handle_abort_timeout_txn_kv), 
[&metrics_context]() -> int {
+                                metrics_context.report(true);
+                                return 0;
+                            });
+}
+
+// Scan and statistics expired_txn_label that need to be recycled
+int InstanceRecycler::scan_and_statistics_expired_txn_label() {
+    RecyclerMetricsContext metrics_context(instance_id_, 
"recycle_expired_txn_label");
+
+    RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id_, 0, 0};
+    RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id_, INT64_MAX, 
INT64_MAX};
+    std::string begin_recycle_txn_key;
+    std::string end_recycle_txn_key;
+    recycle_txn_key(recycle_txn_key_info0, &begin_recycle_txn_key);
+    recycle_txn_key(recycle_txn_key_info1, &end_recycle_txn_key);
+    int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+    int64_t current_time_ms =
+            
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+
+    // for calculate the total num or bytes of recyled objects
+    auto handle_expired_txn_label_kv = [&, this](std::string_view k, 
std::string_view v) -> int {
+        RecycleTxnPB recycle_txn_pb;
+        if (!recycle_txn_pb.ParseFromArray(v.data(), v.size())) {
+            return 0;
+        }
+        if ((config::force_immediate_recycle) ||
+            (recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
+            (calculate_txn_expired_time(instance_id_, recycle_txn_pb, 
&earlest_ts) <=
+             current_time_ms)) {
+            metrics_context.total_need_recycle_num++;
+        }
+        return 0;
+    };
+    return scan_and_recycle(begin_recycle_txn_key, end_recycle_txn_key,
+                            std::move(handle_expired_txn_label_kv), 
[&metrics_context]() -> int {
+                                metrics_context.report(true);
+                                return 0;
+                            });
+}
+
+// Scan and statistics copy_jobs that need to be recycled
+int InstanceRecycler::scan_and_statistics_copy_jobs() {
+    RecyclerMetricsContext metrics_context(instance_id_, "recycle_copy_jobs");
+    CopyJobKeyInfo key_info0 {instance_id_, "", 0, "", 0};
+    CopyJobKeyInfo key_info1 {instance_id_, "\xff", 0, "", 0};
+    std::string key0;
+    std::string key1;
+    copy_job_key(key_info0, &key0);
+    copy_job_key(key_info1, &key1);
+
+    // for calculate the total num or bytes of recyled objects
+    auto scan_and_statistics = [&metrics_context](std::string_view k, 
std::string_view v) -> int {
+        CopyJobPB copy_job;
+        if (!copy_job.ParseFromArray(v.data(), v.size())) {
+            LOG_WARNING("malformed copy job").tag("key", hex(k));
+            return 0;
+        }
+
+        if (copy_job.job_status() == CopyJobPB::FINISH) {
+            if (copy_job.stage_type() == StagePB::EXTERNAL) {
+                int64_t current_time =
+                        
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+                if (copy_job.finish_time_ms() > 0) {
+                    if (!config::force_immediate_recycle &&
+                        current_time < copy_job.finish_time_ms() +
+                                               
config::copy_job_max_retention_second * 1000) {
+                        return 0;
+                    }
+                } else {
+                    if (!config::force_immediate_recycle &&
+                        current_time < copy_job.start_time_ms() +
+                                               
config::copy_job_max_retention_second * 1000) {
+                        return 0;
+                    }
+                }
+            }
+        } else if (copy_job.job_status() == CopyJobPB::LOADING) {
+            int64_t current_time =
+                    
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+            if (!config::force_immediate_recycle && current_time <= 
copy_job.timeout_time_ms()) {
+                return 0;
+            }
+        }
+        metrics_context.total_need_recycle_num++;
+        return 0;
+    };
+
+    return scan_and_recycle(key0, key1, std::move(scan_and_statistics),
+                            [&metrics_context]() -> int {
+                                metrics_context.report(true);
+                                return 0;
+                            });
+}
+
+// Scan and statistics stage that need to be recycled
+int InstanceRecycler::scan_and_statistics_stage() {
+    RecyclerMetricsContext metrics_context(instance_id_, "recycle_stage");
+    RecycleStageKeyInfo key_info0 {instance_id_, ""};
+    RecycleStageKeyInfo key_info1 {instance_id_, "\xff"};
+    std::string key0 = recycle_stage_key(key_info0);
+    std::string key1 = recycle_stage_key(key_info1);
+
+    // for calculate the total num or bytes of recyled objects
+    auto scan_and_statistics = [&metrics_context, this](std::string_view k,
+                                                        std::string_view v) -> 
int {
+        RecycleStagePB recycle_stage;
+        if (!recycle_stage.ParseFromArray(v.data(), v.size())) {
+            LOG_WARNING("malformed recycle stage").tag("key", hex(k));
+            return 0;
+        }
+
+        int idx = stoi(recycle_stage.stage().obj_info().id());
+        if (idx > instance_info_.obj_info().size() || idx < 1) {
+            LOG(WARNING) << "invalid idx: " << idx;
+            return 0;
+        }
+
+        std::shared_ptr<StorageVaultAccessor> accessor;
+        int ret = SYNC_POINT_HOOK_RETURN_VALUE(
+                [&] {
+                    auto& old_obj = instance_info_.obj_info()[idx - 1];
+                    auto s3_conf = S3Conf::from_obj_store_info(old_obj);
+                    if (!s3_conf) {
+                        return 0;
+                    }
+
+                    s3_conf->prefix = 
recycle_stage.stage().obj_info().prefix();
+                    std::shared_ptr<S3Accessor> s3_accessor;
+                    int ret = S3Accessor::create(std::move(s3_conf.value()), 
&s3_accessor);
+                    if (ret != 0) {
+                        return 0;
+                    }
+
+                    accessor = std::move(s3_accessor);
+                    return 0;
+                }(),
+                "recycle_stage:get_accessor", &accessor);
+
+        if (ret != 0) {
+            LOG(WARNING) << "failed to init accessor ret=" << ret;
+            return 0;
+        }
+
+        metrics_context.total_need_recycle_num++;
+        return 0;
+    };
+
+    return scan_and_recycle(key0, key1, std::move(scan_and_statistics),
+                            [&metrics_context]() -> int {
+                                metrics_context.report(true);
+                                return 0;
+                            });
+}
+
+// Scan and statistics expired_stage_objects that need to be recycled
+int InstanceRecycler::scan_and_statistics_expired_stage_objects() {
+    RecyclerMetricsContext metrics_context(instance_id_, 
"recycle_expired_stage_objects");
+
+    // for calculate the total num or bytes of recyled objects
+    auto scan_and_statistics = [&metrics_context, this]() {
+        for (const auto& stage : instance_info_.stages()) {
+            if (stopped()) {
+                break;
+            }
+            if (stage.type() == StagePB::EXTERNAL) {
+                continue;
+            }
+            int idx = stoi(stage.obj_info().id());
+            if (idx > instance_info_.obj_info().size() || idx < 1) {
+                continue;
+            }
+            const auto& old_obj = instance_info_.obj_info()[idx - 1];
+            auto s3_conf = S3Conf::from_obj_store_info(old_obj);
+            if (!s3_conf) {
+                continue;
+            }
+            s3_conf->prefix = stage.obj_info().prefix();
+            std::shared_ptr<S3Accessor> accessor;
+            int ret1 = S3Accessor::create(*s3_conf, &accessor);
+            if (ret1 != 0) {
+                continue;
+            }
+            if (s3_conf->prefix.find("/stage/") == std::string::npos) {
+                continue;
+            }
+            metrics_context.total_need_recycle_num++;
+        }
+    };
+
+    scan_and_statistics();
+    metrics_context.report(true);
+    return 0;
+}
+
+// Scan and statistics versions that need to be recycled
+int InstanceRecycler::scan_and_statistics_versions() {
+    RecyclerMetricsContext metrics_context(instance_id_, "recycle_versions");
+    auto version_key_begin = partition_version_key({instance_id_, 0, 0, 0});
+    auto version_key_end = partition_version_key({instance_id_, INT64_MAX, 0, 
0});
+
+    int64_t last_scanned_table_id = 0;
+    bool is_recycled = false; // Is last scanned kv recycled
+    // for calculate the total num or bytes of recyled objects
+    auto scan_and_statistics = [&metrics_context, &last_scanned_table_id, 
&is_recycled, this](
+                                       std::string_view k, std::string_view) {
+        auto k1 = k;
+        k1.remove_prefix(1);
+        // 0x01 "version" ${instance_id} "partition" ${db_id} ${tbl_id} 
${partition_id}
+        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
+        decode_key(&k1, &out);
+        DCHECK_EQ(out.size(), 6) << k;
+        auto table_id = std::get<int64_t>(std::get<0>(out[4]));
+        if (table_id == last_scanned_table_id) { // Already handle kvs of this 
table
+            metrics_context.total_need_recycle_num +=
+                    is_recycled; // Version kv of this table has been recycled
+            return 0;
+        }
+        last_scanned_table_id = table_id;
+        is_recycled = false;
+        auto tablet_key_begin = stats_tablet_key({instance_id_, table_id, 0, 
0, 0});
+        auto tablet_key_end = stats_tablet_key({instance_id_, table_id, 
INT64_MAX, 0, 0});
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv_->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            return 0;
+        }
+        std::unique_ptr<RangeGetIterator> iter;
+        err = txn->get(tablet_key_begin, tablet_key_end, &iter, false, 1);
+        if (err != TxnErrorCode::TXN_OK) {
+            return 0;
+        }
+        if (iter->has_next()) { // Table is useful, should not recycle table 
and partition versions
+            return 0;
+        }
+        metrics_context.total_need_recycle_num++;
+        is_recycled = true;
+        return 0;
+    };
+
+    return scan_and_recycle(version_key_begin, version_key_end, 
std::move(scan_and_statistics),
+                            [&metrics_context]() -> int {
+                                metrics_context.report(true);
+                                return 0;
+                            });
+}
+
 } // namespace doris::cloud
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index d1ae8a056c8..97f9a8d57c5 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -204,6 +204,26 @@ public:
 
     bool check_recycle_tasks();
 
+    int scan_and_statistics_indexes();
+
+    int scan_and_statistics_partitions();
+
+    int scan_and_statistics_rowsets();
+
+    int scan_and_statistics_tmp_rowsets();
+
+    int scan_and_statistics_abort_timeout_txn();
+
+    int scan_and_statistics_expired_txn_label();
+
+    int scan_and_statistics_copy_jobs();
+
+    int scan_and_statistics_stage();
+
+    int scan_and_statistics_expired_stage_objects();
+
+    int scan_and_statistics_versions();
+
 private:
     // returns 0 for success otherwise error
     int init_obj_store_accessors();
@@ -222,13 +242,6 @@ private:
                          std::function<int(std::string_view k, 
std::string_view v)> recycle_func,
                          std::function<int()> loop_done = nullptr);
 
-    int scan_for_recycle_and_statistics(
-            std::string begin, std::string_view end, std::string task_name,
-            RecyclerMetricsContext& metrics_context,
-            std::function<int(std::string_view k, std::string_view v)> 
statistics_func,
-            std::function<int(std::string_view k, std::string_view v)> 
recycle_func,
-            std::function<int()> loop_done = nullptr);
-
     // return 0 for success otherwise error
     int delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta_pb);
 
@@ -363,30 +376,34 @@ public:
     // `is_begin` is used to initialize total num of items need to be recycled
     void report(bool is_begin = false) {
         if (!operation_type.empty()) {
-            if (total_need_recycle_data_size > 0) {
-                // is init
-                if (is_begin) {
+            // is init
+            if (is_begin) {
+                if (total_need_recycle_data_size) {
                     g_bvar_recycler_instance_last_round_to_recycle_bytes.put(
                             {instance_id, operation_type}, 
total_need_recycle_data_size);
-                } else {
+                }
+            } else {
+                if (total_recycled_data_size.load()) {
                     g_bvar_recycler_instance_last_round_recycled_bytes.put(
                             {instance_id, operation_type}, 
total_recycled_data_size.load());
-                    
g_bvar_recycler_instance_recycle_total_bytes_since_started.put(
-                            {instance_id, operation_type}, 
total_recycled_data_size.load());
                 }
+                g_bvar_recycler_instance_recycle_total_bytes_since_started.put(
+                        {instance_id, operation_type}, 
total_recycled_data_size.load());
             }
 
-            if (total_need_recycle_num > 0) {
-                // is init
-                if (is_begin) {
+            // is init
+            if (is_begin) {
+                if (total_need_recycle_num) {
                     g_bvar_recycler_instance_last_round_to_recycle_num.put(
                             {instance_id, operation_type}, 
total_need_recycle_num);
-                } else {
+                }
+            } else {
+                if (total_recycled_num.load()) {
                     g_bvar_recycler_instance_last_round_recycled_num.put(
                             {instance_id, operation_type}, 
total_recycled_num.load());
-                    
g_bvar_recycler_instance_recycle_total_num_since_started.put(
-                            {instance_id, operation_type}, 
total_recycled_num.load());
                 }
+                g_bvar_recycler_instance_recycle_total_num_since_started.put(
+                        {instance_id, operation_type}, 
total_recycled_num.load());
             }
         }
     }
diff --git a/cloud/src/recycler/recycler_service.cpp 
b/cloud/src/recycler/recycler_service.cpp
index b812fbd27ee..48b2fe370ef 100644
--- a/cloud/src/recycler/recycler_service.cpp
+++ b/cloud/src/recycler/recycler_service.cpp
@@ -23,6 +23,13 @@
 #include <gen_cpp/cloud.pb.h>
 #include <google/protobuf/util/json_util.h>
 
+#include <algorithm>
+#include <functional>
+#include <numeric>
+#include <sstream>
+#include <utility>
+#include <vector>
+
 #include "common/config.h"
 #include "common/defer.h"
 #include "common/logging.h"
@@ -34,6 +41,7 @@
 #include "recycler/meta_checker.h"
 #include "recycler/recycler.h"
 #include "recycler/s3_accessor.h"
+#include "recycler/util.h"
 
 namespace doris::cloud {
 
@@ -52,6 +60,245 @@ 
RecyclerServiceImpl::RecyclerServiceImpl(std::shared_ptr<TxnKv> txn_kv, Recycler
 
 RecyclerServiceImpl::~RecyclerServiceImpl() = default;
 
+void RecyclerServiceImpl::statistics_recycle(StatisticsRecycleRequest& req, 
MetaServiceCode& code,
+                                             std::string& msg) {
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = MetaServiceCode::KV_TXN_CREATE_ERR;
+        msg = "failed to create txn";
+        return;
+    }
+
+    static std::map<std::string, std::function<void(InstanceRecycler&)>> 
resource_handlers = {
+            {"recycle_indexes",
+             [](InstanceRecycler& instance_recycler) {
+                 instance_recycler.scan_and_statistics_indexes();
+             }},
+            {"recycle_partitions",
+             [](InstanceRecycler& instance_recycler) {
+                 instance_recycler.scan_and_statistics_partitions();
+             }},
+            {"recycle_tmp_rowsets",
+             [](InstanceRecycler& instance_recycler) {
+                 instance_recycler.scan_and_statistics_tmp_rowsets();
+             }},
+            {"recycle_rowsets",
+             [](InstanceRecycler& instance_recycler) {
+                 instance_recycler.scan_and_statistics_rowsets();
+             }},
+            {"abort_timeout_txn",
+             [](InstanceRecycler& instance_recycler) {
+                 instance_recycler.scan_and_statistics_abort_timeout_txn();
+             }},
+            {"recycle_expired_txn_label",
+             [](InstanceRecycler& instance_recycler) {
+                 instance_recycler.scan_and_statistics_expired_txn_label();
+             }},
+            {"recycle_versions",
+             [](InstanceRecycler& instance_recycler) {
+                 instance_recycler.scan_and_statistics_versions();
+             }},
+            {"recycle_copy_jobs",
+             [](InstanceRecycler& instance_recycler) {
+                 instance_recycler.scan_and_statistics_copy_jobs();
+             }},
+            {"recycle_stage",
+             [](InstanceRecycler& instance_recycler) {
+                 instance_recycler.scan_and_statistics_stage();
+             }},
+            {"recycle_expired_stage_objects",
+             [](InstanceRecycler& instance_recycler) {
+                 instance_recycler.scan_and_statistics_expired_stage_objects();
+             }},
+            {"recycle_tablet",
+             [](InstanceRecycler& instance_recycler) {
+                 instance_recycler.scan_and_statistics_partitions();
+                 instance_recycler.scan_and_statistics_indexes();
+             }},
+            {"recycle_segment", [](InstanceRecycler& instance_recycler) {
+                 instance_recycler.scan_and_statistics_partitions();
+                 instance_recycler.scan_and_statistics_indexes();
+                 instance_recycler.scan_and_statistics_rowsets();
+                 instance_recycler.scan_and_statistics_tmp_rowsets();
+             }}};
+
+    std::set<std::string> resource_types;
+    for (const auto& resource_type : req.resource_type()) {
+        if (resource_type == "*") {
+            std::for_each(resource_handlers.begin(), resource_handlers.end(),
+                          [&](const auto& it) { 
resource_types.emplace(it.first); });
+            break;
+        } else {
+            if (!resource_handlers.contains(resource_type)) {
+                code = MetaServiceCode::INVALID_ARGUMENT;
+                msg = fmt::format(
+                        "invalid resource type: {}, valid resource_type have 
[{}]", resource_type,
+                        std::accumulate(resource_handlers.begin(), 
resource_handlers.end(),
+                                        std::string(), [](const std::string& 
acc, const auto& it) {
+                                            return acc.empty() ? it.first : 
acc + ", " + it.first;
+                                        }));
+                LOG_WARNING(msg);
+                return;
+            } else {
+                resource_types.emplace(resource_type);
+            }
+        }
+    }
+
+    std::set<std::string> instance_ids;
+    std::vector<InstanceInfoPB> instances;
+    get_all_instances(txn_kv_.get(), instances);
+
+    for (const auto& instance_id : req.instance_ids()) {
+        if (instance_id == "*") {
+            std::for_each(instances.begin(), instances.end(), [&](const 
InstanceInfoPB& instance) {
+                instance_ids.emplace(instance.instance_id());
+            });
+            break;
+        } else {
+            if (std::find_if(instances.begin(), instances.end(),
+                             [&](const InstanceInfoPB& instance) {
+                                 return instance.instance_id() == instance_id;
+                             }) == instances.end()) {
+                code = MetaServiceCode::INVALID_ARGUMENT;
+                msg = fmt::format("invalid instance id: {}", instance_id);
+                LOG_WARNING(msg);
+                return;
+            } else {
+                instance_ids.emplace(instance_id);
+            }
+        }
+    }
+
+    LOG(INFO) << "begin to statistics recycle for "
+              << std::accumulate(instance_ids.begin(), instance_ids.end(), 
std::string(),
+                                 [](const std::string& acc, const std::string& 
id) {
+                                     return acc.empty() ? id : acc + ", " + id;
+                                 });
+
+    auto worker_pool = std::make_unique<SimpleThreadPool>(
+            config::instance_recycler_statistics_recycle_worker_pool_size, 
"statistics_recycle");
+    worker_pool->start();
+
+    for (const auto& id : instance_ids) {
+        InstanceKeyInfo key_info {id};
+        std::string key;
+        instance_key(key_info, &key);
+        std::string val;
+        err = txn->get(key, &val);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = MetaServiceCode::KV_TXN_GET_ERR;
+            msg = fmt::format("failed to get instance, instance_id={}, 
err={}", id, err);
+            LOG_WARNING(msg);
+            continue;
+        }
+        InstanceInfoPB instance;
+        if (!instance.ParseFromString(val)) {
+            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+            msg = fmt::format("malformed instance info, key={}, val={}", 
hex(key), hex(val));
+            LOG_WARNING(msg);
+            continue;
+        }
+        auto instance_recycler = std::make_shared<InstanceRecycler>(
+                txn_kv_, instance, recycler_->_thread_pool_group, 
txn_lazy_committer_);
+
+        if (int r = instance_recycler->init(); r != 0) {
+            LOG(WARNING) << "failed to init instance recycler, instance_id=" 
<< id << " ret=" << r;
+            continue;
+        }
+        // if empty, statistics all resources
+        if (resource_types.empty()) {
+            for (const auto& [_, func] : resource_handlers) {
+                worker_pool->submit([&instance_recycler, &func]() { 
func(*instance_recycler); });
+            }
+        } else {
+            for (const auto& resource_type : resource_types) {
+                if (auto it = resource_handlers.find(resource_type);
+                    it != resource_handlers.end()) {
+                    worker_pool->submit(
+                            [&it, &instance_recycler]() { 
it->second(*instance_recycler); });
+                }
+            }
+        }
+    }
+
+    worker_pool->stop();
+    std::stringstream ss;
+    for_each(instance_ids.begin(), instance_ids.end(), [&](const std::string& 
id) {
+        ss << "Instance ID: " << id << "\n";
+        ss << "----------------------------------------\n";
+
+        for_each(resource_types.begin(), resource_types.end(), [&](const auto& 
resource_type) {
+            int64_t to_recycle_num = 0;
+            int64_t to_recycle_bytes = 0;
+            if (resource_type == "recycle_segment" || resource_type == 
"recycle_tablet") {
+                to_recycle_num = 
g_bvar_recycler_instance_last_round_to_recycle_num.get(
+                        {"global_recycler", resource_type});
+                to_recycle_bytes = 
g_bvar_recycler_instance_last_round_to_recycle_bytes.get(
+                        {"global_recycler", resource_type});
+            } else {
+                to_recycle_num =
+                        
g_bvar_recycler_instance_last_round_to_recycle_num.get({id, resource_type});
+                to_recycle_bytes = 
g_bvar_recycler_instance_last_round_to_recycle_bytes.get(
+                        {id, resource_type});
+            }
+
+            ss << "Task Type: " << resource_type << "\n";
+
+            // Add specific counts for different resource types
+            if (resource_type == "recycle_partitions") {
+                ss << "  • Need to recycle partition count: " << 
to_recycle_num << " items\n";
+                ss << "  • Need to recycle partition size: " << 
to_recycle_bytes << " bytes\n";
+            } else if (resource_type == "recycle_rowsets") {
+                ss << "  • Need to recycle rowset count: " << to_recycle_num 
<< " items\n";
+                ss << "  • Need to recycle rowset size: " << to_recycle_bytes 
<< " bytes\n";
+            } else if (resource_type == "recycle_tmp_rowsets") {
+                ss << "  • Need to recycle tmp rowset count: " << 
to_recycle_num << " items\n";
+                ss << "  • Need to recycle tmp rowset size: " << 
to_recycle_bytes << " bytes\n";
+            } else if (resource_type == "recycle_indexes") {
+                ss << "  • Need to recycle index count: " << to_recycle_num << 
" items\n";
+                ss << "  • Need to recycle index size: " << to_recycle_bytes 
<< " bytes\n";
+            } else if (resource_type == "recycle_segment") {
+                ss << "  • Need to recycle segment count: " << to_recycle_num 
<< " items\n";
+                ss << "  • Need to recycle segment size: " << to_recycle_bytes 
<< " bytes\n";
+            } else if (resource_type == "recycle_tablet") {
+                ss << "  • Need to recycle tablet count: " << to_recycle_num 
<< " items\n";
+                ss << "  • Need to recycle tablet size: " << to_recycle_bytes 
<< " bytes\n";
+            } else if (resource_type == "recycle_versions") {
+                ss << "  • Need to recycle version count: " << to_recycle_num 
<< " items\n";
+                ss << "  • Need to recycle version size: " << to_recycle_bytes 
<< " bytes\n";
+            } else if (resource_type == "abort_timeout_txn") {
+                ss << "  • Need to abort timeout txn count: " << 
to_recycle_num << " items\n";
+                ss << "  • Need to recycle timeout txn size: " << 
to_recycle_bytes << " bytes\n";
+            } else if (resource_type == "recycle_expired_txn_label") {
+                ss << "  • Need to recycle expired txn label count: " << 
to_recycle_num
+                   << " items\n";
+                ss << "  • Need to recycle expired txn label size: " << 
to_recycle_bytes
+                   << " bytes\n";
+            } else if (resource_type == "recycle_copy_jobs") {
+                ss << "  • Need to recycle copy job count: " << to_recycle_num 
<< " items\n";
+                ss << "  • Need to recycle copy job size: " << 
to_recycle_bytes << " bytes\n";
+            } else if (resource_type == "recycle_stage") {
+                ss << "  • Need to recycle stage count: " << to_recycle_num << 
" items\n";
+                ss << "  • Need to recycle stage size: " << to_recycle_bytes 
<< " bytes\n";
+            } else if (resource_type == "recycle_expired_stage_objects") {
+                ss << "  • Need to recycle expired stage object count: " << 
to_recycle_num
+                   << " items\n";
+                ss << "  • Need to recycle expired stage object size: " << 
to_recycle_bytes
+                   << " bytes\n";
+            } else {
+                ss << "  • Need to recycle count: " << to_recycle_num << " 
items\n";
+                ss << "  • Need to recycle size: " << to_recycle_bytes << " 
bytes\n";
+            }
+
+            ss << "----------------------------------------\n";
+        });
+        ss << "\n";
+    });
+    msg = ss.str();
+}
+
 void RecyclerServiceImpl::recycle_instance(::google::protobuf::RpcController* 
controller,
                                            const 
::doris::cloud::RecycleInstanceRequest* request,
                                            
::doris::cloud::RecycleInstanceResponse* response,
@@ -330,6 +577,20 @@ void 
RecyclerServiceImpl::http(::google::protobuf::RpcController* controller,
         return;
     }
 
+    if (unresolved_path == "statistics_recycle") {
+        StatisticsRecycleRequest req;
+        auto st = google::protobuf::util::JsonStringToMessage(request_body, 
&req);
+        if (!st.ok()) {
+            msg = "failed to StatisticsRecycleRequest, error: " + 
st.message().ToString();
+            response_body = msg;
+            LOG(WARNING) << msg;
+            return;
+        }
+        statistics_recycle(req, code, msg);
+        response_body = msg;
+        return;
+    }
+
     if (unresolved_path == "recycle_copy_jobs") {
         auto instance_id = uri.GetQuery("instance_id");
         if (instance_id == nullptr || instance_id->empty()) {
diff --git a/cloud/src/recycler/recycler_service.h 
b/cloud/src/recycler/recycler_service.h
index 67eb9d21333..2ddf311310b 100644
--- a/cloud/src/recycler/recycler_service.h
+++ b/cloud/src/recycler/recycler_service.h
@@ -44,6 +44,8 @@ public:
               ::google::protobuf::Closure* done) override;
 
 private:
+    void statistics_recycle(StatisticsRecycleRequest& req, MetaServiceCode& 
code, std::string& msg);
+
     void check_instance(const std::string& instance_id, MetaServiceCode& code, 
std::string& msg);
 
 private:
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 4e5c27f6a43..571bd0b26e2 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1342,6 +1342,11 @@ message RecycleInstanceRequest {
     repeated string instance_ids = 1;
 }
 
+message StatisticsRecycleRequest {
+    repeated string instance_ids = 1;
+    repeated string resource_type = 2;
+}
+
 message RecycleInstanceResponse {
     optional MetaServiceResponseStatus status = 1;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to