This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bc3380f6cdc [feat](cloud) Repair tablet indexes versioned write
(#54772)
bc3380f6cdc is described below
commit bc3380f6cdc0f0ec28996fa049a3e89419cd1a16
Author: walter <[email protected]>
AuthorDate: Fri Aug 15 16:24:53 2025 +0800
[feat](cloud) Repair tablet indexes versioned write (#54772)
---
cloud/src/meta-service/meta_service_txn.cpp | 23 ++++-
cloud/test/txn_lazy_commit_test.cpp | 144 +++++++++++++++++++++++++++-
2 files changed, 160 insertions(+), 7 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 83f1bf909df..c0228c95315 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1604,7 +1604,8 @@ void MetaServiceImpl::commit_txn_immediately(
void repair_tablet_index(
std::shared_ptr<TxnKv>& txn_kv, MetaServiceCode& code, std::string&
msg,
const std::string& instance_id, int64_t db_id, int64_t txn_id,
- const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>&
tmp_rowsets_meta) {
+ const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>&
tmp_rowsets_meta,
+ bool is_versioned_write) {
std::stringstream ss;
std::vector<std::string> tablet_idx_keys;
for (auto& [_, i] : tmp_rowsets_meta) {
@@ -1673,9 +1674,24 @@ void repair_tablet_index(
return;
}
txn->put(sub_tablet_idx_keys[j], idx_val);
- LOG(INFO) << " repaire tablet index txn_id=" << txn_id
+ LOG(INFO) << " repair tablet index txn_id=" << txn_id
<< " tablet_idx_pb:" <<
tablet_idx_pb.ShortDebugString()
<< " key=" << hex(sub_tablet_idx_keys[j]);
+ if (is_versioned_write) {
+ std::string versioned_tablet_idx_key =
+ versioned::tablet_index_key({instance_id,
tablet_idx_pb.tablet_id()});
+ std::string versioned_tablet_inverted_idx_key =
+ versioned::tablet_inverted_index_key(
+ {instance_id, db_id,
tablet_idx_pb.table_id(),
+ tablet_idx_pb.index_id(),
tablet_idx_pb.partition_id(),
+ tablet_idx_pb.tablet_id()});
+ txn->put(versioned_tablet_idx_key, idx_val);
+ txn->put(versioned_tablet_inverted_idx_key, "");
+ LOG(INFO) << "repair tablet index and inverted index,
txn_id=" << txn_id
+ << " tablet_id=" << tablet_idx_pb.tablet_id()
+ << " index_key=" << hex(versioned_tablet_idx_key)
+ << " inverted_index_key=" <<
hex(versioned_tablet_inverted_idx_key);
+ }
}
}
@@ -1767,7 +1783,8 @@ void MetaServiceImpl::commit_txn_eventually(
stats.get_bytes += txn->get_bytes();
stats.get_counter += txn->num_get_keys();
txn.reset();
- repair_tablet_index(txn_kv_, code, msg, instance_id, db_id,
txn_id, tmp_rowsets_meta);
+ repair_tablet_index(txn_kv_, code, msg, instance_id, db_id,
txn_id, tmp_rowsets_meta,
+ is_versioned_write);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "repair_tablet_index failed, txn_id=" <<
txn_id << " code=" << code;
return;
diff --git a/cloud/test/txn_lazy_commit_test.cpp
b/cloud/test/txn_lazy_commit_test.cpp
index 0b6b04752d4..cc4da09d575 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -54,7 +54,8 @@ namespace doris::cloud {
void repair_tablet_index(
std::shared_ptr<TxnKv>& txn_kv, MetaServiceCode& code, std::string&
msg,
const std::string& instance_id, int64_t db_id, int64_t txn_id,
- const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>&
tmp_rowsets_meta);
+ const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>&
tmp_rowsets_meta,
+ bool is_versioned_write);
};
static doris::cloud::RecyclerThreadPoolGroup thread_group;
@@ -344,11 +345,13 @@ static void
check_txn_not_exist(std::unique_ptr<Transaction>& txn, int64_t db_id
}
// Create a MULTI_VERSION_READ_WRITE instance and refresh the resource manager.
-static void create_and_refresh_instance(MetaServiceProxy* service, std::string
instance_id) {
+static void create_and_refresh_instance(
+ MetaServiceProxy* service, std::string instance_id,
+ MultiVersionStatus multi_version_status = MULTI_VERSION_READ_WRITE) {
// write instance
InstanceInfoPB instance_info;
instance_info.set_instance_id(instance_id);
- instance_info.set_multi_version_status(MULTI_VERSION_READ_WRITE);
+ instance_info.set_multi_version_status(multi_version_status);
std::unique_ptr<Transaction> txn;
ASSERT_EQ(service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->put(instance_key(instance_id), instance_info.SerializeAsString());
@@ -458,7 +461,7 @@ TEST(TxnLazyCommitTest, RepairTabletIndexTest) {
MetaServiceCode code = MetaServiceCode::UNDEFINED_ERR;
std::string msg;
- repair_tablet_index(txn_kv, code, msg, mock_instance, db_id, txn_id,
tmp_rowsets_meta);
+ repair_tablet_index(txn_kv, code, msg, mock_instance, db_id, txn_id,
tmp_rowsets_meta, false);
ASSERT_EQ(code, MetaServiceCode::OK);
{
@@ -866,6 +869,139 @@ TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventually)
{
sp->disable_processing();
}
+TEST(TxnLazyCommitVersionedReadTest, CommitTxnEventuallyWithoutDbIdTest) {
+ auto txn_kv = get_mem_txn_kv();
+
+ int64_t db_id = 3131397513;
+ int64_t table_id = 3213867;
+ int64_t index_id = 123513;
+ int64_t partition_id = 113123;
+ bool commit_txn_eventually_finish_hit = false;
+ bool last_pending_txn_id_hit = false;
+ int repair_tablet_idx_count = 0;
+
+ auto sp = SyncPoint::get_instance();
+ sp->set_call_back("commit_txn_eventually::need_repair_tablet_idx",
[&](auto&& args) {
+ bool need_repair_tablet_idx = *try_any_cast<bool*>(args[0]);
+ if (repair_tablet_idx_count == 0) {
+ ASSERT_TRUE(need_repair_tablet_idx);
+ repair_tablet_idx_count++;
+ } else {
+ ASSERT_FALSE(need_repair_tablet_idx);
+ }
+ });
+
+ sp->set_call_back("commit_txn_eventually::last_pending_txn_id", [&](auto&&
args) {
+ int64_t last_pending_txn_id = *try_any_cast<int64_t*>(args[0]);
+ ASSERT_EQ(last_pending_txn_id, 0);
+ last_pending_txn_id_hit = true;
+ });
+
+ sp->set_call_back("commit_txn_eventually::finish", [&](auto&& args) {
+ MetaServiceCode code = *try_any_cast<MetaServiceCode*>(args[0]);
+ ASSERT_EQ(code, MetaServiceCode::OK);
+ commit_txn_eventually_finish_hit = true;
+ });
+ sp->enable_processing();
+
+ auto meta_service = get_meta_service(txn_kv, false);
+ std::string instance_id = "test_instance";
+ std::string cloud_unique_id = "1:test_instance:1";
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
+ SyncPoint::get_instance()->set_call_back("get_instance_id", [&](auto&&
args) {
+ auto* ret = try_any_cast_ret<std::string>(args);
+ ret->first = instance_id;
+ ret->second = true;
+ });
+ SyncPoint::get_instance()->enable_processing();
+ create_and_refresh_instance(meta_service.get(), instance_id,
MULTI_VERSION_WRITE_ONLY);
+
+ int64_t txn_id = 0;
+ {
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ req.set_cloud_unique_id(cloud_unique_id);
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+ txn_info_pb.set_label("test_label_commit_txn_eventually");
+ txn_info_pb.add_table_ids(table_id);
+ txn_info_pb.set_timeout_ms(36000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+ meta_service->begin_txn(&cntl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ txn_id = res.txn_id();
+ }
+
+ // mock rowset and tablet
+ int64_t tablet_id_base = 1103;
+ for (int i = 0; i < 5; ++i) {
+ create_tablet_without_db_id(meta_service.get(), table_id, index_id,
partition_id,
+ tablet_id_base + i);
+ auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, index_id,
partition_id);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ for (int i = 0; i < 5; ++i) {
+ int64_t tablet_id = tablet_id_base + i;
+ check_tmp_rowset_exist(txn, tablet_id, txn_id);
+ }
+ }
+
+ {
+ brpc::Controller cntl;
+ CommitTxnRequest req;
+ req.set_cloud_unique_id(cloud_unique_id);
+ req.set_db_id(db_id);
+ req.set_txn_id(txn_id);
+ req.set_is_2pc(false);
+ req.set_enable_txn_lazy_commit(true);
+ for (int i = 0; i < 5; ++i) {
+ int64_t tablet_id = tablet_id_base + i;
+ req.add_base_tablet_ids(tablet_id);
+ }
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_GE(repair_tablet_idx_count, 0);
+ ASSERT_TRUE(last_pending_txn_id_hit);
+ ASSERT_TRUE(commit_txn_eventually_finish_hit);
+ }
+
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ for (int i = 0; i < 5; ++i) {
+ int64_t tablet_id = tablet_id_base + i;
+ check_tablet_idx_db_id(txn, db_id, tablet_id);
+ check_tmp_rowset_not_exist(txn, tablet_id, txn_id);
+ check_rowset_meta_exist(txn, tablet_id, 2);
+
+ {
+ std::string mock_instance = "test_instance";
+ std::string key = versioned::tablet_index_key({mock_instance,
tablet_id});
+ std::string val;
+ ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK);
+ TabletIndexPB tablet_idx_pb;
+ tablet_idx_pb.ParseFromString(val);
+ ASSERT_EQ(tablet_idx_pb.db_id(), db_id);
+ }
+ }
+ }
+
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+}
+
TEST(TxnLazyCommitTest, CommitTxnImmediatelyTest) {
auto txn_kv = get_mem_txn_kv();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]