This is an automated email from the ASF dual-hosted git repository.
plat1ko pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 7131a3b39b3 [fix](checker) Clarified the semantics of the checker
return value (#38785)
7131a3b39b3 is described below
commit 7131a3b39b32bcafad3b4a6c062d24dfdb7bd5d8
Author: plat1ko <[email protected]>
AuthorDate: Mon Aug 26 19:50:56 2024 +0800
[fix](checker) Clarified the semantics of the checker return value (#38785)
---
cloud/src/recycler/checker.cpp | 68 ++++++++++++++++++++++++++++--------------
cloud/src/recycler/checker.h | 13 ++++----
2 files changed, 53 insertions(+), 28 deletions(-)
diff --git a/cloud/src/recycler/checker.cpp b/cloud/src/recycler/checker.cpp
index 49421f97ca0..f8289160269 100644
--- a/cloud/src/recycler/checker.cpp
+++ b/cloud/src/recycler/checker.cpp
@@ -167,10 +167,20 @@ int Checker::start() {
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
g_bvar_checker_enqueue_cost_s.put(instance_id, ctime_ms / 1000 -
enqueue_time_s);
ret = checker->do_check();
+
if (config::enable_inverted_check) {
- if (checker->do_inverted_check() != 0) ret = -1;
+ if (ret == 0) {
+ ret = checker->do_inverted_check();
+ }
+ }
+
+ if (ret < 0) {
+ // If ret < 0, it means that a temporary error occurred during
the check process.
+ // The check job should not be considered finished, and the
next round of check job
+ // should be retried as soon as possible.
+ return;
}
- if (ret == -1) return;
+
// If instance checker has been aborted, don't finish this job
if (!checker->stopped()) {
finish_instance_recycle_job(txn_kv_.get(), check_job_key,
instance.instance_id(),
@@ -444,9 +454,10 @@ int InstanceChecker::init_storage_vault_accessors(const
InstanceInfoPB& instance
int InstanceChecker::do_check() {
TEST_SYNC_POINT("InstanceChecker.do_check");
LOG(INFO) << "begin to check instance objects instance_id=" <<
instance_id_;
+ int check_ret = 0;
long num_scanned = 0;
long num_scanned_with_segment = 0;
- long num_check_failed = 0;
+ long num_rowset_loss = 0;
long instance_volume = 0;
using namespace std::chrono;
auto start_time = steady_clock::now();
@@ -455,11 +466,11 @@ int InstanceChecker::do_check() {
LOG(INFO) << "check instance objects finished, cost=" << cost
<< "s. instance_id=" << instance_id_ << " num_scanned=" <<
num_scanned
<< " num_scanned_with_segment=" << num_scanned_with_segment
- << " num_check_failed=" << num_check_failed
+ << " num_rowset_loss=" << num_rowset_loss
<< " instance_volume=" << instance_volume;
g_bvar_checker_num_scanned.put(instance_id_, num_scanned);
g_bvar_checker_num_scanned_with_segment.put(instance_id_,
num_scanned_with_segment);
- g_bvar_checker_num_check_failed.put(instance_id_, num_check_failed);
+ g_bvar_checker_num_check_failed.put(instance_id_, num_rowset_loss);
g_bvar_checker_check_cost_s.put(instance_id_, static_cast<long>(cost));
// FIXME(plat1ko): What if some list operation failed?
g_bvar_checker_instance_volume.put(instance_id_, instance_volume);
@@ -490,7 +501,7 @@ int InstanceChecker::do_check() {
.tag("resource_id", rs_meta.resource_id())
.tag("tablet_id", rs_meta.tablet_id())
.tag("rowset_id", rs_meta.rowset_id_v2());
- ++num_check_failed;
+ check_ret = -1;
return;
}
@@ -498,7 +509,7 @@ int InstanceChecker::do_check() {
int ret =
find_it->second->list_directory(tablet_path_prefix(rs_meta.tablet_id()),
&list_iter);
if (ret != 0) { // No need to log, because S3Accessor has logged
this error
- ++num_check_failed;
+ check_ret = -1;
return;
}
@@ -510,6 +521,7 @@ int InstanceChecker::do_check() {
instance_volume += tablet_volume;
}
+ bool data_loss = false;
for (int i = 0; i < rs_meta.num_segments(); ++i) {
auto path = segment_path(rs_meta.tablet_id(),
rs_meta.rowset_id_v2(), i);
if (tablet_files_cache.files.contains(path)) {
@@ -518,13 +530,16 @@ int InstanceChecker::do_check() {
if (1 == key_exist(txn_kv_.get(), key)) {
// Rowset has been deleted instead of data loss
- continue;
+ break;
}
-
- ++num_check_failed;
+ data_loss = true;
TEST_SYNC_POINT_CALLBACK("InstanceChecker.do_check1", &path);
LOG(WARNING) << "object not exist, path=" << path << " key=" <<
hex(key);
}
+
+ if (data_loss) {
+ ++num_rowset_loss;
+ }
};
// scan visible rowsets
@@ -553,7 +568,7 @@ int InstanceChecker::do_check() {
doris::RowsetMetaCloudPB rs_meta;
if (!rs_meta.ParseFromArray(v.data(), v.size())) {
- ++num_check_failed;
+ ++num_rowset_loss;
LOG(WARNING) << "malformed rowset meta. key=" << hex(k) << "
val=" << hex(v);
continue;
}
@@ -561,7 +576,8 @@ int InstanceChecker::do_check() {
}
start_key.push_back('\x00'); // Update to next smallest key for
iteration
} while (it->more() && !stopped());
- return num_check_failed == 0 ? 0 : -2;
+
+ return num_rowset_loss > 0 ? 1 : check_ret;
}
int InstanceChecker::get_bucket_lifecycle(int64_t* lifecycle_days) {
@@ -599,15 +615,16 @@ int InstanceChecker::do_inverted_check() {
}
LOG(INFO) << "begin to inverted check objects instance_id=" <<
instance_id_;
+ int check_ret = 0;
long num_scanned = 0;
- long num_check_failed = 0;
+ long num_file_leak = 0;
using namespace std::chrono;
auto start_time = steady_clock::now();
std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
auto cost = duration<float>(steady_clock::now() - start_time).count();
LOG(INFO) << "inverted check instance objects finished, cost=" << cost
<< "s. instance_id=" << instance_id_ << " num_scanned=" <<
num_scanned
- << " num_check_failed=" << num_check_failed;
+ << " num_file_leak=" << num_file_leak;
});
struct TabletRowsets {
@@ -616,6 +633,7 @@ int InstanceChecker::do_inverted_check() {
};
TabletRowsets tablet_rowsets_cache;
+ // Return 0 if check success, return 1 if file is garbage data, negative
if error occurred
auto check_segment_file = [&](const std::string& obj_key) {
std::vector<std::string> str;
butil::SplitString(obj_key, '/', &str);
@@ -683,12 +701,13 @@ int InstanceChecker::do_inverted_check() {
}
}
} while (it->more() && !stopped());
- if (tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
- return 0;
- } else {
- LOG(WARNING) << "rowset not exists, key=" << obj_key;
- return -1;
+
+ if (!tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
+ // Garbage data leak
+ LOG(WARNING) << "rowset should be recycled, key=" << obj_key;
+ return 1;
}
+
return 0;
};
@@ -705,10 +724,15 @@ int InstanceChecker::do_inverted_check() {
for (auto file = list_iter->next(); file.has_value(); file =
list_iter->next()) {
++num_scanned;
- if (check_segment_file(file->path) != 0) {
+ int ret = check_segment_file(file->path);
+ if (ret != 0) {
LOG(WARNING) << "failed to check segment file, uri=" <<
accessor->uri()
<< " path=" << file->path;
- ++num_check_failed;
+ if (ret == 1) {
+ ++num_file_leak;
+ } else {
+ check_ret = -1;
+ }
}
}
@@ -717,7 +741,7 @@ int InstanceChecker::do_inverted_check() {
return -1;
}
}
- return num_check_failed == 0 ? 0 : -1;
+ return num_file_leak > 0 ? 1 : check_ret;
}
} // namespace doris::cloud
diff --git a/cloud/src/recycler/checker.h b/cloud/src/recycler/checker.h
index 4cd851d5218..03717a69b5e 100644
--- a/cloud/src/recycler/checker.h
+++ b/cloud/src/recycler/checker.h
@@ -77,13 +77,14 @@ public:
explicit InstanceChecker(std::shared_ptr<TxnKv> txn_kv, const std::string&
instance_id);
// Return 0 if success, otherwise error
int init(const InstanceInfoPB& instance);
- // Check whether the objects in the object store of the instance belong to
the visible rowsets.
- // This function is used to verify that there is no garbage data leakage,
should only be called in recycler test.
- // Return 0 if success, otherwise failed
+ // Return 0 if success.
+ // Return 1 if data leak is identified.
+ // Return negative if a temporary error occurred during the check process.
int do_inverted_check();
- // Return 0 if success, the definition of success is the absence of S3
access errors and data loss
- // Return -1 if encountering the situation that need to abort checker.
- // Return -2 if having S3 access errors or data loss
+
+ // Return 0 if success.
+ // Return 1 if data loss is identified.
+ // Return negative if a temporary error occurred during the check process.
int do_check();
// If there are multiple buckets, return the minimum lifecycle; if there
are no buckets (i.e.
// all accessors are HdfsAccessor), return INT64_MAX.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]