This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 45bcd1d1a3c branch-3.0: [Enhancement](Test) Add test config to 
recycler #44761 (#45368)
45bcd1d1a3c is described below

commit 45bcd1d1a3c4255768cf2608d8ba216b5de020cc
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Dec 13 23:33:55 2024 +0800

    branch-3.0: [Enhancement](Test) Add test config to recycler #44761 (#45368)
    
    Cherry-picked from #44761
    
    Co-authored-by: abmdocrt <[email protected]>
---
 cloud/src/common/config.h           |  4 ++++
 cloud/src/recycler/meta_checker.cpp | 36 ++++++++++++++++++++++++++++----
 cloud/src/recycler/recycler.cpp     | 41 ++++++++++++++++++++++++++-----------
 3 files changed, 65 insertions(+), 16 deletions(-)

diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index c6b6e1ef290..1579ef9d627 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -87,6 +87,10 @@ CONF_mInt64(recycler_sleep_before_scheduling_seconds, "60");
 // log a warning if a recycle task takes longer than this duration
 CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h
 
+// force recycler to recycle all useless object.
+// **just for TEST**
+CONF_Bool(force_immediate_recycle, "false");
+
 CONF_String(test_s3_ak, "");
 CONF_String(test_s3_sk, "");
 CONF_String(test_s3_endpoint, "");
diff --git a/cloud/src/recycler/meta_checker.cpp 
b/cloud/src/recycler/meta_checker.cpp
index 522015555de..f1223068d4b 100644
--- a/cloud/src/recycler/meta_checker.cpp
+++ b/cloud/src/recycler/meta_checker.cpp
@@ -25,6 +25,7 @@
 #include <chrono>
 #include <set>
 
+#include "common/logging.h"
 #include "common/util.h"
 #include "meta-service/keys.h"
 #include "meta-service/txn_kv.h"
@@ -54,6 +55,7 @@ struct PartitionInfo {
     int64_t db_id;
     int64_t table_id;
     int64_t partition_id;
+    int64_t tablet_id;
     int64_t visible_version;
 };
 
@@ -173,6 +175,9 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
                     MYSQL_ROW row = mysql_fetch_row(result);
                     TabletInfo tablet_info = {0};
                     tablet_info.tablet_id = atoll(row[0]);
+                    VLOG_DEBUG << "get tablet info log"
+                               << ", db name" << elem.first << ", table name" 
<< table
+                               << ",tablet id" << tablet_info.tablet_id;
                     tablet_info.schema_version = atoll(row[4]);
                     tablets.push_back(std::move(tablet_info));
                 }
@@ -201,6 +206,13 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
                 partition_info.db_id = atoll(row[4]);
                 partition_info.table_id = atoll(row[5]);
                 partition_info.partition_id = atoll(row[6]);
+                partition_info.tablet_id = tablet_info.tablet_id;
+                VLOG_DEBUG << "get partition info log"
+                           << ", db id" << partition_info.db_id << ", table id"
+                           << partition_info.table_id << ", partition id"
+                           << partition_info.partition_id << ", tablet id"
+                           << partition_info.tablet_id;
+
                 partitions.insert({partition_info.partition_id, 
std::move(partition_info)});
             }
         }
@@ -217,9 +229,16 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
                 int num_row = mysql_num_rows(result);
                 for (int i = 0; i < num_row; ++i) {
                     MYSQL_ROW row = mysql_fetch_row(result);
-                    int partition_id = atoll(row[0]);
-                    int visible_version = atoll(row[2]);
+                    int64_t partition_id = atoll(row[0]);
+                    int64_t visible_version = atoll(row[2]);
                     partitions[partition_id].visible_version = visible_version;
+                    VLOG_DEBUG << "get partition version log"
+                               << ", db name" << elem.first << ", table name" 
<< table
+                               << ", raw partition id" << row[0] << ", first 
partition id"
+                               << partition_id << ", db id" << 
partitions[partition_id].db_id
+                               << ", table id" << 
partitions[partition_id].table_id
+                               << ", second partition id" << 
partitions[partition_id].partition_id
+                               << ", tablet id" << 
partitions[partition_id].tablet_id;
                 }
             }
             mysql_free_result(result);
@@ -354,14 +373,23 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
         int64_t db_id = elem.second.db_id;
         int64_t table_id = elem.second.table_id;
         int64_t partition_id = elem.second.partition_id;
+        int64_t tablet_id = elem.second.tablet_id;
         std::string ver_key = partition_version_key({instance_id_, db_id, 
table_id, partition_id});
         std::string ver_val;
         err = txn->get(ver_key, &ver_val);
         if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-            LOG(WARNING) << "version key not found, partition id: " << 
partition_id;
+            LOG_WARNING("version key not found.")
+                    .tag("db id", db_id)
+                    .tag("table id", table_id)
+                    .tag("partition id", partition_id)
+                    .tag("tablet id", tablet_id);
             return false;
         } else if (err != TxnErrorCode::TXN_OK) {
-            LOG(WARNING) << "failed to get version: " << partition_id;
+            LOG_WARNING("failed to get version.")
+                    .tag("db id", db_id)
+                    .tag("table id", table_id)
+                    .tag("partition id", partition_id)
+                    .tag("tablet id", tablet_id);
             return false;
         }
 
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index f7000ea3792..6877d7e433b 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -24,6 +24,7 @@
 
 #include <atomic>
 #include <chrono>
+#include <cstdint>
 #include <deque>
 #include <string>
 #include <string_view>
