This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new d893d6ec382 branch-3.1: [opt](txn lazy commit) Make convert tmp
rowsets batch more adaptive #55035 (#55573)
d893d6ec382 is described below
commit d893d6ec38295b9039241987e033ddf4fb52a983
Author: Lei Zhang <[email protected]>
AuthorDate: Thu Sep 4 10:40:38 2025 +0800
branch-3.1: [opt](txn lazy commit) Make convert tmp rowsets batch more
adaptive #55035 (#55573)
bp #55035
---
cloud/src/common/config.h | 2 +-
cloud/src/meta-service/txn_lazy_committer.cpp | 23 +++--
cloud/test/txn_lazy_commit_test.cpp | 139 ++++++++++++++++++++++++++
3 files changed, 156 insertions(+), 8 deletions(-)
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 4cd46c605fa..b1ed35bc0ba 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -284,7 +284,7 @@ CONF_mInt64(max_txn_commit_byte, "7340032");
CONF_Bool(enable_cloud_txn_lazy_commit, "true");
CONF_Int32(txn_lazy_commit_rowsets_thresold, "1000");
CONF_Int32(txn_lazy_commit_num_threads, "8");
-CONF_Int32(txn_lazy_max_rowsets_per_batch, "1000");
+CONF_mInt64(txn_lazy_max_rowsets_per_batch, "1000");
// max TabletIndexPB num for batch get
CONF_Int32(max_tablet_index_num_per_batch, "1000");
CONF_Int32(max_restore_job_rowsets_per_batch, "1000");
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp
b/cloud/src/meta-service/txn_lazy_committer.cpp
index 795f2f21cb7..7a4755e44b9 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -315,7 +315,6 @@ void TxnLazyCommitTask::commit() {
LOG(WARNING) << "scan_tmp_rowset failed, txn_id=" << txn_id_
<< " code=" << code_;
break;
}
-
VLOG_DEBUG << "txn_id=" << txn_id_
<< " tmp_rowset_metas.size()=" <<
all_tmp_rowset_metas.size();
if (all_tmp_rowset_metas.size() == 0) {
@@ -325,23 +324,33 @@ void TxnLazyCommitTask::commit() {
// <partition_id, tmp_rowsets>
std::map<int64_t, std::vector<std::pair<std::string,
doris::RowsetMetaCloudPB>>>
partition_to_tmp_rowset_metas;
+ size_t max_rowset_meta_size = 0;
for (auto& [tmp_rowset_key, tmp_rowset_pb] : all_tmp_rowset_metas)
{
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].emplace_back();
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().first =
tmp_rowset_key;
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().second =
tmp_rowset_pb;
+ max_rowset_meta_size = std::max(max_rowset_meta_size,
tmp_rowset_pb.ByteSizeLong());
+ }
+
+ // fdb txn limit 10MB, we use 4MB as the max size for each batch.
+ size_t max_rowsets_per_batch =
config::txn_lazy_max_rowsets_per_batch;
+ if (max_rowset_meta_size > 0) {
+ max_rowsets_per_batch = std::min((4UL << 20) /
max_rowset_meta_size,
+
size_t(config::txn_lazy_max_rowsets_per_batch));
+
TEST_SYNC_POINT_CALLBACK("TxnLazyCommitTask::commit::max_rowsets_per_batch",
+ &max_rowsets_per_batch,
&max_rowset_meta_size);
}
for (auto& [partition_id, tmp_rowset_metas] :
partition_to_tmp_rowset_metas) {
// tablet_id -> TabletIndexPB
std::map<int64_t, TabletIndexPB> tablet_ids;
- for (size_t i = 0; i < tmp_rowset_metas.size();
- i += config::txn_lazy_max_rowsets_per_batch) {
- size_t end =
- (i + config::txn_lazy_max_rowsets_per_batch) >
tmp_rowset_metas.size()
- ? tmp_rowset_metas.size()
- : i +
config::txn_lazy_max_rowsets_per_batch;
+ for (size_t i = 0; i < tmp_rowset_metas.size(); i +=
max_rowsets_per_batch) {
+ size_t end = (i + max_rowsets_per_batch) >
tmp_rowset_metas.size()
+ ? tmp_rowset_metas.size()
+ : i + max_rowsets_per_batch;
+
std::vector<std::pair<std::string,
doris::RowsetMetaCloudPB>>
sub_partition_tmp_rowset_metas(tmp_rowset_metas.begin() + i,
tmp_rowset_metas.begin() + end);
diff --git a/cloud/test/txn_lazy_commit_test.cpp
b/cloud/test/txn_lazy_commit_test.cpp
index 62f959167e6..5b1686eed28 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -178,6 +178,33 @@ static doris::RowsetMetaCloudPB create_rowset(int64_t
txn_id, int64_t tablet_id,
return rowset;
}
+static doris::RowsetMetaCloudPB create_huge_rowset(int64_t txn_id, int64_t
tablet_id, int index_id,
+ int partition_id, int64_t
version = -1,
+ int num_rows = 100) {
+ doris::RowsetMetaCloudPB rowset;
+ rowset.set_rowset_id(0); // required
+ rowset.set_rowset_id_v2(next_rowset_id());
+ rowset.set_tablet_id(tablet_id);
+ rowset.set_partition_id(partition_id);
+ rowset.set_index_id(index_id);
+ rowset.set_txn_id(txn_id);
+ if (version > 0) {
+ rowset.set_start_version(version);
+ rowset.set_end_version(version);
+ }
+ rowset.set_num_segments(600);
+ for (int i = 0; i < 600; i++) {
+ auto ptr = rowset.add_segments_key_bounds();
+ ptr->set_min_key("xxsqewqeqweeqwewqeqeq");
+ ptr->set_max_key("dase23452rr234ewdw534523");
+ }
+ rowset.set_num_rows(0);
+ rowset.set_data_disk_size(0);
+ rowset.mutable_tablet_schema()->set_schema_version(0);
+ rowset.set_txn_expiration(::time(nullptr)); // Required by DCHECK
+ return rowset;
+}
+
static void commit_rowset(MetaServiceProxy* meta_service, const
doris::RowsetMetaCloudPB& rowset,
CreateRowsetResponse& res) {
brpc::Controller cntl;
@@ -202,6 +229,18 @@ static std::shared_ptr<TxnKv> get_mem_txn_kv() {
return txn_kv;
}
+static std::shared_ptr<TxnKv> get_fdb_txn_kv() {
+ int ret = 0;
+ cloud::config::fdb_cluster_file_path = "fdb.cluster";
+ auto fdb_txn_kv =
std::dynamic_pointer_cast<cloud::TxnKv>(std::make_shared<cloud::FdbTxnKv>());
+ if (fdb_txn_kv != nullptr) {
+ ret = fdb_txn_kv->init();
+ [&] { ASSERT_EQ(ret, 0); }();
+ }
+ [&] { ASSERT_NE(fdb_txn_kv.get(), nullptr); }();
+ return fdb_txn_kv;
+}
+
static void check_tablet_idx_db_id(std::unique_ptr<Transaction>& txn, int64_t
db_id,
int64_t tablet_id) {
std::string mock_instance = "test_instance";
@@ -2949,4 +2988,104 @@ TEST(TxnLazyCommitTest,
CommitTxnEventuallyWithMultiTableTest) {
sp->disable_processing();
}
+TEST(TxnLazyCommitTest, CommitTxnEventuallyWithHugeRowsetMetaTest) {
+ config::txn_lazy_max_rowsets_per_batch = 1000;
+ auto txn_kv = get_fdb_txn_kv();
+ int64_t db_id = 14135425;
+ int64_t table_id = 31245456;
+ int64_t index_id = 434324;
+ int64_t partition_id = 3215764;
+
+ int64_t table_id2 = 213476;
+ int64_t index_id2 = 126765;
+ int64_t partition_id2 = 214567;
+ bool commit_txn_eventually_finish_hit = false;
+
+ auto sp = SyncPoint::get_instance();
+ sp->set_call_back("commit_txn_eventually::task->wait", [&](auto&& args) {
+ auto [code, msg] = *try_any_cast<std::pair<MetaServiceCode,
std::string>*>(args[0]);
+ ASSERT_EQ(code, MetaServiceCode::OK);
+ commit_txn_eventually_finish_hit = true;
+ });
+
+ sp->set_call_back("TxnLazyCommitTask::commit::max_rowsets_per_batch",
[&](auto&& args) {
+ size_t max_rowsets_per_batch = *try_any_cast<size_t*>(args[0]);
+ size_t max_rowset_meta_size = *try_any_cast<size_t*>(args[1]);
+ LOG(INFO) << "max_rowsets_per_batch:" << max_rowsets_per_batch
+ << " max_rowset_meta_size:" << max_rowset_meta_size;
+ ASSERT_EQ(max_rowsets_per_batch, 134);
+ });
+
+ sp->enable_processing();
+
+ auto meta_service = get_meta_service(txn_kv, true);
+ brpc::Controller cntl;
+ BeginTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ TxnInfoPB txn_info_pb;
+ txn_info_pb.set_db_id(db_id);
+ txn_info_pb.set_label("test_label_multi_table_commit_txn");
+ txn_info_pb.add_table_ids(table_id);
+ txn_info_pb.add_table_ids(table_id2);
+ txn_info_pb.set_timeout_ms(36000);
+ req.mutable_txn_info()->CopyFrom(txn_info_pb);
+ BeginTxnResponse res;
+
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res,
+ nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ int64_t txn_id = res.txn_id();
+
+ // mock rowset and tablet
+ int64_t tablet_id_base = 3131124;
+ for (int i = 0; i < 1000; ++i) {
+ create_tablet_with_db_id(meta_service.get(), db_id, table_id,
index_id, partition_id,
+ tablet_id_base + i);
+ auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i,
index_id, partition_id);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ for (int i = 1000; i < 2000; ++i) {
+ create_tablet_with_db_id(meta_service.get(), db_id, table_id2,
index_id2, partition_id2,
+ tablet_id_base + i);
+ auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i,
index_id2, partition_id2);
+ CreateRowsetResponse res;
+ commit_rowset(meta_service.get(), tmp_rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ }
+
+ {
+ brpc::Controller cntl;
+ CommitTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_db_id(db_id);
+ req.set_txn_id(txn_id);
+ req.set_is_2pc(false);
+ req.set_enable_txn_lazy_commit(true);
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_TRUE(commit_txn_eventually_finish_hit);
+ }
+
+ {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string mock_instance = "test_instance";
+ for (int i = 0; i < 2000; ++i) {
+ int64_t tablet_id = tablet_id_base + i;
+ check_tablet_idx_db_id(txn, db_id, tablet_id);
+ check_tmp_rowset_not_exist(txn, tablet_id, txn_id);
+ check_rowset_meta_exist(txn, tablet_id, 2);
+ }
+ }
+
+ sp->clear_all_call_backs();
+ sp->clear_trace();
+ sp->disable_processing();
+ config::txn_lazy_max_rowsets_per_batch = 2;
+}
+
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]