This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fe1e0355617 [feat](checker) Add table/partition version key 
consistency checking for checker (#54527)
fe1e0355617 is described below

commit fe1e03556174c9e74c3247f0946d710d7e778aec
Author: Uniqueyou <[email protected]>
AuthorDate: Sat Sep 13 12:26:24 2025 +0800

    [feat](checker) Add table/partition version key consistency checking for 
checker (#54527)
    
    1. meta checker: check if the **partition and table** of the version key
    **exist** in the fe meta
    
    2. checker: check whether the **version** of the version key is
    reasonable. The version of the table should be **greater than** the
    version of all its partitions.
    
    3. fix wrong checking for schema key
---
 cloud/src/common/config.h           |   3 +-
 cloud/src/recycler/checker.cpp      |  86 ++++++++
 cloud/src/recycler/checker.h        |   5 +
 cloud/src/recycler/meta_checker.cpp | 383 ++++++++++++++++++++++++------------
 cloud/src/recycler/meta_checker.h   |  83 ++++----
 cloud/test/recycler_test.cpp        |  95 ++++++++-
 6 files changed, 488 insertions(+), 167 deletions(-)

diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 9d8fd6234ca..23b0e9fa128 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -123,7 +123,8 @@ CONF_mBool(enable_restore_job_check, "false");
 CONF_mBool(enable_tablet_stats_key_check, "false");
 CONF_mBool(enable_txn_key_check, "false");
 
-CONF_mBool(enable_checker_for_meta_key_check, "false");
+CONF_mBool(enable_meta_key_check, "false");
+CONF_mBool(enable_version_key_check, "false");
 CONF_mInt64(mow_job_key_check_expiration_diff_seconds, "600"); // 10min
 
 CONF_String(test_s3_ak, "");
diff --git a/cloud/src/recycler/checker.cpp b/cloud/src/recycler/checker.cpp
index 254b0e4ef21..661e4ab05a8 100644
--- a/cloud/src/recycler/checker.cpp
+++ b/cloud/src/recycler/checker.cpp
@@ -228,6 +228,12 @@ int Checker::start() {
                 }
             }
 
+            if (config::enable_version_key_check) {
+                if (int ret = checker->do_version_key_check(); ret != 0) {
+                    success = false;
+                }
+            }
+
             // If instance checker has been aborted, don't finish this job
             if (!checker->stopped()) {
                 finish_instance_recycle_job(txn_kv_.get(), check_job_key, 
instance.instance_id(),
@@ -2055,6 +2061,85 @@ int InstanceChecker::scan_and_handle_kv(
     return ret;
 }
 
+int InstanceChecker::do_version_key_check() {
+    std::unique_ptr<RangeGetIterator> it;
+    std::string begin = table_version_key({instance_id_, 0, 0});
+    std::string end = table_version_key({instance_id_, INT64_MAX, 0});
+    bool check_res = true;
+    do {
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv_->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            LOG(WARNING) << "failed to create txn";
+            return -1;
+        }
+        err = txn->get(begin, end, &it);
+        if (err != TxnErrorCode::TXN_OK) {
+            LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
+            return -1;
+        }
+        while (it->has_next() && !stopped()) {
+            auto [k, v] = it->next();
+            std::string_view k1 = k;
+            k1.remove_prefix(1);
+            std::vector<std::tuple<std::variant<int64_t, std::string>, int, 
int>> out;
+            decode_key(&k1, &out);
+            int64_t table_version = -1;
+            // 0x01 "version" ${instance_id} "table" ${db_id} ${tbl_id}
+            if (!txn->decode_atomic_int(v, &table_version)) {
+                LOG(WARNING) << "malformed table version value";
+                return -1;
+            }
+            auto table_id = std::get<int64_t>(std::get<0>(out[4]));
+            auto db_id = std::get<int64_t>(std::get<0>(out[3]));
+            std::string partition_version_key_begin =
+                    partition_version_key({instance_id_, db_id, table_id, 0});
+            std::string partition_version_key_end =
+                    partition_version_key({instance_id_, db_id, table_id, 
INT64_MAX});
+            VersionPB partition_version_pb;
+
+            do {
+                std::unique_ptr<Transaction> txn;
+                TxnErrorCode err = txn_kv_->create_txn(&txn);
+                if (err != TxnErrorCode::TXN_OK) {
+                    LOG(WARNING) << "failed to create txn";
+                    return -1;
+                }
+                err = txn->get(partition_version_key_begin, 
partition_version_key_end, &it);
+                if (err != TxnErrorCode::TXN_OK) {
+                    LOG(WARNING) << "failed to get mow tablet job key, err=" 
<< err;
+                    return -1;
+                }
+                while (it->has_next() && !stopped()) {
+                    auto [k, v] = it->next();
+                    // 0x01 "version" ${instance_id} "partition" ${db_id} 
${tbl_id} ${partition_id}
+                    std::string_view k1 = k;
+                    k1.remove_prefix(1);
+                    std::vector<std::tuple<std::variant<int64_t, std::string>, 
int, int>> out;
+                    decode_key(&k1, &out);
+                    if (!partition_version_pb.ParseFromArray(v.data(), 
v.size())) [[unlikely]] {
+                        LOG(WARNING) << "failed to parse partition VersionPB";
+                        return -1;
+                    }
+                    auto partition_id = std::get<int64_t>(std::get<0>(out[5]));
+                    int64_t partition_version = partition_version_pb.version();
+                    if (table_version < partition_version) {
+                        check_res = false;
+                        LOG(WARNING)
+                                << "table version is less than partition 
version,"
+                                << " table_id: " << table_id << 
"tablet_version: " << table_version
+                                << " partition_id: " << partition_id
+                                << " partition_version: " << partition_version;
+                    }
+                }
+                partition_version_key_begin = it->next_begin_key();
+            } while (it->more() && !stopped());
+        }
+        begin = it->next_begin_key(); // Update to next smallest key for 
iteration
+    } while (it->more() && !stopped());
+    return check_res ? 0 : -1;
+}
+
 int InstanceChecker::do_restore_job_check() {
     int64_t num_prepared = 0;
     int64_t num_committed = 0;
@@ -2103,6 +2188,7 @@ int InstanceChecker::do_restore_job_check() {
             LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
             return -1;
         }
+
         if (!it->has_next()) {
             break;
         }
diff --git a/cloud/src/recycler/checker.h b/cloud/src/recycler/checker.h
index 5993aa0b5de..d97a6d4d55d 100644
--- a/cloud/src/recycler/checker.h
+++ b/cloud/src/recycler/checker.h
@@ -113,6 +113,11 @@ public:
 
     int do_txn_key_check();
 
+    // check table and partition version key
+    // table version should be greater than the versions of all its partitions
+    // Return 0 if success, otherwise error
+    int do_version_key_check();
+
     // If there are multiple buckets, return the minimum lifecycle; if there 
are no buckets (i.e.
     // all accessors are HdfsAccessor), return INT64_MAX.
     // Return 0 if success, otherwise error
diff --git a/cloud/src/recycler/meta_checker.cpp 
b/cloud/src/recycler/meta_checker.cpp
index 78d65fe0eea..2a478b68b79 100644
--- a/cloud/src/recycler/meta_checker.cpp
+++ b/cloud/src/recycler/meta_checker.cpp
@@ -122,6 +122,7 @@ bool MetaChecker::do_meta_tablet_key_check(MYSQL* conn) {
             LOG(WARNING) << "tablet_idx.db_id not found in fe meta, db_id = "
                          << tablet_index_meta.db_id()
                          << "tablet index meta: " << 
tablet_index_meta.DebugString();
+            check_res = false;
             continue;
         }
         std::string db_name = db_meta_.at(tablet_index_meta.db_id());
@@ -194,6 +195,7 @@ bool MetaChecker::do_meta_tablet_key_index_check(MYSQL* 
conn) {
     for (const TabletIndexPB& tablet_idx : tablet_indexes) {
         if (!db_meta_.contains(tablet_idx.db_id())) {
             LOG(WARNING) << "tablet_idx.db_id not found in fe meta, db_id = " 
<< tablet_idx.db_id();
+            check_res = false;
             continue;
         }
         std::string sql_stmt = "show tablet " + 
std::to_string(tablet_idx.tablet_id());
@@ -298,6 +300,7 @@ bool MetaChecker::do_meta_schema_key_check(MYSQL* conn) {
             LOG(WARNING) << "tablet_idx.db_id not found in fe meta, db_id = "
                          << tablet_index_meta.db_id()
                          << "tablet index meta: " << 
tablet_index_meta.DebugString();
+            check_res = false;
             continue;
         }
         std::string db_name = db_meta_.at(tablet_index_meta.db_id());
@@ -330,9 +333,8 @@ bool MetaChecker::do_meta_schema_key_check(MYSQL* conn) {
         }
 
         MYSQL_RES* result;
-        std::string sql_stmt =
-                fmt::format("SHOW PROC '/dbs/{}/{}/index_schema/{}'", 
tablet_index_meta.db_id(),
-                            tablet_meta.table_id(), tablet_meta.index_id());
+        std::string sql_stmt = fmt::format("SHOW PROC 
'/dbs/{}/{}/index_schema'",
+                                           tablet_index_meta.db_id(), 
tablet_meta.table_id());
         mysql_query(conn, sql_stmt.c_str());
 
         result = mysql_store_result(conn);
@@ -356,6 +358,188 @@ bool MetaChecker::do_meta_schema_key_check(MYSQL* conn) {
     return check_res;
 }
 
+bool MetaChecker::do_version_partition_key_check(MYSQL* conn) {
+    std::vector<PartitionInfo> partitions_info;
+    bool check_res = true;
+
+    // scan and collect tablet_idx
+    std::string start_key;
+    std::string end_key;
+    partition_version_key({instance_id_, 0, 0, 0}, &start_key);
+    partition_version_key({instance_id_, INT64_MAX, 0, 0}, &end_key);
+    scan_and_handle_kv(
+            start_key, end_key,
+            [&partitions_info](std::string_view key, std::string_view value) 
-> int {
+                VersionPB partition_version;
+                if (!partition_version.ParseFromArray(value.data(), 
value.size())) {
+                    LOG(WARNING) << "malformed tablet index value";
+                    return -1;
+                }
+                auto k1 = key;
+                k1.remove_prefix(1);
+                // 0x01 "version" ${instance_id} "partition" ${db_id} 
${tbl_id} ${partition_id}
+                std::vector<std::tuple<std::variant<int64_t, std::string>, 
int, int>> out;
+                decode_key(&k1, &out);
+                DCHECK_EQ(out.size(), 6) << key;
+                auto db_id = std::get<int64_t>(std::get<0>(out[3]));
+                auto table_id = std::get<int64_t>(std::get<0>(out[4]));
+                auto partition_id = std::get<int64_t>(std::get<0>(out[5]));
+                partitions_info.emplace_back(PartitionInfo {
+                        .db_id = db_id, .table_id = table_id, .partition_id = 
partition_id});
+                return 0;
+            });
+
+    for (const auto& partition_info : partitions_info) {
+        if (!db_meta_.contains(partition_info.db_id)) {
+            LOG(WARNING) << "partition_info.db_id not found in fe meta, db_id 
= "
+                         << partition_info.db_id
+                         << "partition_info meta: " << 
partition_info.debug_string();
+            check_res = false;
+            continue;
+        }
+        std::string db_name = db_meta_.at(partition_info.db_id);
+        if (db_name == "__internal_schema" || db_name == "information_schema" 
||
+            db_name == "mysql") {
+            continue;
+        }
+
+        if (mysql_select_db(conn, db_name.c_str())) {
+            LOG(WARNING) << "mysql select db error, db_name: " << db_name
+                         << " error: " << mysql_error(conn);
+            continue;
+        }
+        MYSQL_RES* result;
+        std::string sql_stmt = fmt::format("show partition {}", 
partition_info.partition_id);
+        mysql_query(conn, sql_stmt.c_str());
+
+        result = mysql_store_result(conn);
+        if (result) {
+            MYSQL_ROW row = mysql_fetch_row(result);
+            if (partition_info.table_id != atoll(row[4])) {
+                LOG(WARNING) << "check failed, fdb meta: " << 
partition_info.debug_string()
+                             << " fe partition of table_id: " << atoll(row[4]);
+                check_res = false;
+            } else if (partition_info.db_id != atoll(row[3])) {
+                LOG(WARNING) << "check failed, fdb meta: " << 
partition_info.debug_string()
+                             << " fe partition of db_id: " << atoll(row[3]);
+                check_res = false;
+            }
+            mysql_free_result(result);
+        } else {
+            LOG(WARNING) << "check failed, fdb meta: " << 
partition_info.debug_string()
+                         << " fe partition not found";
+            check_res = false;
+        }
+        stat_info_.check_fe_partition_version_num++;
+    }
+
+    return check_res;
+}
+
+bool MetaChecker::do_version_table_key_check(MYSQL* conn) {
+    std::vector<TableInfo> tables_info;
+    bool check_res = true;
+
+    // table id -> version
+    std::unordered_map<int64_t, int64_t> fe_tables_info;
+    std::string start_key;
+    std::string end_key;
+    table_version_key({instance_id_, 0, 0}, &start_key);
+    table_version_key({instance_id_, INT64_MAX, 0}, &end_key);
+
+    // collect table version from fdb
+    scan_and_handle_kv(
+            start_key, end_key,
+            [&tables_info, this](std::string_view key, std::string_view value) 
-> int {
+                int64_t version = 0;
+                std::unique_ptr<Transaction> txn;
+                TxnErrorCode err = txn_kv_->create_txn(&txn);
+                if (err != TxnErrorCode::TXN_OK) {
+                    LOG(WARNING) << "failed to create txn";
+                    return -1;
+                }
+                if (!txn->decode_atomic_int(value, &version)) {
+                    LOG(WARNING) << "malformed table version value";
+                    return -1;
+                }
+                auto k1 = key;
+                k1.remove_prefix(1);
+                // 0x01 "version" ${instance_id} "table" ${db_id} ${tbl_id} -> 
${version}
+                std::vector<std::tuple<std::variant<int64_t, std::string>, 
int, int>> out;
+                decode_key(&k1, &out);
+                DCHECK_EQ(out.size(), 5) << key;
+                auto db_id = std::get<int64_t>(std::get<0>(out[3]));
+                auto table_id = std::get<int64_t>(std::get<0>(out[4]));
+                tables_info.emplace_back(TableInfo {.db_id = db_id, .table_id 
= table_id});
+                return 0;
+            });
+
+    // collect table version from fe meta
+    for (const auto& table_info : tables_info) {
+        if (!db_meta_.contains(table_info.db_id)) {
+            LOG(WARNING) << "table_info.db_id not found in fe meta, db_id = " 
<< table_info.db_id
+                         << "table_info meta: " << table_info.debug_string();
+            check_res = false;
+            continue;
+        }
+        std::string db_name = db_meta_.at(table_info.db_id);
+        if (db_name == "__internal_schema" || db_name == "information_schema" 
||
+            db_name == "mysql") {
+            continue;
+        }
+
+        if (mysql_select_db(conn, db_name.c_str())) {
+            LOG(WARNING) << "mysql select db error, db_name: " << db_name
+                         << " error: " << mysql_error(conn);
+            continue;
+        }
+
+        MYSQL_RES* result;
+        std::string sql_stmt = fmt::format("show table {}", 
table_info.table_id);
+        mysql_query(conn, sql_stmt.c_str());
+
+        result = mysql_store_result(conn);
+        if (result) {
+            MYSQL_ROW row = mysql_fetch_row(result);
+            int64_t db_id = atoll(row[2]);
+            if (table_info.db_id != db_id) {
+                LOG(WARNING) << "check failed, fdb meta: " << 
table_info.debug_string()
+                             << " fe table of db_id: " << atoll(row[2]);
+                check_res = false;
+            }
+        } else {
+            LOG(WARNING) << "check failed, fdb meta: " << 
table_info.debug_string()
+                         << " fe db not found";
+            check_res = false;
+        }
+        stat_info_.check_fe_table_version_num++;
+    }
+
+    return check_res;
+}
+
+template <>
+bool MetaChecker::handle_check_fe_meta_by_fdb<CHECK_VERSION>(MYSQL* conn) {
+    bool check_res = true;
+
+    // check PartitionVersionKey
+    if (!do_version_partition_key_check(conn)) {
+        check_res = false;
+        LOG(WARNING) << "do_version_partition_key_check failed";
+    } else {
+        LOG(INFO) << "do_version_partition_key_check success";
+    }
+
+    // check TableVersionKey
+    if (!do_version_table_key_check(conn)) {
+        check_res = false;
+        LOG(WARNING) << "do_version_table_key_check failed";
+    } else {
+        LOG(INFO) << "do_version_table_key_check success";
+    }
+    return check_res;
+}
+
 template <>
 bool MetaChecker::handle_check_fe_meta_by_fdb<CHECK_META>(MYSQL* conn) {
     bool check_res = true;
@@ -387,19 +571,17 @@ bool 
MetaChecker::handle_check_fe_meta_by_fdb<CHECK_META>(MYSQL* conn) {
 
 bool MetaChecker::check_fe_meta_by_fdb(MYSQL* conn) {
     bool success = true;
-    if (config::enable_checker_for_meta_key_check) {
+    if (config::enable_meta_key_check) {
         success = handle_check_fe_meta_by_fdb<CHECK_META>(conn);
     }
 
-    // TODO(wyxxxcat) add check for version key
-    // if (config::enable_checker_for_version_key_check) {
-    //     success = handle_check_fe_meta_by_fdb<CHECK_VERSION>(conn);
-    // }
+    if (config::enable_version_key_check) {
+        success = handle_check_fe_meta_by_fdb<CHECK_VERSION>(conn);
+    }
     return success;
 }
 
-bool MetaChecker::do_meta_tablet_index_key_inverted_check(MYSQL* conn,
-                                                          const 
std::vector<TabletInfo>& tablets) {
+bool MetaChecker::do_meta_tablet_index_key_inverted_check(MYSQL* conn) {
     bool check_res = true;
     // check tablet idx
     for (const auto& tablet_info : tablets) {
@@ -471,8 +653,7 @@ bool 
MetaChecker::do_meta_tablet_index_key_inverted_check(MYSQL* conn,
     return check_res;
 }
 
-bool MetaChecker::do_meta_tablet_key_inverted_check(MYSQL* conn, 
std::vector<TabletInfo>& tablets,
-                                                    std::map<int64_t, 
PartitionInfo>& partitions) {
+bool MetaChecker::do_meta_tablet_key_inverted_check(MYSQL* conn) {
     bool check_res = true;
     // check tablet meta
     for (const auto& tablet_info : tablets) {
@@ -500,64 +681,10 @@ bool 
MetaChecker::do_meta_tablet_key_inverted_check(MYSQL* conn, std::vector<Tab
         stat_info_.check_fdb_tablet_meta_num++;
     }
 
-    // TODO(wyxxxcat):
-    // separate from this function to check partition version function
-    // for (const auto& elem : partitions) {
-    //     std::unique_ptr<Transaction> txn;
-    //     TxnErrorCode err = txn_kv_->create_txn(&txn);
-    //     if (err != TxnErrorCode::TXN_OK) {
-    //         LOG(WARNING) << "failed to init txn";
-    //         continue;
-    //     }
-    //     if (elem.second.visible_version == 0 || elem.second.visible_version 
== 1) {
-    //         continue;
-    //     }
-
-    //     int64_t db_id = elem.second.db_id;
-    //     int64_t table_id = elem.second.table_id;
-    //     int64_t partition_id = elem.second.partition_id;
-    //     int64_t tablet_id = elem.second.tablet_id;
-    //     std::string ver_key = partition_version_key({instance_id_, db_id, 
table_id, partition_id});
-    //     std::string ver_val;
-    //     err = txn->get(ver_key, &ver_val);
-    //     if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-    //         LOG_WARNING("version key not found.")
-    //                 .tag("db id", db_id)
-    //                 .tag("table id", table_id)
-    //                 .tag("partition id", partition_id)
-    //                 .tag("tablet id", tablet_id);
-    //         check_res = false;
-    //         continue;
-    //     } else if (err != TxnErrorCode::TXN_OK) {
-    //         LOG_WARNING("failed to get version.")
-    //                 .tag("db id", db_id)
-    //                 .tag("table id", table_id)
-    //                 .tag("partition id", partition_id)
-    //                 .tag("tablet id", tablet_id);
-    //         check_res = false;
-    //         continue;
-    //     }
-
-    //     VersionPB version_pb;
-    //     if (!version_pb.ParseFromString(ver_val)) {
-    //         LOG(WARNING) << "malformed version value";
-    //         check_res = false;
-    //         continue;
-    //     }
-
-    //     if (version_pb.version() != elem.second.visible_version) {
-    //         LOG(WARNING) << "partition version check failed, FE partition 
version"
-    //                      << elem.second.visible_version << " ms version: " 
<< version_pb.version();
-    //         check_res = false;
-    //         continue;
-    //     }
-    //     stat_info_.check_fdb_partition_version_num++;
-    // }
     return check_res;
 }
 
-bool MetaChecker::do_meta_schema_key_inverted_check(MYSQL* conn, 
std::vector<TabletInfo>& tablets,
-                                                    std::map<int64_t, 
PartitionInfo>& partitions) {
+bool MetaChecker::do_meta_schema_key_inverted_check(MYSQL* conn) {
     bool check_res = true;
 
     for (const auto& tablet_info : tablets) {
@@ -589,14 +716,9 @@ bool MetaChecker::do_meta_schema_key_inverted_check(MYSQL* 
conn, std::vector<Tab
 
 template <>
 bool MetaChecker::handle_check_fdb_by_fe_meta<CHECK_META>(MYSQL* conn) {
-    std::vector<TabletInfo> tablets;
-    std::map<int64_t, PartitionInfo> partitions;
-
-    init_tablet_info_from_fe_meta(conn, tablets, partitions);
-
     bool check_res = true;
     // check MetaTabletIdxKey
-    if (!do_meta_tablet_index_key_inverted_check(conn, tablets)) {
+    if (!do_meta_tablet_index_key_inverted_check(conn)) {
         check_res = false;
         LOG(WARNING) << "do_meta_tablet_index_key_inverted_check failed";
     } else {
@@ -604,7 +726,7 @@ bool 
MetaChecker::handle_check_fdb_by_fe_meta<CHECK_META>(MYSQL* conn) {
     }
 
     // check MetaTabletKey
-    if (!do_meta_tablet_key_inverted_check(conn, tablets, partitions)) {
+    if (!do_meta_tablet_key_inverted_check(conn)) {
         check_res = false;
         LOG(WARNING) << "do_meta_tablet_key_inverted_check failed";
     } else {
@@ -612,7 +734,7 @@ bool 
MetaChecker::handle_check_fdb_by_fe_meta<CHECK_META>(MYSQL* conn) {
     }
 
     // check MetaSchemaKey
-    if (!do_meta_schema_key_inverted_check(conn, tablets, partitions)) {
+    if (!do_meta_schema_key_inverted_check(conn)) {
         check_res = false;
         LOG(WARNING) << "do_meta_schema_key_inverted_check failed";
     } else {
@@ -624,19 +746,15 @@ bool 
MetaChecker::handle_check_fdb_by_fe_meta<CHECK_META>(MYSQL* conn) {
 
 bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
     bool success = true;
-    if (config::enable_checker_for_meta_key_check) {
+    if (config::enable_meta_key_check) {
         success = handle_check_fdb_by_fe_meta<CHECK_META>(conn);
     }
 
-    // TODO(wyxxxcat) add check for version key
-    // if (config::enable_checker_for_version_key_check) {
-    //     success = handle_check_fdb_by_fe_meta<CHECK_VERSION>(conn);
-    // }
-
     LOG(INFO) << "check_fdb_table_idx_num: " << 
stat_info_.check_fdb_tablet_idx_num
               << " check_fdb_table_meta_num: " << 
stat_info_.check_fdb_tablet_meta_num
               << " check_fdb_tablet_schema_num: " << 
stat_info_.check_fdb_tablet_schema_num
-              << " check_fdb_partition_version_num: " << 
stat_info_.check_fdb_partition_version_num;
+              << " check_fe_table_version_num: " << 
stat_info_.check_fe_table_version_num
+              << " check_fe_partition_version_num: " << 
stat_info_.check_fe_partition_version_num;
     return success;
 }
 
@@ -687,6 +805,7 @@ void MetaChecker::do_check(const std::string& host, const 
std::string& port,
     bool ret = false;
     do {
         init_db_meta(&conn);
+        init_tablet_and_partition_info_from_fe_meta(&conn);
         ret = check_fe_meta_by_fdb(&conn);
         if (!ret) {
             std::this_thread::sleep_for(seconds(10));
@@ -698,18 +817,23 @@ void MetaChecker::do_check(const std::string& host, const 
std::string& port,
         LOG(WARNING) << "check_fe_meta_by_fdb failed, there may be data leak";
         msg = "meta leak err";
     }
-    now = 
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
+
     LOG(INFO) << "check_fe_meta_by_fdb finish, cost(second): " << now - start;
 
+    start = 
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
     LOG(INFO) << "check_fdb_by_fe_meta begin";
-    init_db_meta(&conn);
-    ret = check_fdb_by_fe_meta(&conn);
-    if (!ret) {
-        LOG(WARNING) << "check_fdb_by_fe_meta failed, there may be data loss";
-        msg = "meta loss err";
-        return;
-    }
-    now = 
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
+    do {
+        init_db_meta(&conn);
+        init_tablet_and_partition_info_from_fe_meta(&conn);
+        ret = check_fdb_by_fe_meta(&conn);
+        if (!ret) {
+            LOG(WARNING) << "check_fdb_by_fe_meta failed, there may be data 
loss";
+            msg = "meta loss err";
+            return;
+        }
+        now = 
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
+    } while (now - start <= 180 && !ret);
+
     LOG(INFO) << "check_fdb_by_fe_meta finish, cost(second): " << now - start;
 
     mysql_close(&conn);
@@ -717,8 +841,7 @@ void MetaChecker::do_check(const std::string& host, const 
std::string& port,
     LOG(INFO) << "meta check finish";
 }
 
-void MetaChecker::init_tablet_info_from_fe_meta(MYSQL* conn, 
std::vector<TabletInfo>& tablets,
-                                                std::map<int64_t, 
PartitionInfo>& partitions) {
+void MetaChecker::init_tablet_and_partition_info_from_fe_meta(MYSQL* conn) {
     // init tablet info, partition info
     std::map<std::string, std::vector<std::string>> db_to_tables;
     std::string sql_stmt = "show databases";
@@ -731,7 +854,7 @@ void MetaChecker::init_tablet_info_from_fe_meta(MYSQL* 
conn, std::vector<TabletI
         for (int i = 0; i < num_row; ++i) {
             MYSQL_ROW row = mysql_fetch_row(result);
             if (strcmp(row[0], "__internal_schema") == 0 ||
-                strcmp(row[0], "information_schema") == 0 || strcmp(row[0], 
"mysql")) {
+                strcmp(row[0], "information_schema") == 0 || strcmp(row[0], 
"mysql") == 0) {
                 continue;
             }
             db_to_tables.insert({row[0], std::vector<std::string>()});
@@ -769,7 +892,6 @@ void MetaChecker::init_tablet_info_from_fe_meta(MYSQL* 
conn, std::vector<TabletI
                     VLOG_DEBUG << "get tablet info log"
                                << ", db name" << elem.first << ", table name" 
<< table
                                << ",tablet id" << tablet_info.tablet_id;
-                    tablet_info.schema_version = atoll(row[4]);
                     tablets.push_back(tablet_info);
                 }
                 mysql_free_result(result);
@@ -777,9 +899,36 @@ void MetaChecker::init_tablet_info_from_fe_meta(MYSQL* 
conn, std::vector<TabletI
         }
     }
 
+    // get table info from FE
+    for (const auto& [db_id, db_name] : db_meta_) {
+        MYSQL_RES* result;
+        if (db_name == "__internal_schema" || db_name == "information_schema" 
||
+            db_name == "mysql") {
+            continue;
+        }
+        std::string sql_stmt = fmt::format("SHOW PROC '/dbs/{}'", db_id);
+        mysql_query(conn, sql_stmt.c_str());
+
+        result = mysql_store_result(conn);
+        if (result) {
+            int num_row = mysql_num_rows(result);
+            for (int i = 0; i < num_row; ++i) {
+                MYSQL_ROW row = mysql_fetch_row(result);
+                int64_t table_id = atoll(row[0]);
+                tables.emplace_back(TableInfo {.db_id = db_id, .table_id = 
table_id});
+            }
+        }
+    }
+
     // get tablet info from FE
     // get Partition info from FE
     for (auto& tablet_info : tablets) {
+        std::string db_name = db_meta_.begin()->second;
+        if (mysql_select_db(conn, db_name.c_str())) {
+            LOG(WARNING) << "mysql select db error, db_name: " << db_name
+                         << " error: " << mysql_error(conn);
+            continue;
+        }
         std::string sql_stmt = "show tablet " + 
std::to_string(tablet_info.tablet_id);
         mysql_query(conn, sql_stmt.c_str());
         result = mysql_store_result(conn);
@@ -792,6 +941,24 @@ void MetaChecker::init_tablet_info_from_fe_meta(MYSQL* 
conn, std::vector<TabletI
                 tablet_info.partition_id = atoll(row[6]);
                 tablet_info.index_id = atoll(row[7]);
 
+                int schema_version = -1;
+                {
+                    MYSQL_RES* result;
+                    std::string sql_stmt = fmt::format("SHOW PROC 
'/dbs/{}/{}/index_schema'",
+                                                       tablet_info.db_id, 
tablet_info.table_id);
+                    mysql_query(conn, sql_stmt.c_str());
+
+                    result = mysql_store_result(conn);
+                    if (!result) {
+                        continue;
+                    }
+                    MYSQL_ROW row = mysql_fetch_row(result);
+                    schema_version = atoll(row[2]);
+                    mysql_free_result(result);
+                }
+
+                tablet_info.schema_version = schema_version;
+
                 PartitionInfo partition_info;
                 partition_info.db_id = atoll(row[4]);
                 partition_info.table_id = atoll(row[5]);
@@ -808,32 +975,6 @@ void MetaChecker::init_tablet_info_from_fe_meta(MYSQL* 
conn, std::vector<TabletI
             mysql_free_result(result);
         }
     }
-
-    // get partition version from FE
-    for (const auto& elem : db_to_tables) {
-        for (const std::string& table : elem.second) {
-            std::string sql_stmt = "show partitions from " + elem.first + "." 
+ table;
-            mysql_query(conn, sql_stmt.c_str());
-            result = mysql_store_result(conn);
-            if (result) {
-                int num_row = mysql_num_rows(result);
-                for (int i = 0; i < num_row; ++i) {
-                    MYSQL_ROW row = mysql_fetch_row(result);
-                    int64_t partition_id = atoll(row[0]);
-                    int64_t visible_version = atoll(row[2]);
-                    partitions[partition_id].visible_version = visible_version;
-                    VLOG_DEBUG << "get partition version log"
-                               << ", db name" << elem.first << ", table name" 
<< table
-                               << ", raw partition id" << row[0] << ", first 
partition id"
-                               << partition_id << ", db id" << 
partitions[partition_id].db_id
-                               << ", table id" << 
partitions[partition_id].table_id
-                               << ", second partition id" << 
partitions[partition_id].partition_id
-                               << ", tablet id" << 
partitions[partition_id].tablet_id;
-                }
-                mysql_free_result(result);
-            }
-        }
-    }
 }
 
 } // namespace doris::cloud
diff --git a/cloud/src/recycler/meta_checker.h 
b/cloud/src/recycler/meta_checker.h
index 4f16cdab7c7..40779b9886a 100644
--- a/cloud/src/recycler/meta_checker.h
+++ b/cloud/src/recycler/meta_checker.h
@@ -38,19 +38,23 @@ struct StatInfo {
     int64_t check_fe_tablet_num = 0;
     int64_t check_fe_partition_num = 0;
     int64_t check_fe_tablet_schema_num = 0;
+    int64_t check_fe_table_version_num = 0;
+    int64_t check_fe_partition_version_num = 0;
     // fdb
     int64_t check_fdb_tablet_idx_num = 0;
     int64_t check_fdb_tablet_meta_num = 0;
     int64_t check_fdb_tablet_schema_num = 0;
-    int64_t check_fdb_partition_version_num = 0;
 };
 
-enum CHECK_TYPE {
-    CHECK_TXN,
-    CHECK_VERSION,
-    CHECK_META,
-    CHECK_STATS,
-    CHECK_JOB,
+enum CHECK_TYPE { CHECK_VERSION, CHECK_META };
+
+struct TableInfo {
+    int64_t db_id;
+    int64_t table_id;
+
+    std::string debug_string() const {
+        return "db id: " + std::to_string(db_id) + " table id: " + 
std::to_string(table_id);
+    }
 };
 
 struct TabletInfo {
@@ -75,7 +79,14 @@ struct PartitionInfo {
     int64_t table_id;
     int64_t partition_id;
     int64_t tablet_id;
-    int64_t visible_version;
+
+    // clang-format off
+    std::string debug_string() const {
+        return "db id: " + std::to_string(db_id) + 
+               " table id: " + std::to_string(table_id) +
+               " partition id: " + std::to_string(partition_id);
+    }
+    // clang-format on
 };
 
 class MetaChecker {
@@ -93,27 +104,32 @@ public:
     bool handle_check_fdb_by_fe_meta(MYSQL* conn);
 
 private:
-    void init_tablet_info_from_fe_meta(MYSQL* conn, std::vector<TabletInfo>& 
tablets,
-                                       std::map<int64_t, PartitionInfo>& 
partitions);
+    // init this->tablets, this->partitions, this->tables
+    void init_tablet_and_partition_info_from_fe_meta(MYSQL* conn);
 
     bool scan_and_handle_kv(std::string& start_key, const std::string& end_key,
                             std::function<int(std::string_view, 
std::string_view)>);
 
+    // forward check meta key
     bool do_meta_tablet_key_index_check(MYSQL* conn);
 
     bool do_meta_tablet_key_check(MYSQL* conn);
 
     bool do_meta_schema_key_check(MYSQL* conn);
 
-    bool do_meta_tablet_index_key_inverted_check(MYSQL* conn,
-                                                 const 
std::vector<TabletInfo>& tablets);
+    // forward check version key
+    bool do_version_partition_key_check(MYSQL* conn);
+
+    bool do_version_table_key_check(MYSQL* conn);
 
-    bool do_meta_tablet_key_inverted_check(MYSQL* conn, 
std::vector<TabletInfo>& tablets,
-                                           std::map<int64_t, PartitionInfo>& 
partitions);
+    // inverted check meta key
+    bool do_meta_tablet_index_key_inverted_check(MYSQL* conn);
 
-    bool do_meta_schema_key_inverted_check(MYSQL* conn, 
std::vector<TabletInfo>& tablets,
-                                           std::map<int64_t, PartitionInfo>& 
partitions);
+    bool do_meta_tablet_key_inverted_check(MYSQL* conn);
 
+    bool do_meta_schema_key_inverted_check(MYSQL* conn);
+
+    // init this->db_meta_
     void init_db_meta(MYSQL* conn);
 
 private:
@@ -122,42 +138,23 @@ private:
     std::string instance_id_;
     // db_id -> db_name
     std::unordered_map<int64_t, std::string> db_meta_;
+    // tablet info
+    std::vector<TabletInfo> tablets;
+    // table info
+    std::vector<TableInfo> tables;
+    // partition_id -> partition_info
+    std::map<int64_t, PartitionInfo> partitions;
 };
 
 // not implemented yet
 template <>
-bool MetaChecker::handle_check_fe_meta_by_fdb<CHECK_STATS>(MYSQL* conn) = 
delete;
-
-// not implemented yet
-template <>
-bool MetaChecker::handle_check_fe_meta_by_fdb<CHECK_TXN>(MYSQL* conn) = delete;
-
-// not implemented yet
-template <>
-bool MetaChecker::handle_check_fe_meta_by_fdb<CHECK_VERSION>(MYSQL* conn) = 
delete;
-
-// not implemented yet
-template <>
-bool MetaChecker::handle_check_fe_meta_by_fdb<CHECK_JOB>(MYSQL* conn) = delete;
+bool MetaChecker::handle_check_fe_meta_by_fdb<CHECK_VERSION>(MYSQL* conn);
 
 template <>
 bool MetaChecker::handle_check_fe_meta_by_fdb<CHECK_META>(MYSQL* conn);
 
-// not implemented yet
-template <>
-bool MetaChecker::handle_check_fdb_by_fe_meta<CHECK_STATS>(MYSQL* conn) = 
delete;
-
-// not implemented yet
-template <>
-bool MetaChecker::handle_check_fdb_by_fe_meta<CHECK_TXN>(MYSQL* conn) = delete;
-
-// not implemented yet
-template <>
-bool MetaChecker::handle_check_fdb_by_fe_meta<CHECK_VERSION>(MYSQL* conn) = 
delete;
-
-// not implemented yet
 template <>
-bool MetaChecker::handle_check_fdb_by_fe_meta<CHECK_JOB>(MYSQL* conn) = delete;
+bool MetaChecker::handle_check_fdb_by_fe_meta<CHECK_VERSION>(MYSQL* conn);
 
 template <>
 bool MetaChecker::handle_check_fdb_by_fe_meta<CHECK_META>(MYSQL* conn);
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 1e7a1b918e2..906261d8180 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -878,10 +878,10 @@ static int create_delete_bitmap_update_lock_kv(TxnKv* 
txn_kv, int64_t table_id,
     return 0;
 }
 
-static int create_table_version_kv(TxnKv* txn_kv, int64_t table_id) {
+static int create_table_version_kv(TxnKv* txn_kv, int64_t table_id, int64_t 
version = 1) {
     auto key = table_version_key({instance_id, db_id, table_id});
     std::string val(sizeof(int64_t), 0);
-    *reinterpret_cast<int64_t*>(val.data()) = (int64_t)1;
+    *reinterpret_cast<int64_t*>(val.data()) = version;
     std::unique_ptr<Transaction> txn;
     if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) {
         return -1;
@@ -5113,6 +5113,97 @@ TEST(CheckerTest, tablet_stats_key_check_normal) {
     ASSERT_EQ(checker.do_tablet_stats_key_check(), 0);
 }
 
+TEST(CheckerTest, version_key_check_normal) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+
+    auto* obj_info = instance.add_obj_info();
+    obj_info->set_id("version_key_check_normal");
+    obj_info->set_ak(config::test_s3_ak);
+    obj_info->set_sk(config::test_s3_sk);
+    obj_info->set_endpoint(config::test_s3_endpoint);
+    obj_info->set_region(config::test_s3_region);
+    obj_info->set_bucket(config::test_s3_bucket);
+    obj_info->set_prefix("version_key_check_normal");
+
+    InstanceChecker checker(txn_kv, instance_id);
+    ASSERT_EQ(checker.init(instance), 0);
+    auto accessor = checker.accessor_map_.begin()->second;
+
+    int64_t table_id = 998;
+    size_t part_num = 5;
+    size_t table_version = 20;
+    size_t part_version = 10;
+
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
+
+    create_table_version_kv(txn_kv.get(), table_id, table_version);
+    for (size_t partition_id = 0; partition_id < part_num; partition_id++) {
+        std::string part_ver_key =
+                partition_version_key({instance_id, db_id, table_id, 
partition_id});
+        std::string part_ver_val;
+        VersionPB version_pb;
+        version_pb.set_version(part_version);
+        version_pb.SerializeToString(&part_ver_val);
+        txn->put(part_ver_key, part_ver_val);
+    }
+
+    ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit());
+
+    ASSERT_EQ(checker.do_version_key_check(), 0);
+}
+
+TEST(CheckerTest, version_key_check_abnormal) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    auto* obj_info = instance.add_obj_info();
+    obj_info->set_id("version_key_check_abnormal");
+    obj_info->set_ak(config::test_s3_ak);
+    obj_info->set_sk(config::test_s3_sk);
+    obj_info->set_endpoint(config::test_s3_endpoint);
+    obj_info->set_region(config::test_s3_region);
+    obj_info->set_bucket(config::test_s3_bucket);
+    obj_info->set_prefix("version_key_check_normal");
+
+    InstanceChecker checker(txn_kv, instance_id);
+    ASSERT_EQ(checker.init(instance), 0);
+    auto accessor = checker.accessor_map_.begin()->second;
+
+    int64_t table_id = 998;
+    size_t part_num = 6;
+    size_t table_version = 20;
+    size_t part_version_normal = 10;
+    size_t part_version_abnormal = 30;
+
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
+
+    create_table_version_kv(txn_kv.get(), table_id, table_version);
+    for (size_t partition_id = 0; partition_id < part_num; partition_id++) {
+        std::string part_ver_key =
+                partition_version_key({instance_id, db_id, table_id, 
partition_id});
+        std::string part_ver_val;
+        VersionPB version_pb;
+        if (partition_id < part_num / 2) {
+            version_pb.set_version(part_version_normal);
+        } else {
+            version_pb.set_version(part_version_abnormal);
+        }
+        version_pb.SerializeToString(&part_ver_val);
+        txn->put(part_ver_key, part_ver_val);
+    }
+
+    ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit());
+
+    ASSERT_EQ(checker.do_version_key_check(), -1);
+}
+
 TEST(RecyclerTest, delete_rowset_data) {
     auto txn_kv = std::make_shared<MemTxnKv>();
     ASSERT_EQ(txn_kv->init(), 0);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to