This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 1f81225ab48 branch-3.1: [feat](checker) Add rowset key consistency
checking for checker #54623 (#57978)
1f81225ab48 is described below
commit 1f81225ab48dd04e05b860a86542c51d5d1bcc8d
Author: Yixuan Wang <[email protected]>
AuthorDate: Tue Nov 25 11:05:08 2025 +0800
branch-3.1: [feat](checker) Add rowset key consistency checking for checker
#54623 (#57978)
pick: #54623
---
cloud/src/common/config.h | 6 +-
cloud/src/recycler/checker.cpp | 131 +++++++++++++++++++++++++-
cloud/src/recycler/checker.h | 23 +++++
cloud/src/recycler/meta_checker.cpp | 4 +-
cloud/test/recycler_test.cpp | 180 ++++++++++++++++++++++++++++++++++++
5 files changed, 340 insertions(+), 4 deletions(-)
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index ff59ef649fc..14b45bb48c4 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -118,12 +118,16 @@ CONF_mInt64(recycle_task_threshold_seconds, "10800"); //
3h
CONF_Bool(force_immediate_recycle, "false");
CONF_mBool(enable_mow_job_key_check, "false");
+
CONF_mBool(enable_restore_job_check, "false");
CONF_mBool(enable_tablet_stats_key_check, "false");
CONF_mBool(enable_txn_key_check, "false");
-CONF_mBool(enable_checker_for_meta_key_check, "false");
+CONF_mBool(enable_meta_key_check, "false");
+CONF_mBool(enable_version_key_check, "false");
+CONF_mBool(enable_meta_rowset_key_check, "false");
+
CONF_mInt64(mow_job_key_check_expiration_diff_seconds, "600"); // 10min
CONF_String(test_s3_ak, "");
diff --git a/cloud/src/recycler/checker.cpp b/cloud/src/recycler/checker.cpp
index 1d8716169d9..ca111906150 100644
--- a/cloud/src/recycler/checker.cpp
+++ b/cloud/src/recycler/checker.cpp
@@ -223,6 +223,12 @@ int Checker::start() {
}
}
+ if (config::enable_meta_rowset_key_check) {
+ if (int ret = checker->do_meta_rowset_key_check(); ret != 0) {
+ success = false;
+ }
+ }
+
if (config::enable_delete_bitmap_storage_optimize_v2_check) {
if (int ret =
checker->do_delete_bitmap_storage_optimize_check(2 /*version*/);
ret != 0) {
@@ -1786,7 +1792,6 @@ int InstanceChecker::do_mow_job_key_check() {
} while (it->more() && !stopped());
return 0;
}
-
int InstanceChecker::do_tablet_stats_key_check() {
int ret = 0;
@@ -2281,6 +2286,7 @@ int InstanceChecker::check_txn_index_key(std::string_view
key, std::string_view
}
auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
auto db_id = std::get<int64_t>(std::get<0>(out[3]));
+ /// get tablet id
std::string txn_index = txn_index_key({instance_id_, txn_id});
std::string txn_index_val;
TxnIndexPB txn_index_pb;
@@ -2427,4 +2433,127 @@ int InstanceChecker::do_txn_key_check() {
return 0;
}
+int InstanceChecker::check_meta_tmp_rowset_key(std::string_view key,
std::string_view value) {
+ TxnInfoPB txn_info_pb;
+ std::string_view k1 = key;
+ k1.remove_prefix(1);
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+ decode_key(&k1, &out);
+ // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
+ if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
+ LOG(WARNING) << "failed to parse TxnInfoPB";
+ return -1;
+ }
+ /// get tablet id
+ auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
+ std::string txn_index = txn_index_key({instance_id_, txn_id});
+ std::string txn_index_val;
+ TxnIndexPB txn_index_pb;
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to init txn";
+ return -1;
+ }
+ if (txn->get(txn_index, &txn_index_val) != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to get txn index key, key=" << txn_index;
+ return -1;
+ }
+ txn_index_pb.ParseFromString(txn_index_val);
+ auto tablet_id = txn_index_pb.tablet_index().tablet_id();
+ std::string meta_tmp_rowset_key = meta_rowset_tmp_key({instance_id_,
txn_id, tablet_id});
+ int is_key_exist = key_exist(txn_kv_.get(), meta_tmp_rowset_key);
+ if (is_key_exist == 1) {
+ if (txn_info_pb.status() != TxnStatusPB::TXN_STATUS_VISIBLE) {
+ // clang-format off
+ LOG(INFO) << "meta tmp rowset key not exist but txn status !=
TXN_STATUS_VISIBLE"
+ << "meta tmp rowset key=" << meta_tmp_rowset_key
+ << "txn_info=" << txn_info_pb.ShortDebugString();
+ // clang-format on
+ return 1;
+ }
+ } else if (is_key_exist == 0) {
+ if (txn_info_pb.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
+ // clang-format off
+ LOG(INFO) << "meta tmp rowset key exist but txn status !=
TXN_STATUS_PREPARED"
+ << "meta tmp rowset key=" << meta_tmp_rowset_key
+ << "txn_info=" << txn_info_pb.ShortDebugString();
+ // clang-format on
+ return 1;
+ }
+ } else {
+ LOG(WARNING) << "failed to get key, key=" << meta_tmp_rowset_key;
+ return -1;
+ }
+ return 0;
+}
+
+int InstanceChecker::check_meta_rowset_key(std::string_view key,
std::string_view value) {
+ RowsetMetaCloudPB meta_rowset_pb;
+ if (!meta_rowset_pb.ParseFromArray(value.data(), value.size())) {
+ LOG(WARNING) << "failed to parse RowsetMetaCloudPB";
+ return -1;
+ }
+ std::string tablet_index_key = meta_tablet_idx_key({instance_id_,
meta_rowset_pb.tablet_id()});
+ if (key_exist(txn_kv_.get(), tablet_index_key) == 1) {
+ LOG(WARNING) << "rowset's tablet id not found in fdb"
+ << "tablet_index_key: " << tablet_index_key
+ << "rowset meta: " << meta_rowset_pb.ShortDebugString();
+ return 1;
+ }
+ return 0;
+}
+
+int InstanceChecker::do_meta_rowset_key_check() {
+ int ret = 0;
+
+ std::string begin = meta_rowset_key({instance_id_, 0, 0});
+ std::string end = meta_rowset_key({instance_id_, INT64_MAX, 0});
+ int64_t num_scanned = 0;
+ int64_t num_loss = 0;
+
+ ret = scan_and_handle_kv(begin, end, [&](std::string_view k,
std::string_view v) {
+ num_scanned++;
+ int ret = check_meta_rowset_key(k, v);
+ if (ret == 1) {
+ num_loss++;
+ }
+ return ret;
+ });
+ if (ret == -1) {
+ LOG(WARNING) << "failed to check meta rowset key,";
+ return -1;
+ } else if (ret == 1) {
+ LOG(WARNING) << "meta rowset key may be loss, num_scanned=" <<
num_scanned
+ << ", num_loss=" << num_loss;
+ }
+ LOG(INFO) << "meta rowset key check finish, num_scanned=" << num_scanned
+ << ", num_loss=" << num_loss;
+
+ begin = txn_info_key({instance_id_, 0, 0});
+ end = txn_info_key({instance_id_, INT64_MAX, 0});
+ num_scanned = 0;
+ num_loss = 0;
+
+ ret = scan_and_handle_kv(begin, end, [&](std::string_view k,
std::string_view v) {
+ num_scanned++;
+ int ret = check_meta_tmp_rowset_key(k, v);
+ if (ret == 1) {
+ num_loss++;
+ }
+ return ret;
+ });
+ if (ret == -1) {
+ LOG(WARNING) << "failed to check tmp meta rowset key";
+ return -1;
+ } else if (ret == 1) {
+ LOG(WARNING) << "meta tmp rowset key may be loss, num_scanned=" <<
num_scanned
+ << ", num_loss=" << num_loss;
+ }
+ LOG(INFO) << "meta tmp rowset key check finish, num_scanned=" <<
num_scanned
+ << ", num_loss=" << num_loss;
+
+ return ret;
+}
+
} // namespace doris::cloud
diff --git a/cloud/src/recycler/checker.h b/cloud/src/recycler/checker.h
index 5993aa0b5de..e098ab27e95 100644
--- a/cloud/src/recycler/checker.h
+++ b/cloud/src/recycler/checker.h
@@ -113,6 +113,16 @@ public:
int do_txn_key_check();
+ // check table and partition version key
+ // table version should be greater than the versions of all its partitions
+ // Return 0 if success, otherwise error
+ int do_version_key_check();
+
+ // Return 0 if success.
+ // Return 1 if meta rowset key leak or loss is identified.
+ // Return negative if a temporary error occurred during the check process.
+ int do_meta_rowset_key_check();
+
// If there are multiple buckets, return the minimum lifecycle; if there
are no buckets (i.e.
// all accessors are HdfsAccessor), return INT64_MAX.
// Return 0 if success, otherwise error
@@ -181,6 +191,19 @@ private:
int check_txn_running_key(std::string_view key, std::string_view value);
+ // Only check whether the meta rowset key is leak
+ // in do_inverted_check() function, check whether the key is lost by
comparing data file with key
+ // Return 0 if success.
+ // Return 1 if meta rowset key leak is identified.
+ // Return negative if a temporary error occurred during the check process.
+ int check_meta_rowset_key(std::string_view key, std::string_view value);
+
+ // if TxnInfoKey's finish time > current time, it should not find tmp
rowset
+ // Return 0 if success.
+ // Return 1 if meta tmp rowset key is abnormal.
+ // Return negative if a temporary error occurred during the check process.
+ int check_meta_tmp_rowset_key(std::string_view key, std::string_view
value);
+
/**
* It is used to scan the key in the range from start_key to end_key
* and then perform handle operations on each group of kv
diff --git a/cloud/src/recycler/meta_checker.cpp
b/cloud/src/recycler/meta_checker.cpp
index 78d65fe0eea..2f60e1fa57e 100644
--- a/cloud/src/recycler/meta_checker.cpp
+++ b/cloud/src/recycler/meta_checker.cpp
@@ -387,7 +387,7 @@ bool
MetaChecker::handle_check_fe_meta_by_fdb<CHECK_META>(MYSQL* conn) {
bool MetaChecker::check_fe_meta_by_fdb(MYSQL* conn) {
bool success = true;
- if (config::enable_checker_for_meta_key_check) {
+ if (config::enable_meta_key_check) {
success = handle_check_fe_meta_by_fdb<CHECK_META>(conn);
}
@@ -624,7 +624,7 @@ bool
MetaChecker::handle_check_fdb_by_fe_meta<CHECK_META>(MYSQL* conn) {
bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
bool success = true;
- if (config::enable_checker_for_meta_key_check) {
+ if (config::enable_meta_key_check) {
success = handle_check_fdb_by_fe_meta<CHECK_META>(conn);
}
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index c02925b9496..1634984dd48 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -4402,6 +4402,186 @@ TEST(CheckerTest,
delete_bitmap_storage_optimize_v2_check_abnormal) {
ASSERT_EQ(checker.do_delete_bitmap_storage_optimize_check(2), 1);
ASSERT_EQ(expected_abnormal_rowsets, real_abnormal_rowsets);
}
+TEST(CheckerTest, meta_rowset_key_check_normal) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ auto obj_info = instance.add_obj_info();
+ obj_info->set_id("1");
+
+ InstanceChecker checker(txn_kv, instance_id);
+ ASSERT_EQ(checker.init(instance), 0);
+
+ int64_t tablet_id = 10001;
+ int64_t version = 1;
+ int64_t index_id = 1001;
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ TabletIndexPB tablet_index;
+ tablet_index.set_index_id(index_id);
+ tablet_index.set_tablet_id(tablet_id);
+ std::string tablet_index_key = meta_tablet_idx_key({instance_id,
tablet_id});
+ txn->put(tablet_index_key, tablet_index.SerializeAsString());
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ doris::RowsetMetaCloudPB rowset_pb;
+ rowset_pb.set_rowset_id(0);
+ rowset_pb.set_rowset_id_v2("rowset_id_1");
+ rowset_pb.set_tablet_id(tablet_id);
+ rowset_pb.set_resource_id("resource_id");
+
+ std::unique_ptr<Transaction> txn2;
+ ASSERT_EQ(txn_kv->create_txn(&txn2), TxnErrorCode::TXN_OK);
+ std::string rowset_key = meta_rowset_key({instance_id, tablet_id,
version});
+ txn2->put(rowset_key, rowset_pb.SerializeAsString());
+ ASSERT_EQ(txn2->commit(), TxnErrorCode::TXN_OK);
+
+ std::string key = meta_rowset_key({instance_id, tablet_id, version});
+ std::string val;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(checker.check_meta_rowset_key(key, val), 0);
+}
+
+TEST(CheckerTest, meta_rowset_key_check_abnormal) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ auto obj_info = instance.add_obj_info();
+ obj_info->set_id("1");
+
+ InstanceChecker checker(txn_kv, instance_id);
+ ASSERT_EQ(checker.init(instance), 0);
+
+ int64_t version = 1;
+ int64_t non_existent_tablet_id = 10002;
+
+ doris::RowsetMetaCloudPB rowset_pb;
+ rowset_pb.set_rowset_id(0);
+ rowset_pb.set_rowset_id_v2("rowset_id_1");
+ rowset_pb.set_tablet_id(non_existent_tablet_id);
+ rowset_pb.set_resource_id("resource_id");
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string rowset_key = meta_rowset_key({instance_id,
non_existent_tablet_id, version});
+ txn->put(rowset_key, rowset_pb.SerializeAsString());
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ std::string key = meta_rowset_key({instance_id, non_existent_tablet_id,
version});
+ std::string val;
+ std::unique_ptr<Transaction> txn1;
+ ASSERT_EQ(txn_kv->create_txn(&txn1), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn1->get(key, &val), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(checker.check_meta_rowset_key(key, val), 1);
+}
+
+TEST(CheckerTest, meta_tmp_rowset_key_check_normal) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ auto obj_info = instance.add_obj_info();
+ obj_info->set_id("1");
+
+ InstanceChecker checker(txn_kv, instance_id);
+ ASSERT_EQ(checker.init(instance), 0);
+
+ int64_t db_id = 1000;
+ int64_t txn_id = 2000;
+ int64_t tablet_id = 3000;
+
+ std::unique_ptr<Transaction> txn1;
+ ASSERT_EQ(txn_kv->create_txn(&txn1), TxnErrorCode::TXN_OK);
+ TxnIndexPB txn_index_pb;
+ auto* tablet_index = txn_index_pb.mutable_tablet_index();
+ tablet_index->set_tablet_id(tablet_id);
+ std::string txn_index_key_str = txn_index_key({instance_id, txn_id});
+ txn1->put(txn_index_key_str, txn_index_pb.SerializeAsString());
+ ASSERT_EQ(txn1->commit(), TxnErrorCode::TXN_OK);
+
+ std::unique_ptr<Transaction> txn2;
+ ASSERT_EQ(txn_kv->create_txn(&txn2), TxnErrorCode::TXN_OK);
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_status(TxnStatusPB::TXN_STATUS_PREPARED);
+ std::string txn_info_key_str = txn_info_key({instance_id, db_id, txn_id});
+ txn2->put(txn_info_key_str, txn_info_pb.SerializeAsString());
+ ASSERT_EQ(txn2->commit(), TxnErrorCode::TXN_OK);
+
+ std::unique_ptr<Transaction> txn3;
+ ASSERT_EQ(txn_kv->create_txn(&txn3), TxnErrorCode::TXN_OK);
+ doris::RowsetMetaCloudPB rowset_pb;
+ rowset_pb.set_rowset_id(0);
+ rowset_pb.set_rowset_id_v2("rowset_id_1");
+ rowset_pb.set_tablet_id(tablet_id);
+ std::string meta_tmp_rowset_key_str = meta_rowset_tmp_key({instance_id,
txn_id, tablet_id});
+ txn3->put(meta_tmp_rowset_key_str, rowset_pb.SerializeAsString());
+ ASSERT_EQ(txn3->commit(), TxnErrorCode::TXN_OK);
+
+ std::string key = txn_info_key({instance_id, db_id, txn_id});
+ std::string val;
+ std::unique_ptr<Transaction> txn4;
+ ASSERT_EQ(txn_kv->create_txn(&txn4), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn4->get(key, &val), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(checker.check_meta_tmp_rowset_key(key, val), 0);
+}
+
+TEST(CheckerTest, meta_tmp_rowset_key_check_abnormal) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ auto obj_info = instance.add_obj_info();
+ obj_info->set_id("1");
+
+ InstanceChecker checker(txn_kv, instance_id);
+ ASSERT_EQ(checker.init(instance), 0);
+
+ constexpr int64_t db_id = 1000;
+ constexpr int64_t txn_id = 2000;
+ constexpr int64_t tablet_id = 3000;
+
+ std::unique_ptr<Transaction> txn1;
+ ASSERT_EQ(txn_kv->create_txn(&txn1), TxnErrorCode::TXN_OK);
+ TxnIndexPB txn_index_pb;
+ auto* tablet_index = txn_index_pb.mutable_tablet_index();
+ tablet_index->set_tablet_id(tablet_id);
+ std::string txn_index_key_str = txn_index_key({instance_id, txn_id});
+ txn1->put(txn_index_key_str, txn_index_pb.SerializeAsString());
+ ASSERT_EQ(txn1->commit(), TxnErrorCode::TXN_OK);
+
+ std::unique_ptr<Transaction> txn2;
+ ASSERT_EQ(txn_kv->create_txn(&txn2), TxnErrorCode::TXN_OK);
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);
+ std::string txn_info_key_str = txn_info_key({instance_id, db_id, txn_id});
+ txn2->put(txn_info_key_str, txn_info_pb.SerializeAsString());
+ ASSERT_EQ(txn2->commit(), TxnErrorCode::TXN_OK);
+
+ std::unique_ptr<Transaction> txn3;
+ ASSERT_EQ(txn_kv->create_txn(&txn3), TxnErrorCode::TXN_OK);
+ doris::RowsetMetaCloudPB rowset_pb;
+ rowset_pb.set_rowset_id(0);
+ rowset_pb.set_rowset_id_v2("rowset_id_1");
+ rowset_pb.set_tablet_id(tablet_id);
+ std::string meta_tmp_rowset_key_str = meta_rowset_tmp_key({instance_id,
txn_id, tablet_id});
+ txn3->put(meta_tmp_rowset_key_str, rowset_pb.SerializeAsString());
+ ASSERT_EQ(txn3->commit(), TxnErrorCode::TXN_OK);
+
+ std::string key = txn_info_key({instance_id, db_id, txn_id});
+ std::string val;
+ std::unique_ptr<Transaction> txn4;
+ ASSERT_EQ(txn_kv->create_txn(&txn4), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn4->get(key, &val), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(checker.check_meta_tmp_rowset_key(key, val), 1);
+}
TEST(CheckerTest, tablet_stats_key_check_leak) {
auto txn_kv = std::make_shared<MemTxnKv>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]