@@ -747,7 +748,10 @@ int InstanceRecycler::recycle_indexes() {
                 .tag("num_recycled", num_recycled);
     });
 
-    auto calc_expiration = [](const RecycleIndexPB& index) {
+    auto calc_expiration = [](const RecycleIndexPB& index) -> int64_t {
+        if (config::force_immediate_recycle) {
+            return 0;
+        }
         int64_t expiration = index.expiration() > 0 ? index.expiration() : 
index.creation_time();
         int64_t retention_seconds = config::retention_seconds;
         if (index.state() == RecycleIndexPB::DROPPED) {
@@ -942,7 +946,10 @@ int InstanceRecycler::recycle_partitions() {
                 .tag("num_recycled", num_recycled);
     });
 
-    auto calc_expiration = [](const RecyclePartitionPB& partition) {
+    auto calc_expiration = [](const RecyclePartitionPB& partition) -> int64_t {
+        if (config::force_immediate_recycle) {
+            return 0;
+        }
         int64_t expiration =
                 partition.expiration() > 0 ? partition.expiration() : 
partition.creation_time();
         int64_t retention_seconds = config::retention_seconds;
@@ -1686,7 +1693,10 @@ int InstanceRecycler::recycle_rowsets() {
         return 0;
     };
 
-    auto calc_expiration = [](const RecycleRowsetPB& rs) {
+    auto calc_expiration = [](const RecycleRowsetPB& rs) -> int64_t {
+        if (config::force_immediate_recycle) {
+            return 0;
+        }
         // RecycleRowsetPB created by compacted or dropped rowset has no 
expiration time, and will be recycled when exceed retention time
         int64_t expiration = rs.expiration() > 0 ? rs.expiration() : 
rs.creation_time();
         int64_t retention_seconds = config::retention_seconds;
@@ -1923,8 +1933,9 @@ int InstanceRecycler::recycle_tmp_rowsets() {
         // ATTN: `txn_expiration` should > 0, however we use `creation_time` + 
a large `retention_time` (> 1 day in production environment)
         //  when `txn_expiration` <= 0 in some unexpected situation (usually 
when there are bugs). This is usually safe, coz loading
         //  duration or timeout always < `retention_time` in practice.
-        int64_t expiration =
-                rowset.txn_expiration() > 0 ? rowset.txn_expiration() : 
rowset.creation_time();
+        int64_t expiration = config::force_immediate_recycle ? 0
+                             : rowset.txn_expiration() > 0   ? 
rowset.txn_expiration()
+                                                             : 
rowset.creation_time();
         VLOG_DEBUG << "recycle tmp rowset scan, key=" << hex(k) << " 
num_scanned=" << num_scanned
                    << " num_expired=" << num_expired << " expiration=" << 
expiration
                    << " txn_expiration=" << rowset.txn_expiration()
@@ -2106,7 +2117,7 @@ int InstanceRecycler::abort_timeout_txn() {
                 LOG_WARNING("malformed txn_running_pb").tag("key", hex(k));
                 return -1;
             }
-            if (txn_running_pb.timeout_time() > current_time) {
+            if (!config::force_immediate_recycle && 
txn_running_pb.timeout_time() > current_time) {
                 return 0;
             }
             ++num_timeout;
@@ -2196,7 +2207,8 @@ int InstanceRecycler::recycle_expired_txn_label() {
             LOG_WARNING("malformed txn_running_pb").tag("key", hex(k));
             return -1;
         }
-        if ((recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
+        if ((config::force_immediate_recycle) ||
+            (recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
             (recycle_txn_pb.creation_time() + config::label_keep_max_second * 
1000L <=
              current_time)) {
             LOG_INFO("found recycle txn").tag("key", hex(k));
@@ -2492,14 +2504,16 @@ int InstanceRecycler::recycle_copy_jobs() {
                 int64_t current_time =
                         
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
                 if (copy_job.finish_time_ms() > 0) {
-                    if (current_time <
-                        copy_job.finish_time_ms() + 
config::copy_job_max_retention_second * 1000) {
+                    if (!config::force_immediate_recycle &&
+                        current_time < copy_job.finish_time_ms() +
+                                               
config::copy_job_max_retention_second * 1000) {
                         return 0;
                     }
                 } else {
                     // For compatibility, copy job does not contain finish 
time before 2.2.2, use start time
-                    if (current_time <
-                        copy_job.start_time_ms() + 
config::copy_job_max_retention_second * 1000) {
+                    if (!config::force_immediate_recycle &&
+                        current_time < copy_job.start_time_ms() +
+                                               
config::copy_job_max_retention_second * 1000) {
                         return 0;
                     }
                 }
@@ -2508,7 +2522,7 @@ int InstanceRecycler::recycle_copy_jobs() {
             int64_t current_time =
                     
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
             // if copy job is timeout: delete all copy file kvs and copy job kv
-            if (current_time <= copy_job.timeout_time_ms()) {
+            if (!config::force_immediate_recycle && current_time <= 
copy_job.timeout_time_ms()) {
                 return 0;
             }
             ++num_expired;
@@ -2796,6 +2810,9 @@ int InstanceRecycler::recycle_expired_stage_objects() {
         int64_t expiration_time =
                 
duration_cast<seconds>(system_clock::now().time_since_epoch()).count() -
                 config::internal_stage_objects_expire_time_second;
+        if (config::force_immediate_recycle) {
+            expiration_time = INT64_MAX;
+        }
         ret1 = accessor->delete_all(expiration_time);
         if (ret1 != 0) {
             LOG(WARNING) << "failed to recycle expired stage objects, 
instance_id=" << instance_id_


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to