This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new a7c827ddfd6 branch-3.0: [feat](recycler) Add http api for statistics
recycler metrics #52523 (#53117)
a7c827ddfd6 is described below
commit a7c827ddfd62662201ee47c08152c6eb8518b8f0
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Jul 12 10:17:34 2025 +0800
branch-3.0: [feat](recycler) Add http api for statistics recycler metrics
#52523 (#53117)
Cherry-picked from #52523
Co-authored-by: Uniqueyou <[email protected]>
---
cloud/src/common/bvars.cpp | 6 +-
cloud/src/common/bvars.h | 6 +-
cloud/src/common/config.h | 4 +-
cloud/src/recycler/recycler.cpp | 1532 +++++++++++++++++--------------
cloud/src/recycler/recycler.h | 55 +-
cloud/src/recycler/recycler_service.cpp | 261 ++++++
cloud/src/recycler/recycler_service.h | 2 +
gensrc/proto/cloud.proto | 5 +
8 files changed, 1172 insertions(+), 699 deletions(-)
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 4485fb9f5d6..5999320af83 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -110,11 +110,11 @@ bvar::Adder<int64_t>
g_bvar_recycler_instance_recycle_task_concurrency;
// recycler's mbvars
bvar::Adder<int64_t>
g_bvar_recycler_instance_running_counter("recycler_instance_running_counter");
// cost time of the last whole recycle process
-mBvarStatus<int64_t>
g_bvar_recycler_instance_last_recycle_duration("recycler_instance_last_recycle_duration",{"instance_id"});
+mBvarStatus<int64_t>
g_bvar_recycler_instance_last_round_recycle_duration("recycler_instance_last_round_recycle_duration",{"instance_id"});
mBvarStatus<int64_t>
g_bvar_recycler_instance_next_ts("recycler_instance_next_ts",{"instance_id"});
// start and end timestamps of the recycle process
-mBvarStatus<int64_t>
g_bvar_recycler_instance_recycle_st_ts("recycler_instance_recycle_st_ts",{"instance_id"});
-mBvarStatus<int64_t>
g_bvar_recycler_instance_recycle_ed_ts("recycler_instance_recycle_ed_ts",{"instance_id"});
+mBvarStatus<int64_t>
g_bvar_recycler_instance_recycle_start_ts("recycler_instance_recycle_start_ts",{"instance_id"});
+mBvarStatus<int64_t>
g_bvar_recycler_instance_recycle_end_ts("recycler_instance_recycle_end_ts",{"instance_id"});
mBvarStatus<int64_t>
g_bvar_recycler_instance_recycle_last_success_ts("recycler_instance_recycle_last_success_ts",{"instance_id"});
// recycler's mbvars
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 83ab481764f..3ca3e1b2cab 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -262,10 +262,10 @@ extern BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_expired_txn_label_earl
extern bvar::Status<int64_t> g_bvar_recycler_task_max_concurrency;
extern bvar::Adder<int64_t> g_bvar_recycler_instance_recycle_task_concurrency;
extern bvar::Adder<int64_t> g_bvar_recycler_instance_running_counter;
-extern mBvarStatus<int64_t> g_bvar_recycler_instance_last_recycle_duration;
+extern mBvarStatus<int64_t>
g_bvar_recycler_instance_last_round_recycle_duration;
extern mBvarStatus<int64_t> g_bvar_recycler_instance_next_ts;
-extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_st_ts;
-extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_ed_ts;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_start_ts;
+extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_end_ts;
extern mBvarStatus<int64_t> g_bvar_recycler_instance_recycle_last_success_ts;
extern mBvarIntAdder g_bvar_recycler_vault_recycle_status;
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 62cd5c9343b..6fd933d3a15 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -89,6 +89,8 @@ CONF_Strings(recycle_whitelist, ""); // Comma seprated list
CONF_Strings(recycle_blacklist, ""); // Comma seprated list
// IO worker thread pool concurrency: object list, delete
CONF_mInt32(instance_recycler_worker_pool_size, "32");
+// The worker pool size for http api `statistics_recycle` worker pool
+CONF_mInt32(instance_recycler_statistics_recycle_worker_pool_size, "5");
CONF_Bool(enable_checker, "false");
// The parallelism for parallel recycle operation
// s3_producer_pool recycle_tablet_pool, delete single object in this pool
@@ -105,7 +107,7 @@ CONF_mInt32(scan_instances_interval_seconds, "60"); // 1min
// interval for check object
CONF_mInt32(check_object_interval_seconds, "43200"); // 12hours
// enable recycler metrics statistics
-CONF_Bool(enable_recycler_metrics, "false");
+CONF_Bool(enable_recycler_stats_metrics, "false");
CONF_mInt64(check_recycle_task_interval_seconds, "600"); // 10min
CONF_mInt64(recycler_sleep_before_scheduling_seconds, "60");
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index e285886acfb..e07dda0669a 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -68,8 +68,8 @@ namespace doris::cloud {
using namespace std::chrono;
-static RecyclerMetricsContext tablet_metrics_context_("global_recycler",
"recycle_tablet");
-static RecyclerMetricsContext segment_metrics_context_("global_recycler",
"recycle_segment");
+RecyclerMetricsContext tablet_metrics_context_("global_recycler",
"recycle_tablet");
+RecyclerMetricsContext segment_metrics_context_("global_recycler",
"recycle_segment");
// return 0 for success get a key, 1 for key not found, negative for error
[[maybe_unused]] static int txn_get(TxnKv* txn_kv, std::string_view key,
std::string& val) {
@@ -288,7 +288,7 @@ void Recycler::recycle_callback() {
auto ctime_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
g_bvar_recycler_instance_recycle_task_concurrency << 1;
g_bvar_recycler_instance_running_counter << 1;
- g_bvar_recycler_instance_recycle_st_ts.put({instance_id}, ctime_ms);
+ g_bvar_recycler_instance_recycle_start_ts.put({instance_id}, ctime_ms);
tablet_metrics_context_.reset();
segment_metrics_context_.reset();
ret = instance_recycler->do_recycle();
@@ -308,8 +308,8 @@ void Recycler::recycle_callback() {
auto now =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
auto elpased_ms = now - ctime_ms;
- g_bvar_recycler_instance_recycle_ed_ts.put({instance_id}, now);
- g_bvar_recycler_instance_last_recycle_duration.put({instance_id},
elpased_ms);
+ g_bvar_recycler_instance_recycle_end_ts.put({instance_id}, now);
+
g_bvar_recycler_instance_last_round_recycle_duration.put({instance_id},
elpased_ms);
g_bvar_recycler_instance_next_ts.put({instance_id},
now +
config::recycle_interval_seconds * 1000);
LOG(INFO) << "recycle instance done, "
@@ -812,6 +812,178 @@ int InstanceRecycler::recycle_deleted_instance() {
return ret;
}
+bool is_txn_finished(std::shared_ptr<TxnKv> txn_kv, const std::string&
instance_id,
+ int64_t txn_id) {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to create txn, txn_id=" << txn_id << "
instance_id=" << instance_id;
+ return false;
+ }
+
+ std::string index_val;
+ const std::string index_key = txn_index_key({instance_id, txn_id});
+ err = txn->get(index_key, &index_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ if (TxnErrorCode::TXN_KEY_NOT_FOUND == err) {
+ TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_has_been_recycled");
+ // txn has been recycled;
+ LOG(INFO) << "txn index key has been recycled, txn_id=" << txn_id
+ << " instance_id=" << instance_id;
+ return true;
+ }
+ LOG(WARNING) << "failed to get txn index key, txn_id=" << txn_id
+ << " instance_id=" << instance_id << " key=" <<
hex(index_key)
+ << " err=" << err;
+ return false;
+ }
+
+ TxnIndexPB index_pb;
+ if (!index_pb.ParseFromString(index_val)) {
+ LOG(WARNING) << "failed to parse txn_index_pb, txn_id=" << txn_id
+ << " instance_id=" << instance_id;
+ return false;
+ }
+
+ DCHECK(index_pb.has_tablet_index() == true);
+ if (!index_pb.tablet_index().has_db_id()) {
+ // In the previous version, the db_id was not set in the index_pb.
+ // If updating to the version which enable txn lazy commit, the db_id
will be set.
+ LOG(INFO) << "txn index has no db_id, txn_id=" << txn_id << "
instance_id=" << instance_id
+ << " index=" << index_pb.ShortDebugString();
+ return true;
+ }
+
+ int64_t db_id = index_pb.tablet_index().db_id();
+ DCHECK_GT(db_id, 0) << "db_id=" << db_id << " txn_id=" << txn_id
+ << " instance_id=" << instance_id;
+
+ std::string info_val;
+ const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+ err = txn->get(info_key, &info_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ if (TxnErrorCode::TXN_KEY_NOT_FOUND == err) {
+ // txn info has been recycled;
+ LOG(INFO) << "txn info key has been recycled, db_id=" << db_id <<
" txn_id=" << txn_id
+ << " instance_id=" << instance_id;
+ return true;
+ }
+
+ DCHECK(err != TxnErrorCode::TXN_KEY_NOT_FOUND);
+ LOG(WARNING) << "failed to get txn info key, txn_id=" << txn_id
+ << " instance_id=" << instance_id << " key=" <<
hex(info_key)
+ << " err=" << err;
+ return false;
+ }
+
+ TxnInfoPB txn_info;
+ if (!txn_info.ParseFromString(info_val)) {
+ LOG(WARNING) << "failed to parse txn_info, txn_id=" << txn_id
+ << " instance_id=" << instance_id;
+ return false;
+ }
+
+ DCHECK(txn_info.txn_id() == txn_id) << "txn_id=" << txn_id << "
instance_id=" << instance_id
+ << " txn_info=" <<
txn_info.ShortDebugString();
+
+ if (TxnStatusPB::TXN_STATUS_ABORTED == txn_info.status() ||
+ TxnStatusPB::TXN_STATUS_VISIBLE == txn_info.status()) {
+ TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_has_been_aborted",
&txn_info);
+ return true;
+ }
+
+ TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_not_finished", &txn_info);
+ return false;
+}
+
+int64_t calculate_rowset_expired_time(const std::string& instance_id_, const
RecycleRowsetPB& rs,
+ int64_t* earlest_ts /* rowset earliest
expiration ts */) {
+ if (config::force_immediate_recycle) {
+ return 0L;
+ }
+ // RecycleRowsetPB created by compacted or dropped rowset has no
expiration time, and will be recycled when exceed retention time
+ int64_t expiration = rs.expiration() > 0 ? rs.expiration() :
rs.creation_time();
+ int64_t retention_seconds = config::retention_seconds;
+ if (rs.type() == RecycleRowsetPB::COMPACT || rs.type() ==
RecycleRowsetPB::DROP) {
+ retention_seconds =
std::min(config::compacted_rowset_retention_seconds, retention_seconds);
+ }
+ int64_t final_expiration = expiration + retention_seconds;
+ if (*earlest_ts > final_expiration) {
+ *earlest_ts = final_expiration;
+ g_bvar_recycler_recycle_rowset_earlest_ts.put(instance_id_,
*earlest_ts);
+ }
+ return final_expiration;
+}
+
+int64_t calculate_partition_expired_time(
+ const std::string& instance_id_, const RecyclePartitionPB&
partition_meta_pb,
+ int64_t* earlest_ts /* partition earliest expiration ts */) {
+ if (config::force_immediate_recycle) {
+ return 0L;
+ }
+ int64_t expiration = partition_meta_pb.expiration() > 0 ?
partition_meta_pb.expiration()
+ :
partition_meta_pb.creation_time();
+ int64_t retention_seconds = config::retention_seconds;
+ if (partition_meta_pb.state() == RecyclePartitionPB::DROPPED) {
+ retention_seconds =
+ std::min(config::dropped_partition_retention_seconds,
retention_seconds);
+ }
+ int64_t final_expiration = expiration + retention_seconds;
+ if (*earlest_ts > final_expiration) {
+ *earlest_ts = final_expiration;
+ g_bvar_recycler_recycle_partition_earlest_ts.put(instance_id_,
*earlest_ts);
+ }
+ return final_expiration;
+}
+
+int64_t calculate_index_expired_time(const std::string& instance_id_,
+ const RecycleIndexPB& index_meta_pb,
+ int64_t* earlest_ts /* index earliest
expiration ts */) {
+ if (config::force_immediate_recycle) {
+ return 0L;
+ }
+ int64_t expiration = index_meta_pb.expiration() > 0 ?
index_meta_pb.expiration()
+ :
index_meta_pb.creation_time();
+ int64_t retention_seconds = config::retention_seconds;
+ if (index_meta_pb.state() == RecycleIndexPB::DROPPED) {
+ retention_seconds = std::min(config::dropped_index_retention_seconds,
retention_seconds);
+ }
+ int64_t final_expiration = expiration + retention_seconds;
+ if (*earlest_ts > final_expiration) {
+ *earlest_ts = final_expiration;
+ g_bvar_recycler_recycle_index_earlest_ts.put(instance_id_,
*earlest_ts);
+ }
+ return final_expiration;
+}
+
+int64_t calculate_tmp_rowset_expired_time(
+ const std::string& instance_id_, const doris::RowsetMetaCloudPB&
tmp_rowset_meta_pb,
+ int64_t* earlest_ts /* tmp_rowset earliest expiration ts */) {
+ // ATTN: `txn_expiration` should > 0, however we use `creation_time` + a
large `retention_time` (> 1 day in production environment)
+ // when `txn_expiration` <= 0 in some unexpected situation (usually when
there are bugs). This is usually safe, coz loading
+ // duration or timeout always < `retention_time` in practice.
+ int64_t expiration = tmp_rowset_meta_pb.txn_expiration() > 0
+ ? tmp_rowset_meta_pb.txn_expiration()
+ : tmp_rowset_meta_pb.creation_time();
+ expiration = config::force_immediate_recycle ? 0 : expiration;
+ int64_t final_expiration = expiration + config::retention_seconds;
+ if (*earlest_ts > final_expiration) {
+ *earlest_ts = final_expiration;
+ g_bvar_recycler_recycle_tmp_rowset_earlest_ts.put(instance_id_,
*earlest_ts);
+ }
+ return final_expiration;
+}
+
+int64_t calculate_txn_expired_time(const std::string& instance_id_, const
RecycleTxnPB& txn_meta_pb,
+ int64_t* earlest_ts /* txn earliest
expiration ts */) {
+ int64_t final_expiration = txn_meta_pb.creation_time() +
config::label_keep_max_second * 1000L;
+ if (*earlest_ts > final_expiration / 1000) {
+ *earlest_ts = final_expiration / 1000;
+ g_bvar_recycler_recycle_expired_txn_label_earlest_ts.put(instance_id_,
*earlest_ts);
+ }
+ return final_expiration;
+}
+
int InstanceRecycler::recycle_indexes() {
const std::string task_name = "recycle_indexes";
int64_t num_scanned = 0;
@@ -845,24 +1017,6 @@ int InstanceRecycler::recycle_indexes() {
int64_t earlest_ts = std::numeric_limits<int64_t>::max();
- auto calc_expiration = [&earlest_ts, this](const RecycleIndexPB& index) {
- if (config::force_immediate_recycle) {
- return 0L;
- }
- int64_t expiration = index.expiration() > 0 ? index.expiration() :
index.creation_time();
- int64_t retention_seconds = config::retention_seconds;
- if (index.state() == RecycleIndexPB::DROPPED) {
- retention_seconds =
- std::min(config::dropped_index_retention_seconds,
retention_seconds);
- }
- int64_t final_expiration = expiration + retention_seconds;
- if (earlest_ts > final_expiration) {
- earlest_ts = final_expiration;
- g_bvar_recycler_recycle_index_earlest_ts.put(instance_id_,
earlest_ts);
- }
- return final_expiration;
- };
-
// Elements in `index_keys` has the same lifetime as `it` in
`scan_and_recycle`
std::vector<std::string_view> index_keys;
auto recycle_func = [&, this](std::string_view k, std::string_view v) ->
int {
@@ -873,7 +1027,8 @@ int InstanceRecycler::recycle_indexes() {
return -1;
}
int64_t current_time = ::time(nullptr);
- if (current_time < calc_expiration(index_pb)) { // not expired
+ if (current_time <
+ calculate_index_expired_time(instance_id_, index_pb, &earlest_ts))
{ // not expired
return 0;
}
++num_expired;
@@ -933,47 +1088,6 @@ int InstanceRecycler::recycle_indexes() {
return 0;
};
- // for calculate the total num or bytes of recyled objects
- auto scan_and_statistics = [&](std::string_view k, std::string_view v) ->
int {
- RecycleIndexPB index_pb;
- if (!index_pb.ParseFromArray(v.data(), v.size())) {
- return 0;
- }
- int64_t current_time = ::time(nullptr);
- if (current_time < calc_expiration(index_pb)) {
- return 0;
- }
- // decode index_id
- auto k1 = k;
- k1.remove_prefix(1);
- std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>>
out;
- decode_key(&k1, &out);
- // 0x01 "recycle" ${instance_id} "index" ${index_id} -> RecycleIndexPB
- auto index_id = std::get<int64_t>(std::get<0>(out[3]));
- std::unique_ptr<Transaction> txn;
- TxnErrorCode err = txn_kv_->create_txn(&txn);
- if (err != TxnErrorCode::TXN_OK) {
- return 0;
- }
- std::string val;
- err = txn->get(k, &val);
- if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
- return 0;
- }
- if (err != TxnErrorCode::TXN_OK) {
- return 0;
- }
- index_pb.Clear();
- if (!index_pb.ParseFromString(val)) {
- return 0;
- }
- if (scan_tablets_and_statistics(index_pb.table_id(), index_id,
metrics_context) != 0) {
- return 0;
- }
- metrics_context.total_need_recycle_num++;
- return 0;
- };
-
auto loop_done = [&index_keys, this]() -> int {
if (index_keys.empty()) return 0;
DORIS_CLOUD_DEFER {
@@ -986,9 +1100,11 @@ int InstanceRecycler::recycle_indexes() {
return 0;
};
- return scan_for_recycle_and_statistics(index_key0, index_key1, "indexes",
metrics_context,
- std::move(scan_and_statistics),
std::move(recycle_func),
- std::move(loop_done));
+ if (config::enable_recycler_stats_metrics) {
+ scan_and_statistics_indexes();
+ }
+ // recycle_func and loop_done for scan and recycle
+ return scan_and_recycle(index_key0, index_key1, std::move(recycle_func),
std::move(loop_done));
}
bool check_lazy_txn_finished(std::shared_ptr<TxnKv> txn_kv, const std::string
instance_id,
@@ -1109,25 +1225,6 @@ int InstanceRecycler::recycle_partitions() {
int64_t earlest_ts = std::numeric_limits<int64_t>::max();
- auto calc_expiration = [&earlest_ts, this](const RecyclePartitionPB&
partition) {
- if (config::force_immediate_recycle) {
- return 0L;
- }
- int64_t expiration =
- partition.expiration() > 0 ? partition.expiration() :
partition.creation_time();
- int64_t retention_seconds = config::retention_seconds;
- if (partition.state() == RecyclePartitionPB::DROPPED) {
- retention_seconds =
- std::min(config::dropped_partition_retention_seconds,
retention_seconds);
- }
- int64_t final_expiration = expiration + retention_seconds;
- if (earlest_ts > final_expiration) {
- earlest_ts = final_expiration;
- g_bvar_recycler_recycle_partition_earlest_ts.put(instance_id_,
earlest_ts);
- }
- return final_expiration;
- };
-
// Elements in `partition_keys` has the same lifetime as `it` in
`scan_and_recycle`
std::vector<std::string_view> partition_keys;
std::vector<std::string> partition_version_keys;
@@ -1139,7 +1236,8 @@ int InstanceRecycler::recycle_partitions() {
return -1;
}
int64_t current_time = ::time(nullptr);
- if (current_time < calc_expiration(part_pb)) { // not expired
+ if (current_time <
+ calculate_partition_expired_time(instance_id_, part_pb,
&earlest_ts)) { // not expired
return 0;
}
++num_expired;
@@ -1213,54 +1311,6 @@ int InstanceRecycler::recycle_partitions() {
return ret;
};
- // for calculate the total num or bytes of recyled objects
- auto scan_and_statistics = [&, this](std::string_view k, std::string_view
v) -> int {
- RecyclePartitionPB part_pb;
- if (!part_pb.ParseFromArray(v.data(), v.size())) {
- return 0;
- }
- int64_t current_time = ::time(nullptr);
- if (current_time < calc_expiration(part_pb)) {
- return 0;
- }
- // decode partition_id
- auto k1 = k;
- k1.remove_prefix(1);
- std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>>
out;
- decode_key(&k1, &out);
- // 0x01 "recycle" ${instance_id} "partition" ${partition_id} ->
RecyclePartitionPB
- auto partition_id = std::get<int64_t>(std::get<0>(out[3]));
- // Change state to RECYCLING
- std::unique_ptr<Transaction> txn;
- TxnErrorCode err = txn_kv_->create_txn(&txn);
- if (err != TxnErrorCode::TXN_OK) {
- return 0;
- }
- std::string val;
- err = txn->get(k, &val);
- if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
- return 0;
- }
- if (err != TxnErrorCode::TXN_OK) {
- return 0;
- }
- part_pb.Clear();
- if (!part_pb.ParseFromString(val)) {
- return 0;
- }
- // Partitions with PREPARED state MUST have no data
- bool is_empty_tablet = part_pb.state() == RecyclePartitionPB::PREPARED;
- int ret = 0;
- for (int64_t index_id : part_pb.index_id()) {
- if (scan_tablets_and_statistics(part_pb.table_id(), index_id,
metrics_context,
- partition_id, is_empty_tablet) !=
0) {
- ret = 0;
- }
- }
- metrics_context.total_need_recycle_num++;
- return ret;
- };
-
auto loop_done = [&partition_keys, &partition_version_keys, this]() -> int
{
if (partition_keys.empty()) return 0;
DORIS_CLOUD_DEFER {
@@ -1288,9 +1338,11 @@ int InstanceRecycler::recycle_partitions() {
return 0;
};
- return scan_for_recycle_and_statistics(part_key0, part_key1, "partitions",
metrics_context,
- std::move(scan_and_statistics),
std::move(recycle_func),
- std::move(loop_done));
+ if (config::enable_recycler_stats_metrics) {
+ scan_and_statistics_partitions();
+ }
+ // recycle_func and loop_done for scan and recycle
+ return scan_and_recycle(part_key0, part_key1, std::move(recycle_func),
std::move(loop_done));
}
int InstanceRecycler::recycle_versions() {
@@ -1370,48 +1422,11 @@ int InstanceRecycler::recycle_versions() {
return 0;
};
- int64_t last_scanned_table_id_t = 0;
- bool is_recycled_t = false; // Is last scanned kv recycled
- // for calculate the total num or bytes of recyled objects
- auto scan_and_statistics = [&metrics_context, &last_scanned_table_id_t,
&is_recycled_t, this](
- std::string_view k, std::string_view) {
- auto k1 = k;
- k1.remove_prefix(1);
- // 0x01 "version" ${instance_id} "partition" ${db_id} ${tbl_id}
${partition_id}
- std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>>
out;
- decode_key(&k1, &out);
- DCHECK_EQ(out.size(), 6) << k;
- auto table_id = std::get<int64_t>(std::get<0>(out[4]));
- if (table_id == last_scanned_table_id_t) { // Already handle kvs of
this table
- metrics_context.total_need_recycle_num +=
- is_recycled_t; // Version kv of this table has been
recycled
- return 0;
- }
- last_scanned_table_id_t = table_id;
- is_recycled_t = false;
- auto tablet_key_begin = stats_tablet_key({instance_id_, table_id, 0,
0, 0});
- auto tablet_key_end = stats_tablet_key({instance_id_, table_id,
INT64_MAX, 0, 0});
- std::unique_ptr<Transaction> txn;
- TxnErrorCode err = txn_kv_->create_txn(&txn);
- if (err != TxnErrorCode::TXN_OK) {
- return 0;
- }
- std::unique_ptr<RangeGetIterator> iter;
- err = txn->get(tablet_key_begin, tablet_key_end, &iter, false, 1);
- if (err != TxnErrorCode::TXN_OK) {
- return 0;
- }
- if (iter->has_next()) { // Table is useful, should not recycle table
and partition versions
- return 0;
- }
- metrics_context.total_need_recycle_num++;
- is_recycled_t = true;
- return 0;
- };
-
- return scan_for_recycle_and_statistics(version_key_begin, version_key_end,
"versions",
- metrics_context,
std::move(scan_and_statistics),
- std::move(recycle_func));
+ if (config::enable_recycler_stats_metrics) {
+ scan_and_statistics_versions();
+ }
+ // recycle_func and loop_done for scan and recycle
+ return scan_and_recycle(version_key_begin, version_key_end,
std::move(recycle_func));
}
int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id,
@@ -1935,7 +1950,12 @@ int
InstanceRecycler::scan_tablets_and_statistics(int64_t table_id, int64_t inde
}
return 0;
};
- return scan_and_recycle(tablet_key_begin, tablet_key_end,
std::move(scan_and_statistics));
+ return scan_and_recycle(tablet_key_begin, tablet_key_end,
std::move(scan_and_statistics),
+ [&metrics_context]() -> int {
+ metrics_context.report();
+ tablet_metrics_context_.report();
+ return 0;
+ });
}
int InstanceRecycler::scan_tablet_and_statistics(int64_t tablet_id,
@@ -2033,7 +2053,7 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id,
RecyclerMetricsContext&
std::string recyc_rs_key0 = recycle_rowset_key({instance_id_, tablet_id,
""});
std::string recyc_rs_key1 = recycle_rowset_key({instance_id_, tablet_id +
1, ""});
- std::vector<std::string> rowset_meta;
+ std::set<std::string> resource_ids;
int64_t recycle_rowsets_number = 0;
int64_t recycle_segments_number = 0;
int64_t recycle_rowsets_data_size = 0;
@@ -2139,17 +2159,16 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id,
RecyclerMetricsContext&
max_rowset_creation_time = std::max(max_rowset_creation_time,
rs_meta.creation_time());
min_rowset_expiration_time = std::min(min_rowset_expiration_time,
rs_meta.txn_expiration());
max_rowset_expiration_time = std::max(max_rowset_expiration_time,
rs_meta.txn_expiration());
- rowset_meta.emplace_back(rs_meta.resource_id());
- LOG(INFO) << "rs_meta.resource_id()=" << rs_meta.resource_id();
+ resource_ids.emplace(rs_meta.resource_id());
}
LOG_INFO("recycle tablet start to delete object")
.tag("instance id", instance_id_)
.tag("tablet id", tablet_id)
.tag("recycle tablet resource ids are",
- std::accumulate(rowset_meta.begin(), rowset_meta.begin(),
std::string(),
- [](std::string acc, const auto& it) {
- return acc.empty() ? it : acc + ", " + it;
+ std::accumulate(resource_ids.begin(), resource_ids.begin(),
std::string(),
+ [](std::string rs_id, const auto& it) {
+ return rs_id.empty() ? it : rs_id + ", "
+ it;
}));
SyncExecutor<int> concurrent_delete_executor(
@@ -2161,7 +2180,7 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id,
RecyclerMetricsContext&
// ATTN: there may be data leak if not all accessor initilized successfully
// partial data deleted if the tablet is stored cross-storage vault
// vault id is not attached to TabletMeta...
- for (const auto& resource_id : rowset_meta) {
+ for (const auto& resource_id : resource_ids) {
concurrent_delete_executor.add([&, rs_id = resource_id,
accessor_ptr =
accessor_map_[resource_id]]() {
std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&](int*) {
@@ -2208,8 +2227,11 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id,
RecyclerMetricsContext&
segment_metrics_context_.total_recycled_num += recycle_segments_number;
segment_metrics_context_.total_recycled_data_size +=
recycle_rowsets_data_size + recycle_rowsets_index_size;
+ metrics_context.total_recycled_data_size +=
+ recycle_rowsets_data_size + recycle_rowsets_index_size;
tablet_metrics_context_.report();
segment_metrics_context_.report();
+ metrics_context.report();
txn.reset();
if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
@@ -2339,77 +2361,23 @@ int InstanceRecycler::recycle_rowsets() {
int64_t earlest_ts = std::numeric_limits<int64_t>::max();
- auto calc_expiration = [&earlest_ts, this](const RecycleRowsetPB& rs) {
- if (config::force_immediate_recycle) {
- return 0L;
- }
- // RecycleRowsetPB created by compacted or dropped rowset has no
expiration time, and will be recycled when exceed retention time
- int64_t expiration = rs.expiration() > 0 ? rs.expiration() :
rs.creation_time();
- int64_t retention_seconds = config::retention_seconds;
- if (rs.type() == RecycleRowsetPB::COMPACT || rs.type() ==
RecycleRowsetPB::DROP) {
- retention_seconds =
- std::min(config::compacted_rowset_retention_seconds,
retention_seconds);
- }
- int64_t final_expiration = expiration + retention_seconds;
- if (earlest_ts > final_expiration) {
- earlest_ts = final_expiration;
- g_bvar_recycler_recycle_rowset_earlest_ts.put(instance_id_,
earlest_ts);
- }
- return final_expiration;
- };
-
- // for calculate the total num or bytes of recyled objects
- auto scan_and_statistics = [&](std::string_view k, std::string_view v) ->
int {
+ auto handle_rowset_kv = [&, this](std::string_view k, std::string_view v)
-> int {
+ ++num_scanned;
+ total_rowset_key_size += k.size();
+ total_rowset_value_size += v.size();
RecycleRowsetPB rowset;
if (!rowset.ParseFromArray(v.data(), v.size())) {
- return 0;
+ LOG_WARNING("malformed recycle rowset").tag("key", hex(k));
+ return -1;
}
+
+ int final_expiration = calculate_rowset_expired_time(instance_id_,
rowset, &earlest_ts);
+
+ VLOG_DEBUG << "recycle rowset scan, key=" << hex(k) << " num_scanned="
<< num_scanned
+ << " num_expired=" << num_expired << " expiration=" <<
final_expiration
+ << " RecycleRowsetPB=" << rowset.ShortDebugString();
int64_t current_time = ::time(nullptr);
- if (current_time < calc_expiration(rowset)) { // not expired
- return 0;
- }
- if (!rowset.has_type()) {
- if (!rowset.has_resource_id()) [[unlikely]] {
- return 0;
- }
- if (rowset.resource_id().empty()) [[unlikely]] {
- return 0;
- }
- return 0;
- }
- auto* rowset_meta = rowset.mutable_rowset_meta();
- if (!rowset_meta->has_resource_id()) [[unlikely]] {
- if (rowset.type() == RecycleRowsetPB::PREPARE ||
rowset_meta->num_segments() != 0) {
- return 0;
- }
- }
- if (rowset.type() != RecycleRowsetPB::PREPARE) {
- if (rowset_meta->num_segments() > 0) {
- metrics_context.total_need_recycle_num++;
- segment_metrics_context_.total_need_recycle_num +=
rowset_meta->num_segments();
- segment_metrics_context_.total_need_recycle_data_size +=
- rowset_meta->total_disk_size();
- metrics_context.total_need_recycle_data_size +=
rowset_meta->total_disk_size();
- }
- }
- return 0;
- };
-
- auto handle_rowset_kv = [&](std::string_view k, std::string_view v) -> int
{
- ++num_scanned;
- total_rowset_key_size += k.size();
- total_rowset_value_size += v.size();
- RecycleRowsetPB rowset;
- if (!rowset.ParseFromArray(v.data(), v.size())) {
- LOG_WARNING("malformed recycle rowset").tag("key", hex(k));
- return -1;
- }
-
- VLOG_DEBUG << "recycle rowset scan, key=" << hex(k) << " num_scanned="
<< num_scanned
- << " num_expired=" << num_expired << " expiration=" <<
calc_expiration(rowset)
- << " RecycleRowsetPB=" << rowset.ShortDebugString();
- int64_t current_time = ::time(nullptr);
- if (current_time < calc_expiration(rowset)) { // not expired
+ if (current_time < final_expiration) { // not expired
return 0;
}
++num_expired;
@@ -2504,9 +2472,13 @@ int InstanceRecycler::recycle_rowsets() {
return 0;
};
- int ret = scan_for_recycle_and_statistics(recyc_rs_key0, recyc_rs_key1,
"rowsets",
- metrics_context,
std::move(scan_and_statistics),
- std::move(handle_rowset_kv),
std::move(loop_done));
+ if (config::enable_recycler_stats_metrics) {
+ scan_and_statistics_rowsets();
+ }
+ // recycle_func and loop_done for scan and recycle
+ int ret = scan_and_recycle(recyc_rs_key0, recyc_rs_key1,
std::move(handle_rowset_kv),
+ std::move(loop_done));
+
worker_pool->stop();
if (!async_recycled_rowset_keys.empty()) {
@@ -2520,90 +2492,6 @@ int InstanceRecycler::recycle_rowsets() {
return ret;
}
-bool is_txn_finished(std::shared_ptr<TxnKv> txn_kv, const std::string&
instance_id,
- int64_t txn_id) {
- std::unique_ptr<Transaction> txn;
- TxnErrorCode err = txn_kv->create_txn(&txn);
- if (err != TxnErrorCode::TXN_OK) {
- LOG(WARNING) << "failed to create txn, txn_id=" << txn_id << "
instance_id=" << instance_id;
- return false;
- }
-
- std::string index_val;
- const std::string index_key = txn_index_key({instance_id, txn_id});
- err = txn->get(index_key, &index_val);
- if (err != TxnErrorCode::TXN_OK) {
- if (TxnErrorCode::TXN_KEY_NOT_FOUND == err) {
- TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_has_been_recycled");
- // txn has been recycled;
- LOG(INFO) << "txn index key has been recycled, txn_id=" << txn_id
- << " instance_id=" << instance_id;
- return true;
- }
- LOG(WARNING) << "failed to get txn index key, txn_id=" << txn_id
- << " instance_id=" << instance_id << " key=" <<
hex(index_key)
- << " err=" << err;
- return false;
- }
-
- TxnIndexPB index_pb;
- if (!index_pb.ParseFromString(index_val)) {
- LOG(WARNING) << "failed to parse txn_index_pb, txn_id=" << txn_id
- << " instance_id=" << instance_id;
- return false;
- }
-
- DCHECK(index_pb.has_tablet_index() == true);
- if (!index_pb.tablet_index().has_db_id()) {
- // In the previous version, the db_id was not set in the index_pb.
- // If updating to the version which enable txn lazy commit, the db_id
will be set.
- LOG(INFO) << "txn index has no db_id, txn_id=" << txn_id << "
instance_id=" << instance_id
- << " index=" << index_pb.ShortDebugString();
- return true;
- }
-
- int64_t db_id = index_pb.tablet_index().db_id();
- DCHECK_GT(db_id, 0) << "db_id=" << db_id << " txn_id=" << txn_id
- << " instance_id=" << instance_id;
-
- std::string info_val;
- const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
- err = txn->get(info_key, &info_val);
- if (err != TxnErrorCode::TXN_OK) {
- if (TxnErrorCode::TXN_KEY_NOT_FOUND == err) {
- // txn info has been recycled;
- LOG(INFO) << "txn info key has been recycled, db_id=" << db_id <<
" txn_id=" << txn_id
- << " instance_id=" << instance_id;
- return true;
- }
-
- DCHECK(err != TxnErrorCode::TXN_KEY_NOT_FOUND);
- LOG(WARNING) << "failed to get txn info key, txn_id=" << txn_id
- << " instance_id=" << instance_id << " key=" <<
hex(info_key)
- << " err=" << err;
- return false;
- }
-
- TxnInfoPB txn_info;
- if (!txn_info.ParseFromString(info_val)) {
- LOG(WARNING) << "failed to parse txn_info, txn_id=" << txn_id
- << " instance_id=" << instance_id;
- return false;
- }
-
- DCHECK(txn_info.txn_id() == txn_id) << "txn_id=" << txn_id << "
instance_id=" << instance_id
- << " txn_info=" <<
txn_info.ShortDebugString();
-
- if (TxnStatusPB::TXN_STATUS_ABORTED == txn_info.status() ||
- TxnStatusPB::TXN_STATUS_VISIBLE == txn_info.status()) {
- TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_has_been_aborted",
&txn_info);
- return true;
- }
-
- TEST_SYNC_POINT_CALLBACK("is_txn_finished::txn_not_finished", &txn_info);
- return false;
-}
-
int InstanceRecycler::recycle_tmp_rowsets() {
const std::string task_name = "recycle_tmp_rowsets";
int64_t num_scanned = 0;
@@ -2648,25 +2536,10 @@ int InstanceRecycler::recycle_tmp_rowsets() {
std::map<std::string, doris::RowsetMetaCloudPB> tmp_rowsets;
int64_t earlest_ts = std::numeric_limits<int64_t>::max();
- auto calc_expiration = [&earlest_ts, this](const doris::RowsetMetaCloudPB&
rowset) {
- // ATTN: `txn_expiration` should > 0, however we use `creation_time` +
a large `retention_time` (> 1 day in production environment)
- // when `txn_expiration` <= 0 in some unexpected situation (usually
when there are bugs). This is usually safe, coz loading
- // duration or timeout always < `retention_time` in practice.
- int64_t expiration =
- rowset.txn_expiration() > 0 ? rowset.txn_expiration() :
rowset.creation_time();
- expiration = config::force_immediate_recycle ? 0 : expiration;
- int64_t final_expiration = expiration + config::retention_seconds;
- if (earlest_ts > final_expiration) {
- earlest_ts = final_expiration;
- g_bvar_recycler_recycle_tmp_rowset_earlest_ts.put(instance_id_,
earlest_ts);
- }
- return final_expiration;
- };
auto handle_rowset_kv = [&num_scanned, &num_expired, &tmp_rowset_keys,
&tmp_rowsets,
&expired_rowset_size, &total_rowset_key_size,
&total_rowset_value_size,
- &calc_expiration,
- this](std::string_view k, std::string_view v) ->
int {
+ &earlest_ts, this](std::string_view k,
std::string_view v) -> int {
++num_scanned;
total_rowset_key_size += k.size();
total_rowset_value_size += v.size();
@@ -2675,7 +2548,7 @@ int InstanceRecycler::recycle_tmp_rowsets() {
LOG_WARNING("malformed rowset meta").tag("key", hex(k));
return -1;
}
- int64_t expiration = calc_expiration(rowset);
+ int64_t expiration = calculate_tmp_rowset_expired_time(instance_id_,
rowset, &earlest_ts);
VLOG_DEBUG << "recycle tmp rowset scan, key=" << hex(k) << "
num_scanned=" << num_scanned
<< " num_expired=" << num_expired << " expiration=" <<
expiration
<< " txn_expiration=" << rowset.txn_expiration()
@@ -2724,41 +2597,6 @@ int InstanceRecycler::recycle_tmp_rowsets() {
return 0;
};
- // for calculate the total num or bytes of recyled objects
- auto scan_and_statistics = [&](std::string_view k, std::string_view v) ->
int {
- doris::RowsetMetaCloudPB rowset;
- if (!rowset.ParseFromArray(v.data(), v.size())) {
- return 0;
- }
- int64_t expiration = calc_expiration(rowset);
- int64_t current_time = ::time(nullptr);
- if (current_time < expiration) {
- return 0;
- }
-
- DCHECK_GT(rowset.txn_id(), 0)
- << "txn_id=" << rowset.txn_id() << " rowset=" <<
rowset.ShortDebugString();
- if (!is_txn_finished(txn_kv_, instance_id_, rowset.txn_id())) {
- return 0;
- }
-
- if (!rowset.has_resource_id()) {
- if (rowset.num_segments() > 0) [[unlikely]] { // impossible
- return 0;
- }
- metrics_context.total_need_recycle_num++;
- return 0;
- }
-
- metrics_context.total_need_recycle_num++;
- if (rowset.num_segments() > 0) {
- metrics_context.total_need_recycle_data_size +=
rowset.total_disk_size();
- segment_metrics_context_.total_need_recycle_data_size +=
rowset.total_disk_size();
- segment_metrics_context_.total_need_recycle_num +=
rowset.num_segments();
- }
- return 0;
- };
-
auto loop_done = [&tmp_rowset_keys, &tmp_rowsets, &num_recycled,
&metrics_context,
this]() -> int {
DORIS_CLOUD_DEFER {
@@ -2778,30 +2616,12 @@ int InstanceRecycler::recycle_tmp_rowsets() {
return 0;
};
- return scan_for_recycle_and_statistics(tmp_rs_key0, tmp_rs_key1,
"tmp_rowsets", metrics_context,
- std::move(scan_and_statistics),
- std::move(handle_rowset_kv),
std::move(loop_done));
-}
-
-int InstanceRecycler::scan_for_recycle_and_statistics(
- std::string begin, std::string_view end, std::string task_name,
- RecyclerMetricsContext& metrics_context,
- std::function<int(std::string_view k, std::string_view v)>
statistics_func,
- std::function<int(std::string_view k, std::string_view v)>
recycle_func,
- std::function<int()> loop_done) {
- if (config::enable_recycler_metrics) {
- scan_and_recycle(begin, end, std::move(statistics_func));
-
- // report to bvar
- metrics_context.report(true);
- tablet_metrics_context_.report(true);
- segment_metrics_context_.report(true);
-
- int ret = scan_and_recycle(begin, end, std::move(recycle_func),
std::move(loop_done));
- return ret;
- } else {
- return scan_and_recycle(begin, end, std::move(recycle_func),
std::move(loop_done));
+ if (config::enable_recycler_stats_metrics) {
+ scan_and_statistics_tmp_rowsets();
}
+ // recycle_func and loop_done for scan and recycle
+ return scan_and_recycle(tmp_rs_key0, tmp_rs_key1,
std::move(handle_rowset_kv),
+ std::move(loop_done));
}
int InstanceRecycler::scan_and_recycle(
@@ -3002,50 +2822,12 @@ int InstanceRecycler::abort_timeout_txn() {
return 0;
};
- // for calculate the total num or bytes of recyled objects
- auto scan_and_statistics = [&metrics_context, ¤t_time,
this](std::string_view k,
-
std::string_view v) -> int {
- std::unique_ptr<Transaction> txn;
- TxnErrorCode err = txn_kv_->create_txn(&txn);
- if (err != TxnErrorCode::TXN_OK) {
- return 0;
- }
- std::string_view k1 = k;
- k1.remove_prefix(1);
- std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>>
out;
- if (decode_key(&k1, &out) != 0) {
- return 0;
- }
- int64_t db_id = std::get<int64_t>(std::get<0>(out[3]));
- int64_t txn_id = std::get<int64_t>(std::get<0>(out[4]));
- // Update txn_info
- std::string txn_inf_key, txn_inf_val;
- txn_info_key({instance_id_, db_id, txn_id}, &txn_inf_key);
- err = txn->get(txn_inf_key, &txn_inf_val);
- if (err != TxnErrorCode::TXN_OK) {
- return 0;
- }
- TxnInfoPB txn_info;
- if (!txn_info.ParseFromString(txn_inf_val)) {
- return 0;
- }
-
- if (TxnStatusPB::TXN_STATUS_COMMITTED != txn_info.status()) {
- TxnRunningPB txn_running_pb;
- if (!txn_running_pb.ParseFromArray(v.data(), v.size())) {
- return 0;
- }
- if (!config::force_immediate_recycle &&
txn_running_pb.timeout_time() > current_time) {
- return 0;
- }
- metrics_context.total_need_recycle_num++;
- }
- return 0;
- };
-
- return scan_for_recycle_and_statistics(
- begin_txn_running_key, end_txn_running_key, "abort_timeout_txns",
metrics_context,
- std::move(scan_and_statistics), std::move(handle_txn_running_kv));
+ if (config::enable_recycler_stats_metrics) {
+ scan_and_statistics_abort_timeout_txn();
+ }
+ // recycle_func and loop_done for scan and recycle
+ return scan_and_recycle(begin_txn_running_key, end_txn_running_key,
+ std::move(handle_txn_running_kv));
}
int InstanceRecycler::recycle_expired_txn_label() {
@@ -3081,15 +2863,6 @@ int InstanceRecycler::recycle_expired_txn_label() {
};
int64_t earlest_ts = std::numeric_limits<int64_t>::max();
- auto calc_expiration = [&earlest_ts, this](const RecycleTxnPB&
recycle_txn_pb) {
- int64_t final_expiration =
- recycle_txn_pb.creation_time() + config::label_keep_max_second
* 1000L;
- if (earlest_ts > final_expiration / 1000) {
- earlest_ts = final_expiration / 1000;
-
g_bvar_recycler_recycle_expired_txn_label_earlest_ts.put(instance_id_,
earlest_ts);
- }
- return final_expiration;
- };
SyncExecutor<int> concurrent_delete_executor(
_thread_pool_group.s3_producer_pool,
@@ -3099,7 +2872,7 @@ int InstanceRecycler::recycle_expired_txn_label() {
int64_t current_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
- auto handle_recycle_txn_kv = [&](std::string_view k, std::string_view v)
-> int {
+ auto handle_recycle_txn_kv = [&, this](std::string_view k,
std::string_view v) -> int {
++num_scanned;
RecycleTxnPB recycle_txn_pb;
if (!recycle_txn_pb.ParseFromArray(v.data(), v.size())) {
@@ -3108,7 +2881,8 @@ int InstanceRecycler::recycle_expired_txn_label() {
}
if ((config::force_immediate_recycle) ||
(recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
- (calc_expiration(recycle_txn_pb) <= current_time_ms)) {
+ (calculate_txn_expired_time(instance_id_, recycle_txn_pb,
&earlest_ts) <=
+ current_time_ms)) {
VLOG_DEBUG << "found recycle txn, key=" << hex(k);
num_expired++;
recycle_txn_info_keys.emplace_back(k);
@@ -3116,20 +2890,6 @@ int InstanceRecycler::recycle_expired_txn_label() {
return 0;
};
- // for calculate the total num or bytes of recyled objects
- auto scan_and_statistics = [&](std::string_view k, std::string_view v) ->
int {
- RecycleTxnPB recycle_txn_pb;
- if (!recycle_txn_pb.ParseFromArray(v.data(), v.size())) {
- return 0;
- }
- if ((config::force_immediate_recycle) ||
- (recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
- (calc_expiration(recycle_txn_pb) <= current_time_ms)) {
- metrics_context.total_need_recycle_num++;
- }
- return 0;
- };
-
auto delete_recycle_txn_kv = [&](const std::string& k) -> int {
std::string_view k1 = k;
//RecycleTxnKeyInfo 0:instance_id 1:db_id 2:txn_id
@@ -3256,9 +3016,12 @@ int InstanceRecycler::recycle_expired_txn_label() {
return ret;
};
- return scan_for_recycle_and_statistics(
- begin_recycle_txn_key, end_recycle_txn_key, "expired_txn_labels",
metrics_context,
- std::move(scan_and_statistics), std::move(handle_recycle_txn_kv),
std::move(loop_done));
+ if (config::enable_recycler_stats_metrics) {
+ scan_and_statistics_expired_txn_label();
+ }
+ // recycle_func and loop_done for scan and recycle
+ return scan_and_recycle(begin_recycle_txn_key, end_recycle_txn_key,
+ std::move(handle_recycle_txn_kv),
std::move(loop_done));
}
struct CopyJobIdTuple {
@@ -3523,46 +3286,12 @@ int InstanceRecycler::recycle_copy_jobs() {
return 0;
};
- // for calculate the total num or bytes of recyled objects
- auto scan_and_statistics = [&metrics_context](std::string_view k,
std::string_view v) -> int {
- CopyJobPB copy_job;
- if (!copy_job.ParseFromArray(v.data(), v.size())) {
- LOG_WARNING("malformed copy job").tag("key", hex(k));
- return 0;
- }
-
- if (copy_job.job_status() == CopyJobPB::FINISH) {
- if (copy_job.stage_type() == StagePB::EXTERNAL) {
- int64_t current_time =
-
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
- if (copy_job.finish_time_ms() > 0) {
- if (!config::force_immediate_recycle &&
- current_time < copy_job.finish_time_ms() +
-
config::copy_job_max_retention_second * 1000) {
- return 0;
- }
- } else {
- if (!config::force_immediate_recycle &&
- current_time < copy_job.start_time_ms() +
-
config::copy_job_max_retention_second * 1000) {
- return 0;
- }
- }
- }
- } else if (copy_job.job_status() == CopyJobPB::LOADING) {
- int64_t current_time =
-
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
- if (!config::force_immediate_recycle && current_time <=
copy_job.timeout_time_ms()) {
- return 0;
- }
- }
- metrics_context.total_need_recycle_num++;
- return 0;
- };
-
- return scan_for_recycle_and_statistics(key0, key1, "copy_jobs",
metrics_context,
- std::move(scan_and_statistics),
std::move(recycle_func));
-}
+ if (config::enable_recycler_stats_metrics) {
+ scan_and_statistics_copy_jobs();
+ }
+ // recycle_func and loop_done for scan and recycle
+ return scan_and_recycle(key0, key1, std::move(recycle_func));
+}
int InstanceRecycler::init_copy_job_accessor(const std::string& stage_id,
const StagePB::StageType&
stage_type,
@@ -3663,8 +3392,8 @@ int InstanceRecycler::init_copy_job_accessor(const
std::string& stage_id,
int InstanceRecycler::recycle_stage() {
int64_t num_scanned = 0;
int64_t num_recycled = 0;
- RecyclerMetricsContext metrics_context(instance_id_, "recycle_stage");
const std::string task_name = "recycle_stage";
+ RecyclerMetricsContext metrics_context(instance_id_, task_name);
LOG_WARNING("begin to recycle stage").tag("instance_id", instance_id_);
@@ -3751,51 +3480,6 @@ int InstanceRecycler::recycle_stage() {
return 0;
};
- // for calculate the total num or bytes of recyled objects
- auto scan_and_statistics = [&metrics_context, this](std::string_view k,
- std::string_view v) ->
int {
- RecycleStagePB recycle_stage;
- if (!recycle_stage.ParseFromArray(v.data(), v.size())) {
- LOG_WARNING("malformed recycle stage").tag("key", hex(k));
- return 0;
- }
-
- int idx = stoi(recycle_stage.stage().obj_info().id());
- if (idx > instance_info_.obj_info().size() || idx < 1) {
- LOG(WARNING) << "invalid idx: " << idx;
- return 0;
- }
-
- std::shared_ptr<StorageVaultAccessor> accessor;
- int ret = SYNC_POINT_HOOK_RETURN_VALUE(
- [&] {
- auto& old_obj = instance_info_.obj_info()[idx - 1];
- auto s3_conf = S3Conf::from_obj_store_info(old_obj);
- if (!s3_conf) {
- return 0;
- }
-
- s3_conf->prefix =
recycle_stage.stage().obj_info().prefix();
- std::shared_ptr<S3Accessor> s3_accessor;
- int ret = S3Accessor::create(std::move(s3_conf.value()),
&s3_accessor);
- if (ret != 0) {
- return 0;
- }
-
- accessor = std::move(s3_accessor);
- return 0;
- }(),
- "recycle_stage:get_accessor", &accessor);
-
- if (ret != 0) {
- LOG(WARNING) << "failed to init accessor ret=" << ret;
- return 0;
- }
-
- metrics_context.total_need_recycle_num++;
- return 0;
- };
-
auto loop_done = [&stage_keys, this]() -> int {
if (stage_keys.empty()) return 0;
DORIS_CLOUD_DEFER {
@@ -3807,9 +3491,11 @@ int InstanceRecycler::recycle_stage() {
}
return 0;
};
- return scan_for_recycle_and_statistics(key0, key1, "stages",
metrics_context,
- std::move(scan_and_statistics),
std::move(recycle_func),
- std::move(loop_done));
+ if (config::enable_recycler_stats_metrics) {
+ scan_and_statistics_stage();
+ }
+ // recycle_func and loop_done for scan and recycle
+ return scan_and_recycle(key0, key1, std::move(recycle_func),
std::move(loop_done));
}
int InstanceRecycler::recycle_expired_stage_objects() {
@@ -3827,108 +3513,69 @@ int InstanceRecycler::recycle_expired_stage_objects() {
};
int ret = 0;
- // for calculate the total num or bytes of recyled objects
- auto scan_and_statistics = [&metrics_context, this]() {
- for (const auto& stage : instance_info_.stages()) {
- if (stopped()) {
- break;
- }
- if (stage.type() == StagePB::EXTERNAL) {
- continue;
- }
- int idx = stoi(stage.obj_info().id());
- if (idx > instance_info_.obj_info().size() || idx < 1) {
- continue;
- }
- const auto& old_obj = instance_info_.obj_info()[idx - 1];
- auto s3_conf = S3Conf::from_obj_store_info(old_obj);
- if (!s3_conf) {
- continue;
- }
- s3_conf->prefix = stage.obj_info().prefix();
- std::shared_ptr<S3Accessor> accessor;
- int ret1 = S3Accessor::create(*s3_conf, &accessor);
- if (ret1 != 0) {
- continue;
- }
- if (s3_conf->prefix.find("/stage/") == std::string::npos) {
- continue;
- }
- metrics_context.total_need_recycle_num++;
- }
- };
-
- auto handle_recycle_func = [&, this]() {
- for (const auto& stage : instance_info_.stages()) {
- std::stringstream ss;
- ss << "instance_id=" << instance_id_ << ", stage_id=" <<
stage.stage_id()
- << ", user_name="
- << (stage.mysql_user_name().empty() ? "null" :
stage.mysql_user_name().at(0))
- << ", user_id="
- << (stage.mysql_user_id().empty() ? "null" :
stage.mysql_user_id().at(0))
- << ", prefix=" << stage.obj_info().prefix();
-
- if (stopped()) {
- break;
- }
- if (stage.type() == StagePB::EXTERNAL) {
- continue;
- }
- int idx = stoi(stage.obj_info().id());
- if (idx > instance_info_.obj_info().size() || idx < 1) {
- LOG(WARNING) << "invalid idx: " << idx << ", id: " <<
stage.obj_info().id();
- continue;
- }
- const auto& old_obj = instance_info_.obj_info()[idx - 1];
- auto s3_conf = S3Conf::from_obj_store_info(old_obj);
- if (!s3_conf) {
- LOG(WARNING) << "failed to init s3_conf with obj_info="
- << old_obj.ShortDebugString();
- continue;
- }
+ if (config::enable_recycler_stats_metrics) {
+ scan_and_statistics_expired_stage_objects();
+ }
- s3_conf->prefix = stage.obj_info().prefix();
- std::shared_ptr<S3Accessor> accessor;
- int ret1 = S3Accessor::create(*s3_conf, &accessor);
- if (ret1 != 0) {
- LOG(WARNING) << "failed to init s3 accessor ret=" << ret1 << "
" << ss.str();
- ret = -1;
- continue;
- }
+ for (const auto& stage : instance_info_.stages()) {
+ std::stringstream ss;
+ ss << "instance_id=" << instance_id_ << ", stage_id=" <<
stage.stage_id() << ", user_name="
+ << (stage.mysql_user_name().empty() ? "null" :
stage.mysql_user_name().at(0))
+ << ", user_id=" << (stage.mysql_user_id().empty() ? "null" :
stage.mysql_user_id().at(0))
+ << ", prefix=" << stage.obj_info().prefix();
- if (s3_conf->prefix.find("/stage/") == std::string::npos) {
- LOG(WARNING) << "try to delete illegal prefix, which is
catastrophic, " << ss.str();
- ret = -1;
- continue;
- }
+ if (stopped()) {
+ break;
+ }
+ if (stage.type() == StagePB::EXTERNAL) {
+ continue;
+ }
+ int idx = stoi(stage.obj_info().id());
+ if (idx > instance_info_.obj_info().size() || idx < 1) {
+ LOG(WARNING) << "invalid idx: " << idx << ", id: " <<
stage.obj_info().id();
+ continue;
+ }
- LOG(INFO) << "recycle expired stage objects, " << ss.str();
- int64_t expiration_time =
-
duration_cast<seconds>(system_clock::now().time_since_epoch()).count() -
- config::internal_stage_objects_expire_time_second;
- if (config::force_immediate_recycle) {
- expiration_time = INT64_MAX;
- }
- ret1 = accessor->delete_all(expiration_time);
- if (ret1 != 0) {
- LOG(WARNING) << "failed to recycle expired stage objects,
ret=" << ret1 << " "
- << ss.str();
- ret = -1;
- continue;
- }
- metrics_context.total_recycled_num++;
- metrics_context.report();
+ const auto& old_obj = instance_info_.obj_info()[idx - 1];
+ auto s3_conf = S3Conf::from_obj_store_info(old_obj);
+ if (!s3_conf) {
+ LOG(WARNING) << "failed to init s3_conf with obj_info=" <<
old_obj.ShortDebugString();
+ continue;
}
- };
- // for calculate the total num or bytes of recyled objects
- scan_and_statistics();
+ s3_conf->prefix = stage.obj_info().prefix();
+ std::shared_ptr<S3Accessor> accessor;
+ int ret1 = S3Accessor::create(*s3_conf, &accessor);
+ if (ret1 != 0) {
+ LOG(WARNING) << "failed to init s3 accessor ret=" << ret1 << " "
<< ss.str();
+ ret = -1;
+ continue;
+ }
- // report to bvar
- metrics_context.report(true);
+ if (s3_conf->prefix.find("/stage/") == std::string::npos) {
+ LOG(WARNING) << "try to delete illegal prefix, which is
catastrophic, " << ss.str();
+ ret = -1;
+ continue;
+ }
- handle_recycle_func();
+ LOG(INFO) << "recycle expired stage objects, " << ss.str();
+ int64_t expiration_time =
+
duration_cast<seconds>(system_clock::now().time_since_epoch()).count() -
+ config::internal_stage_objects_expire_time_second;
+ if (config::force_immediate_recycle) {
+ expiration_time = INT64_MAX;
+ }
+ ret1 = accessor->delete_all(expiration_time);
+ if (ret1 != 0) {
+ LOG(WARNING) << "failed to recycle expired stage objects, ret=" <<
ret1 << " "
+ << ss.str();
+ ret = -1;
+ continue;
+ }
+ metrics_context.total_recycled_num++;
+ metrics_context.report();
+ }
return ret;
}
@@ -3965,4 +3612,543 @@ bool InstanceRecycler::check_recycle_tasks() {
return found;
}
+// Scan and statistics indexes that need to be recycled
+int InstanceRecycler::scan_and_statistics_indexes() {
+ RecyclerMetricsContext metrics_context(instance_id_, "recycle_indexes");
+
+ RecycleIndexKeyInfo index_key_info0 {instance_id_, 0};
+ RecycleIndexKeyInfo index_key_info1 {instance_id_, INT64_MAX};
+ std::string index_key0;
+ std::string index_key1;
+ recycle_index_key(index_key_info0, &index_key0);
+ recycle_index_key(index_key_info1, &index_key1);
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ auto handle_index_kv = [&, this](std::string_view k, std::string_view v)
-> int {
+ RecycleIndexPB index_pb;
+ if (!index_pb.ParseFromArray(v.data(), v.size())) {
+ return 0;
+ }
+ int64_t current_time = ::time(nullptr);
+ if (current_time <
+ calculate_index_expired_time(instance_id_, index_pb, &earlest_ts))
{ // not expired
+ return 0;
+ }
+ // decode index_id
+ auto k1 = k;
+ k1.remove_prefix(1);
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>>
out;
+ decode_key(&k1, &out);
+ // 0x01 "recycle" ${instance_id} "index" ${index_id} -> RecycleIndexPB
+ auto index_id = std::get<int64_t>(std::get<0>(out[3]));
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ return 0;
+ }
+ std::string val;
+ err = txn->get(k, &val);
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ return 0;
+ }
+ if (err != TxnErrorCode::TXN_OK) {
+ return 0;
+ }
+ index_pb.Clear();
+ if (!index_pb.ParseFromString(val)) {
+ return 0;
+ }
+ if (scan_tablets_and_statistics(index_pb.table_id(), index_id,
metrics_context) != 0) {
+ return 0;
+ }
+ metrics_context.total_need_recycle_num++;
+ return 0;
+ };
+
+ return scan_and_recycle(index_key0, index_key1, std::move(handle_index_kv),
+ [&metrics_context]() -> int {
+ metrics_context.report(true);
+ segment_metrics_context_.report(true);
+ tablet_metrics_context_.report(true);
+ return 0;
+ });
+}
+
+// Scan and statistics partitions that need to be recycled
+int InstanceRecycler::scan_and_statistics_partitions() {
+ RecyclerMetricsContext metrics_context(instance_id_, "recycle_partitions");
+
+ RecyclePartKeyInfo part_key_info0 {instance_id_, 0};
+ RecyclePartKeyInfo part_key_info1 {instance_id_, INT64_MAX};
+ std::string part_key0;
+ std::string part_key1;
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ recycle_partition_key(part_key_info0, &part_key0);
+ recycle_partition_key(part_key_info1, &part_key1);
+ auto handle_partition_kv = [&, this](std::string_view k, std::string_view
v) -> int {
+ RecyclePartitionPB part_pb;
+ if (!part_pb.ParseFromArray(v.data(), v.size())) {
+ return 0;
+ }
+ int64_t current_time = ::time(nullptr);
+ if (current_time <
+ calculate_partition_expired_time(instance_id_, part_pb,
&earlest_ts)) { // not expired
+ return 0;
+ }
+ // decode partition_id
+ auto k1 = k;
+ k1.remove_prefix(1);
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>>
out;
+ decode_key(&k1, &out);
+ // 0x01 "recycle" ${instance_id} "partition" ${partition_id} ->
RecyclePartitionPB
+ auto partition_id = std::get<int64_t>(std::get<0>(out[3]));
+ // Change state to RECYCLING
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ return 0;
+ }
+ std::string val;
+ err = txn->get(k, &val);
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ return 0;
+ }
+ if (err != TxnErrorCode::TXN_OK) {
+ return 0;
+ }
+ part_pb.Clear();
+ if (!part_pb.ParseFromString(val)) {
+ return 0;
+ }
+ // Partitions with PREPARED state MUST have no data
+ bool is_empty_tablet = part_pb.state() == RecyclePartitionPB::PREPARED;
+ int ret = 0;
+ for (int64_t index_id : part_pb.index_id()) {
+ if (scan_tablets_and_statistics(part_pb.table_id(), index_id,
metrics_context,
+ partition_id, is_empty_tablet) !=
0) {
+ ret = 0;
+ }
+ }
+ metrics_context.total_need_recycle_num++;
+ return ret;
+ };
+ return scan_and_recycle(part_key0, part_key1,
std::move(handle_partition_kv),
+ [&metrics_context]() -> int {
+ metrics_context.report(true);
+ segment_metrics_context_.report(true);
+ tablet_metrics_context_.report(true);
+ return 0;
+ });
+}
+
+// Scan and statistics rowsets that need to be recycled
+int InstanceRecycler::scan_and_statistics_rowsets() {
+ RecyclerMetricsContext metrics_context(instance_id_, "recycle_rowsets");
+ RecycleRowsetKeyInfo recyc_rs_key_info0 {instance_id_, 0, ""};
+ RecycleRowsetKeyInfo recyc_rs_key_info1 {instance_id_, INT64_MAX, ""};
+ std::string recyc_rs_key0;
+ std::string recyc_rs_key1;
+ recycle_rowset_key(recyc_rs_key_info0, &recyc_rs_key0);
+ recycle_rowset_key(recyc_rs_key_info1, &recyc_rs_key1);
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ auto handle_rowset_kv = [&, this](std::string_view k, std::string_view v)
-> int {
+ RecycleRowsetPB rowset;
+ if (!rowset.ParseFromArray(v.data(), v.size())) {
+ return 0;
+ }
+ int64_t current_time = ::time(nullptr);
+ if (current_time <
+ calculate_rowset_expired_time(instance_id_, rowset, &earlest_ts))
{ // not expired
+ return 0;
+ }
+ if (!rowset.has_type()) {
+ if (!rowset.has_resource_id()) [[unlikely]] {
+ return 0;
+ }
+ if (rowset.resource_id().empty()) [[unlikely]] {
+ return 0;
+ }
+ return 0;
+ }
+ auto* rowset_meta = rowset.mutable_rowset_meta();
+ if (!rowset_meta->has_resource_id()) [[unlikely]] {
+ if (rowset.type() == RecycleRowsetPB::PREPARE ||
rowset_meta->num_segments() != 0) {
+ return 0;
+ }
+ }
+ if (rowset.type() != RecycleRowsetPB::PREPARE) {
+ if (rowset_meta->num_segments() > 0) {
+ metrics_context.total_need_recycle_num++;
+ segment_metrics_context_.total_need_recycle_num +=
rowset_meta->num_segments();
+ segment_metrics_context_.total_need_recycle_data_size +=
+ rowset_meta->total_disk_size();
+ metrics_context.total_need_recycle_data_size +=
rowset_meta->total_disk_size();
+ }
+ }
+ return 0;
+ };
+ return scan_and_recycle(recyc_rs_key0, recyc_rs_key1,
std::move(handle_rowset_kv),
+ [&metrics_context]() -> int {
+ metrics_context.report(true);
+ segment_metrics_context_.report(true);
+ return 0;
+ });
+}
+
+// Scan and statistics tmp_rowsets that need to be recycled
+int InstanceRecycler::scan_and_statistics_tmp_rowsets() {
+ RecyclerMetricsContext metrics_context(instance_id_,
"recycle_tmp_rowsets");
+ MetaRowsetTmpKeyInfo tmp_rs_key_info0 {instance_id_, 0, 0};
+ MetaRowsetTmpKeyInfo tmp_rs_key_info1 {instance_id_, INT64_MAX, 0};
+ std::string tmp_rs_key0;
+ std::string tmp_rs_key1;
+ meta_rowset_tmp_key(tmp_rs_key_info0, &tmp_rs_key0);
+ meta_rowset_tmp_key(tmp_rs_key_info1, &tmp_rs_key1);
+
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ auto handle_tmp_rowsets_kv = [&, this](std::string_view k,
std::string_view v) -> int {
+ doris::RowsetMetaCloudPB rowset;
+ if (!rowset.ParseFromArray(v.data(), v.size())) {
+ return 0;
+ }
+ int64_t expiration = calculate_tmp_rowset_expired_time(instance_id_,
rowset, &earlest_ts);
+ int64_t current_time = ::time(nullptr);
+ if (current_time < expiration) {
+ return 0;
+ }
+
+ DCHECK_GT(rowset.txn_id(), 0)
+ << "txn_id=" << rowset.txn_id() << " rowset=" <<
rowset.ShortDebugString();
+ if (!is_txn_finished(txn_kv_, instance_id_, rowset.txn_id())) {
+ return 0;
+ }
+
+ if (!rowset.has_resource_id()) {
+ if (rowset.num_segments() > 0) [[unlikely]] { // impossible
+ return 0;
+ }
+ metrics_context.total_need_recycle_num++;
+ return 0;
+ }
+
+ metrics_context.total_need_recycle_num++;
+ if (rowset.num_segments() > 0) {
+ metrics_context.total_need_recycle_data_size +=
rowset.total_disk_size();
+ segment_metrics_context_.total_need_recycle_data_size +=
rowset.total_disk_size();
+ segment_metrics_context_.total_need_recycle_num +=
rowset.num_segments();
+ }
+ return 0;
+ };
+ return scan_and_recycle(tmp_rs_key0, tmp_rs_key1,
std::move(handle_tmp_rowsets_kv),
+ [&metrics_context]() -> int {
+ metrics_context.report(true);
+ segment_metrics_context_.report(true);
+ return 0;
+ });
+}
+
+// Scan and statistics abort_timeout_txn that need to be recycled
+int InstanceRecycler::scan_and_statistics_abort_timeout_txn() {
+ RecyclerMetricsContext metrics_context(instance_id_, "abort_timeout_txn");
+
+ TxnRunningKeyInfo txn_running_key_info0 {instance_id_, 0, 0};
+ TxnRunningKeyInfo txn_running_key_info1 {instance_id_, INT64_MAX,
INT64_MAX};
+ std::string begin_txn_running_key;
+ std::string end_txn_running_key;
+ txn_running_key(txn_running_key_info0, &begin_txn_running_key);
+ txn_running_key(txn_running_key_info1, &end_txn_running_key);
+
+ int64_t current_time =
+
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+
+ auto handle_abort_timeout_txn_kv = [&metrics_context, ¤t_time, this](
+ std::string_view k,
std::string_view v) -> int {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ return 0;
+ }
+ std::string_view k1 = k;
+ k1.remove_prefix(1);
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>>
out;
+ if (decode_key(&k1, &out) != 0) {
+ return 0;
+ }
+ int64_t db_id = std::get<int64_t>(std::get<0>(out[3]));
+ int64_t txn_id = std::get<int64_t>(std::get<0>(out[4]));
+ // Update txn_info
+ std::string txn_inf_key, txn_inf_val;
+ txn_info_key({instance_id_, db_id, txn_id}, &txn_inf_key);
+ err = txn->get(txn_inf_key, &txn_inf_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ return 0;
+ }
+ TxnInfoPB txn_info;
+ if (!txn_info.ParseFromString(txn_inf_val)) {
+ return 0;
+ }
+
+ if (TxnStatusPB::TXN_STATUS_COMMITTED != txn_info.status()) {
+ TxnRunningPB txn_running_pb;
+ if (!txn_running_pb.ParseFromArray(v.data(), v.size())) {
+ return 0;
+ }
+ if (!config::force_immediate_recycle &&
txn_running_pb.timeout_time() > current_time) {
+ return 0;
+ }
+ metrics_context.total_need_recycle_num++;
+ }
+ return 0;
+ };
+ return scan_and_recycle(begin_txn_running_key, end_txn_running_key,
+ std::move(handle_abort_timeout_txn_kv),
[&metrics_context]() -> int {
+ metrics_context.report(true);
+ return 0;
+ });
+}
+
+// Scan and statistics expired_txn_label that need to be recycled
+int InstanceRecycler::scan_and_statistics_expired_txn_label() {
+ RecyclerMetricsContext metrics_context(instance_id_,
"recycle_expired_txn_label");
+
+ RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id_, 0, 0};
+ RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id_, INT64_MAX,
INT64_MAX};
+ std::string begin_recycle_txn_key;
+ std::string end_recycle_txn_key;
+ recycle_txn_key(recycle_txn_key_info0, &begin_recycle_txn_key);
+ recycle_txn_key(recycle_txn_key_info1, &end_recycle_txn_key);
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+ int64_t current_time_ms =
+
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+
+ // for calculate the total num or bytes of recyled objects
+ auto handle_expired_txn_label_kv = [&, this](std::string_view k,
std::string_view v) -> int {
+ RecycleTxnPB recycle_txn_pb;
+ if (!recycle_txn_pb.ParseFromArray(v.data(), v.size())) {
+ return 0;
+ }
+ if ((config::force_immediate_recycle) ||
+ (recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
+ (calculate_txn_expired_time(instance_id_, recycle_txn_pb,
&earlest_ts) <=
+ current_time_ms)) {
+ metrics_context.total_need_recycle_num++;
+ }
+ return 0;
+ };
+ return scan_and_recycle(begin_recycle_txn_key, end_recycle_txn_key,
+ std::move(handle_expired_txn_label_kv),
[&metrics_context]() -> int {
+ metrics_context.report(true);
+ return 0;
+ });
+}
+
+// Scan and statistics copy_jobs that need to be recycled
+int InstanceRecycler::scan_and_statistics_copy_jobs() {
+ RecyclerMetricsContext metrics_context(instance_id_, "recycle_copy_jobs");
+ CopyJobKeyInfo key_info0 {instance_id_, "", 0, "", 0};
+ CopyJobKeyInfo key_info1 {instance_id_, "\xff", 0, "", 0};
+ std::string key0;
+ std::string key1;
+ copy_job_key(key_info0, &key0);
+ copy_job_key(key_info1, &key1);
+
+ // for calculate the total num or bytes of recyled objects
+ auto scan_and_statistics = [&metrics_context](std::string_view k,
std::string_view v) -> int {
+ CopyJobPB copy_job;
+ if (!copy_job.ParseFromArray(v.data(), v.size())) {
+ LOG_WARNING("malformed copy job").tag("key", hex(k));
+ return 0;
+ }
+
+ if (copy_job.job_status() == CopyJobPB::FINISH) {
+ if (copy_job.stage_type() == StagePB::EXTERNAL) {
+ int64_t current_time =
+
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+ if (copy_job.finish_time_ms() > 0) {
+ if (!config::force_immediate_recycle &&
+ current_time < copy_job.finish_time_ms() +
+
config::copy_job_max_retention_second * 1000) {
+ return 0;
+ }
+ } else {
+ if (!config::force_immediate_recycle &&
+ current_time < copy_job.start_time_ms() +
+
config::copy_job_max_retention_second * 1000) {
+ return 0;
+ }
+ }
+ }
+ } else if (copy_job.job_status() == CopyJobPB::LOADING) {
+ int64_t current_time =
+
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+ if (!config::force_immediate_recycle && current_time <=
copy_job.timeout_time_ms()) {
+ return 0;
+ }
+ }
+ metrics_context.total_need_recycle_num++;
+ return 0;
+ };
+
+ return scan_and_recycle(key0, key1, std::move(scan_and_statistics),
+ [&metrics_context]() -> int {
+ metrics_context.report(true);
+ return 0;
+ });
+}
+
+// Scan and statistics stage that need to be recycled
+int InstanceRecycler::scan_and_statistics_stage() {
+ RecyclerMetricsContext metrics_context(instance_id_, "recycle_stage");
+ RecycleStageKeyInfo key_info0 {instance_id_, ""};
+ RecycleStageKeyInfo key_info1 {instance_id_, "\xff"};
+ std::string key0 = recycle_stage_key(key_info0);
+ std::string key1 = recycle_stage_key(key_info1);
+
+ // for calculate the total num or bytes of recyled objects
+ auto scan_and_statistics = [&metrics_context, this](std::string_view k,
+ std::string_view v) ->
int {
+ RecycleStagePB recycle_stage;
+ if (!recycle_stage.ParseFromArray(v.data(), v.size())) {
+ LOG_WARNING("malformed recycle stage").tag("key", hex(k));
+ return 0;
+ }
+
+ int idx = stoi(recycle_stage.stage().obj_info().id());
+ if (idx > instance_info_.obj_info().size() || idx < 1) {
+ LOG(WARNING) << "invalid idx: " << idx;
+ return 0;
+ }
+
+ std::shared_ptr<StorageVaultAccessor> accessor;
+ int ret = SYNC_POINT_HOOK_RETURN_VALUE(
+ [&] {
+ auto& old_obj = instance_info_.obj_info()[idx - 1];
+ auto s3_conf = S3Conf::from_obj_store_info(old_obj);
+ if (!s3_conf) {
+ return 0;
+ }
+
+ s3_conf->prefix =
recycle_stage.stage().obj_info().prefix();
+ std::shared_ptr<S3Accessor> s3_accessor;
+ int ret = S3Accessor::create(std::move(s3_conf.value()),
&s3_accessor);
+ if (ret != 0) {
+ return 0;
+ }
+
+ accessor = std::move(s3_accessor);
+ return 0;
+ }(),
+ "recycle_stage:get_accessor", &accessor);
+
+ if (ret != 0) {
+ LOG(WARNING) << "failed to init accessor ret=" << ret;
+ return 0;
+ }
+
+ metrics_context.total_need_recycle_num++;
+ return 0;
+ };
+
+ return scan_and_recycle(key0, key1, std::move(scan_and_statistics),
+ [&metrics_context]() -> int {
+ metrics_context.report(true);
+ return 0;
+ });
+}
+
+// Scan and statistics expired_stage_objects that need to be recycled
+int InstanceRecycler::scan_and_statistics_expired_stage_objects() {
+ RecyclerMetricsContext metrics_context(instance_id_,
"recycle_expired_stage_objects");
+
+ // for calculate the total num or bytes of recyled objects
+ auto scan_and_statistics = [&metrics_context, this]() {
+ for (const auto& stage : instance_info_.stages()) {
+ if (stopped()) {
+ break;
+ }
+ if (stage.type() == StagePB::EXTERNAL) {
+ continue;
+ }
+ int idx = stoi(stage.obj_info().id());
+ if (idx > instance_info_.obj_info().size() || idx < 1) {
+ continue;
+ }
+ const auto& old_obj = instance_info_.obj_info()[idx - 1];
+ auto s3_conf = S3Conf::from_obj_store_info(old_obj);
+ if (!s3_conf) {
+ continue;
+ }
+ s3_conf->prefix = stage.obj_info().prefix();
+ std::shared_ptr<S3Accessor> accessor;
+ int ret1 = S3Accessor::create(*s3_conf, &accessor);
+ if (ret1 != 0) {
+ continue;
+ }
+ if (s3_conf->prefix.find("/stage/") == std::string::npos) {
+ continue;
+ }
+ metrics_context.total_need_recycle_num++;
+ }
+ };
+
+ scan_and_statistics();
+ metrics_context.report(true);
+ return 0;
+}
+
+// Scan and statistics versions that need to be recycled
+int InstanceRecycler::scan_and_statistics_versions() {
+ RecyclerMetricsContext metrics_context(instance_id_, "recycle_versions");
+ auto version_key_begin = partition_version_key({instance_id_, 0, 0, 0});
+ auto version_key_end = partition_version_key({instance_id_, INT64_MAX, 0,
0});
+
+ int64_t last_scanned_table_id = 0;
+ bool is_recycled = false; // Is last scanned kv recycled
+ // for calculate the total num or bytes of recyled objects
+ auto scan_and_statistics = [&metrics_context, &last_scanned_table_id,
&is_recycled, this](
+ std::string_view k, std::string_view) {
+ auto k1 = k;
+ k1.remove_prefix(1);
+ // 0x01 "version" ${instance_id} "partition" ${db_id} ${tbl_id}
${partition_id}
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>>
out;
+ decode_key(&k1, &out);
+ DCHECK_EQ(out.size(), 6) << k;
+ auto table_id = std::get<int64_t>(std::get<0>(out[4]));
+ if (table_id == last_scanned_table_id) { // Already handle kvs of this
table
+ metrics_context.total_need_recycle_num +=
+ is_recycled; // Version kv of this table has been recycled
+ return 0;
+ }
+ last_scanned_table_id = table_id;
+ is_recycled = false;
+ auto tablet_key_begin = stats_tablet_key({instance_id_, table_id, 0,
0, 0});
+ auto tablet_key_end = stats_tablet_key({instance_id_, table_id,
INT64_MAX, 0, 0});
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ return 0;
+ }
+ std::unique_ptr<RangeGetIterator> iter;
+ err = txn->get(tablet_key_begin, tablet_key_end, &iter, false, 1);
+ if (err != TxnErrorCode::TXN_OK) {
+ return 0;
+ }
+ if (iter->has_next()) { // Table is useful, should not recycle table
and partition versions
+ return 0;
+ }
+ metrics_context.total_need_recycle_num++;
+ is_recycled = true;
+ return 0;
+ };
+
+ return scan_and_recycle(version_key_begin, version_key_end,
std::move(scan_and_statistics),
+ [&metrics_context]() -> int {
+ metrics_context.report(true);
+ return 0;
+ });
+}
+
} // namespace doris::cloud
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index d1ae8a056c8..97f9a8d57c5 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -204,6 +204,26 @@ public:
bool check_recycle_tasks();
+ int scan_and_statistics_indexes();
+
+ int scan_and_statistics_partitions();
+
+ int scan_and_statistics_rowsets();
+
+ int scan_and_statistics_tmp_rowsets();
+
+ int scan_and_statistics_abort_timeout_txn();
+
+ int scan_and_statistics_expired_txn_label();
+
+ int scan_and_statistics_copy_jobs();
+
+ int scan_and_statistics_stage();
+
+ int scan_and_statistics_expired_stage_objects();
+
+ int scan_and_statistics_versions();
+
private:
// returns 0 for success otherwise error
int init_obj_store_accessors();
@@ -222,13 +242,6 @@ private:
std::function<int(std::string_view k,
std::string_view v)> recycle_func,
std::function<int()> loop_done = nullptr);
- int scan_for_recycle_and_statistics(
- std::string begin, std::string_view end, std::string task_name,
- RecyclerMetricsContext& metrics_context,
- std::function<int(std::string_view k, std::string_view v)>
statistics_func,
- std::function<int(std::string_view k, std::string_view v)>
recycle_func,
- std::function<int()> loop_done = nullptr);
-
// return 0 for success otherwise error
int delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta_pb);
@@ -363,30 +376,34 @@ public:
// `is_begin` is used to initialize total num of items need to be recycled
void report(bool is_begin = false) {
if (!operation_type.empty()) {
- if (total_need_recycle_data_size > 0) {
- // is init
- if (is_begin) {
+ // is init
+ if (is_begin) {
+ if (total_need_recycle_data_size) {
g_bvar_recycler_instance_last_round_to_recycle_bytes.put(
{instance_id, operation_type},
total_need_recycle_data_size);
- } else {
+ }
+ } else {
+ if (total_recycled_data_size.load()) {
g_bvar_recycler_instance_last_round_recycled_bytes.put(
{instance_id, operation_type},
total_recycled_data_size.load());
-
g_bvar_recycler_instance_recycle_total_bytes_since_started.put(
- {instance_id, operation_type},
total_recycled_data_size.load());
}
+ g_bvar_recycler_instance_recycle_total_bytes_since_started.put(
+ {instance_id, operation_type},
total_recycled_data_size.load());
}
- if (total_need_recycle_num > 0) {
- // is init
- if (is_begin) {
+ // is init
+ if (is_begin) {
+ if (total_need_recycle_num) {
g_bvar_recycler_instance_last_round_to_recycle_num.put(
{instance_id, operation_type},
total_need_recycle_num);
- } else {
+ }
+ } else {
+ if (total_recycled_num.load()) {
g_bvar_recycler_instance_last_round_recycled_num.put(
{instance_id, operation_type},
total_recycled_num.load());
-
g_bvar_recycler_instance_recycle_total_num_since_started.put(
- {instance_id, operation_type},
total_recycled_num.load());
}
+ g_bvar_recycler_instance_recycle_total_num_since_started.put(
+ {instance_id, operation_type},
total_recycled_num.load());
}
}
}
diff --git a/cloud/src/recycler/recycler_service.cpp
b/cloud/src/recycler/recycler_service.cpp
index b812fbd27ee..48b2fe370ef 100644
--- a/cloud/src/recycler/recycler_service.cpp
+++ b/cloud/src/recycler/recycler_service.cpp
@@ -23,6 +23,13 @@
#include <gen_cpp/cloud.pb.h>
#include <google/protobuf/util/json_util.h>
+#include <algorithm>
+#include <functional>
+#include <numeric>
+#include <sstream>
+#include <utility>
+#include <vector>
+
#include "common/config.h"
#include "common/defer.h"
#include "common/logging.h"
@@ -34,6 +41,7 @@
#include "recycler/meta_checker.h"
#include "recycler/recycler.h"
#include "recycler/s3_accessor.h"
+#include "recycler/util.h"
namespace doris::cloud {
@@ -52,6 +60,245 @@
RecyclerServiceImpl::RecyclerServiceImpl(std::shared_ptr<TxnKv> txn_kv, Recycler
RecyclerServiceImpl::~RecyclerServiceImpl() = default;
+void RecyclerServiceImpl::statistics_recycle(StatisticsRecycleRequest& req,
MetaServiceCode& code,
+ std::string& msg) {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = MetaServiceCode::KV_TXN_CREATE_ERR;
+ msg = "failed to create txn";
+ return;
+ }
+
+ static std::map<std::string, std::function<void(InstanceRecycler&)>>
resource_handlers = {
+ {"recycle_indexes",
+ [](InstanceRecycler& instance_recycler) {
+ instance_recycler.scan_and_statistics_indexes();
+ }},
+ {"recycle_partitions",
+ [](InstanceRecycler& instance_recycler) {
+ instance_recycler.scan_and_statistics_partitions();
+ }},
+ {"recycle_tmp_rowsets",
+ [](InstanceRecycler& instance_recycler) {
+ instance_recycler.scan_and_statistics_tmp_rowsets();
+ }},
+ {"recycle_rowsets",
+ [](InstanceRecycler& instance_recycler) {
+ instance_recycler.scan_and_statistics_rowsets();
+ }},
+ {"abort_timeout_txn",
+ [](InstanceRecycler& instance_recycler) {
+ instance_recycler.scan_and_statistics_abort_timeout_txn();
+ }},
+ {"recycle_expired_txn_label",
+ [](InstanceRecycler& instance_recycler) {
+ instance_recycler.scan_and_statistics_expired_txn_label();
+ }},
+ {"recycle_versions",
+ [](InstanceRecycler& instance_recycler) {
+ instance_recycler.scan_and_statistics_versions();
+ }},
+ {"recycle_copy_jobs",
+ [](InstanceRecycler& instance_recycler) {
+ instance_recycler.scan_and_statistics_copy_jobs();
+ }},
+ {"recycle_stage",
+ [](InstanceRecycler& instance_recycler) {
+ instance_recycler.scan_and_statistics_stage();
+ }},
+ {"recycle_expired_stage_objects",
+ [](InstanceRecycler& instance_recycler) {
+ instance_recycler.scan_and_statistics_expired_stage_objects();
+ }},
+ {"recycle_tablet",
+ [](InstanceRecycler& instance_recycler) {
+ instance_recycler.scan_and_statistics_partitions();
+ instance_recycler.scan_and_statistics_indexes();
+ }},
+ {"recycle_segment", [](InstanceRecycler& instance_recycler) {
+ instance_recycler.scan_and_statistics_partitions();
+ instance_recycler.scan_and_statistics_indexes();
+ instance_recycler.scan_and_statistics_rowsets();
+ instance_recycler.scan_and_statistics_tmp_rowsets();
+ }}};
+
+ std::set<std::string> resource_types;
+ for (const auto& resource_type : req.resource_type()) {
+ if (resource_type == "*") {
+ std::for_each(resource_handlers.begin(), resource_handlers.end(),
+ [&](const auto& it) {
resource_types.emplace(it.first); });
+ break;
+ } else {
+ if (!resource_handlers.contains(resource_type)) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format(
+ "invalid resource type: {}, valid resource_type have
[{}]", resource_type,
+ std::accumulate(resource_handlers.begin(),
resource_handlers.end(),
+ std::string(), [](const std::string&
acc, const auto& it) {
+ return acc.empty() ? it.first :
acc + ", " + it.first;
+ }));
+ LOG_WARNING(msg);
+ return;
+ } else {
+ resource_types.emplace(resource_type);
+ }
+ }
+ }
+
+ std::set<std::string> instance_ids;
+ std::vector<InstanceInfoPB> instances;
+ get_all_instances(txn_kv_.get(), instances);
+
+ for (const auto& instance_id : req.instance_ids()) {
+ if (instance_id == "*") {
+ std::for_each(instances.begin(), instances.end(), [&](const
InstanceInfoPB& instance) {
+ instance_ids.emplace(instance.instance_id());
+ });
+ break;
+ } else {
+ if (std::find_if(instances.begin(), instances.end(),
+ [&](const InstanceInfoPB& instance) {
+ return instance.instance_id() == instance_id;
+ }) == instances.end()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("invalid instance id: {}", instance_id);
+ LOG_WARNING(msg);
+ return;
+ } else {
+ instance_ids.emplace(instance_id);
+ }
+ }
+ }
+
+ LOG(INFO) << "begin to statistics recycle for "
+ << std::accumulate(instance_ids.begin(), instance_ids.end(),
std::string(),
+ [](const std::string& acc, const std::string&
id) {
+ return acc.empty() ? id : acc + ", " + id;
+ });
+
+ auto worker_pool = std::make_unique<SimpleThreadPool>(
+ config::instance_recycler_statistics_recycle_worker_pool_size,
"statistics_recycle");
+ worker_pool->start();
+
+ for (const auto& id : instance_ids) {
+ InstanceKeyInfo key_info {id};
+ std::string key;
+ instance_key(key_info, &key);
+ std::string val;
+ err = txn->get(key, &val);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = MetaServiceCode::KV_TXN_GET_ERR;
+ msg = fmt::format("failed to get instance, instance_id={},
err={}", id, err);
+ LOG_WARNING(msg);
+ continue;
+ }
+ InstanceInfoPB instance;
+ if (!instance.ParseFromString(val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("malformed instance info, key={}, val={}",
hex(key), hex(val));
+ LOG_WARNING(msg);
+ continue;
+ }
+ auto instance_recycler = std::make_shared<InstanceRecycler>(
+ txn_kv_, instance, recycler_->_thread_pool_group,
txn_lazy_committer_);
+
+ if (int r = instance_recycler->init(); r != 0) {
+ LOG(WARNING) << "failed to init instance recycler, instance_id="
<< id << " ret=" << r;
+ continue;
+ }
+ // if empty, statistics all resources
+ if (resource_types.empty()) {
+ for (const auto& [_, func] : resource_handlers) {
+ worker_pool->submit([&instance_recycler, &func]() {
func(*instance_recycler); });
+ }
+ } else {
+ for (const auto& resource_type : resource_types) {
+ if (auto it = resource_handlers.find(resource_type);
+ it != resource_handlers.end()) {
+ worker_pool->submit(
+ [&it, &instance_recycler]() {
it->second(*instance_recycler); });
+ }
+ }
+ }
+ }
+
+ worker_pool->stop();
+ std::stringstream ss;
+ for_each(instance_ids.begin(), instance_ids.end(), [&](const std::string&
id) {
+ ss << "Instance ID: " << id << "\n";
+ ss << "----------------------------------------\n";
+
+ for_each(resource_types.begin(), resource_types.end(), [&](const auto&
resource_type) {
+ int64_t to_recycle_num = 0;
+ int64_t to_recycle_bytes = 0;
+ if (resource_type == "recycle_segment" || resource_type ==
"recycle_tablet") {
+ to_recycle_num =
g_bvar_recycler_instance_last_round_to_recycle_num.get(
+ {"global_recycler", resource_type});
+ to_recycle_bytes =
g_bvar_recycler_instance_last_round_to_recycle_bytes.get(
+ {"global_recycler", resource_type});
+ } else {
+ to_recycle_num =
+
g_bvar_recycler_instance_last_round_to_recycle_num.get({id, resource_type});
+ to_recycle_bytes =
g_bvar_recycler_instance_last_round_to_recycle_bytes.get(
+ {id, resource_type});
+ }
+
+ ss << "Task Type: " << resource_type << "\n";
+
+ // Add specific counts for different resource types
+ if (resource_type == "recycle_partitions") {
+ ss << " • Need to recycle partition count: " <<
to_recycle_num << " items\n";
+ ss << " • Need to recycle partition size: " <<
to_recycle_bytes << " bytes\n";
+ } else if (resource_type == "recycle_rowsets") {
+ ss << " • Need to recycle rowset count: " << to_recycle_num
<< " items\n";
+ ss << " • Need to recycle rowset size: " << to_recycle_bytes
<< " bytes\n";
+ } else if (resource_type == "recycle_tmp_rowsets") {
+ ss << " • Need to recycle tmp rowset count: " <<
to_recycle_num << " items\n";
+ ss << " • Need to recycle tmp rowset size: " <<
to_recycle_bytes << " bytes\n";
+ } else if (resource_type == "recycle_indexes") {
+ ss << " • Need to recycle index count: " << to_recycle_num <<
" items\n";
+ ss << " • Need to recycle index size: " << to_recycle_bytes
<< " bytes\n";
+ } else if (resource_type == "recycle_segment") {
+ ss << " • Need to recycle segment count: " << to_recycle_num
<< " items\n";
+ ss << " • Need to recycle segment size: " << to_recycle_bytes
<< " bytes\n";
+ } else if (resource_type == "recycle_tablet") {
+ ss << " • Need to recycle tablet count: " << to_recycle_num
<< " items\n";
+ ss << " • Need to recycle tablet size: " << to_recycle_bytes
<< " bytes\n";
+ } else if (resource_type == "recycle_versions") {
+ ss << " • Need to recycle version count: " << to_recycle_num
<< " items\n";
+ ss << " • Need to recycle version size: " << to_recycle_bytes
<< " bytes\n";
+ } else if (resource_type == "abort_timeout_txn") {
+ ss << " • Need to abort timeout txn count: " <<
to_recycle_num << " items\n";
+ ss << " • Need to recycle timeout txn size: " <<
to_recycle_bytes << " bytes\n";
+ } else if (resource_type == "recycle_expired_txn_label") {
+ ss << " • Need to recycle expired txn label count: " <<
to_recycle_num
+ << " items\n";
+ ss << " • Need to recycle expired txn label size: " <<
to_recycle_bytes
+ << " bytes\n";
+ } else if (resource_type == "recycle_copy_jobs") {
+ ss << " • Need to recycle copy job count: " << to_recycle_num
<< " items\n";
+ ss << " • Need to recycle copy job size: " <<
to_recycle_bytes << " bytes\n";
+ } else if (resource_type == "recycle_stage") {
+ ss << " • Need to recycle stage count: " << to_recycle_num <<
" items\n";
+ ss << " • Need to recycle stage size: " << to_recycle_bytes
<< " bytes\n";
+ } else if (resource_type == "recycle_expired_stage_objects") {
+ ss << " • Need to recycle expired stage object count: " <<
to_recycle_num
+ << " items\n";
+ ss << " • Need to recycle expired stage object size: " <<
to_recycle_bytes
+ << " bytes\n";
+ } else {
+ ss << " • Need to recycle count: " << to_recycle_num << "
items\n";
+ ss << " • Need to recycle size: " << to_recycle_bytes << "
bytes\n";
+ }
+
+ ss << "----------------------------------------\n";
+ });
+ ss << "\n";
+ });
+ msg = ss.str();
+}
+
void RecyclerServiceImpl::recycle_instance(::google::protobuf::RpcController*
controller,
const
::doris::cloud::RecycleInstanceRequest* request,
::doris::cloud::RecycleInstanceResponse* response,
@@ -330,6 +577,20 @@ void
RecyclerServiceImpl::http(::google::protobuf::RpcController* controller,
return;
}
+ if (unresolved_path == "statistics_recycle") {
+ StatisticsRecycleRequest req;
+ auto st = google::protobuf::util::JsonStringToMessage(request_body,
&req);
+ if (!st.ok()) {
+ msg = "failed to StatisticsRecycleRequest, error: " +
st.message().ToString();
+ response_body = msg;
+ LOG(WARNING) << msg;
+ return;
+ }
+ statistics_recycle(req, code, msg);
+ response_body = msg;
+ return;
+ }
+
if (unresolved_path == "recycle_copy_jobs") {
auto instance_id = uri.GetQuery("instance_id");
if (instance_id == nullptr || instance_id->empty()) {
diff --git a/cloud/src/recycler/recycler_service.h
b/cloud/src/recycler/recycler_service.h
index 67eb9d21333..2ddf311310b 100644
--- a/cloud/src/recycler/recycler_service.h
+++ b/cloud/src/recycler/recycler_service.h
@@ -44,6 +44,8 @@ public:
::google::protobuf::Closure* done) override;
private:
+ void statistics_recycle(StatisticsRecycleRequest& req, MetaServiceCode&
code, std::string& msg);
+
void check_instance(const std::string& instance_id, MetaServiceCode& code,
std::string& msg);
private:
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 4e5c27f6a43..571bd0b26e2 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1342,6 +1342,11 @@ message RecycleInstanceRequest {
repeated string instance_ids = 1;
}
+message StatisticsRecycleRequest {
+ repeated string instance_ids = 1;
+ repeated string resource_type = 2;
+}
+
message RecycleInstanceResponse {
optional MetaServiceResponseStatus status = 1;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]