This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new d893d6ec382 branch-3.1: [opt](txn lazy commit) Make convert tmp 
rowsets batch more adaptive #55035 (#55573)
d893d6ec382 is described below

commit d893d6ec38295b9039241987e033ddf4fb52a983
Author: Lei Zhang <[email protected]>
AuthorDate: Thu Sep 4 10:40:38 2025 +0800

    branch-3.1: [opt](txn lazy commit) Make convert tmp rowsets batch more 
adaptive #55035 (#55573)
    
    bp #55035
---
 cloud/src/common/config.h                     |   2 +-
 cloud/src/meta-service/txn_lazy_committer.cpp |  23 +++--
 cloud/test/txn_lazy_commit_test.cpp           | 139 ++++++++++++++++++++++++++
 3 files changed, 156 insertions(+), 8 deletions(-)

diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 4cd46c605fa..b1ed35bc0ba 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -284,7 +284,7 @@ CONF_mInt64(max_txn_commit_byte, "7340032");
 CONF_Bool(enable_cloud_txn_lazy_commit, "true");
 CONF_Int32(txn_lazy_commit_rowsets_thresold, "1000");
 CONF_Int32(txn_lazy_commit_num_threads, "8");
-CONF_Int32(txn_lazy_max_rowsets_per_batch, "1000");
+CONF_mInt64(txn_lazy_max_rowsets_per_batch, "1000");
 // max TabletIndexPB num for batch get
 CONF_Int32(max_tablet_index_num_per_batch, "1000");
 CONF_Int32(max_restore_job_rowsets_per_batch, "1000");
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp 
b/cloud/src/meta-service/txn_lazy_committer.cpp
index 795f2f21cb7..7a4755e44b9 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -315,7 +315,6 @@ void TxnLazyCommitTask::commit() {
                 LOG(WARNING) << "scan_tmp_rowset failed, txn_id=" << txn_id_ 
<< " code=" << code_;
                 break;
             }
-
             VLOG_DEBUG << "txn_id=" << txn_id_
                        << " tmp_rowset_metas.size()=" << 
