This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fe1e0355617 [feat](checker) Add table/partition version key
consistency checking for checker (#54527)
fe1e0355617 is described below
commit fe1e03556174c9e74c3247f0946d710d7e778aec
Author: Uniqueyou <[email protected]>
AuthorDate: Sat Sep 13 12:26:24 2025 +0800
[feat](checker) Add table/partition version key consistency checking for
checker (#54527)
1. meta checker: check if the **partition and table** of the version key
**exist** in the fe meta
2. checker: check whether the **version** of the version key is
reasonable. The version of the table should be **greater than** the
version of all its partitions.
3. fix wrong checking for schema key
---
cloud/src/common/config.h | 3 +-
cloud/src/recycler/checker.cpp | 86 ++++++++
cloud/src/recycler/checker.h | 5 +
cloud/src/recycler/meta_checker.cpp | 383 ++++++++++++++++++++++++------------
cloud/src/recycler/meta_checker.h | 83 ++++----
cloud/test/recycler_test.cpp | 95 ++++++++-
6 files changed, 488 insertions(+), 167 deletions(-)
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 9d8fd6234ca..23b0e9fa128 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -123,7 +123,8 @@ CONF_mBool(enable_restore_job_check, "false");
CONF_mBool(enable_tablet_stats_key_check, "false");
CONF_mBool(enable_txn_key_check, "false");
-CONF_mBool(enable_checker_for_meta_key_check, "false");
+CONF_mBool(enable_meta_key_check, "false");
+CONF_mBool(enable_version_key_check, "false");
CONF_mInt64(mow_job_key_check_expiration_diff_seconds, "600"); // 10min
CONF_String(test_s3_ak, "");
diff --git a/cloud/src/recycler/checker.cpp b/cloud/src/recycler/checker.cpp
index 254b0e4ef21..661e4ab05a8 100644
--- a/cloud/src/recycler/checker.cpp
+++ b/cloud/src/recycler/checker.cpp
@@ -228,6 +228,12 @@ int Checker::start() {
}
}
+ if (config::enable_version_key_check) {
+ if (int ret = checker->do_version_key_check(); ret != 0) {
+ success = false;
+ }
+ }
+
// If instance checker has been aborted, don't finish this job
if (!checker->stopped()) {
finish_instance_recycle_job(txn_kv_.get(), check_job_key,
instance.instance_id(),
@@ -2055,6 +2061,85 @@ int InstanceChecker::scan_and_handle_kv(
return ret;
}
+int InstanceChecker::do_version_key_check() {
+ std::unique_ptr<RangeGetIterator> it;
+ std::string begin = table_version_key({instance_id_, 0, 0});
+ std::string end = table_version_key({instance_id_, INT64_MAX, 0});
+ bool check_res = true;
+ do {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to create txn";
+ return -1;
+ }
+ err = txn->get(begin, end, &it);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
+ return -1;
+ }
+ while (it->has_next() && !stopped()) {
+ auto [k, v] = it->next();
+ std::string_view k1 = k;
+ k1.remove_prefix(1);
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int,
int>> out;
+ decode_key(&k1, &out);
+ int64_t table_version = -1;
+ // 0x01 "version" ${instance_id} "table" ${db_id} ${tbl_id}
+ if (!txn->decode_atomic_int(v, &table_version)) {
+ LOG(WARNING) << "malformed table version value";
+ return -1;
+ }
+ auto table_id = std::get<int64_t>(std::get<0>(out[4]));
+ auto db_id = std::get<int64_t>(std::get<0>(out[3]));
+ std::string partition_version_key_begin =
+ partition_version_key({instance_id_, db_id, table_id, 0});
+ std::string partition_version_key_end =
+ partition_version_key({instance_id_, db_id, table_id,
INT64_MAX});
+ VersionPB partition_version_pb;
+
+ do {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to create txn";
+ return -1;
+ }
+ err = txn->get(partition_version_key_begin,
partition_version_key_end, &it);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to get mow tablet job key, err="
<< err;
+ return -1;
+ }
+ while (it->has_next() && !stopped()) {
+ auto [k, v] = it->next();
+ // 0x01 "version" ${instance_id} "partition" ${db_id}
${tbl_id} ${partition_id}
+ std::string_view k1 = k;
+ k1.remove_prefix(1);
+ std::vector<std::tuple<std::variant<int64_t, std::string>,
int, int>> out;
+ decode_key(&k1, &out);
+ if (!partition_version_pb.ParseFromArray(v.data(),
v.size())) [[unlikely]] {
+ LOG(WARNING) << "failed to parse partition VersionPB";
+ return -1;
+ }
+ auto partition_id = std::get<int64_t>(std::get<0>(out[5]));
+ int64_t partition_version = partition_version_pb.version();
+ if (table_version < partition_version) {
+ check_res = false;
+ LOG(WARNING)
+ << "table version is less than partition
version,"
+ << " table_id: " << table_id <<
"tablet_version: " << table_version
+ << " partition_id: " << partition_id
+ << " partition_version: " << partition_version;
+ }
+ }
+ partition_version_key_begin = it->next_begin_key();
+ } while (it->more() && !stopped());
+ }
+ begin = it->next_begin_key(); // Update to next smallest key for
iteration
+ } while (it->more() && !stopped());
+ return check_res ? 0 : -1;
+}
+
int InstanceChecker::do_restore_job_check() {
int64_t num_prepared = 0;
int64_t num_committed = 0;
@@ -2103,6 +2188,7 @@ int InstanceChecker::do_restore_job_check() {
LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
return -1;
}
+
if (!it->has_next()) {
break;
}
diff --git a/cloud/src/recycler/checker.h b/cloud/src/recycler/checker.h
index 5993aa0b5de..d97a6d4d55d 100644
--- a/cloud/src/recycler/checker.h
+++ b/cloud/src/recycler/checker.h
@@ -113,6 +113,11 @@ public:
int do_txn_key_check();
+ // check table and partition version key
+ // table version should be greater than the versions of all its partitions
+ // Return 0 if success, otherwise error
+ int do_version_key_check();
+
// If there are multiple buckets, return the minimum lifecycle; if there
are no buckets (i.e.
// all accessors are HdfsAccessor), return INT64_MAX.
// Return 0 if success, otherwise error
diff --git a/cloud/src/recycler/meta_checker.cpp
b/cloud/src/recycler/meta_checker.cpp
index 78d65fe0eea..2a478b68b79 100644
--- a/cloud/src/recycler/meta_checker.cpp
+++ b/cloud/src/recycler/meta_checker.cpp
@@ -122,6 +122,7 @@ bool MetaChecker::do_meta_tablet_key_check(MYSQL* conn) {
LOG(WARNING) << "tablet_idx.db_id not found in fe meta, db_id = "
<< tablet_index_meta.db_id()
<< "tablet index meta: " <<
tablet_index_meta.DebugString();
+ check_res = false;
continue;
}
std::string db_name = db_meta_.at(tablet_index_meta.db_id());
@@ -194,6 +195,7 @@ bool MetaChecker::do_meta_tablet_key_index_check(MYSQL*
conn) {
for (const TabletIndexPB& tablet_idx : tablet_indexes) {
if (!db_meta_.contains(tablet_idx.db_id())) {
LOG(WARNING) << "tablet_idx.db_id not found in fe meta, db_id = "
<< tablet_idx.db_id();
+ check_res = false;
continue;
}
std::string sql_stmt = "show tablet " +
std::to_string(tablet_idx.tablet_id());
@@ -298,6 +300,7 @@ bool MetaChecker::do_meta_schema_key_check(MYSQL* conn) {
LOG(WARNING) << "tablet_idx.db_id not found in fe meta, db_id = "
<< tablet_index_meta.db_id()
<< "tablet index meta: " <<
tablet_index_meta.DebugString();
+ check_res = false;
continue;
}
std::string db_name = db_meta_.at(tablet_index_meta.db_id());
@@ -330,9 +333,8 @@ bool MetaChecker::do_meta_schema_key_check(MYSQL* conn) {
}
MYSQL_RES* result;
- std::string sql_stmt =
- fmt::format("SHOW PROC '/dbs/{}/{}/index_schema/{}'",
tablet_index_meta.db_id(),
- tablet_meta.table_id(), tablet_meta.index_id());
+ std::string sql_stmt = fmt::format("SHOW PROC
'/dbs/{}/{}/index_schema'",
+ tablet_index_meta.db_id(),
tablet_meta.table_id());
mysql_query(conn, sql_stmt.c_str());
result = mysql_store_result(conn);
@@ -356,6 +358,188 @@ bool MetaChecker::do_meta_schema_key_check(MYSQL* conn) {
return check_res;
}
+bool MetaChecker::do_version_partition_key_check(MYSQL* conn) {
+ std::vector<PartitionInfo> partitions_info;
+ bool check_res = true;
+
+ // scan and collect tablet_idx
+ std::string start_key;
+ std::string end_key;
+ partition_version_key({instance_id_, 0, 0, 0}, &start_key);
+ partition_version_key({instance_id_, INT64_MAX, 0, 0}, &end_key);
+ scan_and_handle_kv(
+ start_key, end_key,
+ [&partitions_info](std::string_view key, std::string_view value)
-> int {
+ VersionPB partition_version;
+ if (!partition_version.ParseFromArray(value.data(),
value.size())) {
+ LOG(WARNING) << "malformed tablet index value";
+ return -1;
+ }
+ auto k1 = key;
+ k1.remove_prefix(1);
+ // 0x01 "version" ${instance_id} "partition" ${db_id}
${tbl_id} ${partition_id}
+ std::vector<std::tuple<std::variant<int64_t, std::string>,
int, int>> out;
+ decode_key(&k1, &out);
+ DCHECK_EQ(out.size(), 6) << key;
+ auto db_id = std::get<int64_t>(std::get<0>(out[3]));
+ auto table_id = std::get<int64_t>(std::get<0>(out[4]));
+ auto partition_id = std::get<int64_t>(std::get<0>(out[5]));
+ partitions_info.emplace_back(PartitionInfo {
+ .db_id = db_id, .table_id = table_id, .partition_id =
partition_id});
+ return 0;
+ });
+
+ for (const auto& partition_info : partitions_info) {
+ if (!db_meta_.contains(partition_info.db_id)) {
+ LOG(WARNING) << "partition_info.db_id not found in fe meta, db_id
= "
+ << partition_info.db_id
+ << "partition_info meta: " <<
partition_info.debug_string();
+ check_res = false;
+ continue;
+ }
+ std::string db_name = db_meta_.at(partition_info.db_id);
+ if (db_name == "__internal_schema" || db_name == "information_schema"
||
+ db_name == "mysql") {
+ continue;
+ }
+
+ if (mysql_select_db(conn, db_name.c_str())) {
+ LOG(WARNING) << "mysql select db error, db_name: " << db_name
+ << " error: " << mysql_error(conn);
+ continue;
+ }
+ MYSQL_RES* result;
+ std::string sql_stmt = fmt::format("show partition {}",
partition_info.partition_id);
+ mysql_query(conn, sql_stmt.c_str());
+
+ result = mysql_store_result(conn);
+ if (result) {
+ MYSQL_ROW row = mysql_fetch_row(result);
+ if (partition_info.table_id != atoll(row[4])) {
+ LOG(WARNING) << "check failed, fdb meta: " <<
partition_info.debug_string()
+ << " fe partition of table_id: " << atoll(row[4]);
+ check_res = false;
+ } else if (partition_info.db_id != atoll(row[3])) {
+ LOG(WARNING) << "check failed, fdb meta: " <<
partition_info.debug_string()
+ << " fe partition of db_id: " << atoll(row[3]);
+ check_res = false;
+ }
+ mysql_free_result(result);
+ } else {
+ LOG(WARNING) << "check failed, fdb meta: " <<
partition_info.debug_string()
+ << " fe partition not found";
+ check_res = false;
+ }
+ stat_info_.check_fe_partition_version_num++;
+ }
+
+ return check_res;
+}
+
+bool MetaChecker::do_version_table_key_check(MYSQL* conn) {
+ std::vector<TableInfo> tables_info;
+ bool check_res = true;
+
+ // table id -> version
+ std::unordered_map<int64_t, int64_t> fe_tables_info;
+ std::string start_key;
+ std::string end_key;
+ table_version_key({instance_id_, 0, 0}, &start_key);
+ table_version_key({instance_id_, INT64_MAX, 0}, &end_key);
+
+ // collect table version from fdb
+ scan_and_handle_kv(
+ start_key, end_key,
+ [&tables_info, this](std::string_view key, std::string_view value)
-> int {
+ int64_t version = 0;
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to create txn";
+ return -1;
+ }
+ if (!txn->decode_atomic_int(value, &version)) {
+ LOG(WARNING) << "malformed table version value";
+ return -1;
+ }
+ auto k1 = key;
+ k1.remove_prefix(1);
+ // 0x01 "version" ${instance_id} "table" ${db_id} ${tbl_id} ->
${version}
+ std::vector<std::tuple<std::variant<int64_t, std::string>,
int, int>> out;
+ decode_key(&k1, &out);
+ DCHECK_EQ(out.size(), 5) << key;
+ auto db_id = std::get<int64_t>(std::get<0>(out[3]));
+ auto table_id = std::get<int64_t>(std::get<0>(out[4]));
+ tables_info.emplace_back(TableInfo {.db_id = db_id, .table_id
= table_id});
+ return 0;
+ });
+
+ // collect table version from fe meta
+ for (const auto& table_info : tables_info) {
+ if (!db_meta_.contains(table_info.db_id)) {
+ LOG(WARNING) << "table_info.db_id not found in fe meta, db_id = "
<< table_info.db_id
+ << "table_info meta: " << table_info.debug_string();
+ check_res = false;
+ continue;
+ }
+ std::string db_name = db_meta_.at(table_info.db_id);
+ if (db_name == "__internal_schema" || db_name == "information_schema"
||
+ db_name == "mysql") {
+ continue;
+ }
+
+ if (mysql_select_db(conn, db_name.c_str())) {
+ LOG(WARNING) << "mysql select db error, db_name: " << db_name
+ << " error: " << mysql_error(conn);
+ continue;
+ }
+
+ MYSQL_RES* result;
+ std::string sql_stmt = fmt::format("show table {}",
table_info.table_id);
+ mysql_query(conn, sql_stmt.c_str());
+
+ result = mysql_store_result(conn);
+ if (result) {
+ MYSQL_ROW row = mysql_fetch_row(result);
+ int64_t db_id = atoll(row[2]);
+ if (table_info.db_id != db_id) {
+ LOG(WARNING) << "check failed, fdb meta: " <<
table_info.debug_string()
+ << " fe table of db_id: " << atoll(row[2]);
+ check_res = false;
+ }
+ } else {
+ LOG(WARNING) << "check failed, fdb meta: " <<
table_info.debug_string()
+ << " fe db not found";
+ check_res = false;
+ }
+ stat_info_.check_fe_table_version_num++;
+ }
+
+ return check_res;
+}
+
+template <>
+bool MetaChecker::handle_check_fe_meta_by_fdb<CHECK_VERSION>(MYSQL* conn) {
+ bool check_res = true;
+
+ // check PartitionVersionKey
+ if (!do_version_partition_key_check(conn)) {
+ check_res = false;
+ LOG(WARNING) << "do_version_partition_key_check failed";
+ } else {
+ LOG(INFO) << "do_version_partition_key_check success";
+ }
+
+ // check TableVersionKey
+ if (!do_version_table_key_check(conn)) {
+ check_res = false;
+ LOG(WARNING) << "do_version_table_key_check failed";
+ } else {
+ LOG(INFO) << "do_version_table_key_check success";
+ }
+ return check_res;
+}
+
template <>
bool MetaChecker::handle_check_fe_meta_by_fdb<CHECK_META>(MYSQL* conn) {
bool check_res = true;
@@ -387,19 +571,17 @@ bool
MetaChecker::handle_check_fe_meta_by_fdb<CHECK_META>(MYSQL* conn) {
bool MetaChecker::check_fe_meta_by_fdb(MYSQL* conn) {
bool success = true;
- if (config::enable_checker_for_meta_key_check) {
+ if (config::enable_meta_key_check) {
success = handle_check_fe_meta_by_fdb<CHECK_META>(conn);
}
- // TODO(wyxxxcat) add check for version key
- // if (config::enable_checker_for_version_key_check) {
- // success = handle_check_fe_meta_by_fdb<CHECK_VERSION>(conn);
- // }
+ if (config::enable_version_key_check) {
+ success = handle_check_fe_meta_by_fdb<CHECK_VERSION>(conn);
+ }
return success;
}
-bool MetaChecker::do_meta_tablet_index_key_inverted_check(MYSQL* conn,
- const
std::vector<TabletInfo>& tablets) {
+bool MetaChecker::do_meta_tablet_index_key_inverted_check(MYSQL* conn) {
bool check_res = true;
// check tablet idx
for (const auto& tablet_info : tablets) {
@@ -471,8 +653,7 @@ bool
MetaChecker::do_meta_tablet_index_key_inverted_check(MYSQL* conn,
return check_res;
}
-bool MetaChecker::do_meta_tablet_key_inverted_check(MYSQL* conn,
std::vector<TabletInfo>& tablets,
- std::map<int64_t,
PartitionInfo>& partitions) {
+bool MetaChecker::do_meta_tablet_key_inverted_check(MYSQL* conn) {
bool check_res = true;
// check tablet meta
for (const auto& tablet_info : tablets) {
@@ -500,64 +681,10 @@ bool
MetaChecker::do_meta_tablet_key_inverted_check(MYSQL* conn, std::vector<Tab
stat_info_.check_fdb_tablet_meta_num++;
}
- // TODO(wyxxxcat):
- // separate from this function to check partition version function
- // for (const auto& elem : partitions) {
- // std::unique_ptr<Transaction> txn;
- // TxnErrorCode err = txn_kv_->create_txn(&txn);
- // if (err != TxnErrorCode::TXN_OK) {
- // LOG(WARNING) << "failed to init txn";
- // continue;
- // }
- // if (elem.second.visible_version == 0 || elem.second.visible_version
== 1) {
- // continue;
- // }
-
- // int64_t db_id = elem.second.db_id;
- // int64_t table_id = elem.second.table_id;
- // int64_t partition_id = elem.second.partition_id;
- // int64_t tablet_id = elem.second.tablet_id;
- // std::string ver_key = partition_version_key({instance_id_, db_id,
table_id, partition_id});
- // std::string ver_val;
- // err = txn->get(ver_key, &ver_val);
- // if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
- // LOG_WARNING("version key not found.")
- // .tag("db id", db_id)
- // .tag("table id", table_id)
- // .tag("partition id", partition_id)
- // .tag("tablet id", tablet_id);
- // check_res = false;
- // continue;
- // } else if (err != TxnErrorCode::TXN_OK) {
- // LOG_WARNING("failed to get version.")
- // .tag("db id", db_id)
- // .tag("table id", table_id)
- // .tag("partition id", partition_id)
- // .tag("tablet id", tablet_id);
- // check_res = false;
- // continue;
- // }
-
- // VersionPB version_pb;
- // if (!version_pb.ParseFromString(ver_val)) {
- // LOG(WARNING) << "malformed version value";
- // check_res = false;
- // continue;
- // }
-
- // if (version_pb.version() != elem.second.visible_version) {
- // LOG(WARNING) << "partition version check failed, FE partition
version"
- // << elem.second.visible_version << " ms version: "
<< version_pb.version();
- // check_res = false;
- // continue;
- // }
- // stat_info_.check_fdb_partition_version_num++;
- // }
return check_res;
}
-bool MetaChecker::do_meta_schema_key_inverted_check(MYSQL* conn,
std::vector<TabletInfo>& tablets,
- std::map<int64_t,
PartitionInfo>& partitions) {
+bool MetaChecker::do_meta_schema_key_inverted_check(MYSQL* conn) {
bool check_res = true;
for (const auto& tablet_info : tablets) {
@@ -589,14 +716,9 @@ bool MetaChecker::do_meta_schema_key_inverted_check(MYSQL*
conn, std::vector<Tab
template <>
bool MetaChecker::handle_check_fdb_by_fe_meta<CHECK_META>(MYSQL* conn) {
- std::vector<TabletInfo> tablets;
- std::map<int64_t, PartitionInfo> partitions;
-
- init_tablet_info_from_fe_meta(conn, tablets, partitions);
-
bool check_res = true;
// check MetaTabletIdxKey
- if (!do_meta_tablet_index_key_inverted_check(conn, tablets)) {
+ if (!do_meta_tablet_index_key_inverted_check(conn)) {
check_res = false;
LOG(WARNING) << "do_meta_tablet_index_key_inverted_check failed";
} else {
@@ -604,7 +726,7 @@ bool
MetaChecker::handle_check_fdb_by_fe_meta<CHECK_META>(MYSQL* conn) {
}
// check MetaTabletKey
- if (!do_meta_tablet_key_inverted_check(conn, tablets, partitions)) {
+ if (!do_meta_tablet_key_inverted_check(conn)) {
check_res = false;
LOG(WARNING) << "do_meta_tablet_key_inverted_check failed";
} else {
@@ -612,7 +734,7 @@ bool
MetaChecker::handle_check_fdb_by_fe_meta<CHECK_META>(MYSQL* conn) {
}
// check MetaSchemaKey
- if (!do_meta_schema_key_inverted_check(conn, tablets, partitions)) {
+ if (!do_meta_schema_key_inverted_check(conn)) {
check_res = false;
LOG(WARNING) << "do_meta_schema_key_inverted_check failed";
} else {
@@ -624,19 +746,15 @@ bool
MetaChecker::handle_check_fdb_by_fe_meta<CHECK_META>(MYSQL* conn) {
bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
bool success = true;
- if (config::enable_checker_for_meta_key_check) {
+ if (config::enable_meta_key_check) {
success = handle_check_fdb_by_fe_meta<CHECK_META>(conn);
}
- // TODO(wyxxxcat) add check for version key
- // if (config::enable_checker_for_version_key_check) {
- // success = handle_check_fdb_by_fe_meta<CHECK_VERSION>(conn);
- // }
-
LOG(INFO) << "check_fdb_table_idx_num: " <<
stat_info_.check_fdb_tablet_idx_num
<< " check_fdb_table_meta_num: " <<
stat_info_.check_fdb_tablet_meta_num
<< " check_fdb_tablet_schema_num: " <<
stat_info_.check_fdb_tablet_schema_num
- << " check_fdb_partition_version_num: " <<
stat_info_.check_fdb_partition_version_num;
+ << " check_fe_table_version_num: " <<
stat_info_.check_fe_table_version_num
+ << " check_fe_partition_version_num: " <<
stat_info_.check_fe_partition_version_num;
return success;
}
@@ -687,6 +805,7 @@ void MetaChecker::do_check(const std::string& host, const
std::string& port,
bool ret = false;
do {
init_db_meta(&conn);
+ init_tablet_and_partition_info_from_fe_meta(&conn);
ret = check_fe_meta_by_fdb(&conn);
if (!ret) {
std::this_thread::sleep_for(seconds(10));
@@ -698,18 +817,23 @@ void MetaChecker::do_check(const std::string& host, const
std::string& port,
LOG(WARNING) << "check_fe_meta_by_fdb failed, there may be data leak";
msg = "meta leak err";
}
- now =
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
+
LOG(INFO) << "check_fe_meta_by_fdb finish, cost(second): " << now - start;
+ start =
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
LOG(INFO) << "check_fdb_by_fe_meta begin";
- init_db_meta(&conn);
- ret = check_fdb_by_fe_meta(&conn);
- if (!ret) {
- LOG(WARNING) << "check_fdb_by_fe_meta failed, there may be data loss";
- msg = "meta loss err";
- return;
- }
- now =
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
+ do {
+ init_db_meta(&conn);
+ init_tablet_and_partition_info_from_fe_meta(&conn);
+ ret = check_fdb_by_fe_meta(&conn);
+ if (!ret) {
+ LOG(WARNING) << "check_fdb_by_fe_meta failed, there may be data
loss";
+ msg = "meta loss err";
+ return;
+ }
+ now =
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
+ } while (now - start <= 180 && !ret);
+
LOG(INFO) << "check_fdb_by_fe_meta finish, cost(second): " << now - start;
mysql_close(&conn);
@@ -717,8 +841,7 @@ void MetaChecker::do_check(const std::string& host, const
std::string& port,
LOG(INFO) << "meta check finish";
}
-void MetaChecker::init_tablet_info_from_fe_meta(MYSQL* conn,
std::vector<TabletInfo>& tablets,
- std::map<int64_t,
PartitionInfo>& partitions) {
+void MetaChecker::init_tablet_and_partition_info_from_fe_meta(MYSQL* conn) {
// init tablet info, partition info
std::map<std::string, std::vector<std::string>> db_to_tables;
std::string sql_stmt = "show databases";
@@ -731,7 +854,7 @@ void MetaChecker::init_tablet_info_from_fe_meta(MYSQL*
conn, std::vector<TabletI
for (int i = 0; i < num_row; ++i) {
MYSQL_ROW row = mysql_fetch_row(result);
if (strcmp(row[0], "__internal_schema") == 0 ||
- strcmp(row[0], "information_schema") == 0 || strcmp(row[0],
"mysql")) {
+ strcmp(row[0], "information_schema") == 0 || strcmp(row[0],
"mysql") == 0) {
continue;
}
db_to_tables.insert({row[0], std::vector<std::string>()});
@@ -769,7 +892,6 @@ void MetaChecker::init_tablet_info_from_fe_meta(MYSQL*
conn, std::vector<TabletI
VLOG_DEBUG << "get tablet info log"
<< ", db name" << elem.first << ", table name"
<< table
<< ",tablet id" << tablet_info.tablet_id;
- tablet_info.schema_version = atoll(row[4]);
tablets.push_back(tablet_info);
}
mysql_free_result(result);
@@ -777,9 +899,36 @@ void MetaChecker::init_tablet_info_from_fe_meta(MYSQL*
conn, std::vector<TabletI
}
}
+ // get table info from FE
+ for (const auto& [db_id, db_name] : db_meta_) {
+ MYSQL_RES* result;
+ if (db_name == "__internal_schema" || db_name == "information_schema"
||
+ db_name == "mysql") {
+ continue;
+ }
+ std::string sql_stmt = fmt::format("SHOW PROC '/dbs/{}'", db_id);
+ mysql_query(conn, sql_stmt.c_str());
+
+ result = mysql_store_result(conn);
+ if (result) {
+ int num_row = mysql_num_rows(result);
+ for (int i = 0; i < num_row; ++i) {
+ MYSQL_ROW row = mysql_fetch_row(result);
+ int64_t table_id = atoll(row[0]);
+ tables.emplace_back(TableInfo {.db_id = db_id, .table_id =
table_id});
+ }
+ }
+ }
+
// get tablet info from FE
// get Partition info from FE
for (auto& tablet_info : tablets) {
+ std::string db_name = db_meta_.begin()->second;
+ if (mysql_select_db(conn, db_name.c_str())) {
+ LOG(WARNING) << "mysql select db error, db_name: " << db_name
+ << " error: " << mysql_error(conn);
+ continue;
+ }
std::string sql_stmt = "show tablet " +
std::to_string(tablet_info.tablet_id);
mysql_query(conn, sql_stmt.c_str());
result = mysql_store_result(conn);
@@ -792,6 +941,24 @@ void MetaChecker::init_tablet_info_from_fe_meta(MYSQL*
conn, std::vector<TabletI
tablet_info.partition_id = atoll(row[6]);
tablet_info.index_id = atoll(row[7]);
+ int schema_version = -1;
+ {
+ MYSQL_RES* result;
+ std::string sql_stmt = fmt::format("SHOW PROC
'/dbs/{}/{}/index_schema'",
+ tablet_info.db_id,
tablet_info.table_id);
+ mysql_query(conn, sql_stmt.c_str());
+
+ result = mysql_store_result(conn);
+ if (!result) {
+ continue;
+ }
+ MYSQL_ROW row = mysql_fetch_row(result);
+ schema_version = atoll(row[2]);
+ mysql_free_result(result);
+ }
+
+ tablet_info.schema_version = schema_version;
+
PartitionInfo partition_info;
partition_info.db_id = atoll(row[4]);
partition_info.table_id = atoll(row[5]);
@@ -808,32 +975,6 @@ void MetaChecker::init_tablet_info_from_fe_meta(MYSQL*
conn, std::vector<TabletI
mysql_free_result(result);
}
}
-
- // get partition version from FE
- for (const auto& elem : db_to_tables) {
- for (const std::string& table : elem.second) {
- std::string sql_stmt = "show partitions from " + elem.first + "."
+ table;
- mysql_query(conn, sql_stmt.c_str());
- result = mysql_store_result(conn);
- if (result) {
- int num_row = mysql_num_rows(result);
- for (int i = 0; i < num_row; ++i) {
- MYSQL_ROW row = mysql_fetch_row(result);
- int64_t partition_id = atoll(row[0]);
- int64_t visible_version = atoll(row[2]);
- partitions[partition_id].visible_version = visible_version;
- VLOG_DEBUG << "get partition version log"
- << ", db name" << elem.first << ", table name"
<< table
- << ", raw partition id" << row[0] << ", first
partition id"
- << partition_id << ", db id" <<
partitions[partition_id].db_id
- << ", table id" <<
partitions[partition_id].table_id
- << ", second partition id" <<
partitions[partition_id].partition_id
- << ", tablet id" <<
partitions[partition_id].tablet_id;
- }
- mysql_free_result(result);
- }
- }
- }
}
} // namespace doris::cloud
diff --git a/cloud/src/recycler/meta_checker.h
b/cloud/src/recycler/meta_checker.h
index 4f16cdab7c7..40779b9886a 100644
--- a/cloud/src/recycler/meta_checker.h
+++ b/cloud/src/recycler/meta_checker.h
@@ -38,19 +38,23 @@ struct StatInfo {
int64_t check_fe_tablet_num = 0;
int64_t check_fe_partition_num = 0;
int64_t check_fe_tablet_schema_num = 0;
+ int64_t check_fe_table_version_num = 0;
+ int64_t check_fe_partition_version_num = 0;
// fdb
int64_t check_fdb_tablet_idx_num = 0;
int64_t check_fdb_tablet_meta_num = 0;
int64_t check_fdb_tablet_schema_num = 0;
- int64_t check_fdb_partition_version_num = 0;
};
-enum CHECK_TYPE {
- CHECK_TXN,
- CHECK_VERSION,
- CHECK_META,
- CHECK_STATS,
- CHECK_JOB,
+enum CHECK_TYPE { CHECK_VERSION, CHECK_META };
+
+struct TableInfo {
+ int64_t db_id;
+ int64_t table_id;
+
+ std::string debug_string() const {
+ return "db id: " + std::to_string(db_id) + " table id: " +
std::to_string(table_id);
+ }
};
struct TabletInfo {
@@ -75,7 +79,14 @@ struct PartitionInfo {
int64_t table_id;
int64_t partition_id;
int64_t tablet_id;
- int64_t visible_version;
+
+ // clang-format off
+ std::string debug_string() const {
+ return "db id: " + std::to_string(db_id) +
+ " table id: " + std::to_string(table_id) +
+ " partition id: " + std::to_string(partition_id);
+ }
+ // clang-format on
};
class MetaChecker {
@@ -93,27 +104,32 @@ public:
bool handle_check_fdb_by_fe_meta(MYSQL* conn);
private:
- void init_tablet_info_from_fe_meta(MYSQL* conn, std::vector<TabletInfo>&
tablets,
- std::map<int64_t, PartitionInfo>&
partitions);
+ // init this->tablets, this->partitions, this->tables
+ void init_tablet_and_partition_info_from_fe_meta(MYSQL* conn);
bool scan_and_handle_kv(std::string& start_key, const std::string& end_key,
std::function<int(std::string_view,
std::string_view)>);
+ // forward check meta key
bool do_meta_tablet_key_index_check(MYSQL* conn);
bool do_meta_tablet_key_check(MYSQL* conn);
bool do_meta_schema_key_check(MYSQL* conn);
- bool do_meta_tablet_index_key_inverted_check(MYSQL* conn,
- const
std::vector<TabletInfo>& tablets);
+ // forward check version key
+ bool do_version_partition_key_check(MYSQL* conn);
+
+ bool do_version_table_key_check(MYSQL* conn);
- bool do_meta_tablet_key_inverted_check(MYSQL* conn,
std::vector<TabletInfo>& tablets,
- std::map<int64_t, PartitionInfo>&
partitions);
+ // inverted check meta key
+ bool do_meta_tablet_index_key_inverted_check(MYSQL* conn);
- bool do_meta_schema_key_inverted_check(MYSQL* conn,
std::vector<TabletInfo>& tablets,
- std::map<int64_t, PartitionInfo>&
partitions);
+ bool do_meta_tablet_key_inverted_check(MYSQL* conn);
+ bool do_meta_schema_key_inverted_check(MYSQL* conn);
+
+ // init this->db_meta_
void init_db_meta(MYSQL* conn);
private:
@@ -122,42 +138,23 @@ private:
std::string instance_id_;
// db_id -> db_name
std::unordered_map<int64_t, std::string> db_meta_;
+ // tablet info
+ std::vector<TabletInfo> tablets;
+ // table info
+ std::vector<TableInfo> tables;
+ // partition_id -> partition_info
+ std::map<int64_t, PartitionInfo> partitions;
};
// not implemented yet
template <>
-bool MetaChecker::handle_check_fe_meta_by_fdb<CHECK_STATS>(MYSQL* conn) =
delete;
-
-// not implemented yet
-template <>
-bool MetaChecker::handle_check_fe_meta_by_fdb<CHECK_TXN>(MYSQL* conn) = delete;
-
-// not implemented yet
-template <>
-bool MetaChecker::handle_check_fe_meta_by_fdb<CHECK_VERSION>(MYSQL* conn) =
delete;
-
-// not implemented yet
-template <>
-bool MetaChecker::handle_check_fe_meta_by_fdb<CHECK_JOB>(MYSQL* conn) = delete;
+bool MetaChecker::handle_check_fe_meta_by_fdb<CHECK_VERSION>(MYSQL* conn);
template <>
bool MetaChecker::handle_check_fe_meta_by_fdb<CHECK_META>(MYSQL* conn);
-// not implemented yet
-template <>
-bool MetaChecker::handle_check_fdb_by_fe_meta<CHECK_STATS>(MYSQL* conn) =
delete;
-
-// not implemented yet
-template <>
-bool MetaChecker::handle_check_fdb_by_fe_meta<CHECK_TXN>(MYSQL* conn) = delete;
-
-// not implemented yet
-template <>
-bool MetaChecker::handle_check_fdb_by_fe_meta<CHECK_VERSION>(MYSQL* conn) =
delete;
-
-// not implemented yet
template <>
-bool MetaChecker::handle_check_fdb_by_fe_meta<CHECK_JOB>(MYSQL* conn) = delete;
+bool MetaChecker::handle_check_fdb_by_fe_meta<CHECK_VERSION>(MYSQL* conn);
template <>
bool MetaChecker::handle_check_fdb_by_fe_meta<CHECK_META>(MYSQL* conn);
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 1e7a1b918e2..906261d8180 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -878,10 +878,10 @@ static int create_delete_bitmap_update_lock_kv(TxnKv*
txn_kv, int64_t table_id,
return 0;
}
-static int create_table_version_kv(TxnKv* txn_kv, int64_t table_id) {
+static int create_table_version_kv(TxnKv* txn_kv, int64_t table_id, int64_t
version = 1) {
auto key = table_version_key({instance_id, db_id, table_id});
std::string val(sizeof(int64_t), 0);
- *reinterpret_cast<int64_t*>(val.data()) = (int64_t)1;
+ *reinterpret_cast<int64_t*>(val.data()) = version;
std::unique_ptr<Transaction> txn;
if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) {
return -1;
@@ -5113,6 +5113,97 @@ TEST(CheckerTest, tablet_stats_key_check_normal) {
ASSERT_EQ(checker.do_tablet_stats_key_check(), 0);
}
+TEST(CheckerTest, version_key_check_normal) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+
+ auto* obj_info = instance.add_obj_info();
+ obj_info->set_id("version_key_check_normal");
+ obj_info->set_ak(config::test_s3_ak);
+ obj_info->set_sk(config::test_s3_sk);
+ obj_info->set_endpoint(config::test_s3_endpoint);
+ obj_info->set_region(config::test_s3_region);
+ obj_info->set_bucket(config::test_s3_bucket);
+ obj_info->set_prefix("version_key_check_normal");
+
+ InstanceChecker checker(txn_kv, instance_id);
+ ASSERT_EQ(checker.init(instance), 0);
+ auto accessor = checker.accessor_map_.begin()->second;
+
+ int64_t table_id = 998;
+ size_t part_num = 5;
+ size_t table_version = 20;
+ size_t part_version = 10;
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
+
+ create_table_version_kv(txn_kv.get(), table_id, table_version);
+ for (size_t partition_id = 0; partition_id < part_num; partition_id++) {
+ std::string part_ver_key =
+ partition_version_key({instance_id, db_id, table_id,
partition_id});
+ std::string part_ver_val;
+ VersionPB version_pb;
+ version_pb.set_version(part_version);
+ version_pb.SerializeToString(&part_ver_val);
+ txn->put(part_ver_key, part_ver_val);
+ }
+
+ ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit());
+
+ ASSERT_EQ(checker.do_version_key_check(), 0);
+}
+
+TEST(CheckerTest, version_key_check_abnormal) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ auto* obj_info = instance.add_obj_info();
+ obj_info->set_id("version_key_check_abnormal");
+ obj_info->set_ak(config::test_s3_ak);
+ obj_info->set_sk(config::test_s3_sk);
+ obj_info->set_endpoint(config::test_s3_endpoint);
+ obj_info->set_region(config::test_s3_region);
+ obj_info->set_bucket(config::test_s3_bucket);
+ obj_info->set_prefix("version_key_check_normal");
+
+ InstanceChecker checker(txn_kv, instance_id);
+ ASSERT_EQ(checker.init(instance), 0);
+ auto accessor = checker.accessor_map_.begin()->second;
+
+ int64_t table_id = 998;
+ size_t part_num = 6;
+ size_t table_version = 20;
+ size_t part_version_normal = 10;
+ size_t part_version_abnormal = 30;
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
+
+ create_table_version_kv(txn_kv.get(), table_id, table_version);
+ for (size_t partition_id = 0; partition_id < part_num; partition_id++) {
+ std::string part_ver_key =
+ partition_version_key({instance_id, db_id, table_id,
partition_id});
+ std::string part_ver_val;
+ VersionPB version_pb;
+ if (partition_id < part_num / 2) {
+ version_pb.set_version(part_version_normal);
+ } else {
+ version_pb.set_version(part_version_abnormal);
+ }
+ version_pb.SerializeToString(&part_ver_val);
+ txn->put(part_ver_key, part_ver_val);
+ }
+
+ ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit());
+
+ ASSERT_EQ(checker.do_version_key_check(), -1);
+}
+
TEST(RecyclerTest, delete_rowset_data) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]