This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bca4689a5a4 [fix](cloud) fix unnecessary conflict range in lazy commit 
(#60274)
bca4689a5a4 is described below

commit bca4689a5a425b7e0d8e96efa31a2d13a38f0968
Author: walter <[email protected]>
AuthorDate: Thu Jan 29 15:59:02 2026 +0800

    [fix](cloud) fix unnecessary conflict range in lazy commit (#60274)
    
    Before:
    
    ```
    I20260127 07:50:10.853658 2639096 meta_service_util.cpp:78] update table 
version txn_id=885400033280 db_id=15246789 table_id=42356789                    
                                                         I20260127 
07:50:10.853765 2639096 meta_service_txn.cpp:2381] put_size=160197 del_size=0 
num_put_keys=2002 num_del_keys=0 txn_size=12976511 txn_id=885400033280          
                                          W20260127 07:50:10.924083 2639096 
txn_kv.cpp:782] fdb commit error, c [...]
    ```
    
    After:
    
    ```
    I20260127 09:02:46.518415 2656919 meta_service_util.cpp:78] update table 
version txn_id=5329084161024 db_id=15246789 table_id=42356789
    I20260127 09:02:46.518524 2656919 meta_service_txn.cpp:2402] 
put_size=162198 del_size=0 num_put_keys=2002 num_del_keys=0 txn_size=690512 
txn_id=5329084161024 the underlying txn size=846643
    I20260127 09:02:46.554267 2657045 txn_lazy_committer.cpp:658] lazy task 
commit txn_id=5329084161024 retry_times=0
    ```
---
 cloud/src/meta-service/meta_service_txn.cpp |  36 ++++++++-
 cloud/src/meta-store/mem_txn_kv.h           |   2 +-
 cloud/src/meta-store/txn_kv.cpp             |  53 ++++++++++++-
 cloud/src/meta-store/txn_kv.h               |   6 +-
 cloud/test/txn_lazy_commit_test.cpp         | 117 +++++++++++++++++++++++++---
 5 files changed, 191 insertions(+), 23 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index e7f76f2b7ff..8d26652e0a1 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1312,7 +1312,7 @@ std::pair<MetaServiceCode, std::string> 
get_tablet_indexes(
     }
 
     TxnErrorCode err = txn->batch_get(&tablet_idx_values, tablet_idx_keys,
-                                      Transaction::BatchGetOptions(false));
+                                      Transaction::BatchGetOptions(snapshot));
     if (err != TxnErrorCode::TXN_OK) {
         auto msg = fmt::format("failed to get tablet table index ids, err={}", 
err);
         LOG_WARNING(msg);
@@ -1891,6 +1891,9 @@ void MetaServiceImpl::commit_txn_immediately(
         if (err != TxnErrorCode::TXN_OK) {
             if (err == TxnErrorCode::TXN_CONFLICT) {
                 g_bvar_delete_bitmap_lock_txn_remove_conflict_by_load_counter 
<< 1;
+            } else if (err == TxnErrorCode::TXN_BYTES_TOO_LARGE) {
+                LOG(WARNING) << "commit txn failed due to txn size too large, 
txn_id=" << txn_id
+                             << " the underlying txn size=" << 
txn->approximate_bytes(true);
             }
             code = cast_as<ErrCategory::COMMIT>(err);
             ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << 
err;
@@ -2079,13 +2082,13 @@ void MetaServiceImpl::commit_txn_eventually(
                 std::ranges::views::transform(
                         [](const auto& pair) { return pair.second.tablet_id(); 
}));
         if (!is_versioned_read) {
-            std::tie(code, msg) =
-                    get_tablet_indexes(txn.get(), &tablet_ids, instance_id, 
acquired_tablet_ids);
+            std::tie(code, msg) = get_tablet_indexes(txn.get(), &tablet_ids, 
instance_id,
+                                                     acquired_tablet_ids, 
true);
             if (code != MetaServiceCode::OK) {
                 return;
             }
         } else {
-            err = meta_reader.get_tablet_indexes(txn.get(), 
acquired_tablet_ids, &tablet_ids);
+            err = meta_reader.get_tablet_indexes(txn.get(), 
acquired_tablet_ids, &tablet_ids, true);
             if (err != TxnErrorCode::TXN_OK) {
                 code = cast_as<ErrCategory::READ>(err);
                 msg = fmt::format("failed to get tablet indexes, err={}", err);
@@ -2113,6 +2116,24 @@ void MetaServiceImpl::commit_txn_eventually(
             continue;
         }
 
+        stats.get_bytes += txn->get_bytes();
+        stats.put_bytes += txn->put_bytes();
+        stats.del_bytes += txn->delete_bytes();
+        stats.get_counter += txn->num_get_keys();
+        stats.put_counter += txn->num_put_keys();
+        stats.del_counter += txn->num_del_keys();
+
+        // Reset txn to avoid txn is too old to perform reads or be committed
+        txn.reset();
+        err = txn_kv_->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::CREATE>(err);
+            ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
+
         CommitTxnLogPB commit_txn_log;
         commit_txn_log.set_txn_id(txn_id);
         commit_txn_log.set_db_id(db_id);
@@ -2384,6 +2405,9 @@ void MetaServiceImpl::commit_txn_eventually(
         if (err != TxnErrorCode::TXN_OK) {
             if (err == TxnErrorCode::TXN_CONFLICT) {
                 g_bvar_delete_bitmap_lock_txn_remove_conflict_by_load_counter 
<< 1;
+            } else if (err == TxnErrorCode::TXN_BYTES_TOO_LARGE) {
+                LOG(WARNING) << "commit txn failed due to txn size too large, 
txn_id=" << txn_id
+                             << " the underlying txn size=" << 
txn->approximate_bytes(true);
             }
             code = cast_as<ErrCategory::COMMIT>(err);
             ss << "failed to commit kv txn, txn_id=" << txn_id << " err=" << 
err;
@@ -2931,6 +2955,10 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const 
CommitTxnRequest* request,
         if (err != TxnErrorCode::TXN_OK) {
             if (err == TxnErrorCode::TXN_CONFLICT) {
                 g_bvar_delete_bitmap_lock_txn_remove_conflict_by_load_counter 
<< 1;
+            } else if (err == TxnErrorCode::TXN_BYTES_TOO_LARGE) {
+                LOG(WARNING) << "commit txn with sub txn failed due to txn 
size too large, txn_id="
+                             << txn_id
+                             << " the underlying txn size=" << 
txn->approximate_bytes(true);
             }
             code = cast_as<ErrCategory::COMMIT>(err);
             ss << "failed to commit kv txn with sub txn, txn_id=" << txn_id << 
" err=" << err;
diff --git a/cloud/src/meta-store/mem_txn_kv.h 
b/cloud/src/meta-store/mem_txn_kv.h
index d7edd1bd5af..356fe3cedf6 100644
--- a/cloud/src/meta-store/mem_txn_kv.h
+++ b/cloud/src/meta-store/mem_txn_kv.h
@@ -245,7 +245,7 @@ public:
                             const std::vector<std::pair<std::string, 
std::string>>& ranges,
                             const BatchGetOptions& opts = BatchGetOptions()) 
override;
 
-    size_t approximate_bytes() const override { return approximate_bytes_; }
+    size_t approximate_bytes(bool = false) const override { return 
approximate_bytes_; }
 
     size_t num_get_keys() const override { return num_get_keys_; }
 
diff --git a/cloud/src/meta-store/txn_kv.cpp b/cloud/src/meta-store/txn_kv.cpp
index 6623fedb5dc..d55bcec618c 100644
--- a/cloud/src/meta-store/txn_kv.cpp
+++ b/cloud/src/meta-store/txn_kv.cpp
@@ -533,7 +533,10 @@ TxnErrorCode Transaction::get(std::string_view key, 
std::string* val, bool snaps
     may_logging_single_version_reading(key);
 
     StopWatch sw;
-    approximate_bytes_ += key.size() * 2; // See 
fdbclient/ReadYourWrites.actor.cpp for details
+    if (!snapshot) {
+        // See fdbclient/ReadYourWrites.actor.cpp for details
+        approximate_bytes_ += key.size() * 2;
+    }
     auto* fut = fdb_transaction_get(txn_, (uint8_t*)key.data(), key.size(), 
snapshot);
 
     g_bvar_txn_kv_get_count_normalized << 1;
@@ -577,7 +580,10 @@ TxnErrorCode Transaction::get(std::string_view begin, 
std::string_view end,
     may_logging_single_version_reading(begin);
 
     StopWatch sw;
-    approximate_bytes_ += begin.size() + end.size();
+    if (!opts.snapshot) {
+        // See fdbclient/ReadYourWrites.actor.cpp for details
+        approximate_bytes_ += begin.size() * 2 + end.size() * 2;
+    }
     DORIS_CLOUD_DEFER {
         g_bvar_txn_kv_range_get << sw.elapsed_us();
     };
@@ -881,6 +887,41 @@ TxnErrorCode Transaction::abort() {
     return TxnErrorCode::TXN_OK;
 }
 
+size_t Transaction::approximate_bytes(bool fetch_from_underlying_kv) const {
+    if (!fetch_from_underlying_kv) {
+        return approximate_bytes_;
+    }
+
+    auto* fut = fdb_transaction_get_approximate_size(txn_);
+    DORIS_CLOUD_DEFER {
+        fdb_future_destroy(fut);
+    };
+
+    auto code = await_future(fut);
+    if (code != TxnErrorCode::TXN_OK) {
+        LOG(WARNING) << "failed to await future for 
fdb_transaction_get_approximate_size, code="
+                     << code;
+        return static_cast<size_t>(-1);
+    }
+
+    auto err = fdb_future_get_error(fut);
+    if (err) {
+        LOG(WARNING) << "failed to get approximate size, code=" << err
+                     << " msg=" << fdb_get_error(err);
+        return static_cast<size_t>(-1);
+    }
+
+    int64_t size = 0;
+    err = fdb_future_get_int64(fut, &size);
+    if (err) {
+        LOG(WARNING) << "failed to extract int64 from approximate size future, 
code=" << err
+                     << " msg=" << fdb_get_error(err);
+        return static_cast<size_t>(-1);
+    }
+
+    return static_cast<size_t>(size);
+}
+
 void Transaction::enable_get_versionstamp() {
     versionstamp_enabled_ = true;
 }
@@ -1143,7 +1184,9 @@ TxnErrorCode 
Transaction::batch_get(std::vector<std::optional<std::string>>* res
             may_logging_single_version_reading(k);
             futures.emplace_back(
                     fdb_transaction_get(txn_, (uint8_t*)k.data(), k.size(), 
opts.snapshot));
-            approximate_bytes_ += k.size() * 2;
+            if (!opts.snapshot) {
+                approximate_bytes_ += k.size() * 2;
+            }
         }
 
         size_t num_futures = futures.size();
@@ -1221,7 +1264,9 @@ TxnErrorCode Transaction::batch_scan(
                     snapshot, reverse);
 
             futures.emplace_back(fut);
-            approximate_bytes_ += start.size() + end.size();
+            if (!opts.snapshot) {
+                approximate_bytes_ += start.size() + end.size();
+            }
         }
 
         size_t num_futures = futures.size();
diff --git a/cloud/src/meta-store/txn_kv.h b/cloud/src/meta-store/txn_kv.h
index ab97a1b4c97..8b9a5e69aa4 100644
--- a/cloud/src/meta-store/txn_kv.h
+++ b/cloud/src/meta-store/txn_kv.h
@@ -373,8 +373,10 @@ public:
 
     /**
      * @brief return the approximate bytes consumed by the underlying 
transaction buffer.
+     * @param fetch_from_underlying_kv if true, use an heavy operation to get 
the size from the underlying
+     *                kv store; otherwise, return the tracked size. Default is 
false.
      **/
-    virtual size_t approximate_bytes() const = 0;
+    virtual size_t approximate_bytes(bool fetch_from_underlying_kv = false) 
const = 0;
 
     /**
      * @brief return the num get keys submitted to this txn.
@@ -821,7 +823,7 @@ public:
                             const std::vector<std::pair<std::string, 
std::string>>& ranges,
                             const BatchGetOptions& opts = BatchGetOptions()) 
override;
 
-    size_t approximate_bytes() const override { return approximate_bytes_; }
+    size_t approximate_bytes(bool fetch_from_underlying_kv = false) const 
override;
 
     size_t num_get_keys() const override { return num_get_keys_; }
 
diff --git a/cloud/test/txn_lazy_commit_test.cpp 
b/cloud/test/txn_lazy_commit_test.cpp
index 1b3125970e3..b316a5e5f45 100644
--- a/cloud/test/txn_lazy_commit_test.cpp
+++ b/cloud/test/txn_lazy_commit_test.cpp
@@ -58,8 +58,27 @@ void repair_tablet_index(
         bool is_versioned_write);
 };
 
+static std::shared_ptr<TxnKv> txn_kv;
 static doris::cloud::RecyclerThreadPoolGroup thread_group;
 
+static std::shared_ptr<TxnKv> get_fdb_txn_kv() {
+    if (txn_kv) {
+        return txn_kv;
+    }
+
+    int ret = 0;
+    doris::cloud::config::fdb_cluster_file_path = "fdb.cluster";
+    auto fdb_txn_kv = std::dynamic_pointer_cast<doris::cloud::TxnKv>(
+            std::make_shared<doris::cloud::FdbTxnKv>());
+    if (fdb_txn_kv != nullptr) {
+        ret = fdb_txn_kv->init();
+        [&] { ASSERT_EQ(ret, 0); }();
+    }
+    [&] { ASSERT_NE(fdb_txn_kv.get(), nullptr); }();
+    txn_kv = fdb_txn_kv;
+    return fdb_txn_kv;
+}
+
 int main(int argc, char** argv) {
     const std::string conf_file = "doris_cloud.conf";
     if (!doris::cloud::config::init(conf_file.c_str(), true)) {
@@ -79,6 +98,9 @@ int main(int argc, char** argv) {
     }
     ::testing::InitGoogleTest(&argc, argv);
 
+    // Initialize FDB
+    get_fdb_txn_kv();
+
     auto s3_producer_pool = 
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
     s3_producer_pool->start();
     auto recycle_tablet_pool = 
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
@@ -238,18 +260,6 @@ static std::shared_ptr<TxnKv> get_mem_txn_kv() {
     return txn_kv;
 }
 
-static std::shared_ptr<TxnKv> get_fdb_txn_kv() {
-    int ret = 0;
-    cloud::config::fdb_cluster_file_path = "fdb.cluster";
-    auto fdb_txn_kv = 
std::dynamic_pointer_cast<cloud::TxnKv>(std::make_shared<cloud::FdbTxnKv>());
-    if (fdb_txn_kv != nullptr) {
-        ret = fdb_txn_kv->init();
-        [&] { ASSERT_EQ(ret, 0); }();
-    }
-    [&] { ASSERT_NE(fdb_txn_kv.get(), nullptr); }();
-    return fdb_txn_kv;
-}
-
 static void check_tablet_idx_db_id(std::unique_ptr<Transaction>& txn, int64_t 
db_id,
                                    int64_t tablet_id) {
     std::string mock_instance = "test_instance";
@@ -3336,4 +3346,87 @@ TEST(TxnLazyCommitTest, 
CommitTxnEventuallyWithAbortAfterCommitTest) {
     }
 }
 
+TEST(TxnLazyCommitTest, CommitTxnEventuallyWithManyPartitions) {
+    auto txn_kv = get_fdb_txn_kv();
+    int64_t db_id = 15246789;
+    int64_t table_id = 42356789;
+    int64_t index_id = 98765432;
+
+    std::atomic_bool commit_txn_eventually_finish_hit = false;
+
+    auto sp = SyncPoint::get_instance();
+    DORIS_CLOUD_DEFER {
+        sp->clear_all_call_backs();
+        sp->disable_processing();
+    };
+    sp->set_call_back("commit_txn_eventually::task->wait", [&](auto&& args) {
+        auto [code, msg] = *try_any_cast<std::pair<MetaServiceCode, 
std::string>*>(args[0]);
+        ASSERT_EQ(code, MetaServiceCode::OK);
+        commit_txn_eventually_finish_hit = true;
+    });
+
+    sp->enable_processing();
+
+    auto meta_service = get_meta_service(txn_kv, true);
+    brpc::Controller cntl;
+    BeginTxnRequest req;
+    req.set_cloud_unique_id("test_cloud_unique_id");
+    TxnInfoPB txn_info_pb;
+    txn_info_pb.set_db_id(db_id);
+    txn_info_pb.set_label("test_label_with_many_partitions");
+    txn_info_pb.add_table_ids(table_id);
+    txn_info_pb.set_timeout_ms(60000000);
+    req.mutable_txn_info()->CopyFrom(txn_info_pb);
+    BeginTxnResponse res;
+    
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req, &res,
+                            nullptr);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.ShortDebugString();
+    int64_t txn_id = res.txn_id();
+
+    // mock rowset and tablet: 2000 partitions, 64 tablets per partition
+    int64_t tablet_id_base = 1000000;
+    int64_t partition_id_base = 5000000;
+
+    // Change below parameters to large values if you want to do a more stress 
test
+    const int num_partitions = 20;
+    const int tablets_per_partition = 2;
+
+    for (int p = 0; p < num_partitions; ++p) {
+        int64_t partition_id = partition_id_base + p;
+        for (int t = 0; t < tablets_per_partition; ++t) {
+            int64_t tablet_id = tablet_id_base + p * tablets_per_partition + t;
+            create_tablet_with_db_id(meta_service.get(), db_id, table_id, 
index_id, partition_id,
+                                     tablet_id);
+            auto tmp_rowset = create_rowset(txn_id, tablet_id, index_id, 
partition_id);
+            CreateRowsetResponse res;
+            prepare_rowset(meta_service.get(), tmp_rowset, res);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+            commit_rowset(meta_service.get(), tmp_rowset, res);
+            ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+        }
+    }
+
+    {
+        brpc::Controller cntl;
+        CommitTxnRequest req;
+        req.set_cloud_unique_id("test_cloud_unique_id");
+        req.set_db_id(db_id);
+        req.set_txn_id(txn_id);
+        req.set_is_2pc(false);
+        req.set_enable_txn_lazy_commit(true);
+        CommitTxnResponse res;
+        
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
 &req,
+                                 &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    }
+
+    for (size_t i = 0; i < 1000; i++) {
+        // just wait for a while to make sure the commit txn eventually task 
is finished
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        if (commit_txn_eventually_finish_hit.load()) {
+            break;
+        }
+    }
+}
+
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to