This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c2b48d08764 [feat](checker) Add txn key consistency checking for
checker (#54620)
c2b48d08764 is described below
commit c2b48d08764aeed83b6ea932e0c2f934c70de9f6
Author: Uniqueyou <[email protected]>
AuthorDate: Fri Sep 12 14:56:52 2025 +0800
[feat](checker) Add txn key consistency checking for checker (#54620)
1. check txn label key
2. check txn index key
3. check txn info key
4. check txn running key
---
cloud/src/common/config.h | 1 +
cloud/src/meta-store/keys.cpp | 2 +-
cloud/src/recycler/checker.cpp | 261 ++++++++++++++++++++++++++-
cloud/src/recycler/checker.h | 9 +
cloud/test/recycler_test.cpp | 391 +++++++++++++++++++++++++++++++++++++++++
5 files changed, 662 insertions(+), 2 deletions(-)
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 13f6ec0a72f..9d8fd6234ca 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -121,6 +121,7 @@ CONF_mBool(enable_mow_job_key_check, "false");
CONF_mBool(enable_restore_job_check, "false");
CONF_mBool(enable_tablet_stats_key_check, "false");
+CONF_mBool(enable_txn_key_check, "false");
CONF_mBool(enable_checker_for_meta_key_check, "false");
CONF_mInt64(mow_job_key_check_expiration_diff_seconds, "600"); // 10min
diff --git a/cloud/src/meta-store/keys.cpp b/cloud/src/meta-store/keys.cpp
index ce334c254d3..cbc6a405f45 100644
--- a/cloud/src/meta-store/keys.cpp
+++ b/cloud/src/meta-store/keys.cpp
@@ -222,7 +222,7 @@ std::string txn_key_prefix(std::string_view instance_id) {
void txn_label_key(const TxnLabelKeyInfo& in, std::string* out) {
encode_prefix(in, out); // 0x01 "txn" ${instance_id}
- encode_bytes(TXN_KEY_INFIX_LABEL, out); // "txn_index"
+ encode_bytes(TXN_KEY_INFIX_LABEL, out); // "txn_label"
encode_int64(std::get<1>(in), out); // db_id
encode_bytes(std::get<2>(in), out); // label
}
diff --git a/cloud/src/recycler/checker.cpp b/cloud/src/recycler/checker.cpp
index f172eee779a..254b0e4ef21 100644
--- a/cloud/src/recycler/checker.cpp
+++ b/cloud/src/recycler/checker.cpp
@@ -29,6 +29,7 @@
#include <algorithm>
#include <chrono>
#include <cstdint>
+#include <functional>
#include <memory>
#include <mutex>
#include <numeric>
@@ -70,6 +71,8 @@ extern std::vector<std::string> recycle_blacklist;
extern bool enable_inverted_check;
} // namespace config
+using namespace std::chrono;
+
Checker::Checker(std::shared_ptr<TxnKv> txn_kv) : txn_kv_(std::move(txn_kv)) {
ip_port_ = std::string(butil::my_ip_cstr()) + ":" +
std::to_string(config::brpc_listen_port);
}
@@ -212,6 +215,12 @@ int Checker::start() {
}
}
+ if (config::enable_txn_key_check) {
+ if (int ret = checker->do_txn_key_check(); ret != 0) {
+ success = false;
+ }
+ }
+
if (config::enable_delete_bitmap_storage_optimize_v2_check) {
if (int ret =
checker->do_delete_bitmap_storage_optimize_check(2 /*version*/);
ret != 0) {
@@ -281,7 +290,6 @@ void Checker::lease_check_jobs() {
}
}
}
-
#define LOG_CHECK_INTERVAL_ALARM LOG(WARNING) << "Err for check interval: "
void Checker::do_inspect(const InstanceInfoPB& instance) {
std::string check_job_key = job_check_key({instance.instance_id()});
@@ -2150,4 +2158,255 @@ int InstanceChecker::do_restore_job_check() {
return 0;
}
+int InstanceChecker::check_txn_info_key(std::string_view key, std::string_view
value) {
+ std::unordered_map<int64_t, std::string> txn_info_;
+ TxnLabelPB txn_label_pb;
+
+ auto handle_check_txn_label_key = [&](std::string_view key,
std::string_view value) -> int {
+ TxnInfoPB txn_info_pb;
+ std::string_view k1 = key;
+ k1.remove_prefix(1);
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>>
out;
+ decode_key(&k1, &out);
+ // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
+ if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
+ LOG(WARNING) << "failed to parse TxnInfoPB";
+ return -1;
+ }
+ auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
+ auto it = txn_info_.find(txn_id);
+ if (it == txn_info_.end()) {
+ return 0;
+ } else {
+ if (it->second != txn_info_pb.label()) {
+ LOG(WARNING) << "txn_info_pb's txn_label not same with
txn_label_pb's txn_label,"
+ << " txn_info_pb's txn_label: " <<
txn_info_pb.label()
+ << " txn_label_pb meta: " <<
txn_label_pb.ShortDebugString();
+ return 1;
+ }
+ }
+ return 0;
+ };
+ std::string_view k1 = key;
+ k1.remove_prefix(1);
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+ decode_key(&k1, &out);
+ // 0x01 "txn" ${instance_id} "txn_label" ${db_id} ${label}
+ if (!txn_label_pb.ParseFromArray(value.data(), value.size() -
VERSION_STAMP_LEN)) {
+ LOG(WARNING) << "failed to parse TxnLabelPB";
+ return -1;
+ }
+ auto db_id = std::get<int64_t>(std::get<0>(out[3]));
+ auto label = std::get<std::string>(std::get<0>(out[4]));
+ // txn_id -> txn_label
+ for (const auto& txn_id : txn_label_pb.txn_ids()) {
+ txn_info_.insert({txn_id, label});
+ }
+ std::string txn_info_key_begin = txn_info_key({instance_id_, db_id, 0});
+ std::string txn_info_key_end = txn_info_key({instance_id_, db_id,
INT64_MAX});
+ return scan_and_handle_kv(txn_info_key_begin, txn_info_key_end,
+ [&](std::string_view k, std::string_view v) ->
int {
+ return handle_check_txn_label_key(k, v);
+ });
+}
+
+int InstanceChecker::check_txn_label_key(std::string_view key,
std::string_view value) {
+ TxnInfoPB txn_info_pb;
+ std::string_view k1 = key;
+ k1.remove_prefix(1);
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+ decode_key(&k1, &out);
+ // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
+ if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
+ LOG(WARNING) << "failed to parse TxnInfoPB";
+ return -1;
+ }
+ auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
+ auto db_id = std::get<int64_t>(std::get<0>(out[3]));
+ auto label = txn_info_pb.label();
+ std::string txn_label = txn_label_key({instance_id_, db_id, label});
+ std::string txn_label_val;
+ TxnLabelPB txn_label_pb;
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to init txn";
+ return -1;
+ }
+ if (txn->get(txn_label, &txn_label_val) != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to get txn label key, key=" << hex(txn_label);
+ return -1;
+ }
+ txn_label_pb.ParseFromString(txn_label_val);
+ auto txn_ids = txn_label_pb.txn_ids();
+ if (!std::count(txn_ids.begin(), txn_ids.end(), txn_id)) {
+ // clang-format off txn_info_pb
+ LOG(WARNING) << "txn_info_pb's txn_id not found in txn_label_pb info,"
+ << " txn_id: " << txn_id
+ << " txn_label_pb meta: " <<
txn_label_pb.ShortDebugString();
+ // clang-format on
+ return 1;
+ }
+ return 0;
+}
+
+int InstanceChecker::check_txn_index_key(std::string_view key,
std::string_view value) {
+ TxnInfoPB txn_info_pb;
+ std::string_view k1 = key;
+ k1.remove_prefix(1);
+ std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
+ decode_key(&k1, &out);
+ // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
+ if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
+ LOG(WARNING) << "failed to parse TxnInfoPB";
+ return -1;
+ }
+ auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
+ auto db_id = std::get<int64_t>(std::get<0>(out[3]));
+ std::string txn_index = txn_index_key({instance_id_, txn_id});
+ std::string txn_index_val;
+ TxnIndexPB txn_index_pb;
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to init txn";
+ return -1;
+ }
+ if (txn->get(txn_index, &txn_index_val) != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to get txn label key, key=" << hex(txn_index);
+ return -1;
+ }
+ txn_index_pb.ParseFromString(txn_index_val);
+ if (txn_index_pb.tablet_index().db_id() != db_id) {
+ // clang-format off txn_info_pb
+ LOG(WARNING) << "txn_index_pb's db_id not same with txn_info_pb's
db_id,"
+ << " txn_index_pb meta: " <<
txn_index_pb.ShortDebugString()
+ << " txn_info_pb meta: " <<
txn_info_pb.ShortDebugString();
+ // clang-format on
+ return 1;
+ }
+ return 0;
+}
+
+int InstanceChecker::check_txn_running_key(std::string_view key,
std::string_view value) {
+ TxnRunningPB txn_running_pb;
+ int64_t current_time =
+
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+ if (!txn_running_pb.ParseFromArray(value.data(), value.size())) {
+ LOG(WARNING) << "failed to parse TxnRunningPB";
+ return -1;
+ }
+ if (txn_running_pb.timeout_time() <= current_time) {
+ LOG(WARNING) << "txn_running_pb.timeout_time() is less than
current_time,"
+ << " but txn_running_key exists, "
+ << " txn_running_pb meta: " <<
txn_running_pb.ShortDebugString();
+ return 1;
+ }
+ return 0;
+}
+
+int InstanceChecker::do_txn_key_check() {
+ int ret = 0;
+
+ // check txn info key depend on txn label key
+ std::string begin = txn_label_key({instance_id_, 0, ""});
+ std::string end = txn_label_key({instance_id_, INT64_MAX, ""});
+ int64_t num_scanned = 0;
+ int64_t num_abnormal = 0;
+ LOG(INFO) << "begin check txn_label_key and txn_info_key";
+ ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k,
std::string_view v) -> int {
+ num_scanned++;
+ int ret = check_txn_info_key(k, v);
+ if (ret == 1) {
+ num_abnormal++;
+ }
+ return ret;
+ });
+
+ if (ret == 1) {
+ LOG(WARNING) << "failed to check txn_info_key depending on
txn_label_key, num_scanned="
+ << num_scanned << ", num_abnormal=" << num_abnormal;
+ return 1;
+ } else if (ret == -1) {
+ LOG(WARNING) << "failed to check txn label key and txn info key";
+ return -1;
+ }
+
+ // check txn label key depend on txn info key
+ begin = txn_info_key({instance_id_, 0, 0});
+ end = txn_info_key({instance_id_, INT64_MAX, 0});
+ num_scanned = 0;
+ num_abnormal = 0;
+ LOG(INFO) << "begin check txn_label_key and txn_info_key";
+ ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k,
std::string_view v) -> int {
+ num_scanned++;
+ int ret = check_txn_label_key(k, v);
+ if (ret == 1) {
+ num_abnormal++;
+ }
+ return ret;
+ });
+ if (ret == 1) {
+ LOG(WARNING) << "failed to check txn_label_key depending on
txn_info_key, num_scanned="
+ << num_scanned << ", num_abnormal=" << num_abnormal;
+ return 1;
+ } else if (ret == -1) {
+ LOG(WARNING) << "failed to inverted check txn label key and txn info
key";
+ return -1;
+ }
+ LOG(INFO) << "finish check txn_label_key and txn_info_key, num_scanned="
<< num_scanned
+ << ", num_abnormal=" << num_abnormal;
+
+ // check txn index key depend on txn info key
+ begin = txn_info_key({instance_id_, 0, 0});
+ end = txn_info_key({instance_id_, INT64_MAX, 0});
+ num_scanned = 0;
+ num_abnormal = 0;
+ LOG(INFO) << "begin check txn_index_key and txn_info_key";
+ ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k,
std::string_view v) -> int {
+ num_scanned++;
+ int ret = check_txn_index_key(k, v);
+ if (ret == 1) {
+ num_abnormal++;
+ }
+ return ret;
+ });
+ if (ret == 1) {
+ LOG(WARNING) << "failed to check txn_idx_key depending on
txn_info_key, num_scanned="
+ << num_scanned << ", num_abnormal=" << num_abnormal;
+ return 1;
+ } else if (ret == -1) {
+ LOG(WARNING) << "failed to check txn index key";
+ return -1;
+ }
+ LOG(INFO) << "finish check txn_index_key and txn_info_key, num_scanned="
<< num_scanned
+ << ", num_abnormal=" << num_abnormal;
+
+ // check txn running key
+ begin = txn_running_key({instance_id_, 0, 0});
+ end = txn_running_key({instance_id_, INT64_MAX, 0});
+ num_scanned = 0;
+ num_abnormal = 0;
+ LOG(INFO) << "begin check txn_running_key";
+ ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k,
std::string_view v) -> int {
+ num_scanned++;
+ int ret = check_txn_running_key(k, v);
+ if (ret == 1) {
+ num_abnormal++;
+ }
+ return ret;
+ });
+ if (ret == 1) {
+ LOG(WARNING) << "failed to check txn_running_key, num_scanned=" <<
num_scanned
+ << ", num_abnormal=" << num_abnormal;
+ return 1;
+ } else if (ret == -1) {
+ LOG(WARNING) << "failed to check txn running key";
+ return -1;
+ }
+ LOG(INFO) << "finish check txn_running_key, num_scanned=" << num_scanned
+ << ", num_abnormal=" << num_abnormal;
+ return 0;
+}
+
} // namespace doris::cloud
diff --git a/cloud/src/recycler/checker.h b/cloud/src/recycler/checker.h
index 39c990d8831..5993aa0b5de 100644
--- a/cloud/src/recycler/checker.h
+++ b/cloud/src/recycler/checker.h
@@ -111,6 +111,8 @@ public:
int do_restore_job_check();
+ int do_txn_key_check();
+
// If there are multiple buckets, return the minimum lifecycle; if there
are no buckets (i.e.
// all accessors are HdfsAccessor), return INT64_MAX.
// Return 0 if success, otherwise error
@@ -171,6 +173,13 @@ private:
// Return 1 if key leak is identified.
// Return negative if a temporary error occurred during the check process.
int check_stats_tablet_key_leaked(std::string_view key, std::string_view
value);
+ int check_txn_info_key(std::string_view key, std::string_view value);
+
+ int check_txn_label_key(std::string_view key, std::string_view value);
+
+ int check_txn_index_key(std::string_view key, std::string_view value);
+
+ int check_txn_running_key(std::string_view key, std::string_view value);
/**
* It is used to scan the key in the range from start_key to end_key
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 026a4cc3897..1e7a1b918e2 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -6785,4 +6785,395 @@ TEST(RecyclerTest, recycle_restore_job_complete_state) {
ASSERT_EQ(recycler.recycle_restore_jobs(), 0);
}
+TEST(CheckerTest, check_txn_info_key) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ auto obj_info = instance.add_obj_info();
+ obj_info->set_id("1");
+
+ InstanceChecker checker(txn_kv, instance_id);
+ ASSERT_EQ(checker.init(instance), 0);
+
+ int64_t db_id = 1001;
+ std::string label = "test_label";
+ std::vector<int64_t> txn_ids = {2001, 2002, 2003};
+
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ std::string label_key, label_val;
+ txn_label_key({instance_id, db_id, label}, &label_key);
+
+ TxnLabelPB txn_label_pb;
+ for (auto txn_id : txn_ids) {
+ txn_label_pb.add_txn_ids(txn_id);
+ }
+
+ std::string serialized_label;
+ ASSERT_TRUE(txn_label_pb.SerializeToString(&serialized_label));
+
+ uint32_t offset = serialized_label.size();
+ serialized_label.append(10, '\x00'); // 10 bytes for versionstamp
+ serialized_label.append((const char*)&offset, 4);
+ MemTxnKv::gen_version_timestamp(123456790, 0, &serialized_label);
+
+ txn->put(label_key, serialized_label);
+
+ for (auto txn_id : txn_ids) {
+ std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_label(label);
+ std::string info_val;
+ ASSERT_TRUE(txn_info_pb.SerializeToString(&info_val));
+ txn->put(info_key, info_val);
+ }
+
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ {
+ std::string label_key = txn_label_key({instance_id, db_id, label});
+ std::string label_val;
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(label_key, &label_val), TxnErrorCode::TXN_OK);
+
+ int ret = checker.check_txn_info_key(label_key, label_val);
+ ASSERT_EQ(ret, 0);
+ }
+
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ int64_t mismatch_txn_id = txn_ids[0];
+ std::string info_key = txn_info_key({instance_id, db_id,
mismatch_txn_id});
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_label("mismatched_label");
+ std::string info_val;
+ ASSERT_TRUE(txn_info_pb.SerializeToString(&info_val));
+ txn->put(info_key, info_val);
+
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ std::string label_key = txn_label_key({instance_id, db_id, label});
+ std::string label_val;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(label_key, &label_val), TxnErrorCode::TXN_OK);
+
+ int ret = checker.check_txn_info_key(label_key, label_val);
+ ASSERT_EQ(ret, 1);
+ }
+
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ std::string label_key = txn_label_key({instance_id, db_id,
"invalid_label"});
+ std::string invalid_val = "invalid_protobuf_data";
+
+ uint32_t offset = invalid_val.size();
+ invalid_val.append(10, '\x00');
+ invalid_val.append((const char*)&offset, 4);
+
+ txn->put(label_key, invalid_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ int ret = checker.check_txn_info_key(label_key, invalid_val);
+ ASSERT_EQ(ret, -1);
+ }
+}
+
+TEST(CheckerTest, check_txn_label_key) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ auto obj_info = instance.add_obj_info();
+ obj_info->set_id("1");
+
+ InstanceChecker checker(txn_kv, instance_id);
+ ASSERT_EQ(checker.init(instance), 0);
+
+ int64_t db_id = 1001;
+ std::string label = "test_label";
+ std::vector<int64_t> txn_ids = {2001, 2002, 2003};
+
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ std::string label_key = txn_label_key({instance_id, db_id, label});
+
+ TxnLabelPB txn_label_pb;
+ for (auto txn_id : txn_ids) {
+ txn_label_pb.add_txn_ids(txn_id);
+ }
+
+ std::string label_val;
+ ASSERT_TRUE(txn_label_pb.SerializeToString(&label_val));
+ txn->put(label_key, label_val);
+
+ for (auto txn_id : txn_ids) {
+ std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_label(label);
+ std::string info_val;
+ ASSERT_TRUE(txn_info_pb.SerializeToString(&info_val));
+ txn->put(info_key, info_val);
+ }
+
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ {
+ for (auto txn_id : txn_ids) {
+ std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+ std::string info_val;
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_OK);
+
+ int ret = checker.check_txn_label_key(info_key, info_val);
+ ASSERT_EQ(ret, 0);
+ }
+ }
+
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ int64_t missing_txn_id = 2004;
+ std::string info_key = txn_info_key({instance_id, db_id,
missing_txn_id});
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_label(label);
+ std::string info_val;
+ ASSERT_TRUE(txn_info_pb.SerializeToString(&info_val));
+ txn->put(info_key, info_val);
+
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ int ret = checker.check_txn_label_key(info_key, info_val);
+ ASSERT_EQ(ret, 1);
+ }
+
+ {
+ std::string invalid_key = txn_info_key({instance_id, db_id, 9999});
+ std::string invalid_val = "invalid_protobuf_data";
+
+ int ret = checker.check_txn_label_key(invalid_key, invalid_val);
+ ASSERT_EQ(ret, -1);
+ }
+
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ std::string non_existent_label = "non_existent_label";
+ int64_t txn_id = 3001;
+ std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_label(non_existent_label);
+ std::string info_val;
+ ASSERT_TRUE(txn_info_pb.SerializeToString(&info_val));
+ txn->put(info_key, info_val);
+
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ int ret = checker.check_txn_label_key(info_key, info_val);
+ ASSERT_EQ(ret, -1);
+ }
+}
+
+TEST(CheckerTest, check_txn_index_key) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ auto obj_info = instance.add_obj_info();
+ obj_info->set_id("1");
+
+ InstanceChecker checker(txn_kv, instance_id);
+ ASSERT_EQ(checker.init(instance), 0);
+
+ int64_t db_id = 1001;
+ int64_t txn_id = 2001;
+
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_label("test_label");
+ std::string info_val;
+ ASSERT_TRUE(txn_info_pb.SerializeToString(&info_val));
+ txn->put(info_key, info_val);
+
+ std::string index_key = txn_index_key({instance_id, txn_id});
+ TxnIndexPB txn_index_pb;
+ TabletIndexPB* tablet_index = txn_index_pb.mutable_tablet_index();
+ tablet_index->set_db_id(db_id);
+ std::string index_val;
+ ASSERT_TRUE(txn_index_pb.SerializeToString(&index_val));
+ txn->put(index_key, index_val);
+
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ {
+ std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+ std::string info_val;
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_OK);
+
+ int ret = checker.check_txn_index_key(info_key, info_val);
+ ASSERT_EQ(ret, 0);
+ }
+
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ int64_t different_db_id = 1002;
+ int64_t new_txn_id = 2002;
+
+ std::string info_key = txn_info_key({instance_id, db_id, new_txn_id});
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_label("test_label_2");
+ std::string info_val;
+ ASSERT_TRUE(txn_info_pb.SerializeToString(&info_val));
+ txn->put(info_key, info_val);
+
+ std::string index_key = txn_index_key({instance_id, new_txn_id});
+ TxnIndexPB txn_index_pb;
+ TabletIndexPB* tablet_index = txn_index_pb.mutable_tablet_index();
+ tablet_index->set_db_id(different_db_id);
+ std::string index_val;
+ ASSERT_TRUE(txn_index_pb.SerializeToString(&index_val));
+ txn->put(index_key, index_val);
+
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ int ret = checker.check_txn_index_key(info_key, info_val);
+ ASSERT_EQ(ret, 1);
+ }
+
+ {
+ std::string invalid_key = txn_info_key({instance_id, db_id, 9999});
+ std::string invalid_val = "invalid_protobuf_data";
+
+ int ret = checker.check_txn_index_key(invalid_key, invalid_val);
+ ASSERT_EQ(ret, -1);
+ }
+
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ int64_t non_existent_txn_id = 3001;
+ std::string info_key = txn_info_key({instance_id, db_id,
non_existent_txn_id});
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_label("test_label_3");
+ std::string info_val;
+ ASSERT_TRUE(txn_info_pb.SerializeToString(&info_val));
+ txn->put(info_key, info_val);
+
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ int ret = checker.check_txn_index_key(info_key, info_val);
+ ASSERT_EQ(ret, -1);
+ }
+}
+
+TEST(CheckerTest, check_txn_running_key) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ auto obj_info = instance.add_obj_info();
+ obj_info->set_id("1");
+
+ InstanceChecker checker(txn_kv, instance_id);
+ ASSERT_EQ(checker.init(instance), 0);
+
+ int64_t db_id = 1001;
+ int64_t txn_id = 2001;
+
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ std::string running_key = txn_running_key({instance_id, db_id,
txn_id});
+ TxnRunningPB txn_running_pb;
+
+ int64_t current_time =
std::chrono::duration_cast<std::chrono::milliseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ txn_running_pb.set_timeout_time(current_time + 3600000);
+
+ std::string running_val;
+ ASSERT_TRUE(txn_running_pb.SerializeToString(&running_val));
+ txn->put(running_key, running_val);
+
+ std::string expired_key = txn_running_key({instance_id, db_id, txn_id
+ 1});
+ TxnRunningPB expired_running_pb;
+ expired_running_pb.set_timeout_time(current_time - 3600000);
+
+ std::string expired_val;
+ ASSERT_TRUE(expired_running_pb.SerializeToString(&expired_val));
+ txn->put(expired_key, expired_val);
+
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ }
+
+ {
+ std::string running_key = txn_running_key({instance_id, db_id,
txn_id});
+ std::string running_val;
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(running_key, &running_val), TxnErrorCode::TXN_OK);
+
+ int ret = checker.check_txn_running_key(running_key, running_val);
+ ASSERT_EQ(ret, 0);
+ }
+
+ {
+ std::string expired_key = txn_running_key({instance_id, db_id, txn_id
+ 1});
+ std::string expired_val;
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(expired_key, &expired_val), TxnErrorCode::TXN_OK);
+
+ int ret = checker.check_txn_running_key(expired_key, expired_val);
+ ASSERT_EQ(ret, 1);
+ }
+
+ {
+ std::string invalid_key = txn_running_key({instance_id, db_id, txn_id
+ 2});
+ std::string invalid_val = "invalid_protobuf_data";
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(invalid_key, invalid_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ std::unique_ptr<Transaction> read_txn;
+ ASSERT_EQ(txn_kv->create_txn(&read_txn), TxnErrorCode::TXN_OK);
+ std::string val;
+ ASSERT_EQ(read_txn->get(invalid_key, &val), TxnErrorCode::TXN_OK);
+
+ int ret = checker.check_txn_running_key(invalid_key, val);
+ ASSERT_EQ(ret, -1);
+ }
+}
+
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]