This is an automated email from the ASF dual-hosted git repository.

plat1ko pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 7131a3b39b3 [fix](checker) Clarified the semantics of the checker 
return value (#38785)
7131a3b39b3 is described below

commit 7131a3b39b32bcafad3b4a6c062d24dfdb7bd5d8
Author: plat1ko <[email protected]>
AuthorDate: Mon Aug 26 19:50:56 2024 +0800

    [fix](checker) Clarified the semantics of the checker return value (#38785)
---
 cloud/src/recycler/checker.cpp | 68 ++++++++++++++++++++++++++++--------------
 cloud/src/recycler/checker.h   | 13 ++++----
 2 files changed, 53 insertions(+), 28 deletions(-)

diff --git a/cloud/src/recycler/checker.cpp b/cloud/src/recycler/checker.cpp
index 49421f97ca0..f8289160269 100644
--- a/cloud/src/recycler/checker.cpp
+++ b/cloud/src/recycler/checker.cpp
@@ -167,10 +167,20 @@ int Checker::start() {
                     
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
             g_bvar_checker_enqueue_cost_s.put(instance_id, ctime_ms / 1000 - 
enqueue_time_s);
             ret = checker->do_check();
+
             if (config::enable_inverted_check) {
-                if (checker->do_inverted_check() != 0) ret = -1;
+                if (ret == 0) {
+                    ret = checker->do_inverted_check();
+                }
+            }
+
+            if (ret < 0) {
+                // If ret < 0, it means that a temporary error occurred during 
the check process.
+                // The check job should not be considered finished, and the 
next round of check job
+                // should be retried as soon as possible.
+                return;
             }
-            if (ret == -1) return;
+
             // If instance checker has been aborted, don't finish this job
             if (!checker->stopped()) {
                 finish_instance_recycle_job(txn_kv_.get(), check_job_key, 
instance.instance_id(),
@@ -444,9 +454,10 @@ int InstanceChecker::init_storage_vault_accessors(const 
InstanceInfoPB& instance
 int InstanceChecker::do_check() {
     TEST_SYNC_POINT("InstanceChecker.do_check");
     LOG(INFO) << "begin to check instance objects instance_id=" << 
instance_id_;
+    int check_ret = 0;
     long num_scanned = 0;
     long num_scanned_with_segment = 0;
-    long num_check_failed = 0;
+    long num_rowset_loss = 0;
     long instance_volume = 0;
     using namespace std::chrono;
     auto start_time = steady_clock::now();
@@ -455,11 +466,11 @@ int InstanceChecker::do_check() {
         LOG(INFO) << "check instance objects finished, cost=" << cost
                   << "s. instance_id=" << instance_id_ << " num_scanned=" << 
num_scanned
                   << " num_scanned_with_segment=" << num_scanned_with_segment
-                  << " num_check_failed=" << num_check_failed
+                  << " num_rowset_loss=" << num_rowset_loss
                   << " instance_volume=" << instance_volume;
         g_bvar_checker_num_scanned.put(instance_id_, num_scanned);
         g_bvar_checker_num_scanned_with_segment.put(instance_id_, 
num_scanned_with_segment);
-        g_bvar_checker_num_check_failed.put(instance_id_, num_check_failed);
+        g_bvar_checker_num_check_failed.put(instance_id_, num_rowset_loss);
         g_bvar_checker_check_cost_s.put(instance_id_, static_cast<long>(cost));
         // FIXME(plat1ko): What if some list operation failed?
         g_bvar_checker_instance_volume.put(instance_id_, instance_volume);
@@ -490,7 +501,7 @@ int InstanceChecker::do_check() {
                         .tag("resource_id", rs_meta.resource_id())
                         .tag("tablet_id", rs_meta.tablet_id())
                         .tag("rowset_id", rs_meta.rowset_id_v2());
-                ++num_check_failed;
+                check_ret = -1;
                 return;
             }
 
@@ -498,7 +509,7 @@ int InstanceChecker::do_check() {
             int ret = 
find_it->second->list_directory(tablet_path_prefix(rs_meta.tablet_id()),
                                                       &list_iter);
             if (ret != 0) { // No need to log, because S3Accessor has logged 
this error
-                ++num_check_failed;
+                check_ret = -1;
                 return;
             }
 
@@ -510,6 +521,7 @@ int InstanceChecker::do_check() {
             instance_volume += tablet_volume;
         }
 
+        bool data_loss = false;
         for (int i = 0; i < rs_meta.num_segments(); ++i) {
             auto path = segment_path(rs_meta.tablet_id(), 
rs_meta.rowset_id_v2(), i);
             if (tablet_files_cache.files.contains(path)) {
@@ -518,13 +530,16 @@ int InstanceChecker::do_check() {
 
             if (1 == key_exist(txn_kv_.get(), key)) {
                 // Rowset has been deleted instead of data loss
-                continue;
+                break;
             }
-
-            ++num_check_failed;
+            data_loss = true;
             TEST_SYNC_POINT_CALLBACK("InstanceChecker.do_check1", &path);
             LOG(WARNING) << "object not exist, path=" << path << " key=" << 
hex(key);
         }
+
+        if (data_loss) {
+            ++num_rowset_loss;
+        }
     };
 
     // scan visible rowsets
@@ -553,7 +568,7 @@ int InstanceChecker::do_check() {
 
             doris::RowsetMetaCloudPB rs_meta;
             if (!rs_meta.ParseFromArray(v.data(), v.size())) {
-                ++num_check_failed;
+                ++num_rowset_loss;
                 LOG(WARNING) << "malformed rowset meta. key=" << hex(k) << " 
val=" << hex(v);
                 continue;
             }
@@ -561,7 +576,8 @@ int InstanceChecker::do_check() {
         }
         start_key.push_back('\x00'); // Update to next smallest key for 
iteration
     } while (it->more() && !stopped());
-    return num_check_failed == 0 ? 0 : -2;
+
+    return num_rowset_loss > 0 ? 1 : check_ret;
 }
 
 int InstanceChecker::get_bucket_lifecycle(int64_t* lifecycle_days) {
@@ -599,15 +615,16 @@ int InstanceChecker::do_inverted_check() {
     }
 
     LOG(INFO) << "begin to inverted check objects instance_id=" << 
instance_id_;
+    int check_ret = 0;
     long num_scanned = 0;
-    long num_check_failed = 0;
+    long num_file_leak = 0;
     using namespace std::chrono;
     auto start_time = steady_clock::now();
     std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
         auto cost = duration<float>(steady_clock::now() - start_time).count();
         LOG(INFO) << "inverted check instance objects finished, cost=" << cost
                   << "s. instance_id=" << instance_id_ << " num_scanned=" << 
num_scanned
-                  << " num_check_failed=" << num_check_failed;
+                  << " num_file_leak=" << num_file_leak;
     });
 
     struct TabletRowsets {
@@ -616,6 +633,7 @@ int InstanceChecker::do_inverted_check() {
     };
     TabletRowsets tablet_rowsets_cache;
 
+    // Return 0 if check success, return 1 if file is garbage data, negative 
if error occurred
     auto check_segment_file = [&](const std::string& obj_key) {
         std::vector<std::string> str;
         butil::SplitString(obj_key, '/', &str);
@@ -683,12 +701,13 @@ int InstanceChecker::do_inverted_check() {
                 }
             }
         } while (it->more() && !stopped());
-        if (tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
-            return 0;
-        } else {
-            LOG(WARNING) << "rowset not exists, key=" << obj_key;
-            return -1;
+
+        if (!tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
+            // Garbage data leak
+            LOG(WARNING) << "rowset should be recycled, key=" << obj_key;
+            return 1;
         }
+
         return 0;
     };
 
@@ -705,10 +724,15 @@ int InstanceChecker::do_inverted_check() {
 
         for (auto file = list_iter->next(); file.has_value(); file = 
list_iter->next()) {
             ++num_scanned;
-            if (check_segment_file(file->path) != 0) {
+            int ret = check_segment_file(file->path);
+            if (ret != 0) {
                 LOG(WARNING) << "failed to check segment file, uri=" << 
accessor->uri()
                              << " path=" << file->path;
-                ++num_check_failed;
+                if (ret == 1) {
+                    ++num_file_leak;
+                } else {
+                    check_ret = -1;
+                }
             }
         }
 
@@ -717,7 +741,7 @@ int InstanceChecker::do_inverted_check() {
             return -1;
         }
     }
-    return num_check_failed == 0 ? 0 : -1;
+    return num_file_leak > 0 ? 1 : check_ret;
 }
 
 } // namespace doris::cloud
diff --git a/cloud/src/recycler/checker.h b/cloud/src/recycler/checker.h
index 4cd851d5218..03717a69b5e 100644
--- a/cloud/src/recycler/checker.h
+++ b/cloud/src/recycler/checker.h
@@ -77,13 +77,14 @@ public:
     explicit InstanceChecker(std::shared_ptr<TxnKv> txn_kv, const std::string& 
instance_id);
     // Return 0 if success, otherwise error
     int init(const InstanceInfoPB& instance);
-    // Check whether the objects in the object store of the instance belong to 
the visible rowsets.
-    // This function is used to verify that there is no garbage data leakage, 
should only be called in recycler test.
-    // Return 0 if success, otherwise failed
+    // Return 0 if success.
+    // Return 1 if data leak is identified.
+    // Return negative if a temporary error occurred during the check process.
     int do_inverted_check();
-    // Return 0 if success, the definition of success is the absence of S3 
access errors and data loss
-    // Return -1 if encountering the situation that need to abort checker.
-    // Return -2 if having S3 access errors or data loss
+
+    // Return 0 if success.
+    // Return 1 if data loss is identified.
+    // Return negative if a temporary error occurred during the check process.
     int do_check();
     // If there are multiple buckets, return the minimum lifecycle; if there 
are no buckets (i.e.
     // all accessors are HdfsAccessor), return INT64_MAX.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to