all_tmp_rowset_metas.size();
             if (all_tmp_rowset_metas.size() == 0) {
@@ -325,23 +324,33 @@ void TxnLazyCommitTask::commit() {
             // <partition_id, tmp_rowsets>
             std::map<int64_t, std::vector<std::pair<std::string, 
doris::RowsetMetaCloudPB>>>
                     partition_to_tmp_rowset_metas;
+            size_t max_rowset_meta_size = 0;
             for (auto& [tmp_rowset_key, tmp_rowset_pb] : all_tmp_rowset_metas) 
{
                 
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].emplace_back();
                 
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().first =
                         tmp_rowset_key;
                 
partition_to_tmp_rowset_metas[tmp_rowset_pb.partition_id()].back().second =
                         tmp_rowset_pb;
+                max_rowset_meta_size = std::max(max_rowset_meta_size, 
tmp_rowset_pb.ByteSizeLong());
+            }
+
+            // fdb txn limit 10MB, we use 4MB as the max size for each batch.
+            size_t max_rowsets_per_batch = 
config::txn_lazy_max_rowsets_per_batch;
+            if (max_rowset_meta_size > 0) {
+                max_rowsets_per_batch = std::min((4UL << 20) / 
max_rowset_meta_size,
+                                                 
size_t(config::txn_lazy_max_rowsets_per_batch));
+                
TEST_SYNC_POINT_CALLBACK("TxnLazyCommitTask::commit::max_rowsets_per_batch",
+                                         &max_rowsets_per_batch, 
&max_rowset_meta_size);
             }
 
             for (auto& [partition_id, tmp_rowset_metas] : 
partition_to_tmp_rowset_metas) {
                 // tablet_id -> TabletIndexPB
                 std::map<int64_t, TabletIndexPB> tablet_ids;
-                for (size_t i = 0; i < tmp_rowset_metas.size();
-                     i += config::txn_lazy_max_rowsets_per_batch) {
-                    size_t end =
-                            (i + config::txn_lazy_max_rowsets_per_batch) > 
tmp_rowset_metas.size()
-                                    ? tmp_rowset_metas.size()
-                                    : i + 
config::txn_lazy_max_rowsets_per_batch;
+                for (size_t i = 0; i < tmp_rowset_metas.size(); i += 
max_rowsets_per_batch) {
+                    size_t end = (i + max_rowsets_per_batch) > 
tmp_rowset_metas.size()
+                                         ? tmp_rowset_metas.size()
+                                         : i + max_rowsets_per_batch;
+
                     std::vector<std::pair<std::string, 
doris::RowsetMetaCloudPB>>
                             
sub_partition_tmp_rowset_metas(tmp_rowset_metas.begin() + i,
                                                            
tmp_rowset_metas.begin() + end);
diff --git a/cloud/test/txn_lazy_commit_test.cpp 
b/cloud/test/txn_lazy_commit_test.cpp
index 62f959167e6..5b1686eed28 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -178,6 +178,33 @@ static doris::RowsetMetaCloudPB create_rowset(int64_t 
txn_id, int64_t tablet_id,
     return rowset;
 }
 
+static doris::RowsetMetaCloudPB create_huge_rowset(int64_t txn_id, int64_t 
tablet_id, int index_id,
+                                                   int partition_id, int64_t 
version = -1,
+                                                   int num_rows = 100) {
+    doris::RowsetMetaCloudPB rowset;
+    rowset.set_rowset_id(0); // required
+    rowset.set_rowset_id_v2(next_rowset_id());
+    rowset.set_tablet_id(tablet_id);
+    rowset.set_partition_id(partition_id);
+    rowset.set_index_id(index_id);
+    rowset.set_txn_id(txn_id);
+    if (version > 0) {
+        rowset.set_start_version(version);
+        rowset.set_end_version(version);
+    }
+    rowset.set_num_segments(600);
+    for (int i = 0; i < 600; i++) {
+        auto ptr = rowset.add_segments_key_bounds();
+        ptr->set_min_key("xxsqewqeqweeqwewqeqeq");
+        ptr->set_max_key("dase23452rr234ewdw534523");
+    }
+    rowset.set_num_rows(0);
+    rowset.set_data_disk_size(0);
+    rowset.mutable_tablet_schema()->set_schema_version(0);
+    rowset.set_txn_expiration(::time(nullptr)); // Required by DCHECK
+    return rowset;
+}
+
 static void commit_rowset(MetaServiceProxy* meta_service, const 
doris::RowsetMetaCloudPB& rowset,
                           CreateRowsetResponse& res) {
     brpc::Controller cntl;
@@ -202,6 +229,18 @@ static std::shared_ptr<TxnKv> get_mem_txn_kv() {
     return txn_kv;
 }
 
+static std::shared_ptr<TxnKv> get_fdb_txn_kv() {
+    int ret = 0;
+    cloud::config::fdb_cluster_file_path = "fdb.cluster";
+    auto fdb_txn_kv = 
std::dynamic_pointer_cast<cloud::TxnKv>(std::make_shared<cloud::FdbTxnKv>());
+    if (fdb_txn_kv != nullptr) {
+        ret = fdb_txn_kv->init();
+        [&] { ASSERT_EQ(ret, 0); }();
+    }
+    [&] { ASSERT_NE(fdb_txn_kv.get(), nullptr); }();
+    return fdb_txn_kv;
+}
+
 static void check_tablet_idx_db_id(std::unique_ptr<Transaction>& txn, int64_t 
db_id,
                                    int64_t tablet_id) {
     std::string mock_instance = "test_instance";
@@ -2949,4 +2988,104 @@ TEST(TxnLazyCommitTest, 
CommitTxnEventuallyWithMultiTableTest) {
     sp->disable_processing();
 }
 
+TEST(TxnLazyCommitTest, CommitTxnEventuallyWithHugeRowsetMetaTest) {
+    config::txn_lazy_max_rowsets_per_batch = 1000;
+    auto txn_kv = get_fdb_txn_kv();
+    int64_t db_id = 14135425;
+    int64_t table_id = 31245456;
+    int64_t index_id = 434324;
+    int64_t partition_id = 3215764;
+
+    int64_t table_id2 = 213476;
+    int64_t index_id2 = 126765;
+    int64_t partition_id2 = 214567;
+    bool commit_txn_eventually_finish_hit = false;
+
+    auto sp = SyncPoint::get_instance();
+    sp->set_call_back("commit_txn_eventually::task->wait", [&](auto&& args) {
+        auto [code, msg] = *try_any_cast<std::pair<MetaServiceCode, 
std::string>*>(args[0]);
+        ASSERT_EQ(code, MetaServiceCode::OK);
+        commit_txn_eventually_finish_hit = true;
+    });
+
+    sp->set_call_back("TxnLazyCommitTask::commit::max_rowsets_per_batch", 
[&](auto&& args) {
+        size_t max_rowsets_per_batch = *try_any_cast<size_t*>(args[0]);
+        size_t max_rowset_meta_size = *try_any_cast<size_t*>(args[1]);
+        LOG(INFO) << "max_rowsets_per_batch:" << max_rowsets_per_batch
+                  << " max_rowset_meta_size:" << max_rowset_meta_size;
+        ASSERT_EQ(max_rowsets_per_batch, 134);
+    });
+
+    sp->enable_processing();
+
+    auto meta_service = get_meta_service(txn_kv, true);
+    brpc::Controller cntl;
+    BeginTxnRequest req;
+    req.set_cloud_unique_id("test_cloud_unique_id");
+    TxnInfoPB txn_info_pb;
+    txn_info_pb.set_db_id(db_id);
+    txn_info_pb.set_label("test_label_multi_table_commit_txn");
+    txn_info_pb.add_table_ids(table_id);
+    txn_info_pb.add_table_ids(table_id2);
+    txn_info_pb.set_timeout_ms(36000);
+    req.mutable_txn_info()->CopyFrom(txn_info_pb);
+    BeginTxnResponse res;
+    
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req, &res,
+                            nullptr);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    int64_t txn_id = res.txn_id();
+
+    // mock rowset and tablet
+    int64_t tablet_id_base = 3131124;
+    for (int i = 0; i < 1000; ++i) {
+        create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
+                                 tablet_id_base + i);
+        auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, 
index_id, partition_id);
+        CreateRowsetResponse res;
+        commit_rowset(meta_service.get(), tmp_rowset, res);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    for (int i = 1000; i < 2000; ++i) {
+        create_tablet_with_db_id(meta_service.get(), db_id, table_id2, 
index_id2, partition_id2,
+                                 tablet_id_base + i);
+        auto tmp_rowset = create_huge_rowset(txn_id, tablet_id_base + i, 
index_id2, partition_id2);
+        CreateRowsetResponse res;
+        commit_rowset(meta_service.get(), tmp_rowset, res);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    {
+        brpc::Controller cntl;
+        CommitTxnRequest req;
+        req.set_cloud_unique_id("test_cloud_unique_id");
+        req.set_db_id(db_id);
+        req.set_txn_id(txn_id);
+        req.set_is_2pc(false);
+        req.set_enable_txn_lazy_commit(true);
+        CommitTxnResponse res;
+        
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                                 &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        ASSERT_TRUE(commit_txn_eventually_finish_hit);
+    }
+
+    {
+        std::unique_ptr<Transaction> txn;
+        ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+        std::string mock_instance = "test_instance";
+        for (int i = 0; i < 2000; ++i) {
+            int64_t tablet_id = tablet_id_base + i;
+            check_tablet_idx_db_id(txn, db_id, tablet_id);
+            check_tmp_rowset_not_exist(txn, tablet_id, txn_id);
+            check_rowset_meta_exist(txn, tablet_id, 2);
+        }
+    }
+
+    sp->clear_all_call_backs();
+    sp->clear_trace();
+    sp->disable_processing();
+    config::txn_lazy_max_rowsets_per_batch = 2;
+}
+
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to